Pages

Friday, July 29, 2016

Marketo Activities -> Salesforce Tasks with REST and Python

A year and a half ago, we had to turn off 2-way syncing in Marketo. It was messing up "Contact" record fields too badly.

Unfortunately, that also meant we lost the functionality of letting Marketo create a new "Task" for a Contact's "Activity History" every time we sent them an email. So we've got a year-and-a-half gap in such "Task" records in Salesforce to fill. (It's okay for it to be a one-off, because we're consolidating mass-mailing systems between departments and getting rid of Marketo.)

There's no easy "export a CSV file of every email we've ever sent every person in Marketo" feature in Marketo's point-and-click web-browser-based user interface. Tech support said we'd have to write code to query their REST API.

My basic algorithm is as follows:

  1. Use Python ("requests" library) code against the Marketo API to get a "dict" of every "Email Sent" activity in Marketo (and which LeadId it belongs to and the name of the email according to Marketo)
  2. Use Marketo's export functions against "All Leads" to get a CSV file containing a mapping of Marketo Lead IDs to Salesforce Contact IDs & save it to my hard drive.
  3. Use Salesforce's Data Loader to get a CSV file containing all existing "Task" records from the old "Marketo Sync" email tracking & save it to my hard drive.
  4. Use Python ("pandas" library) code to append Salesforce IDs from the CSV in step #2 to the "dict" in step #1, while subtracting emails Salesforce already knows about (match on "WhoId" & "Subject" in CSV from step #3). Write the result out to CSV.
  5. Use Salesforce's Data Loader to import the CSV from step #4 into the "Tasks" table of Salesforce

The Python script is:

import requests
baseurl = 'https://xxx.mktorest.com'
clientid = 'cid'
clientsecret = 'secret'
apitaskownerid = 'xxx' #(Marketo API sf user)
activsincedatetime = 'yyyy-mm-ddThh:mm:ss-GMTOffsetHH:00'
accesstoken=requests.get(baseurl + '/identity/oauth/token?grant_type=client_credentials' + '&client_id=' + clientid + '&client_secret=' + clientsecret).json()['access_token']
firstnextpagetoken = requests.get(baseurl + '/rest/v1/activities/pagingtoken.json' + '?sinceDatetime=' + activsincedatetime + '&access_token=' + accesstoken).json()['nextPageToken']

def getactivs(pagetok):
    ems = []
    activsbatchjson = requests.get(baseurl + '/rest/v1/activities.json' + '?nextPageToken=' + pagetok + '&activityTypeIds=INSERTNUMBERHERE' + '&access_token=' + accesstoken).json()
    if 'result' in activsbatchjson:
        ems.append(activsbatchjson['result'])
    if activsbatchjson['moreResult'] != True:
        return ems
    else:
        return getactivs(activsbatchjson['nextPageToken'])

emailssent = getactivs(firstnextpagetoken)[0]

import pandas
activdf = pandas.DataFrame(emailssent, columns=['leadId', 'primaryAttributeValueId', 'primaryAttributeValue', 'activityDate'])
leaddf = pandas.read_csv('c:\\temp\\downloadedmarketoleads.csv',usecols=['Id', 'Marketo SFDC ID'])
joindf = pandas.merge(activdf, leaddf, how='left', left_on='leadId', right_on='Id')
joindf.drop(['Id'], axis=1, inplace=True, errors='ignore')
joindf.rename(columns={'Marketo SFDC ID':'WhoId'}, inplace=True)
joindf['Status'] = 'Completed'
joindf['Priority'] = 'Normal'
joindf['OwnerId'] = apitaskownerid
joindf['IsReminderSet'] = False
joindf['IsRecurrence'] = False
joindf['IsHighPriority'] = False
joindf['IsClosed'] = True
joindf['IsArchived'] = True
joindf['Custom_field__c'] = 'Marketo Sync'
joindf['Subject'] = joindf['primaryAttributeValue'].map(lambda x: 'Was Sent Email: ' + x)
joindf['Description'] = 'Marketo email history backfill'

existingtasksdf = pandas.read_csv('c:\\temp\\downloadedsalesforcetasks.csv') # SELECT ActivityDate,CreatedDate,Description,LastModifiedDate,Subject,WhoId FROM Task WHERE Custom_field__c = 'Marketo Sync' AND Subject LIKE 'Was Sent Email: %' AND IsDeleted = FALSE
existingtasksdf['matched'] = True
not_existing = pandas.merge(joindf, existingtasksdf, how='left', on=['WhoId','Subject'])
not_existing = not_existing[pandas.isnull(not_existing['matched'])]
not_existing.drop(['matched'], axis=1, inplace=True, errors='ignore')
not_existing.to_csv(path_or_buf='c:\\temp\\newtasksreadyforinsert.csv', index=False)

(Note - not a lot of through given to security with respect to this script and its use of login credentials in a GET (although HTTPS) call. It's a one-off script, we're about to shut down the system, and I deleted the credentials from Marketo's configuration shortly after running the script. Your requirements may vary - you may need a more robust way of authenticating.)


P.S.

Updated code coming soon - I had to change the "getactivs" section of the code from a recursive solution (not sure why that came to mind first...it just did that day...) to an interative one because it errored out when fetching half a million records 300 at a time.

I also ended up dumping the Marketo API half-million-record response's output to CSV and commenting out the post-processing part of the code, then commenting out the API-fetch and running post-processing against that CSV. The API-based fetch wasn't always getting a complete data set. (I suspect my authorization key might have been expiring. However, by the time I ran it with code to debug the problem, I was working at start-of-business, and - probably due to lower network traffic - the fetch finally ran without issue, so I'll never know. All I know is I got my data once, and once works for me.)

Finally, I ran into some snags with data Pandas couldn't read from CSV in the API output (some sort of weird em-dash in the data, I think), so the updated code will include even more low-performance kludges to work around that. (Using the CSV module and a loop to build a list-of-dicts and having Pandas read that worked.)

No comments:

Post a Comment