In [ ]:
assert("spark" in globals())
In [ ]:
assert(sc.defaultParallelism > 0)
In [ ]:
from sklearn.datasets import load_iris
from pyspark.sql.types import StringType, FloatType, StructField, StructType
d = load_iris()
def make_records(features, label, label_names):
temp = [float(f) for f in features]
temp.append(str(label_names[label]))
return temp
col_types = [StructField(fname, FloatType(), False) for fname in d["feature_names"]]
col_types.append(StructField("target", StringType(), False))
schema = StructType(col_types)
In [ ]:
df = spark.createDataFrame([make_records(feature, label, d["target_names"]) \
for feature, label in zip(d["data"], d["target"])], schema)
In [ ]:
assert(df.count() == 150)
In [ ]:
expected_columns = d["feature_names"].copy()
expected_columns.append("target")
assert(all(actual == expected for actual, expected in zip(df.columns, expected_columns)))