In [ ]:
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21` // for cleaner logs
import $profile.`hadoop-2.6`
import $ivy.`org.apache.spark::spark-sql:2.1.0` // adjust spark version - spark >= 2.0
import $ivy.`org.apache.spark::spark-mllib:2.1.0`
import $ivy.`org.apache.hadoop:hadoop-aws:2.6.4`
import $ivy.`org.jupyter-scala::spark:0.4.2` // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)
In [ ]:
scala.util.Properties.versionString
In [ ]:
import org.apache.spark._
import org.apache.spark.sql._
import jupyter.spark.session._
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, StopWordsRemover, Tokenizer}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.{LDA, LDAModel}
In [ ]:
// case class NewsData(content: String)
In [ ]:
val spark = JupyterSparkSession.builder() // important - call this rather than SparkSession.builder()
.jupyter() // this method must be called straightaway after builder()
// .yarn("/etc/hadoop/conf") // optional, for Spark on YARN - argument is the Hadoop conf directory
// .emr("2.6.4") // on AWS ElasticMapReduce, this adds aws-related to the spark jar list
.master("local[4]") // change to "yarn-client" on YARN
// .config("spark.executor.instances", "10")
// .config("spark.executor.memory", "3g")
// .config("spark.hadoop.fs.s3a.access.key", awsCredentials._1)
// .config("spark.hadoop.fs.s3a.secret.key", awsCredentials._2)
.appName("notebook")
.getOrCreate()
// import spark.implicits._
In [ ]:
val df = spark
.sparkContext
.wholeTextFiles("../../../resources/dataset/20news-bydate/20news-bydate-train/*")
.map(x => NewsData(x._2))
.toDS()
df.show()
In [ ]:
df.take(10)
In [ ]:
val tokenizer = new Tokenizer()
.setInputCol("value")
.setOutputCol("words")
val remover = new StopWordsRemover()
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("words_filtered")
val cv = new CountVectorizer()
.setInputCol("filteredWords")
.setOutputCol("features")
.setVocabSize(5000)
val lda = new LDA()
.setOptimizer("online")
.setK(20)
.setMaxIter(5)
val pipeline = new Pipeline().setStages(
Array(tokenizer, remover, cv, lda))
In [ ]:
log.info("Starting pipeline fit...")
val pipeLineModel = pipeline.fit(df)