In [ ]:
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21` // for cleaner logs
import $profile.`hadoop-2.6`
import $ivy.`org.apache.spark::spark-sql:2.1.0` // adjust spark version - spark >= 2.0
import $ivy.`org.apache.spark::spark-mllib:2.1.0`
import $ivy.`org.apache.hadoop:hadoop-aws:2.6.4`
import $ivy.`org.jupyter-scala::spark:0.4.2` // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)

In [ ]:
scala.util.Properties.versionString

In [ ]:
import org.apache.spark._
import org.apache.spark.sql._
import jupyter.spark.session._
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, StopWordsRemover, Tokenizer}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.{LDA, LDAModel}

In [ ]:
// case class NewsData(content: String)

In [ ]:
val spark = JupyterSparkSession.builder() // important - call this rather than SparkSession.builder()
  .jupyter() // this method must be called straightaway after builder()
  // .yarn("/etc/hadoop/conf") // optional, for Spark on YARN - argument is the Hadoop conf directory
  // .emr("2.6.4") // on AWS ElasticMapReduce, this adds aws-related to the spark jar list
  .master("local[4]") // change to "yarn-client" on YARN
  // .config("spark.executor.instances", "10")
  // .config("spark.executor.memory", "3g")
  // .config("spark.hadoop.fs.s3a.access.key", awsCredentials._1)
  // .config("spark.hadoop.fs.s3a.secret.key", awsCredentials._2)
  .appName("notebook")
  .getOrCreate()

// import spark.implicits._

In [ ]:
val df = spark
  .sparkContext
  .wholeTextFiles("../../../resources/dataset/20news-bydate/20news-bydate-train/*")
  .map(x => NewsData(x._2))
  .toDS()

df.show()

In [ ]:
df.take(10)

In [ ]:
val tokenizer = new Tokenizer()
  .setInputCol("value")
  .setOutputCol("words")

val remover = new StopWordsRemover()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("words_filtered")

val cv = new CountVectorizer()
  .setInputCol("filteredWords")
  .setOutputCol("features")
  .setVocabSize(5000)

val lda = new LDA()
  .setOptimizer("online")
  .setK(20)
  .setMaxIter(5)

val pipeline = new Pipeline().setStages(
  Array(tokenizer, remover, cv, lda))

In [ ]:
log.info("Starting pipeline fit...")
val pipeLineModel = pipeline.fit(df)