In [14]:
from pyspark import SparkConf, SparkContext
## set up spark context
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
## set up SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [15]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors #for above 2.0 version
#from pyspark.mllib.linalg import Vectors
In [16]:
# load data
data = [(Vectors.dense([0.0, 0.0]),), \
(Vectors.dense([1.0, 1.0]),),\
(Vectors.dense([9.0, 8.0]),), \
(Vectors.dense([8.0, 9.0]),)]
df = sqlContext.createDataFrame(data, ["features"])
df.show()
In [17]:
#kmeans model
kmeans = KMeans(k=2, seed=1)
Kmodel = kmeans.fit(df)
# number of cenet
centers = Kmodel.clusterCenters()
len(centers)
Out[17]:
In [18]:
transformed = Kmodel.transform(df).select("features", "prediction")
rows = transformed.collect()
rows
Out[18]:
In [19]:
rows[0].prediction == rows[1].prediction
#True
rows[2].prediction == rows[3].prediction
# True
Out[19]:
In [20]:
from pyspark.ml.clustering import KMeans
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors
In [21]:
from pyspark import SparkContext
#sc = SparkContext()
sqlContext = SQLContext(sc)
In [22]:
data = sqlContext.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').load('./data/iris.csv')
In [23]:
data.printSchema()
data.show()
In [24]:
from pyspark.ml.feature import StringIndexer
feature = StringIndexer(inputCol="species", outputCol="targetlabel")
target = feature.fit(data).transform(data)
In [25]:
target.printSchema()
target.show()
In [26]:
from pyspark.sql import Row
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
In [27]:
# convert the data to dense vector
def transData(row):
return Row(label=row["targetlabel"],
features=Vectors.dense([row["sepal_length"],
row["sepal_width"],
row["petal_length"],
row["petal_width"]]))
In [28]:
#convert the data to Dataframe
#Note: if your pyspark is 2.0 above, you need to convert Datafrme to rdd FIRST
transformed = target.rdd.map(transData).toDF()
transformed.show()
In [29]:
# model and predict data
kmeans = KMeans(k=3)
model = kmeans.fit(transformed)
predict_data = model.transform(transformed)
predict_data.show()
In [30]:
train_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count()
total = predict_data.count()
In [31]:
train_err, total, 1-float(train_err)/total,float(train_err)/total
Out[31]:
In [ ]: