Training a Sentiment Analysis LSTM Using Noisy Crowd Labels

In this tutorial, we'll provide a simple walkthrough of how to use Snorkel to resolve conflicts in a noisy crowdsourced dataset for a sentiment analysis task, and then use these denoised labels to train an LSTM sentiment analysis model which can be applied to new, unseen data to automatically make predictions!

Specifically, we'll look at:

  1. Loading data via SparkSQL
  2. Creating basic Snorkel objects: Candidates, Contexts, and Labels
  3. Training the GenerativeModel to resolve labeling conflicts
  4. Training a simple LSTM sentiment analysis model, which can then be used on new, unseen data!

Note that this is a simple tutorial meant to give an overview of the mechanics of using Snorkel-- we'll note places where more careful fine-tuning could be done!

Installing PySpark

Please see the official instructions!

Task Detail: Weather Sentiments in Tweets

In this tutorial we focus on the Weather sentiment task from Crowdflower.

In this task, contributors were asked to grade the sentiment of a particular tweet relating to the weather. Contributors could choose among the following categories:

  1. Positive
  2. Negative
  3. I can't tell
  4. Neutral / author is just sharing information
  5. Tweet not related to weather condition

The catch is that 20 contributors graded each tweet. Thus, in many cases contributors assigned conflicting sentiment labels to the same tweet.

The task comes with two data files (to be found in the data directory of the tutorial:

  1. weather-non-agg-DFE.csv contains the raw contributor answers for each of the 1,000 tweets.
  2. weather-evaluated-agg-DFE.csv contains gold sentiment labels by trusted workers for each of the 1,000 tweets.

In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import os
import numpy as np
from snorkel import SnorkelSession
session = SnorkelSession()

Step 1: Preprocessing - Data Loading with Spark SQL and Dataframes

First, we initialize a SparkSession, which manages a connection to a local Spark master which allows us to preprocess the raw data and prepare convert them to the necessary Snorkel format:


In [2]:
# Initialize Spark Environment and Spark SQL
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Snorkel Crowdsourcing Demo") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

We can now load the raw data for our crowdsourcing task (stored in a local csv file) into a dataframe.


In [3]:
# Load Raw Crowdsourcing Data
raw_crowd_answers = spark.read.format("csv").option("header", "true").csv("data/weather-non-agg-DFE.csv")
raw_crowd_answers.printSchema()

# Load Groundtruth Crowdsourcing Data
gold_crowd_answers = spark.read.format("csv").option("header", "true").csv("data/weather-evaluated-agg-DFE.csv")
gold_crowd_answers.createOrReplaceTempView("gold_crowd_answers")
# Filter out low-confidence answers
gold_answers = spark.sql("SELECT tweet_id, sentiment, tweet_body FROM gold_crowd_answers WHERE correct_category ='Yes' and correct_category_conf = 1").orderBy("tweet_id")

# Keep Only the Tweets with Available Groundtruth
candidate_labeled_tweets = raw_crowd_answers.join(gold_answers, raw_crowd_answers.tweet_id == gold_answers.tweet_id).select(raw_crowd_answers.tweet_id,raw_crowd_answers.tweet_body,raw_crowd_answers.worker_id,raw_crowd_answers.emotion)


root
 |-- _unit_id_: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- trust: string (nullable = true)
 |-- worker_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- city: string (nullable = true)
 |-- emotion: string (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- tweet_body: string (nullable = true)

As mentioned above, contributors can provide conflicting labels for the same tweet:


In [4]:
candidate_labeled_tweets.select("worker_id", "emotion", "tweet_body").orderBy("tweet_id").show()


+---------+--------------------+--------------------+
|worker_id|             emotion|          tweet_body|
+---------+--------------------+--------------------+
|  6498214|        I can't tell|I dunno which ass...|
|  7450342|Neutral / author ...|I dunno which ass...|
| 10752241|            Positive|I dunno which ass...|
| 10235355|            Negative|I dunno which ass...|
| 17475684|            Negative|I dunno which ass...|
|  6346694|Neutral / author ...|I dunno which ass...|
| 14806909|Neutral / author ...|I dunno which ass...|
| 19028457|            Positive|I dunno which ass...|
|  6737418|            Negative|I dunno which ass...|
| 14584835|            Negative|I dunno which ass...|
| 18381123|Neutral / author ...|I dunno which ass...|
| 16498372|Tweet not related...|I dunno which ass...|
|  7012325|            Positive|I dunno which ass...|
|  9333400|            Negative|I dunno which ass...|
| 10379699|            Positive|I dunno which ass...|
| 14298198|            Positive|I dunno which ass...|
| 20043586|            Negative|I dunno which ass...|
|  9289735|        I can't tell|I dunno which ass...|
| 16738677|            Negative|I dunno which ass...|
| 15846764|            Negative|I dunno which ass...|
+---------+--------------------+--------------------+
only showing top 20 rows

Step 2: Generating Snorkel Objects

Candidates

Candidates are the core objects in Snorkel representing objects to be classified. We'll use a helper function to create a custom Candidate sub-class, Tweet, with values representing the possible labels that it can be classified with:


In [5]:
from snorkel.models import candidate_subclass

values = list(map(
    lambda r: r.emotion,
    candidate_labeled_tweets.select("emotion").distinct().collect()
))

Tweet = candidate_subclass('Tweet', ['tweet'], values=values)

Contexts

All Candidate objects point to one or more Context objects, which represent the raw data that they are rooted in. In this case, our candidates will each point to a single Context object representing the raw text of the tweet.

Once we have defined the Context for each Candidate, we can commit them to the database. Note that we also split into two sets while doing this:

  1. Training set (split=0): The tweets for which we have noisy, conflicting crowd labels; we will resolve these conflicts using the GenerativeModel and then use them as training data for the LSTM

  2. Test set (split=1): We will pretend that we do not have any crowd labels for this split of the data, and use these to test the LSTM's performance on unseen data


In [6]:
from snorkel.models import Context, Candidate
from snorkel.contrib.models.text import RawText

# Make sure DB is cleared
session.query(Context).delete()
session.query(Candidate).delete()

# Now we create the candidates with a simple loop
tweet_bodies = candidate_labeled_tweets \
    .select("tweet_id", "tweet_body") \
    .orderBy("tweet_id") \
    .distinct()

# Generate and store the tweet candidates to be classified
# Note: We split the tweets in two sets: one for which the crowd 
# labels are not available to Snorkel (test, 10%) and one for which we assume
# crowd labels are obtained (to be used for training, 90%)
total_tweets = tweet_bodies.count()
test_split = total_tweets*0.1
for i, t in enumerate(tweet_bodies.collect()):
    split = 1 if i <= test_split else 0
    raw_text = RawText(stable_id=t.tweet_id, name=t.tweet_id, text=t.tweet_body)
    tweet = Tweet(tweet=raw_text, split=split)
    session.add(tweet)
session.commit()

Labels

Next, we'll store the labels for each of the training candidates in a sparse matrix (which will also automatically be saved to the Snorkel database), with one row for each candidate and one column for each crowd worker:


In [7]:
from snorkel.annotations import LabelAnnotator
from collections import defaultdict

# Extract worker votes
# Cache locally to speed up for this small set
worker_labels = candidate_labeled_tweets.select("tweet_id", "worker_id", "emotion").collect()
wls = defaultdict(list)
for row in worker_labels:
    wls[row.tweet_id].append((row.worker_id, row.emotion))

# Create a label generator
def worker_label_generator(t):
    """A generator over the different (worker_id, label_id) pairs for a Tweet."""
    for worker_id, label in wls[t.tweet.name]:
        yield worker_id, label

labeler = LabelAnnotator(label_generator=worker_label_generator)
%time L_train = labeler.apply(split=0)
L_train


  3%|▎         | 19/568 [00:00<00:02, 189.53it/s]
Clearing existing...
Running UDF...
[========================================] 100%

CPU times: user 3.69 s, sys: 217 ms, total: 3.91 s
Wall time: 3.85 s
Out[7]:
<568x102 sparse matrix of type '<class 'numpy.int64'>'
	with 11360 stored elements in Compressed Sparse Row format>

Finally, we load the ground truth ("gold") labels for both the training and test sets, and store as numpy arrays"


In [8]:
gold_labels = defaultdict(list)

# Get gold labels in verbose form
verbose_labels = dict([(t.tweet_id, t.sentiment) 
                       for t in gold_answers.select("tweet_id", "sentiment").collect()])

# Iterate over splits, align with Candidate ordering
for split in range(2):
    cands = session.query(Tweet).filter(Tweet.split == split).order_by(Tweet.id).all() 
    for c in cands:
        gold_labels[split].append(values.index(verbose_labels[c.tweet.name]) + 1)
          
train_cand_labels = np.array(gold_labels[0])
test_cand_labels = np.array(gold_labels[1])

Step 3: Resolving Crowd Conflicts with the Generative Model

Until now we have converted the raw crowdsourced data into a labeling matrix that can be provided as input to Snorkel. We will now show how to:

  1. Use Snorkel's generative model to learn the accuracy of each crowd contributor.
  2. Use the learned model to estimate a marginal distribution over the domain of possible labels for each task.
  3. Use the estimated marginal distribution to obtain the maximum a posteriori probability estimate for the label that each task takes.

In [9]:
# Imports
from snorkel.learning.gen_learning import GenerativeModel

# Initialize Snorkel's generative model for
# learning the different worker accuracies.
gen_model = GenerativeModel(lf_propensity=True)

In [10]:
# Train the generative model
gen_model.train(
    L_train,
    reg_type=2,
    reg_param=0.1,
    epochs=30
)


Inferred cardinality: 5

Infering the MAP assignment for each task

Each task corresponds to an indipendent random variable. Thus, we can simply associate each task with the most probably label based on the estimated marginal distribution and get an accuracy score:


In [11]:
accuracy = gen_model.score(L_train, train_cand_labels)
print("Accuracy: {:.10f}".format(accuracy))


Accuracy: 0.9964788732

Majority vote

It seems like we did well- but how well? Given that this is a fairly simple task--we have 20 contributors per tweet (and most of them are far better than random)--we expect majority voting to perform extremely well, so we can check against majority vote:


In [12]:
from collections import Counter

# Collect the majority vote answer for each tweet
mv = []
for i in range(L_train.shape[0]):
    c = Counter([L_train[i,j] for j in L_train[i].nonzero()[1]])
    mv.append(c.most_common(1)[0][0])
mv = np.array(mv)

# Count the number correct by majority vote
n_correct = np.sum([1 for i in range(L_train.shape[0]) if mv[i] == train_cand_labels[i]])
print("Accuracy: {:.10f}".format(n_correct / float(L_train.shape[0])))
print("Number incorrect: {:.0f}".format(L_train.shape[0] - n_correct))


Accuracy: 0.9823943662
Number incorrect: 10

We see that while majority vote makes 9 errors, the Snorkel model makes only 2! What about an average crowd worker?

Average human accuracy

We see that the average accuracy of a single crowd worker is in fact much lower:


In [13]:
accs = []
for j in range(L_train.shape[1]):
    n_correct = np.sum([1 for i in range(L_train.shape[0]) if L_train[i,j] == train_cand_labels[i]])
    acc = n_correct / float(L_train[:,j].nnz)
    accs.append(acc)
print("Mean Accuracy: {:.10f}".format(np.mean(accs)))


Mean Accuracy: 0.7336496552

Step 4: Training an ML Model with Snorkel for Sentiment Analysis over Unseen Tweets

In the previous step, we saw that Snorkel's generative model can help to denoise crowd labels automatically. However, what happens when we don't have noisy crowd labels for a tweet?

In this step, we'll use the estimates of the generative model as probabilistic training labels to train a simple LSTM sentiment analysis model, which takes as input a tweet for which no crowd labels are available and predicts its sentiment.

First, we get the probabilistic training labels (training marginals) which are just the marginal estimates of the generative model:


In [14]:
train_marginals = gen_model.marginals(L_train)

In [15]:
from snorkel.annotations import save_marginals
save_marginals(session, L_train, train_marginals)


Saved 568 marginals

Next, we'll train a simple LSTM:


In [16]:
# from snorkel.learning import TextRNN - v0.6.3
from snorkel.learning.tensorflow import TextRNN # v0.7-beta

train_kwargs = {
    'lr':         0.01,
    'dim':        100,
    'n_epochs':   200,
    'dropout':    0.2,
    'print_freq': 5
}

lstm = TextRNN(seed=1701, cardinality=Tweet.cardinality)
train_cands = session.query(Tweet).filter(Tweet.split == 0).order_by(Tweet.id).all()
lstm.train(train_cands, train_marginals, **train_kwargs)


/home/vagrant/miniconda3/envs/snorkel/lib/python3.6/site-packages/snorkel/learning/tensorflow/rnn/rnn_base.py:36: UserWarning: Candidate 557 has argument past max length for model:	[arg ends at index 28; max len 28]
  warnings.warn('\t'.join([w.format(i), info]))
WARNING:tensorflow:From /home/vagrant/miniconda3/envs/snorkel/lib/python3.6/site-packages/snorkel/learning/tensorflow/noise_aware_model.py:77: softmax_cross_entropy_with_logits (from tensorflow.python.ops.nn_ops) is deprecated and will be removed in a future version.
Instructions for updating:

Future major versions of TensorFlow will allow gradients to flow
into the labels input on backprop by default.

See @{tf.nn.softmax_cross_entropy_with_logits_v2}.

/home/vagrant/miniconda3/envs/snorkel/lib/python3.6/site-packages/tensorflow/python/ops/gradients_impl.py:100: UserWarning: Converting sparse IndexedSlices to a dense Tensor of unknown shape. This may consume a large amount of memory.
  "Converting sparse IndexedSlices to a dense Tensor of unknown shape. "
[TextRNN] Training model
[TextRNN] n_train=568  #epochs=200  batch size=256
[TextRNN] Epoch 0 (0.97s)	Average loss=1.539561
[TextRNN] Epoch 5 (4.83s)	Average loss=0.178020
[TextRNN] Epoch 10 (8.63s)	Average loss=0.093538
[TextRNN] Epoch 15 (12.54s)	Average loss=0.039405
[TextRNN] Epoch 20 (16.29s)	Average loss=0.040064
[TextRNN] Epoch 25 (19.96s)	Average loss=0.037989
[TextRNN] Epoch 30 (23.68s)	Average loss=0.042556
[TextRNN] Epoch 35 (27.41s)	Average loss=0.026167
[TextRNN] Epoch 40 (31.10s)	Average loss=0.032155
[TextRNN] Epoch 45 (35.22s)	Average loss=0.031120
[TextRNN] Epoch 50 (38.93s)	Average loss=0.025568
[TextRNN] Epoch 55 (42.64s)	Average loss=0.020803
[TextRNN] Epoch 60 (46.39s)	Average loss=0.024915
[TextRNN] Epoch 65 (50.04s)	Average loss=0.020802
[TextRNN] Epoch 70 (53.71s)	Average loss=0.021762
[TextRNN] Epoch 75 (57.46s)	Average loss=0.022686
[TextRNN] Epoch 80 (61.32s)	Average loss=0.024695
[TextRNN] Epoch 85 (65.23s)	Average loss=0.043518
[TextRNN] Epoch 90 (69.04s)	Average loss=0.025970
[TextRNN] Epoch 95 (72.79s)	Average loss=0.025604
[TextRNN] Epoch 100 (76.77s)	Average loss=0.020448
[TextRNN] Epoch 105 (81.73s)	Average loss=0.025158
[TextRNN] Epoch 110 (85.81s)	Average loss=0.025862
[TextRNN] Epoch 115 (89.73s)	Average loss=0.022250
[TextRNN] Epoch 120 (93.43s)	Average loss=0.020078
[TextRNN] Epoch 125 (97.10s)	Average loss=0.023311
[TextRNN] Epoch 130 (100.84s)	Average loss=0.014641
[TextRNN] Epoch 135 (104.45s)	Average loss=0.029586
[TextRNN] Epoch 140 (108.11s)	Average loss=0.018842
[TextRNN] Epoch 145 (111.75s)	Average loss=0.020960
[TextRNN] Epoch 150 (115.39s)	Average loss=0.016926
[TextRNN] Epoch 155 (119.03s)	Average loss=0.036516
[TextRNN] Epoch 160 (122.70s)	Average loss=0.015394
[TextRNN] Epoch 165 (126.36s)	Average loss=0.016710
[TextRNN] Epoch 170 (130.04s)	Average loss=0.021226
[TextRNN] Epoch 175 (133.65s)	Average loss=0.013755
[TextRNN] Epoch 180 (137.29s)	Average loss=0.014988
[TextRNN] Epoch 185 (140.92s)	Average loss=0.017595
[TextRNN] Epoch 190 (144.55s)	Average loss=0.020424
[TextRNN] Epoch 195 (148.19s)	Average loss=0.023391
[TextRNN] Epoch 199 (151.09s)	Average loss=0.019140
[TextRNN] Training done (151.09s)

In [17]:
test_cands = session.query(Tweet).filter(Tweet.split == 1).order_by(Tweet.id).all()
accuracy = lstm.score(test_cands, test_cand_labels)
print("Accuracy: {:.10f}".format(accuracy))


Accuracy: 0.6250000000

We see that we're already close to the accuracy of an average crowd worker! If we wanted to improve the score, we could tune the LSTM model using grid search (see the Intro tutorial), use pre-trained word embeddings, or many other common techniques for getting state-of-the-art scores. Notably, we're doing this without using gold labels, but rather noisy crowd-labels!

For more, checkout the other tutorials!


In [ ]: