In this notebook we will show a complete recommender system implemented using GraphLab's deployment tools. This recommender example is common in many batch scenarios, where a new recommender is trained on a periodic basis, with the generated recommendations persisted to a relational database used by the web application.
The data we will use in this notebook is the same as the Building a Recommender with Ratings Data notebook, but without the exploration and prototyping parts. The pipeline will contain the following tasks:
Each of these tasks will be defined as a function and executed as a Job using GraphLab. And finally, we will cover how to Run and monitor these pipelines. Remember, when using GraphLab Data Pipelines, the Tasks and Jobs created are managed objects, so they must have unique names.
This notebook uses GraphLab Create 1.3.
In [1]:
import graphlab
The first task in this pipeline will take data, clean it, and transform it into an SFrame. In this task, the raw data is read using graphlab.SFrame.read_csv, with the file path provided as a parameter to the Task. Once the data is loaded into an SFrame, we clean it by calling dropna() on the SFrame. The code that will run when the task is executed is:
In [2]:
def clean_data(path):
import graphlab as gl
sf = gl.SFrame.read_csv(path, delimiter='\t')
sf['rating'] = sf['rating'].astype(int)
sf = sf.dropna()
sf.rename({'user':'user_id', 'movie':'movie_id'})
# To simplify this example, only keep 0.1% of the number of rows from the input data
sf = sf.sample(0.001)
return sf
Now that the data is cleaned and ready as an SFrame, we need to train a model in this recommendation system. To train the model, we need the SFrame created in the previous Task.
In [3]:
def train_model(data):
import graphlab as gl
model = gl.recommender.create(data, user_id='user_id', item_id='movie_id', target='rating')
return model
With the previous task there is now a trained model that we should use for generating recommendations. With a Task now specified that trains a model, it can be improved independently from the task that generates recommendations from that model. To generate recommendations we need the trained model to use, and the users needing recommendations.
Here is the code for generating recommendations froma trained model:
In [4]:
def gen_recs(model, data):
recs = model.recommend(data['user_id'])
return recs
Now that the tasks are defined for this pipeline, let's compose them together to create a Job. Using the late-binding feature of the Data Pipelines framework, the parameters, inputs, and outputs that have not been specified with the Task can be specified at runtime. We will use this feature to specify the database parameters for the 'persist' task, and then raw data location for the 'clean' task.
In [5]:
def my_batch_job(path):
data = clean_data(path)
model = train_model(data)
recs = gen_recs(model, data)
return recs
job = graphlab.deploy.job.create(my_batch_job,
path = 'https://static.turi.com/datasets/movie_ratings/sample.small')
The job is started asynchronously in the background, and we can query for its status calling get_status on the Job instance returned:
In [8]:
job.get_status()
Out[8]:
If you don't want to wait for the job to complete, you can use the get_results
function which waits for the job to complete before you get thee results.
In [9]:
recs = job.get_results() # Blocking call which waits for the job to complete.
To see more information about the job, print the job object:
In [10]:
print job
Let us try and visualize the recommendations.
In [11]:
graphlab.canvas.set_target('ipynb') # show Canvas inline to IPython Notebook
recs.show()
Now that recommendations have been generated, the final step in this pipeline is to save them to a relational database. The main applicaton queries this database for user recommendations as users are interacting with the application. For this task, we will use MySQL as an example, but that can easily be substituted with a different database.
The DB table needed to run this example looks like the following:
+----------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+-------+
| user_id | varchar(50) | NO | | NULL | |
| movie_id | varchar(50) | NO | | NULL | |
| score | float | NO | | NULL | |
| rank | int(8) | NO | | NULL | |
+----------+-------------+------+-----+---------+-------+
To create a table in MySQL with this schema:
CREATE TABLE recommendations (user_id VARCHAR(50),
movie_id VARCHAR(50), score FLOAT, rank INT(8));
In [12]:
@graphlab.deploy.required_packages(['mysql-connector-python'])
def persist_to_db(recs, dbhost, dbuser, dbpass, dbport, dbtable, dbname):
import mysql.connector
from mysql.connector import errorcode
conn = mysql.connector.connect(host=dbhost, user=dbuser, password=dbpass, port=dbport)
conn.database = dbname
cursor = conn.cursor()
# this example expects the table to be empty, minor changes here if you want to
# update existing users' recommendations instead.
add_row_sql = ("INSERT INTO " + dbtable + " (user_id, movie_id, score, rank) "
"VALUES (%(user_id)s, %(movie_id)s, %(score)s, %(rank)s)")
print "Begin - Writing recommendations to DB...."
for row in recs:
cursor.execute(add_row_sql, row)
print "End - Writing recommendations to DB...."
# commit recommendations to database
conn.commit()
Note: An important note about this Task is that is requires using the mysql-connector-python package, which is not in standard Python. Using GraphLab Create, specyfing that this package is required is easily done in the Task definition. When running this task in a remote enviroment (EC2 or Hadoop) the framework will make sure this python package is installed prior to execution.
In order to run this pipeline locally, please install the mysql-connector-python
package on your machine.
In [ ]:
# install the mysql-connector-python package locally, if not running from a virtualenv then sudo may be required
!pip install --allow-external mysql-connector-python mysql-connector-python
Note: Obviously change the following database parameters to ones that match the database you are connecting to. Also, remember to install the mysql-python-connector package on your machine before running this job.
In [11]:
job = graphlab.deploy.job.create(persist_to_db,
recs = recs,
dbhost = '10.10.2.2', # change these db params appropriately
dbuser = 'test',
dbpass = 'secret',
dbname = 'users',
dbport = 3306,
dbtable = 'recommendations')
results = job.get_results()
The job is now 'Completed'.
Data Pipelines also supports running the same pipeline in EC2 or Hadoop YARN clusters (CDH5). In order to run this pipeline in those environments, simply add an environment
parameter to graphlab.deploy.job.create
API. No code needs to change, and the GraphLab Data Pipelines framework takes care of installing and configuring what is needed to run this pipeline in the specified environment.
In [ ]:
ec2 = graphlab.deploy.Ec2Config(aws_access_key_id='<key>',
aws_secret_key='<secret>')
c = graphlab.deploy.ec2_cluster.create(name='ec2cluster',
s3_path='s3://my_bucket',
ec2_config=ec2)
In [ ]:
c = graphlab.deploy.hadoop_cluster.create(name='hd',
turi_dist_path='hdfs://some.domain.com/user/name/dd-deployment',
hadoop_conf_dir='~/yarn-config)
The parameter turi_dist_path
needs to point to a Distributed deployment. The config_dir
parameter should point to the directory that contains yarn-site.xml.
Now with these environments created, we can specify them when creating the Job, and the Job will executed in that environment by setting the environment
parameter.