In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
TF is short for Term Frequency. It is simply the frequency of a term in a document. The higher the TF is for a specific term, the more important that term is to that document.
IDF is short for Inverse Document Frequency. It is the frequency of documents that contain a specific term. If a term exists in every single document, then the Document Frequency is the largest and is 1. And the Inverse Document Frequency will be the smallest. In the situation, this term is non-informative for classifying the documents.The IDF is a measure of the relevance of a term. The higher the IDF is, the more relavant the term is.
TF-IDF is the product of TF and IDF. A high TF-IDF is obtained when the The Term Frequency is high and the Document Frequency is low (IDF is high).
Pyspark has two functions to calculate term frequencies from documents: the HashingTF()
and the CountVectorizer()
. These two functions do two things:
The HashingTF()
utilizes the Murmurhash 3 function to map a raw feature (a term) into an index (a number). Hashing is the process of transforming data of arbitrary size to size-fixed, usually shorter data. The term frequencies are calculated based on the generated indices. For the HashingTF() method, the mapping process is very cheap. Because each term-to-index mapping is independent of other term-to-index mapping. The hashing function takes a unique input and gerenate a “unique result”. However, hashing collision may occur, which means different features (terms) may be hased to the same index.
The CountVectorizer()
indexes terms by descending order of term frequencies in the entire corpus, NOT the term frequencies in the document. After the indexing process, the term frequencies are calculated by documents.
In [2]:
import pandas as pd
pdf = pd.DataFrame({
'terms': [
['spark', 'spark', 'spark', 'is', 'awesome', 'awesome'],
['I', 'love', 'spark', 'very', 'very', 'much'],
['everyone', 'should', 'use', 'spark']
]
})
df = spark.createDataFrame(pdf)
df.show(truncate=False)
In [3]:
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline
hashtf = HashingTF(numFeatures=pow(2, 4), inputCol='terms', outputCol='features(numFeatures), [index], [term frequency]')
stages = [hashtf]
pipeline = Pipeline(stages=stages)
In [4]:
pipeline.fit(df).transform(df).show(truncate=False)
You may note that the first document has three distinct terms, but only two term frequencies are obtained. This apparent discrepancy is due to a hashing collision: both spark
and is
are getting hashed to 1
. The term frequency for index 1
in the first document is 4.0
corresponding to the three counts of spark
and the one count of is
. The likelihood of a hashing collision can be reduced by increasing the numFeatures
parameter passed to the HashingTF
function (the default for example is $2^18 = 262,144$).
The CountVectorizer()
function has three parameters to control which terms will be kept as features.
In the example below, the minTF=1.0,minDF=1.0minTF=1.0,minDF=1.0
and vocabSize=20vocabSize=20
, which is larger than the total number of terms. Therefore, all features (terms) will be kept.
In [5]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline
countvectorizer = CountVectorizer(minTF=1.0, minDF=1.0, vocabSize=20,
inputCol='terms', outputCol='features(vocabSize), [index], [term frequency]')
stages = [countvectorizer]
pipeline = Pipeline(stages=stages)
In [6]:
pipeline.fit(df).transform(df).show(truncate=False)
In [11]:
from pyspark.sql.types import StringType
df_vocab = df.select('terms').rdd.\
flatMap(lambda x: x[0]).\
toDF(schema=StringType()).toDF('terms')
df_vocab.show()
In [24]:
vocab_freq = df_vocab.rdd.countByValue()
pdf = pd.DataFrame({
'term': list(vocab_freq.keys()),
'frequency': list(vocab_freq.values())
})
pdf
tf = spark.createDataFrame(pdf).orderBy('frequency', ascending=False)
tf.show()
In [25]:
from pyspark.ml.feature import StringIndexer
stringindexer = StringIndexer(inputCol='terms', outputCol='StringIndexer(index)')
In [26]:
stringindexer.fit(df_vocab).transform(df_vocab).\
distinct().\
orderBy('StringIndexer(index)').show()
The indexing result is consistant for the first three terms. The rest of terms have the same frequency which is 1. These terms can not be sorted by frequency. This might be the reason that their indices don’t match the results from the CountVectorizer() method.
In [ ]: