In [ ]:
assert("spark" in globals())

In [ ]:
assert(sc.defaultParallelism > 0)

In [ ]:
from sklearn.datasets import load_iris
from pyspark.sql.types import StringType, FloatType, StructField, StructType

d = load_iris()

def make_records(features, label, label_names):
    temp = [float(f) for f in features]
    temp.append(str(label_names[label]))
    return temp

col_types = [StructField(fname, FloatType(), False) for fname in d["feature_names"]]
col_types.append(StructField("target", StringType(), False))
schema = StructType(col_types)

In [ ]:
df = spark.createDataFrame([make_records(feature, label, d["target_names"]) \
                            for feature, label in zip(d["data"], d["target"])], schema)

In [ ]:
assert(df.count() == 150)

In [ ]:
expected_columns = d["feature_names"].copy()
expected_columns.append("target")
assert(all(actual == expected for actual, expected in zip(df.columns, expected_columns)))