In [1]:
import pyspark as ps
from sentimentAnalysis import dataProcessing as dp
In [2]:
# create spark session
spark = ps.sql.SparkSession(sc)
In [7]:
# get dataframes
# specify s3 as sourc with s3a://
#df = spark.read.json("s3a://amazon-review-data/user_dedup.json.gz")
#df_meta = spark.read.json("s3a://amazon-review-data/metadata.json.gz")
# get shard
df_raw_data = spark.read.json("s3a://amazon-review-data/reviews_Musical_Instruments_5.json.gz")
# subset asin, reviewText
df_subset = df_raw_data.select("asin", "reviewText")
df_tokens = dp.add_tokens(df_subset)
In [8]:
from pyspark.ml.feature import NGram
In [10]:
# instantiate ngram object
ngram = NGram(n=3, inputCol="rawTokens", outputCol="triGrams")
# add ngrams
df_triGrams = ngram.transform(df_tokens)
In [11]:
df_triGrams.show(3)
In [18]:
import nltk
In [13]:
# get test row
test_row = df_triGrams.first()
type(test_row["triGrams"])
Out[13]:
In [7]:
# test tiler
nltk.pos_tag(test_row["tokens"])
Out[7]:
In [32]:
from pyspark.sql.types import ArrayType, StringType
In [17]:
# create udf
pos_udf = ps.sql.functions.udf(lambda x: nltk.pos_tag(x), ArrayType(ArrayType(StringType())))
# apply udf, create new column
df_posTag = df_tokens.withColumn("posTags", pos_udf(df_tokens["tokens"]))
df_posTag.show(3)
In [19]:
df_posTag.select("posTags").first()
Out[19]:
In [15]:
test_row["triGrams"][:10]
Out[15]:
In [28]:
def tag_triGrams(triGrams):
tagged = []
for triGram in triGrams:
tagged.append(nltk.pos_tag(triGram.split()))
return tagged
In [21]:
test_row["triGrams"][0].split()
Out[21]:
In [30]:
tag_triGrams(test_row["triGrams"])[:10]
Out[30]:
In [38]:
# create udf
pos_triTag_udf = ps.sql.functions.udf(lambda x: tag_triGrams(x), ArrayType(ArrayType(ArrayType(StringType()))))
# apply udf, create new column
df_triPosTags = df_triGrams.withColumn("triPosTags", pos_triTag_udf(df_triGrams["triGrams"]))
df_triPosTags.show(3)
In [39]:
test_row = df_triPosTags.first()
In [40]:
test_row["triPosTags"]
Out[40]:
In [40]:
# import nltk
# from pyspark.sql.types import ArrayType, StringType
def addPosTags(df_tokens):
# create udf
pos_udf = ps.sql.functions.udf(lambda x: nltk.pos_tag(x), ArrayType(ArrayType(StringType())))
# apply udf, create new column
df_posTag = df_tokens.withColumn("posTags", pos_udf(df_tokens["tokens"]))
df_posTag = df_posTag.withColumn("raw_posTags", pos_udf(df_tokens["rawTokens"]))
return df_posTag
In [41]:
# test
df_posTag = addPosTags(df_tokens)
df_posTag.show(3)
We are interested in nouns and adjectives. Nouns identify product features and adjectives expresses customer opinions of those features.
However, we cannot use consecutive adjective/noun or noun/adjective pairs alone. Consider this phrase: The chair was not great. If we only extracted the noun chair and the adjective great, the resulting pair chair great does not accurately reflect the sentiment expressed in the sentence. The adverb not negates the positive connotation of great. This scenario illustrates one of a number of ways in which adjective/noun pair meanings are influenced by neighboring words.
We need a set of POS sequences that can help identify sequences we are interested in. Thanfuklly, such a set exists (Turney, 2002), and we can use it here:
| Word 1 | Word 2 | Word 3 |
|---|---|---|
| JJ | NN/NS | anything |
| RB/RBR/RBS | JJ | Not NN or NNS |
| JJ | JJ | Not NN or NNS |
| NN/ NNS | JJ | Not NN or NNS |
| RB/ RBR/ RBS | VB/ VBN/ VBD/ VBG | anything |
Turney, Peter D. 2002. Thumbs Up or Thumbs
Down? Semantic Orientation Applied to
Unsupervised, Classification of Reviews.
Proceedings of the 40th Annual Meeting of
the Association for Computational
Linguistics (ACL'02), Philadelphia,
Pennsylvania, USA, July 8-10, 2002. pp
417-424. NRC 44946
Feature-based Customer Review Mining
Jingye Wang Heng Ren
Department of Computer Science
Stanford University
In [50]:
tag_seqs_re = [('JJ', '^(NN|NS)', '.*'),
('^(RB|RBR|RBS)', 'JJ', '^(?!(NN|NS)).*'),
('JJ', 'JJ', '^(?!(NN|NS)).*'),
('^(NN|NS)', 'JJ', '^(?!(NN|NS)).*'),
('^(RB|RBR|RBS)', '^(VB|VBN|VBD|VBG)', '.*')
]
In [52]:
# get python regex
import re
In [42]:
# get test row
test_row = df_posTag.first()
In [42]:
# check triGram tags- want tagged raw tokens (stopwords not removed)
test_row["triPosTags"][:10]
Out[42]:
In [66]:
# function to check if a tagged triGram matches a single sequence
def is_match(triPosTag, seq):
# iterate over tags in triPosTag
for i,el in enumerate(triPosTag):
print(el[1]+" match "+seq[i])
# return False if tag does not match sequence
if not re.match(el[1], seq[i]):
return False
# returns true if no mismatches found
return True
def match_pos_seq(taggedTriGram):
for el in taggedTriGram:
pass
In [70]:
# get test tag
test_triPosTag = test_row["triPosTags"][0]
# create test match tag
test_triPosTag_match = [["a", "NN"], ["b", "JJ"], ["c", "RR"]]
In [71]:
# test regex match works
tag_seqs_re[3]
Out[71]:
In [80]:
re.match(tag_seqs_re[3][0], "NN")
Out[80]:
In [69]:
# test is_match()
is_match(test_triPosTag_match, tag_seqs_re[3])
Out[69]:
In [ ]:
#df_obj_only.write.json("s3a://amazon-review-data/review-data")