In [1]:
"""Run a linear regression using Apache Spark ML.

In the following PySpark (Spark Python API) code, we take the following actions:

  * Load a previously created linear regression (Google BigQuery) input table
    into our Google Cloud Dataproc Spark cluster as an RDD (Resilient
    Distributed Dataset)
  * Transform the RDD into a Spark Dataframe
  * Vectorize the features on which the model will be trained
  * Compute a linear regression using Spark ML

"""


Out[1]:
'Run a linear regression using Apache Spark ML.\n\nIn the following PySpark (Spark Python API) code, we take the following actions:\n\n  * Load a previously created linear regression (Google BigQuery) input table\n    into our Google Cloud Dataproc Spark cluster as an RDD (Resilient\n    Distributed Dataset)\n  * Transform the RDD into a Spark Dataframe\n  * Vectorize the features on which the model will be trained\n  * Compute a linear regression using Spark ML\n\n'

In [1]:
from datetime import datetime
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
# The imports, above, allow us to access SparkML features specific to linear
# regression as well as the Vectors types.


# Define a function that collects the features of interest
# (mother_age, father_age, and gestation_weeks) into a vector.
# Package the vector in a tuple containing the label (`weight_pounds`) for that
# row.


def vector_from_inputs(r):
  return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]),
                                            float(r["father_age"]),
                                            float(r["gestation_weeks"]),
                                            float(r["weight_gain_pounds"]),
                                            float(r["apgar_5min"])))

# Use Cloud Dataprocs automatically propagated configurations to get
# the Google Cloud Storage bucket and Google Cloud Platform project for this
# cluster.
bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")

# Set an input directory for reading data from Bigquery.
todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
input_directory = "gs://{}/tmp/natality-{}".format(bucket, todays_date)

# Set the configuration for importing data from BigQuery.
# Specifically, make sure to set the project ID and bucket for Cloud Dataproc,
# and the project ID, dataset, and table names for BigQuery.

conf = {
    # Input Parameters
    "mapred.bq.project.id": project,
    "mapred.bq.gcs.bucket": bucket,
    "mapred.bq.temp.gcs.path": input_directory,
    "mapred.bq.input.project.id": project,
    "mapred.bq.input.dataset.id": "natality_regression",
    "mapred.bq.input.table.id": "regression_input",
}

In [2]:
# Read the data from BigQuery into Spark as an RDD.
table_data = spark.sparkContext.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
    "org.apache.hadoop.io.LongWritable",
    "com.google.gson.JsonObject",
    conf=conf)

In [3]:
# Extract the JSON strings from the RDD.
table_json = table_data.map(lambda x: x[1])

In [4]:
# Load the JSON strings as a Spark Dataframe.
natality_data = spark.read.json(table_json)

In [5]:
%%timeit
# Create a view so that Spark SQL queries can be run against the data.
natality_data.createOrReplaceTempView("natality")


The slowest run took 93.16 times longer than the fastest. This could mean that an intermediate result is being cached.
100 loops, best of 3: 1.81 ms per loop

In [8]:
%%time

# As a precaution, run a query in Spark SQL to ensure no NULL values exist.
sql_query = """
SELECT *
from natality
where weight_pounds is not null
and mother_age is not null
and father_age is not null
and gestation_weeks is not null
"""
clean_data = spark.sql(sql_query)


CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 6.38 ms

In [9]:
%%time
# Create an input DataFrame for Spark ML using the above function.
training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                             "features"])


CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 1.31 s

In [10]:
%%time
training_data.cache()


CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 14.6 ms
Out[10]:
DataFrame[label: double, features: vector]

In [11]:
%%time

# Construct a new LinearRegression object and fit the training data.
lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
model = lr.fit(training_data)
# Print the model summary.
print "Coefficients:" + str(model.coefficients)
print "Intercept:" + str(model.intercept)
print "R^2:" + str(model.summary.r2)


Coefficients:[0.016665745454019634,-0.0029675198410929033,0.23571439290511598,0.002130020702190186,-0.0004857725176710444]
Intercept:-2.26130330609
R^2:0.295200579027
CPU times: user 112 ms, sys: 20 ms, total: 132 ms
Wall time: 13min 9s

In [12]:
model.summary.residuals.show()


+--------------------+
|           residuals|
+--------------------+
|-0.28279492738973744|
| -2.0245746169931875|
|  -1.535849017855412|
| -0.6959088260511317|
| 0.07542292383583682|
|  1.5918559091748268|
| 0.47091615108549867|
|-3.40576354993515...|
|   4.406963739356298|
| -1.8700567826233057|
| -0.6877534369071245|
|  0.5257065108149863|
|  0.4431331446646336|
|  1.3007407263071986|
| -1.2707145879274657|
| -1.3794738537146607|
|  0.9073908409451175|
| -0.9974428255804781|
| -0.3427353132085349|
| -2.5438139719030115|
+--------------------+
only showing top 20 rows


In [20]:
type(training_data)


Out[20]:
pyspark.sql.dataframe.DataFrame

In [21]:
%%time
# from pyspark.mllib.tree import GradientBoostedTrees
# algo = GradientBoostedTrees()
# model = algo.trainRegressor(data=training_data,categoricalFeaturesInfo={},numIterations=10)
# AssertionError: the data should be RDD of LabeledPoint


CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 12.2 µs

In [24]:
%%time
from pyspark.ml.regression import GBTRegressor
algo = GBTRegressor()
model = algo.fit(dataset=training_data)
# algo.fit??



KeyboardInterruptTraceback (most recent call last)
<ipython-input-24-721a860c9529> in <module>()
----> 1 get_ipython().run_cell_magic(u'time', u'', u'from pyspark.ml.regression import GBTRegressor\nalgo = GBTRegressor()\nmodel = algo.fit(dataset=training_data)\n# algo.fit??')

/usr/local/envs/py2env/lib/python2.7/site-packages/datalab/kernel/__init__.pyc in _run_cell_magic(self, magic_name, line, cell)
    102       # IPython will complain if cell is empty string but not if it is None
    103       cell = None
--> 104     return _orig_run_cell_magic(self, magic_name, line, cell)
    105 
    106   _shell.InteractiveShell.run_cell_magic = _run_cell_magic

/usr/local/envs/py2env/lib/python2.7/site-packages/google/datalab/kernel/__init__.pyc in _run_cell_magic(self, magic_name, line, cell)
     90       # IPython will complain if cell is empty string but not if it is None
     91       cell = None
---> 92     return _orig_run_cell_magic(self, magic_name, line, cell)
     93 
     94   _shell.InteractiveShell.run_cell_magic = _run_cell_magic

/usr/local/envs/py2env/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_cell_magic(self, magic_name, line, cell)
   2115             magic_arg_s = self.var_expand(line, stack_depth)
   2116             with self.builtin_trap:
-> 2117                 result = fn(magic_arg_s, cell)
   2118             return result
   2119 

<decorator-gen-60> in time(self, line, cell, local_ns)

/usr/local/envs/py2env/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
    186     # but it's overkill for just that one bit of state.
    187     def magic_deco(arg):
--> 188         call = lambda f, *a, **k: f(*a, **k)
    189 
    190         if callable(arg):

/usr/local/envs/py2env/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)
   1191         else:
   1192             st = clock2()
-> 1193             exec(code, glob, local_ns)
   1194             end = clock2()
   1195             out = None

<timed exec> in <module>()

/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/base.py in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py in _fit(self, dataset)
    263 
    264     def _fit(self, dataset):
--> 265         java_model = self._fit_java(dataset)
    266         return self._create_model(java_model)
    267 

/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    260         """
    261         self._transfer_params_to_java()
--> 262         return self._java_obj.fit(dataset._jdf)
    263 
    264     def _fit(self, dataset):

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1129             proto.END_COMMAND_PART
   1130 
-> 1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
   1133             answer, self.gateway_client, self.target_id, self.name)

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
    881         connection = self._get_connection()
    882         try:
--> 883             response = connection.send_command(command)
    884             if binary:
    885                 return response, self._create_connection_guard(connection)

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in send_command(self, command)
   1026 
   1027         try:
-> 1028             answer = smart_decode(self.stream.readline()[:-1])
   1029             logger.debug("Answer received: {0}".format(answer))
   1030             if answer.startswith(proto.RETURN_MESSAGE):

/usr/local/envs/py2env/lib/python2.7/socket.pyc in readline(self, size)
    449             while True:
    450                 try:
--> 451                     data = self._sock.recv(self._rbufsize)
    452                 except error, e:
    453                     if e.args[0] == EINTR:

KeyboardInterrupt: 

In [ ]: