In [2]:
from pyspark import SparkConf, SparkContext
## set up spark context
conf = SparkConf().setAppName("myApp")
sc = SparkContext(conf=conf)

# create sparksession object
from pyspark.sql import SparkSession
sparksession = SparkSession(sc)


---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-2-c7badb8b399c> in <module>()
      2 ## set up spark context
      3 conf = SparkConf().setAppName("myApp")
----> 4 sc = SparkContext(conf=conf)
      5 
      6 # create sparksession object

/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/context.pyc in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
    113         """
    114         self._callsite = first_spark_call() or CallSite(None, None, None)
--> 115         SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    116         try:
    117             self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,

/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/context.pyc in _ensure_initialized(cls, instance, gateway, conf)
    270                         " created by %s at %s:%s "
    271                         % (currentAppName, currentMaster,
--> 272                             callsite.function, callsite.file, callsite.linenum))
    273                 else:
    274                     SparkContext._active_spark_context = instance

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*]) created by <module> at /Users/mingchen/anaconda2/lib/python2.7/site-packages/IPython/utils/py3compat.py:288 

In [ ]:
dir()

In [11]:
## Import Data
wine_data = sparksession.read.csv("../data/WineData.csv", inferSchema=True, header=True)

In [14]:
wine_data.show(5)
wine_data.dtypes


+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|                60.0|  0.998|3.16|     0.58|    9.8|      6|
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
only showing top 5 rows

Out[14]:
[('fixed acidity', 'double'),
 ('volatile acidity', 'double'),
 ('citric acid', 'double'),
 ('residual sugar', 'double'),
 ('chlorides', 'double'),
 ('free sulfur dioxide', 'double'),
 ('total sulfur dioxide', 'double'),
 ('density', 'double'),
 ('pH', 'double'),
 ('sulphates', 'double'),
 ('alcohol', 'double'),
 ('quality', 'int')]

In [16]:
wine_data.count()
wine_data.columns


Out[16]:
['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [19]:
wine_data.select(['quality']).distinct().show()


+-------+
|quality|
+-------+
|      6|
|      3|
|      5|
|      4|
|      8|
|      7|
+-------+


In [21]:
# convert data into featuresCol and labelCol structre
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
ml_wine_data = wine_data.rdd.map(lambda r: [Vectors.dense(r[-1]), r[-1]]).toDF(['featuresCol', 'label'])


---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-21-691f1b8c5d2c> in <module>()
      2 from pyspark.sql import Row
      3 from pyspark.ml.linalg import Vectors
----> 4 ml_wine_data = wine_data.rdd.map(lambda r: [Vectors.dense(r[-1]), r[-1]]).toDF(['featuresCol', 'label'])

/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/sql/session.pyc in toDF(self, schema, sampleRatio)
     55         [Row(name=u'Alice', age=1)]
     56         """
---> 57         return sparkSession.createDataFrame(self, schema, sampleRatio)
     58 
     59     RDD.toDF = toDF

/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    522             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    523         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
--> 524         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
    525         df = DataFrame(jdf, self._wrapped)
    526         df._schema = schema

/usr/local/Cellar/apache-spark/2.1.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':"

In [ ]: