Friday 27 March 2020

Consume Qualtrics survey results in SAP Analytics Cloud with live connection to SAP HANA service

More than ever businesses around the world are recognizing the importance of treating their customers and employees like ‘real’ people and not “users”, “buyers” , “resources”et al. Welcome to the experience economy! It is time everyone understands the importance of experience management and the powerful outcome and business value generated through the insights obtained by combining X-data and O-data.

SAP offers some powerful solutions to enable such an experience economy. SAP Business Technology Platform at the core and center helps businesses in by connecting X-data from Qualtrics and O-data from various SAP Line of Business services together. SAP Analytics cloud is the go-to analytical solution offering on the platform, that can be leveraged for data-driven insights connecting O-Data from  Line of Businesses to X-Data from Qualtrics, enabling decisions that create “people experiences”

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

For some time now, SAP Analytics Cloud(SAC) has a standard connector to Qualtrics to fetch and analyze survey responses.

In this blog post however, I would discuss the technical details of another approach/use case where I have done the following:

◉ Pull the Qualtrics survey result and write data into SAP HANA on SAP Cloud Platform every time a response is submitted.

◉ Replicate data from Line of Business(es) into HANA

◉ Correlate this data through calculation views

◉ Consume the calculation view in SAC for analysis

There are numerous studies out there which prove a lack of appreciation at work is one of the major reasons for employee turnover in a company. It is also believed appreciation has a direct impact on employee productivity. What if we can correlate responses on specific questions about employee appreciation, to the number of awards given out and the number of achievements logged in a SuccessFactors system.

Here’s a quick architecture diagram:

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

Let’s now look at the details of the prototype:

1. Pull the Qualtrics survey result and write data into SAP HANA on SAP Cloud Platform every time a response is submitted.

Here’s the approach :

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

1.a. I have an employee engagement survey of type Employee XM created in Qualtrics.

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

1.b. I have my HANA service instance up and running and I have created a multi-target application with a HANA DB module . For starters, here is a great developer tutorial you can refer.

Here is a snapshot of how my entity looks like,

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

1.b. I have used Qualtrics APIs to read the response real-time , every time a response is completed and submitted. This data is written into my table that I have generated in the last step.

Essentially the steps are as follows :

◉ I have hosted a Python-based web hook on SAP Cloud Platform CF which reads a response using Qualtrics Response API and writes to the HANA Table using hdbcli library. 

from http.server import BaseHTTPRequestHandler, HTTPServer
from hdbcli import dbapi
from urllib.parse import urlparse
import urllib
import sys
import requests
import io, os
import simplejson as json
import zipfile
import json
import re
import csv
import datetime

def onResponse(apiToken, surveyId, dataCenter):

    fileFormat = "csv"

    #Step 1 : Export Survey
    fileId = exportSurvey(apiToken,surveyId, dataCenter, fileFormat)

    #Step 2 : Parse file for records
    records = parseSurveyExport(fileId)

    #Step 3 : insert records in HANA DB
    writeRecordstoDB(records, surveyId)

def exportSurvey(apiToken, surveyId, dataCenter, fileFormat):

    surveyId = surveyId
    fileFormat = fileFormat
    dataCenter = dataCenter 

    # Setting static parameters
    requestCheckProgress = 0.0
    progressStatus = "inProgress"
    baseUrl = "https://{0}.qualtrics.com/API/v3/surveys/{1}/export-responses/".format(dataCenter, surveyId)
    headers = {
        "content-type": "application/json",
        "x-api-token": apiToken
    }

    #get Last timestamp
    startDate = getLastTimeStamp(surveyId)
    
    # Step 1: Creating Data Export , get Responses after the last timestamp
    downloadRequestUrl = baseUrl

    if startDate != '':
        downloadRequestPayload = '{"format":"' + fileFormat + '","useLabels":true,"startDate":"' + startDate + '"}'
    else:
        downloadRequestPayload = '{"format":"' + fileFormat + '","useLabels":true}'

    downloadRequestResponse = requests.request("POST", downloadRequestUrl, data=downloadRequestPayload, headers=headers)
    progressId = downloadRequestResponse.json()["result"]["progressId"]
    print(downloadRequestResponse.text)

    # Step 2: Checking on Data Export Progress and waiting until export is ready
    while progressStatus != "complete" and progressStatus != "failed":
        print ("progressStatus=", progressStatus)
        requestCheckUrl = baseUrl + progressId
        requestCheckResponse = requests.request("GET", requestCheckUrl, headers=headers)
        requestCheckProgress = requestCheckResponse.json()["result"]["percentComplete"]
        print("Download is " + str(requestCheckProgress) + " complete")
        progressStatus = requestCheckResponse.json()["result"]["status"]

    #step 2.1: Check for error
    if progressStatus is "failed":
        raise Exception("export failed")

    fileId = requestCheckResponse.json()["result"]["fileId"]

    # Step 3: Downloading file
    requestDownloadUrl = baseUrl + fileId + '/file'
    requestDownload = requests.request("GET", requestDownloadUrl, headers=headers, stream=True)

    # Step 4: Unzipping the file
    try:
        zipfile.ZipFile(io.BytesIO(requestDownload.content)).extractall("MyQualtricsDownload")
    except Exception as e:
        raise Exception("unzip failed" + e)

    fileName = requestDownload.headers['content-disposition']
    fileName = re.search('attachment; filename=(.+?).zip',fileName).group(1).replace("+"," ")

    return fileName

def parseSurveyExport(fileId):
    columnNumbers = {
                        "questionAnswerColumns":[],
                        "ResponseId": 0,
                        "managerID" : 0,
                        "employeeID" : 0,
                        "RecordedDate" : 0
                    }

    questions = []
    insertRecords = []
    
    with open("MyQualtricsDownload/" + fileId + ".csv") as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        line_count = 0
        for row in csv_reader: 
            # get column numbers
            if line_count == 0:
                columnCount = len(row)
                for i in range(columnCount): 
                    if "SID" in row[i] or re.search("^Q(.*?)[0-9]", row[i]) :
                        columnNumbers["questionAnswerColumns"].append(i)
                    elif "RecordedDate" in row[i]:
                        columnNumbers["RecordedDate"] = i
                    elif "ResponseId" in row[i]:
                        columnNumbers["ResponseId"] = i
                    elif "Employee ID" in row[i]:
                        columnNumbers["employeeID"] = i
                    elif "Manager ID" in row[i]:
                        columnNumbers["managerID"] = i
                line_count += 1
            # get questions text
            elif line_count == 1:
                for columnNumber in columnNumbers["questionAnswerColumns"]:
                    question = {}
                    question["text"] = row[columnNumber]
                    question["columnNumber"] = columnNumber
                    questions.append(question)
                line_count += 1
            # get questions id
            elif line_count == 2:
                for q in questions:
                    q["id"] = re.search('{"ImportId":"(.+?)"}', row[q["columnNumber"]]).group(1)
                line_count += 1
            # get response records
            else:
                for q in questions:
                    record = {}
                    record["responseId"] = row[columnNumbers["ResponseId"]]
                    record["questionId"] = q["id"]
                    record["language"] = "en"
                    record["question"] = q["text"]
                    record["response"] = row[q["columnNumber"]]
                    if columnNumbers["managerID"] > 0:
                        record["managerId"] = row[columnNumbers["managerID"]]
                    else:
                        record["managerId"] = ''
                    if columnNumbers["employeeID"] > 0:
                        record["employeeID"] = row[columnNumbers["employeeID"]]
                    else:
                        record["employeeID"] = ''
                    record["responseDate"] = row[columnNumbers["RecordedDate"]]
                    insertRecords.append(record)
                line_count += 1
        print(f'Processed {line_count} lines.')
    
    return insertRecords

def writeRecordstoDB(records, surveyId):
    #Step 1 : Open connection to HDB
    conn = open_hdb_conn()

    #Step 2 : Owrite records to HDB
    if conn and conn.isconnected():
        print("connection to HDB open")
        conn.setautocommit(False)
        cursor = conn.cursor()
        for record in records:
            id = '"<SCHEMA>"."rid".NEXTVAL'
            values = id + ", '" + record["responseId"] + "', '" + record["questionId"] + "', '" + record["language"] + "', '" + record["question"] + "', '" +  record["response"] + "', '" +  record["managerId"] + "', '" +  record["employeeID"] + "', '" +  record["responseDate"] + "', '" +  surveyId + "'"
            cursor.execute("INSERT INTO \"<SCHEMA>\".\"<TABLE>\" VALUES(" + values +")")
            conn.commit()
            rowcount = cursor.rowcount
            if rowcount == 1:
                print("record is updated")
    
    #Step 3 : close connection to HDB
    close_hdb_conn(conn)

def getLastTimeStamp(surveyId):
    #Step 1 : Open connection to HDB
    conn = open_hdb_conn()

    #Step 2 : Get latest timestamp
    startDateforExportString = ''
    if conn and conn.isconnected():
        sql = "SELECT TOP 1 \"RESPONSEDATE\" FROM \"<SCHEMA>\".\"<TABLE>\" as \"response\" where \"SURVEYID\"='" + surveyId + "' " + 'order by "response"."RESPONSEDATE" desc'
        cursor = conn.cursor()
        cursor.execute(sql)
        row = cursor.fetchone()
        if row and len(row) == 1:
            lastResponseDate = row[0]
            startDateforExport = lastResponseDate + datetime.timedelta(0,1)
            startDateforExportString = startDateforExport.strftime("%Y-%m-%dT%H:%M:%SZ")
            
    #Step 3 : close connection to HDB
    close_hdb_conn(conn)

    return startDateforExportString

def open_hdb_conn():
    print("opening connection to HDB")
    try:
        conn = dbapi.connect(address="<DB Host>", encrypt="true", port="<DB port>", user="<DB user>", sslValidateCertificate='false', password="<pwd>")
    except Exception as e:
        raise Exception("Open connection failed" + e)

    return conn

def close_hdb_conn(conn):
    if conn:
        try:
            conn.close()
            print("connection to HDB closed")
        except Exception as e:
            if conn and not conn.isconnected():
                print("connection to HDB closed")
            
def getReponse(d, dataCenter, apiToken):
    responseId = d['ResponseID']
    surveyId = d['SurveyID']
    
    headers = {
        "content-type": "application/json",
        "x-api-token": apiToken,
       }

    url = "https://{0}.qualtrics.com/API/v3/surveys/{1}/responses/{2}".format(dataCenter, surveyId, responseId)

    
    rsp = requests.get(url, headers=headers)
    print(rsp.json())

def parsey(c):
    x=c.decode().split("&")
    d = {}
    for i in x:
        a,b = i.split("=")
        d[a] = b

    d['CompletedDate'] = urllib.parse.unquote(d['CompletedDate'])

    return d

class Handler(BaseHTTPRequestHandler):

  # POST
    def do_POST(self):
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)
        d = parsey(post_data)
        surveyId = d['SurveyID']

        try:
            apiToken = "<Qualtrics API key>"
            dataCenter = "<Qualtrics data center>"
           
        except KeyError:
            print("set environment variables APIKEY and DATACENTER")
            sys.exit(2)
        

        #import all responses for survey and write to database
        #onResponse(apiToken, surveyId, dataCenter)

        #get single response 
        getReponse(d, dataCenter, apiToken)

if __name__ == '__main__':
    
    print('starting server...')
    server_address = ('0.0.0.0', 8080)

    httpd = HTTPServer(server_address, Handler)
    print('running server...')
    httpd.serve_forever()

◉ I have created an event subscription on the Qualtrics server, which calls this web hook on SAP Cloud Platform , every time a response is completed and submitted.

curl -X POST -H 'X-API-TOKEN: yourapitoken'  -H 'Content-Type: application/json' -d '{
    "topics": "surveyengine.completedResponse.yoursurveyid",
    "publicationUrl": "http://<app url on CF>.hana.ondemand.com",
    "encrypt": false
}' 'https://co1.qualtrics.com/API/v3/eventsubscriptions/'

2. Replicating data from Line of Business(es) into HANA using Smart Data Integration (SDI)

SuccessFactors provides APIs to access data, enable open integration and allow easy extensions. APIs are of type SOAP/OData. 

In this step I have done the following :

2.a Set up the oData Adapter on the HANA Service for Smart Data Integration. 

2.b Configure the SuccessFactors System as a remote source system for data replication.

2.c Create virtual tables for the tables of interest. I am interested in getting the User data, Achievements and SpotAward details from the SuccessFactors System.

2.d. Created FlowGraphs to only extract the fields I need and set up replication tasks.

While creating virtual tables from a remote source , you need to ensure the technical user of your HDI container is authorized to access to the remote source. You can take a look at the following links to understand how to do this:

As always, one of the best references for anything HANA related is SAP HANA Academy. To understand and create flowgraphs, replication tasks refer the playlist from SAP HANA academy for “how to use SDI in SAP HANA Service”


Let’s see how this looks in my Web IDE :

SFSF Remote source:

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

Once I have my access setup from my MTA project to the remote source (as per the steps in the blog/video referenced previously), I can set up my virtual tables.

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

I have created 3 virtual tables, for User, Achievement and SpotAward.

VIRTUAL TABLE "SFSFAchievement" AT "sfsf"."Achievement"

VIRTUAL TABLE "SFSFSpotAward" AT "sfsf"."SpotAward"

VIRTUAL TABLE "SFSFUser" AT "sfsf"."User"

The flowgraph in my WebIDE . This maps only the fields that I am interested in from the virtual tables on the source side to new tables on the target side.

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

So now I have my target tables and data , which I can consume in my calculation views

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

3. Correlate this data through calculation views

Create calculation views to correlate data from SuccessFactors on how many achievements an employee has logged vs the awards that he has received and map it to how he feels.

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

Here’s the view of the correlated data from the calculation view that compares employee appreciation with actual achievements and awards given out

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

4. Consume the calculation view in SAC for analysis

I have consumed calculation views created in my HANA service on Cloud Foundry through live access in SAC.

Here is a screenshot of the SAC story which shows X+O Data how in most cases where the employee is demotivated and feels unappreciated, there is a direct relation using quantifiable numbers how many achievements he has had and how many awards were given out. I have set up an auto refresh for the model every 5 seconds

SAP HANA Exam Prep, SAP HANA Certification, SAP HANA Learning, SAP HANA Guides

Every time a new survey response is submitted, the new data gets written to HANA, and also through the replication tasks new data from the SF system is also written into HANA. I also have an auto-refresh configured on the SAC story, So, the data that you will see on the SAC dashboard will mostly be latest.

No comments:

Post a Comment