In [ ]:
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row

val sqlContext = new SQLContext(sc)

val schema = StructType(Array(
     StructField("id",StringType,true),
     StructField("title",StringType,true),
     StructField("body",StringType,true),
     StructField("tags",StringType,true)))


import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@6a8c4ec0
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true), StructField(title,StringType,true), StructField(body,StringType,true), StructField(tags,StringType,true))
Out[ ]:


In [ ]:
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("mode","DROPMALFORMED")
.schema(schema)
.load("/home/felipe/auto-tagger/data/stackoverflow/pieces")


df: org.apache.spark.sql.DataFrame = [id: string, title: string, body: string, tags: string]
Out[ ]:

In [ ]:
// old count was 6034194
val count = df.count()


count: Long = 6034195
Out[ ]:
6034195

In [ ]:
val labelCardinality = df
  .select(df("tags"))
  .map{ case Row(tags) => Option(tags) }
  .map{ 
     case Some(tags) => tags.toString.split(" ").size.toDouble
     case None => 0.0
    } 
 .mean()


labelCardinality: Double = 2.885222303886431
Out[ ]:
2.885222303886431

In [ ]:
val distinctLabelCount = df
  .select(df("tags"))
  .map { case Row(tags) => Option(tags) }
  .filter( opt => opt.isDefined)
  .map( opt => opt.get)
  .flatMap( tags => tags.toString.split(" ") )
  .map( tag => (tag,1) )
  .reduceByKey( (a:Int,b:Int) => a+b )
  .map{ case (label,count) => label }
  .count()


distinctLabelCount: Long = 42048
Out[ ]:
42048

In [ ]:
val labelSetCount = df
  .select(df("tags"))
  .map { case Row(tags) => Option(tags) }
  .filter( opt => opt.isDefined)
  .map( opt => opt.get)
  .map( tags => (tags.toString.split(" ").toSet,1) ) // using set rather than sequence
  .reduceByKey( (a:Int,b:Int) => a+b )
  .map{ case (label,count) => label }
  .count()


labelSetCount: Long = 2075362
Out[ ]:
2075362

In [ ]:
val numberOfLabelSetsThatOccurOnlyOnce = df
  .select(df("tags"))
  .map { case Row(tags) => Option(tags) }
  .filter( opt => opt.isDefined)
  .map( opt => opt.get)
  .map( tags => (tags.toString.split(" ").toSeq,1) )
  .reduceByKey( (a:Int,b:Int) => a+b )
  .filter{ case(label,count) => count == 1 }
  .count()


numberOfLabelSetsThatOccurOnlyOnce: Long = 1155192
Out[ ]:
1155192

In [ ]:
println("labelDensity with and without scientific notation:\n")
val labelDensity = labelCardinality / labelSetCount
print(f"labelDensity: Double = $labelDensity%.10f \n")


labelDensity with and without scientific notation:

labelDensity: Double = 0.0000013902 
labelDensity: Double = 1.3902260443654798E-6
Out[ ]: