Create SparkContext & SparkSession


In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')

from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

TF, IDF and TF-IDF

  • TF is short for Term Frequency. It is simply the frequency of a term in a document. The higher the TF is for a specific term, the more important that term is to that document.

  • IDF is short for Inverse Document Frequency. It is the frequency of documents that contain a specific term. If a term exists in every single document, then the Document Frequency is the largest and is 1. And the Inverse Document Frequency will be the smallest. In the situation, this term is non-informative for classifying the documents.The IDF is a measure of the relevance of a term. The higher the IDF is, the more relavant the term is.

  • TF-IDF is the product of TF and IDF. A high TF-IDF is obtained when the The Term Frequency is high and the Document Frequency is low (IDF is high).

Term Frequency, HashingTF and CountVectorizer

Pyspark has two functions to calculate term frequencies from documents: the HashingTF() and the CountVectorizer(). These two functions do two things:

  1. Indexing terms: converting words to numbers.
  2. Calculate term frequencies for each documents.

The HashingTF() utilizes the Murmurhash 3 function to map a raw feature (a term) into an index (a number). Hashing is the process of transforming data of arbitrary size to size-fixed, usually shorter data. The term frequencies are calculated based on the generated indices. For the HashingTF() method, the mapping process is very cheap. Because each term-to-index mapping is independent of other term-to-index mapping. The hashing function takes a unique input and gerenate a “unique result”. However, hashing collision may occur, which means different features (terms) may be hased to the same index.

The CountVectorizer() indexes terms by descending order of term frequencies in the entire corpus, NOT the term frequencies in the document. After the indexing process, the term frequencies are calculated by documents.

Create some data


In [2]:
import pandas as pd
pdf = pd.DataFrame({
        'terms': [
            ['spark', 'spark', 'spark', 'is', 'awesome', 'awesome'],
            ['I', 'love', 'spark', 'very', 'very', 'much'],
            ['everyone', 'should', 'use', 'spark']
        ]
    })
df = spark.createDataFrame(pdf)
df.show(truncate=False)


+-------------------------------------------+
|terms                                      |
+-------------------------------------------+
|[spark, spark, spark, is, awesome, awesome]|
|[I, love, spark, very, very, much]         |
|[everyone, should, use, spark]             |
+-------------------------------------------+

HashingTF

The numFeatures paramter takes an integer, which should be larger than the total number of terms in the corpus. And it should be a power of two so that features are mapped evenly to columns.


In [3]:
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline

hashtf = HashingTF(numFeatures=pow(2, 4), inputCol='terms', outputCol='features(numFeatures), [index], [term frequency]')
stages = [hashtf]
pipeline = Pipeline(stages=stages)

In [4]:
pipeline.fit(df).transform(df).show(truncate=False)


+-------------------------------------------+------------------------------------------------+
|terms                                      |features(numFeatures), [index], [term frequency]|
+-------------------------------------------+------------------------------------------------+
|[spark, spark, spark, is, awesome, awesome]|(16,[1,15],[4.0,2.0])                           |
|[I, love, spark, very, very, much]         |(16,[0,1,2,8,12],[1.0,1.0,1.0,2.0,1.0])         |
|[everyone, should, use, spark]             |(16,[1,9,13],[2.0,1.0,1.0])                     |
+-------------------------------------------+------------------------------------------------+

You may note that the first document has three distinct terms, but only two term frequencies are obtained. This apparent discrepancy is due to a hashing collision: both spark and is are getting hashed to 1. The term frequency for index 1 in the first document is 4.0 corresponding to the three counts of spark and the one count of is. The likelihood of a hashing collision can be reduced by increasing the numFeatures parameter passed to the HashingTF function (the default for example is $2^18 = 262,144$).

CountVectorizer

The CountVectorizer() function has three parameters to control which terms will be kept as features.

  • minTF: features that has term frequency less than minTF will be removed. If minTF=1minTF=1, then no features will be removed.
  • minDF: features that has document frequency less than minDF will be removed. If minDF=1minDF=1, then no features will be removed.
  • vocabSize: keep terms of the top vocabSize frequencies.

In the example below, the minTF=1.0,minDF=1.0minTF=1.0,minDF=1.0 and vocabSize=20vocabSize=20, which is larger than the total number of terms. Therefore, all features (terms) will be kept.


In [5]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline

countvectorizer = CountVectorizer(minTF=1.0, minDF=1.0, vocabSize=20, 
                                  inputCol='terms', outputCol='features(vocabSize), [index], [term frequency]')
stages = [countvectorizer]
pipeline = Pipeline(stages=stages)

In [6]:
pipeline.fit(df).transform(df).show(truncate=False)


+-------------------------------------------+----------------------------------------------+
|terms                                      |features(vocabSize), [index], [term frequency]|
+-------------------------------------------+----------------------------------------------+
|[spark, spark, spark, is, awesome, awesome]|(10,[0,1,3],[3.0,2.0,1.0])                    |
|[I, love, spark, very, very, much]         |(10,[0,2,4,5,6],[1.0,2.0,1.0,1.0,1.0])        |
|[everyone, should, use, spark]             |(10,[0,7,8,9],[1.0,1.0,1.0,1.0])              |
+-------------------------------------------+----------------------------------------------+

Now, lets use the StringIndexer() to index the corpus and see if the results is consistant with the CountVectorizer() method.

flatMap documents so that each row has a single term.


In [11]:
from pyspark.sql.types import StringType
df_vocab = df.select('terms').rdd.\
            flatMap(lambda x: x[0]).\
            toDF(schema=StringType()).toDF('terms')
df_vocab.show()


+--------+
|   terms|
+--------+
|   spark|
|   spark|
|   spark|
|      is|
| awesome|
| awesome|
|       I|
|    love|
|   spark|
|    very|
|    very|
|    much|
|everyone|
|  should|
|     use|
|   spark|
+--------+

Calculate term frequencies in the corpus


In [24]:
vocab_freq = df_vocab.rdd.countByValue()
pdf = pd.DataFrame({
        'term': list(vocab_freq.keys()),
        'frequency': list(vocab_freq.values())
    })
pdf
tf = spark.createDataFrame(pdf).orderBy('frequency', ascending=False)
tf.show()


+---------+----------+
|frequency|      term|
+---------+----------+
|        5|   [spark]|
|        2|    [very]|
|        2| [awesome]|
|        1|      [is]|
|        1|       [I]|
|        1|    [love]|
|        1|[everyone]|
|        1|     [use]|
|        1|    [much]|
|        1|  [should]|
+---------+----------+

Apply StringIndexer() to df_vocab


In [25]:
from pyspark.ml.feature import StringIndexer
stringindexer = StringIndexer(inputCol='terms', outputCol='StringIndexer(index)')

In [26]:
stringindexer.fit(df_vocab).transform(df_vocab).\
    distinct().\
    orderBy('StringIndexer(index)').show()


+--------+--------------------+
|   terms|StringIndexer(index)|
+--------+--------------------+
|   spark|                 0.0|
| awesome|                 1.0|
|    very|                 2.0|
|      is|                 3.0|
|everyone|                 4.0|
|       I|                 5.0|
|    love|                 6.0|
|  should|                 7.0|
|    much|                 8.0|
|     use|                 9.0|
+--------+--------------------+

The indexing result is consistant for the first three terms. The rest of terms have the same frequency which is 1. These terms can not be sorted by frequency. This might be the reason that their indices don’t match the results from the CountVectorizer() method.


In [ ]: