Making batch recommendations using GraphLab Create


In this notebook we will show a complete recommender system implemented using GraphLab's deployment tools. This recommender example is common in many batch scenarios, where a new recommender is trained on a periodic basis, with the generated recommendations persisted to a relational database used by the web application.

The data we will use in this notebook is the same as the Building a Recommender with Ratings Data notebook, but without the exploration and prototyping parts. The pipeline will contain the following tasks:

  1. Clean and transform data
  2. Train a Recommender model
  3. Generate Recommendations for users
  4. Persist Recommendations to a MySQL database

Each of these tasks will be defined as a function and executed as a Job using GraphLab. And finally, we will cover how to Run and monitor these pipelines. Remember, when using GraphLab Data Pipelines, the Tasks and Jobs created are managed objects, so they must have unique names.

This notebook uses GraphLab Create 1.3.


In [1]:
import graphlab

Clean the data

The first task in this pipeline will take data, clean it, and transform it into an SFrame. In this task, the raw data is read using graphlab.SFrame.read_csv, with the file path provided as a parameter to the Task. Once the data is loaded into an SFrame, we clean it by calling dropna() on the SFrame. The code that will run when the task is executed is:


In [2]:
def clean_data(path):
    import graphlab as gl
    sf = gl.SFrame.read_csv(path, delimiter='\t')
    sf['rating'] = sf['rating'].astype(int)
    sf = sf.dropna()
    sf.rename({'user':'user_id', 'movie':'movie_id'})
    
    # To simplify this example, only keep 0.1% of the number of rows from the input data
    sf = sf.sample(0.001)
    return sf

Train the model

Now that the data is cleaned and ready as an SFrame, we need to train a model in this recommendation system. To train the model, we need the SFrame created in the previous Task.


In [3]:
def train_model(data):
  import graphlab as gl
  model = gl.recommender.create(data, user_id='user_id', item_id='movie_id', target='rating')
  return model

Generate Recommendations

With the previous task there is now a trained model that we should use for generating recommendations. With a Task now specified that trains a model, it can be improved independently from the task that generates recommendations from that model. To generate recommendations we need the trained model to use, and the users needing recommendations.

Here is the code for generating recommendations froma trained model:


In [4]:
def gen_recs(model, data):
    recs = model.recommend(data['user_id'])
    return recs

Running and Monitoring this recommender

Now that the tasks are defined for this pipeline, let's compose them together to create a Job. Using the late-binding feature of the Data Pipelines framework, the parameters, inputs, and outputs that have not been specified with the Task can be specified at runtime. We will use this feature to specify the database parameters for the 'persist' task, and then raw data location for the 'clean' task.

Create a Job


In [5]:
def my_batch_job(path):
    data = clean_data(path)
    model = train_model(data)
    recs = gen_recs(model, data)
    return recs
        
job = graphlab.deploy.job.create(my_batch_job, 
        path = 'https://static.turi.com/datasets/movie_ratings/sample.small')


[INFO] Validating job.
[INFO] This commercial license of GraphLab Create is assigned to engr@turi.com.

[INFO] Start server at: ipc:///tmp/graphlab_server-28939 - Server binary: /Users/srikris/miniconda/envs/graphlab-1.5rc/lib/python2.7/site-packages/graphlab/unity_server - Server log: /tmp/graphlab_server_1437163898.log
[INFO] GraphLab Server Version: 1.4.828
[INFO] Creating a LocalAsync environment called 'async'.
[INFO] Validation complete. Job: 'my_batch_job-Jul-17-2015-13-11-37' ready for execution.
[INFO] Job: 'my_batch_job-Jul-17-2015-13-11-37' scheduled.

The job is started asynchronously in the background, and we can query for its status calling get_status on the Job instance returned:


In [8]:
job.get_status()


Out[8]:
u'Running'

If you don't want to wait for the job to complete, you can use the get_results function which waits for the job to complete before you get thee results.


In [9]:
recs = job.get_results() # Blocking call which waits for the job to complete.


[INFO] Waiting for job to finish, this may take quite a while.
[INFO] You may CTRL-C to stop this command and it will not cancel your job.

To see more information about the job, print the job object:


In [10]:
print job


Info
------
Job                : my_batch_job-Jul-17-2015-13-11-37
Function(s)        : ['my_batch_job']
Status             : Completed

Help
------
Visualize progress : self.show()
Query status       : self.get_status()
Get results        : self.get_results()

Environment
----------
LocalAsync: ["name": async]

Metrics
-------
Start time         : 2015-07-17 13:11:41
End time           : 2015-07-17 13:12:37
+--------------+-----------+---------------------+---------------+-----------+
|  task_name   |   status  |      start_time     |    run_time   | exception |
+--------------+-----------+---------------------+---------------+-----------+
| my_batch_job | Completed | 2015-07-17 13:11:42 | 55.5060768127 |    None   |
+--------------+-----------+---------------------+---------------+-----------+
+-------------------+---------------------+
| exception_message | exception_traceback |
+-------------------+---------------------+
|        None       |         None        |
+-------------------+---------------------+
[1 rows x 7 columns]


Execution Information
---------------------
Execution Directory  : /Users/srikris/.graphlab/artifacts/results/job-results-e16b81be-da34-436b-9c88-bf5a95bf5f0e
Log file             : /Users/srikris/.graphlab/artifacts/results/job-results-e16b81be-da34-436b-9c88-bf5a95bf5f0e/execution.log

Let us try and visualize the recommendations.


In [11]:
graphlab.canvas.set_target('ipynb') # show Canvas inline to IPython Notebook
recs.show()


Persist Recommendations

Now that recommendations have been generated, the final step in this pipeline is to save them to a relational database. The main applicaton queries this database for user recommendations as users are interacting with the application. For this task, we will use MySQL as an example, but that can easily be substituted with a different database.

The DB table needed to run this example looks like the following:

+----------+-------------+------+-----+---------+-------+
| Field    | Type        | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+-------+
| user_id  | varchar(50) | NO   |     | NULL    |       |
| movie_id | varchar(50) | NO   |     | NULL    |       |
| score    | float       | NO   |     | NULL    |       |
| rank     | int(8)      | NO   |     | NULL    |       |
+----------+-------------+------+-----+---------+-------+

To create a table in MySQL with this schema:

CREATE TABLE recommendations (user_id VARCHAR(50), 
                              movie_id VARCHAR(50), score FLOAT, rank INT(8));

In [12]:
@graphlab.deploy.required_packages(['mysql-connector-python'])
def persist_to_db(recs, dbhost, dbuser, dbpass, dbport, dbtable, dbname):
    import mysql.connector
    from mysql.connector import errorcode
    
    conn = mysql.connector.connect(host=dbhost, user=dbuser, password=dbpass, port=dbport)
    conn.database = dbname
    cursor = conn.cursor()
    # this example expects the table to be empty, minor changes here if you want to 
    # update existing users' recommendations instead.
    add_row_sql = ("INSERT INTO " + dbtable + " (user_id, movie_id, score, rank) "
                   "VALUES (%(user_id)s, %(movie_id)s, %(score)s, %(rank)s)")

    print "Begin - Writing recommendations to DB...."
    for row in recs:
        cursor.execute(add_row_sql, row)
    print "End - Writing recommendations to DB...."
    
    # commit recommendations to database
    conn.commit()

Note: An important note about this Task is that is requires using the mysql-connector-python package, which is not in standard Python. Using GraphLab Create, specyfing that this package is required is easily done in the Task definition. When running this task in a remote enviroment (EC2 or Hadoop) the framework will make sure this python package is installed prior to execution.

In order to run this pipeline locally, please install the mysql-connector-python package on your machine.


In [ ]:
# install the mysql-connector-python package locally, if not running from a virtualenv then sudo may be required
!pip install --allow-external mysql-connector-python mysql-connector-python

Save Recommendations to Database

Note: Obviously change the following database parameters to ones that match the database you are connecting to. Also, remember to install the mysql-python-connector package on your machine before running this job.


In [11]:
job = graphlab.deploy.job.create(persist_to_db, 
                 recs = recs,
                 dbhost = '10.10.2.2', # change these db params appropriately
                 dbuser = 'test',
                 dbpass = 'secret',
                 dbname = 'users',
                 dbport = 3306,
                 dbtable = 'recommendations') 

results = job.get_results()


[INFO] Validating job.
[INFO] Validation complete. Job: 'persist_to_db-Feb-13-2015-17-19-28' ready for execution.
[INFO] Required packages are not being installed since  the job is running locally.
[INFO] Job: 'persist_to_db-Feb-13-2015-17-19-28' scheduled.

The job is now 'Completed'.

Running in EC2 or Hadoop

Data Pipelines also supports running the same pipeline in EC2 or Hadoop YARN clusters (CDH5). In order to run this pipeline in those environments, simply add an environment parameter to graphlab.deploy.job.create API. No code needs to change, and the GraphLab Data Pipelines framework takes care of installing and configuring what is needed to run this pipeline in the specified environment.

To create an EC2 environment:


In [ ]:
ec2 = graphlab.deploy.Ec2Config(aws_access_key_id='<key>', 
                                aws_secret_key='<secret>')

c = graphlab.deploy.ec2_cluster.create(name='ec2cluster',
                                       s3_path='s3://my_bucket',
                                       ec2_config=ec2)

To create a Hadoop environment:


In [ ]:
c = graphlab.deploy.hadoop_cluster.create(name='hd',
                                          turi_dist_path='hdfs://some.domain.com/user/name/dd-deployment',
                                          hadoop_conf_dir='~/yarn-config)

The parameter turi_dist_path needs to point to a Distributed deployment. The config_dir parameter should point to the directory that contains yarn-site.xml.

Now with these environments created, we can specify them when creating the Job, and the Job will executed in that environment by setting the environment parameter.

Summary

And that's it, with this example you can see how easy it is to deploy complex python code in other environments. What pipeline will you define next?