Pages

Friday, July 29, 2016

Marketo Activities -> Salesforce Tasks with REST and Python

A year and a half ago, we had to turn off 2-way syncing in Marketo. It was messing up "Contact" record fields too badly.

Unfortunately, that also meant we lost the functionality of letting Marketo create a new "Task" for a Contact's "Activity History" every time we sent them an email. So we've got a year-and-a-half gap in such "Task" records in Salesforce to fill. (It's okay for it to be a one-off, because we're consolidating mass-mailing systems between departments and getting rid of Marketo.)

There's no easy "export a CSV file of every email we've ever sent every person in Marketo" feature in Marketo's point-and-click web-browser-based user interface. Tech support said we'd have to write code to query their REST API.

My basic algorithm is as follows:

  1. Use Python ("requests" library) code against the Marketo API to get a "dict" of every "Email Sent" activity in Marketo (and which LeadId it belongs to and the name of the email according to Marketo)
  2. Use Marketo's export functions against "All Leads" to get a CSV file containing a mapping of Marketo Lead IDs to Salesforce Contact IDs & save it to my hard drive.
  3. Use Salesforce's Data Loader to get a CSV file containing all existing "Task" records from the old "Marketo Sync" email tracking & save it to my hard drive.
  4. Use Python ("pandas" library) code to append Salesforce IDs from the CSV in step #2 to the "dict" in step #1, while subtracting emails Salesforce already knows about (match on "WhoId" & "Subject" in CSV from step #3). Write the result out to CSV.
  5. Use Salesforce's Data Loader to import the CSV from step #4 into the "Tasks" table of Salesforce

The Python script is:

import requests
baseurl = 'https://xxx.mktorest.com'
clientid = 'cid'
clientsecret = 'secret'
apitaskownerid = 'xxx' #(Marketo API sf user)
activsincedatetime = 'yyyy-mm-ddThh:mm:ss-GMTOffsetHH:00'
accesstoken=requests.get(baseurl + '/identity/oauth/token?grant_type=client_credentials' + '&client_id=' + clientid + '&client_secret=' + clientsecret).json()['access_token']
firstnextpagetoken = requests.get(baseurl + '/rest/v1/activities/pagingtoken.json' + '?sinceDatetime=' + activsincedatetime + '&access_token=' + accesstoken).json()['nextPageToken']

def getactivs(pagetok):
    ems = []
    activsbatchjson = requests.get(baseurl + '/rest/v1/activities.json' + '?nextPageToken=' + pagetok + '&activityTypeIds=INSERTNUMBERHERE' + '&access_token=' + accesstoken).json()
    if 'result' in activsbatchjson:
        ems.append(activsbatchjson['result'])
    if activsbatchjson['moreResult'] != True:
        return ems
    else:
        return getactivs(activsbatchjson['nextPageToken'])

emailssent = getactivs(firstnextpagetoken)[0]

import pandas
activdf = pandas.DataFrame(emailssent, columns=['leadId', 'primaryAttributeValueId', 'primaryAttributeValue', 'activityDate'])
leaddf = pandas.read_csv('c:\\temp\\downloadedmarketoleads.csv',usecols=['Id', 'Marketo SFDC ID'])
joindf = pandas.merge(activdf, leaddf, how='left', left_on='leadId', right_on='Id')
joindf.drop(['Id'], axis=1, inplace=True, errors='ignore')
joindf.rename(columns={'Marketo SFDC ID':'WhoId'}, inplace=True)
joindf['Status'] = 'Completed'
joindf['Priority'] = 'Normal'
joindf['OwnerId'] = apitaskownerid
joindf['IsReminderSet'] = False
joindf['IsRecurrence'] = False
joindf['IsHighPriority'] = False
joindf['IsClosed'] = True
joindf['IsArchived'] = True
joindf['Custom_field__c'] = 'Marketo Sync'
joindf['Subject'] = joindf['primaryAttributeValue'].map(lambda x: 'Was Sent Email: ' + x)
joindf['Description'] = 'Marketo email history backfill'

existingtasksdf = pandas.read_csv('c:\\temp\\downloadedsalesforcetasks.csv') # SELECT ActivityDate,CreatedDate,Description,LastModifiedDate,Subject,WhoId FROM Task WHERE Custom_field__c = 'Marketo Sync' AND Subject LIKE 'Was Sent Email: %' AND IsDeleted = FALSE
existingtasksdf['matched'] = True
not_existing = pandas.merge(joindf, existingtasksdf, how='left', on=['WhoId','Subject'])
not_existing = not_existing[pandas.isnull(not_existing['matched'])]
not_existing.drop(['matched'], axis=1, inplace=True, errors='ignore')
not_existing.to_csv(path_or_buf='c:\\temp\\newtasksreadyforinsert.csv', index=False)

(Note - not a lot of through given to security with respect to this script and its use of login credentials in a GET (although HTTPS) call. It's a one-off script, we're about to shut down the system, and I deleted the credentials from Marketo's configuration shortly after running the script. Your requirements may vary - you may need a more robust way of authenticating.)


P.S.

Updated code coming soon - I had to change the "getactivs" section of the code from a recursive solution (not sure why that came to mind first...it just did that day...) to an interative one because it errored out when fetching half a million records 300 at a time.

I also ended up dumping the Marketo API half-million-record response's output to CSV and commenting out the post-processing part of the code, then commenting out the API-fetch and running post-processing against that CSV. The API-based fetch wasn't always getting a complete data set. (I suspect my authorization key might have been expiring. However, by the time I ran it with code to debug the problem, I was working at start-of-business, and - probably due to lower network traffic - the fetch finally ran without issue, so I'll never know. All I know is I got my data once, and once works for me.)

Finally, I ran into some snags with data Pandas couldn't read from CSV in the API output (some sort of weird em-dash in the data, I think), so the updated code will include even more low-performance kludges to work around that. (Using the CSV module and a loop to build a list-of-dicts and having Pandas read that worked.)

Thursday, July 28, 2016

Wednesday, July 20, 2016

Big Job: Apex vs. Manual CSV File Manipulation vs. Python-Is-Cool

In our Salesforce org, we have an object called "Extended Contact Information" that's in a "detail" relationship hanging off of "Contact."

It lets us use record-types to cluster information about Contacts that is only of interest to certain internal departments (and lets us segment, department-by-department, their answers to questions about a Contact that they might feel differently about ... such as whether they're the best way to get ahold of a company).

We also have a checkbox field for each internal department that lives on Contact, and that needs to be checked "true" if a department is working with someone. (Whenever a department attaches an "Extended Contact Information" object to someone, we have a trigger in place that checks that department's corresponding box on "Contact," but it can also be checked by hand.)

Our computers mostly care about this checkbox, but humans care about the "Extended Contact Information" record, so today I did an audit and noticed 4,000 people from our "Continuing Ed" department were flagged in the the checkbox as working with that department but didn't have "Extended Contact Information" records of that type attached.

I wrote the following Apex to execute anonymously, but due to a bazillion triggers downstream of "Extended Contact Information" record inserts, it hits CPU time execution limits somewhere between 100 at a time & 300 at a time.

List<Contact> cons = [SELECT Id, Name FROM Contact WHERE Continuing_Ed__c = TRUE AND Id NOT IN (SELECT Contact_Name__c FROM Extended_Contact_Information__c WHERE RecordTypeId='082I8294817IWfiIWX') LIMIT 100];

List<Extended_Contact_Information__c> ecisToInsert = new List<Extended_Contact_Information__c>();

for (Contact c : cons) {
    ecisToInsert.add(new Extended_Contact_Information__c(
        Contact_Name__c = c.Id,
        RecordTypeId='082I8294817IWfiIWX'
    ));
}

insert ecisToInsert;

Probably the fastest things to do are one of the following:

  1. run this code manually 40 times at "LIMIT 100"
  2. export the 4,000 Contact IDs as a CSV, change the name of the "Id" column to "Contact_Name__c," add a "RecordTypeId" field with "082I8294817IWfiIWX" as the value in every row, and re-import it to the "Extended Contact Information" table through a data-loading tool

But, of course, the XKCD Automation Theory part of my brain wants to write a Python script to imitate option #2 and "save me the trouble" of exporting, copying/pasting, & re-importing data. Especially since, in theory, I may need to do this again.

TBD what I'll actually go with. Python code will be added to this post if I let XKCD-programmer-brain take over.

As a reminder to myself, here's a basic Python "hello-world":

from simple_salesforce import Salesforce
sf = Salesforce(username='un', password='pw', security_token='tk')
print(sf.query_all("SELECT Id, Name FROM Contact WHERE IsDeleted=false LIMIT 2"))

Output:

OrderedDict([('totalSize', 2), ('done', True), ('records', [OrderedDict([('attributes', OrderedDict([('type', 'Contact'), ('url', '/services/data/v29.0/sobjects/Contact/xyzzyID1xyzzy')])), ('Id', 'xyzzyID1xyzzy'), ('Name', 'Person One')]), OrderedDict([('attributes', OrderedDict([('type', 'Contact'), ('url', '/services/data/v29.0/sobjects/Contact/abccbID2abccb')])), ('Id', 'abccbID2abccb'), ('Name', 'Person Two')])])])

And this:

from simple_salesforce import Salesforce
sf = Salesforce(username='un', password='pw', security_token='tk')
cons = sf.query_all("SELECT Id, Name FROM Contact WHERE IsDeleted=false LIMIT 2")
for con in cons['records']:
    print(con['Id'])

Does this:

xyzzyID1xyzzy
abccbID2abccb

Whoops. Looks like there aren't any batch/bulk insert options in "simple-salesforce," and I don't feel like learning a new package. Splitting the difference and grabbing my CSV file with Python, but inserting it into Salesforce with a traditional data-loading tool.

from simple_salesforce import Salesforce
sf = Salesforce(username='un', password='pw', security_token='tk')
cons = sf.query_all("SELECT Id, Name FROM Contact WHERE Continuing_Ed__c = TRUE AND Id NOT IN (SELECT Contact_Name__c FROM Extended_Contact_Information__c WHERE RecordTypeId='082I8294817IWfiIWX') LIMIT 2")

import csv
with open('c:\tempsfoutput.csv', 'w', newline='') as csvfile:
    fieldnames = ['contact_name__c', 'recordtypeid']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames, lineterminator="\n")
    writer.writeheader()
    for con in cons['records']:    
        writer.writerow({'contact_name__c': con['Id'], 'recordtypeid': '082I8294817IWfiIWX'})

print('done')

And then the CSV file looks like this:

contact_name__c,recordtypeid
xyzzyID1xyzzy,082I8294817IWfiIWX
abccbID2abccb,082I8294817IWfiIWX