In [ ]:
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row
val sqlContext = new SQLContext(sc)
val schema = StructType(Array(
StructField("id",StringType,true),
StructField("title",StringType,true),
StructField("body",StringType,true),
StructField("tags",StringType,true)))
Out[ ]:
In [ ]:
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("mode","DROPMALFORMED")
.schema(schema)
.load("/home/felipe/auto-tagger/data/stackoverflow/pieces")
Out[ ]:
In [ ]:
// old count was 6034194
val count = df.count()
Out[ ]:
In [ ]:
val labelCardinality = df
.select(df("tags"))
.map{ case Row(tags) => Option(tags) }
.map{
case Some(tags) => tags.toString.split(" ").size.toDouble
case None => 0.0
}
.mean()
Out[ ]:
In [ ]:
val distinctLabelCount = df
.select(df("tags"))
.map { case Row(tags) => Option(tags) }
.filter( opt => opt.isDefined)
.map( opt => opt.get)
.flatMap( tags => tags.toString.split(" ") )
.map( tag => (tag,1) )
.reduceByKey( (a:Int,b:Int) => a+b )
.map{ case (label,count) => label }
.count()
Out[ ]:
In [ ]:
val labelSetCount = df
.select(df("tags"))
.map { case Row(tags) => Option(tags) }
.filter( opt => opt.isDefined)
.map( opt => opt.get)
.map( tags => (tags.toString.split(" ").toSet,1) ) // using set rather than sequence
.reduceByKey( (a:Int,b:Int) => a+b )
.map{ case (label,count) => label }
.count()
Out[ ]:
In [ ]:
val numberOfLabelSetsThatOccurOnlyOnce = df
.select(df("tags"))
.map { case Row(tags) => Option(tags) }
.filter( opt => opt.isDefined)
.map( opt => opt.get)
.map( tags => (tags.toString.split(" ").toSeq,1) )
.reduceByKey( (a:Int,b:Int) => a+b )
.filter{ case(label,count) => count == 1 }
.count()
Out[ ]:
In [ ]:
println("labelDensity with and without scientific notation:\n")
val labelDensity = labelCardinality / labelSetCount
print(f"labelDensity: Double = $labelDensity%.10f \n")
Out[ ]: