In [ ]:
    
import os
master = '--master local[1]'
#master = '--master spark://apachespark-master-2-1-0:7077'
conf = '--conf spark.cores.max=1 --conf spark.executor.memory=512m'
packages = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1'
jars = '--jars /root/lib/jpmml-sparkml-package-1.0-SNAPSHOT.jar'
py_files = '--py-files /root/lib/jpmml.py'
os.environ['PYSPARK_SUBMIT_ARGS'] = master \
  + ' ' + conf \
  + ' ' + packages \
  + ' ' + jars \
  + ' ' + py_files \
  + ' ' + 'pyspark-shell'
print(os.environ['PYSPARK_SUBMIT_ARGS'])
    
In [ ]:
    
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression
    
In [ ]:
    
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate()
    
In [ ]:
    
df = spark_session.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .load("s3a://datapalooza/airbnb/airbnb.csv.bz2")
print(df.head())
    
In [ ]:
    
print(df.count())
    
In [ ]:
    
print(df_final.count())
    
In [ ]:
    
df_filtered = df.filter("price >= 50 AND price <= 750 AND bathrooms > 0.0 AND bedrooms is not null")
df_filtered.registerTempTable("df_filtered")
df_final = spark_session.sql("""
    select
        id,
        city,
        case when state in('NY', 'CA', 'London', 'Berlin', 'TX' ,'IL', 'OR', 'DC', 'WA')
            then state
            else 'Other'
        end as state,
        space,
        cast(price as double) as price,
        cast(bathrooms as double) as bathrooms,
        cast(bedrooms as double) as bedrooms,
        room_type,
        host_is_super_host,
        cancellation_policy,
        cast(case when security_deposit is null
            then 0.0
            else security_deposit
        end as double) as security_deposit,
        price_per_bedroom,
        cast(case when number_of_reviews is null
            then 0.0
            else number_of_reviews
        end as double) as number_of_reviews,
        cast(case when extra_people is null
            then 0.0
            else extra_people
        end as double) as extra_people,
        instant_bookable,
        cast(case when cleaning_fee is null
            then 0.0
            else cleaning_fee
        end as double) as cleaning_fee,
        cast(case when review_scores_rating is null
            then 80.0
            else review_scores_rating
        end as double) as review_scores_rating,
        cast(case when square_feet is not null and square_feet > 100
            then square_feet
            when (square_feet is null or square_feet <=100) and (bedrooms is null or bedrooms = 0)
            then 350.0
            else 380 * bedrooms
        end as double) as square_feet
    from df_filtered
""").persist()
df_final.registerTempTable("df_final")
df_final.select("square_feet", "price", "bedrooms", "bathrooms", "cleaning_fee").describe().show()
    
In [ ]:
    
print(df_final.schema)
    
In [ ]:
    
# Most popular cities
spark_session.sql("""
    select 
        state,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by state
    order by count(*) desc
""").show()
    
In [ ]:
    
# Most expensive popular cities
spark_session.sql("""
    select 
        city,
        count(*) as ct,
        avg(price) as avg_price,
        max(price) as max_price
    from df_final
    group by city
    order by avg(price) desc
""").filter("ct > 25").show()
    
In [ ]:
    
continuous_features = ["bathrooms", \
                       "bedrooms", \
                       "security_deposit", \
                       "cleaning_fee", \
                       "extra_people", \
                       "number_of_reviews", \
                       "square_feet", \
                       "review_scores_rating"]
categorical_features = ["room_type", \
                        "host_is_super_host", \
                        "cancellation_policy", \
                        "instant_bookable", \
                        "state"]
    
In [ ]:
    
continuous_feature_assembler = VectorAssembler(inputCols=continuous_features, outputCol="unscaled_continuous_features")
continuous_feature_scaler = StandardScaler(inputCol="unscaled_continuous_features", outputCol="scaled_continuous_features", \
                                           withStd=True, withMean=False)
    
In [ ]:
    
categorical_feature_indexers = [StringIndexer(inputCol=x, \
                                              outputCol="{}_index".format(x)) \
                                for x in categorical_features]
categorical_feature_one_hot_encoders = [OneHotEncoder(inputCol=x.getOutputCol(), \
                                                      outputCol="oh_encoder_{}".format(x.getOutputCol() )) \
                                        for x in categorical_feature_indexers]
    
In [ ]:
    
feature_cols_lr = [x.getOutputCol() \
                   for x in categorical_feature_one_hot_encoders]
feature_cols_lr.append("scaled_continuous_features")
feature_assembler_lr = VectorAssembler(inputCols=feature_cols_lr, \
                                       outputCol="features_lr")
    
In [ ]:
    
linear_regression = LinearRegression(featuresCol="features_lr", \
                                     labelCol="price", \
                                     predictionCol="price_prediction", \
                                     maxIter=10, \
                                     regParam=0.3, \
                                     elasticNetParam=0.8)
estimators_lr = \
  [continuous_feature_assembler, continuous_feature_scaler] \
  + categorical_feature_indexers + categorical_feature_one_hot_encoders \
  + [feature_assembler_lr] + [linear_regression]
pipeline = Pipeline(stages=estimators_lr)
pipeline_model = pipeline.fit(df_final)
print(pipeline_model)
    
In [ ]:
    
from jpmml import toPMMLBytes
model = toPMMLBytes(spark_session, df_final, pipeline_model)
with open('model.spark', 'wb') as fh:
    fh.write(model)
    
In [ ]:
    
%%bash
ls -l model.spark