In this tutorial, we'll provide a simple walkthrough of how to use Snorkel to resolve conflicts in a noisy crowdsourced dataset for a sentiment analysis task, and then use these denoised labels to train an LSTM sentiment analysis model which can be applied to new, unseen data to automatically make predictions!
Specifically, we'll look at:
Candidates
, Contexts
, and Labels
GenerativeModel
to resolve labeling conflictsNote that this is a simple tutorial meant to give an overview of the mechanics of using Snorkel-- we'll note places where more careful fine-tuning could be done!
PySpark
Please see the official instructions!
In this tutorial we focus on the Weather sentiment task from Crowdflower.
In this task, contributors were asked to grade the sentiment of a particular tweet relating to the weather. Contributors could choose among the following categories:
The catch is that 20 contributors graded each tweet. Thus, in many cases contributors assigned conflicting sentiment labels to the same tweet.
The task comes with two data files (to be found in the data
directory of the tutorial:
In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import os
import numpy as np
from snorkel import SnorkelSession
session = SnorkelSession()
First, we initialize a SparkSession
, which manages a connection to a local Spark master which allows us to preprocess the raw data and prepare convert them to the necessary Snorkel
format:
In [2]:
# Initialize Spark Environment and Spark SQL
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
spark = SparkSession \
.builder \
.master("local") \
.appName("Snorkel Crowdsourcing Demo") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
We can now load the raw data for our crowdsourcing task (stored in a local csv file) into a dataframe.
In [3]:
# Load Raw Crowdsourcing Data
raw_crowd_answers = spark.read.format("csv").option("header", "true").csv("data/weather-non-agg-DFE.csv")
raw_crowd_answers.printSchema()
# Load Groundtruth Crowdsourcing Data
gold_crowd_answers = spark.read.format("csv").option("header", "true").csv("data/weather-evaluated-agg-DFE.csv")
gold_crowd_answers.createOrReplaceTempView("gold_crowd_answers")
# Filter out low-confidence answers
gold_answers = spark.sql("SELECT tweet_id, sentiment, tweet_body FROM gold_crowd_answers WHERE correct_category ='Yes' and correct_category_conf = 1").orderBy("tweet_id")
# Keep Only the Tweets with Available Groundtruth
candidate_labeled_tweets = raw_crowd_answers.join(gold_answers, raw_crowd_answers.tweet_id == gold_answers.tweet_id).select(raw_crowd_answers.tweet_id,raw_crowd_answers.tweet_body,raw_crowd_answers.worker_id,raw_crowd_answers.emotion)
As mentioned above, contributors can provide conflicting labels for the same tweet:
In [4]:
candidate_labeled_tweets.select("worker_id", "emotion", "tweet_body").orderBy("tweet_id").show()
In [5]:
from snorkel.models import candidate_subclass
values = list(map(
lambda r: r.emotion,
candidate_labeled_tweets.select("emotion").distinct().collect()
))
Tweet = candidate_subclass('Tweet', ['tweet'], values=values)
Contexts
All Candidate
objects point to one or more Context
objects, which represent the raw data that they are rooted in. In this case, our candidates will each point to a single Context
object representing the raw text of the tweet.
Once we have defined the Context
for each Candidate
, we can commit them to the database. Note that we also split into two sets while doing this:
Training set (split=0
): The tweets for which we have noisy, conflicting crowd labels; we will resolve these conflicts using the GenerativeModel
and then use them as training data for the LSTM
Test set (split=1
): We will pretend that we do not have any crowd labels for this split of the data, and use these to test the LSTM's performance on unseen data
In [6]:
from snorkel.models import Context, Candidate
from snorkel.contrib.models.text import RawText
# Make sure DB is cleared
session.query(Context).delete()
session.query(Candidate).delete()
# Now we create the candidates with a simple loop
tweet_bodies = candidate_labeled_tweets \
.select("tweet_id", "tweet_body") \
.orderBy("tweet_id") \
.distinct()
# Generate and store the tweet candidates to be classified
# Note: We split the tweets in two sets: one for which the crowd
# labels are not available to Snorkel (test, 10%) and one for which we assume
# crowd labels are obtained (to be used for training, 90%)
total_tweets = tweet_bodies.count()
test_split = total_tweets*0.1
for i, t in enumerate(tweet_bodies.collect()):
split = 1 if i <= test_split else 0
raw_text = RawText(stable_id=t.tweet_id, name=t.tweet_id, text=t.tweet_body)
tweet = Tweet(tweet=raw_text, split=split)
session.add(tweet)
session.commit()
In [7]:
from snorkel.annotations import LabelAnnotator
from collections import defaultdict
# Extract worker votes
# Cache locally to speed up for this small set
worker_labels = candidate_labeled_tweets.select("tweet_id", "worker_id", "emotion").collect()
wls = defaultdict(list)
for row in worker_labels:
wls[row.tweet_id].append((row.worker_id, row.emotion))
# Create a label generator
def worker_label_generator(t):
"""A generator over the different (worker_id, label_id) pairs for a Tweet."""
for worker_id, label in wls[t.tweet.name]:
yield worker_id, label
labeler = LabelAnnotator(label_generator=worker_label_generator)
%time L_train = labeler.apply(split=0)
L_train
Out[7]:
Finally, we load the ground truth ("gold") labels for both the training and test sets, and store as numpy arrays"
In [8]:
gold_labels = defaultdict(list)
# Get gold labels in verbose form
verbose_labels = dict([(t.tweet_id, t.sentiment)
for t in gold_answers.select("tweet_id", "sentiment").collect()])
# Iterate over splits, align with Candidate ordering
for split in range(2):
cands = session.query(Tweet).filter(Tweet.split == split).order_by(Tweet.id).all()
for c in cands:
gold_labels[split].append(values.index(verbose_labels[c.tweet.name]) + 1)
train_cand_labels = np.array(gold_labels[0])
test_cand_labels = np.array(gold_labels[1])
Until now we have converted the raw crowdsourced data into a labeling matrix that can be provided as input to Snorkel
. We will now show how to:
Snorkel's
generative model to learn the accuracy of each crowd contributor.
In [9]:
# Imports
from snorkel.learning.gen_learning import GenerativeModel
# Initialize Snorkel's generative model for
# learning the different worker accuracies.
gen_model = GenerativeModel(lf_propensity=True)
In [10]:
# Train the generative model
gen_model.train(
L_train,
reg_type=2,
reg_param=0.1,
epochs=30
)
In [11]:
accuracy = gen_model.score(L_train, train_cand_labels)
print("Accuracy: {:.10f}".format(accuracy))
In [12]:
from collections import Counter
# Collect the majority vote answer for each tweet
mv = []
for i in range(L_train.shape[0]):
c = Counter([L_train[i,j] for j in L_train[i].nonzero()[1]])
mv.append(c.most_common(1)[0][0])
mv = np.array(mv)
# Count the number correct by majority vote
n_correct = np.sum([1 for i in range(L_train.shape[0]) if mv[i] == train_cand_labels[i]])
print("Accuracy: {:.10f}".format(n_correct / float(L_train.shape[0])))
print("Number incorrect: {:.0f}".format(L_train.shape[0] - n_correct))
We see that while majority vote makes 9 errors, the Snorkel model makes only 2! What about an average crowd worker?
In [13]:
accs = []
for j in range(L_train.shape[1]):
n_correct = np.sum([1 for i in range(L_train.shape[0]) if L_train[i,j] == train_cand_labels[i]])
acc = n_correct / float(L_train[:,j].nnz)
accs.append(acc)
print("Mean Accuracy: {:.10f}".format(np.mean(accs)))
In the previous step, we saw that Snorkel's generative model can help to denoise crowd labels automatically. However, what happens when we don't have noisy crowd labels for a tweet?
In this step, we'll use the estimates of the generative model as probabilistic training labels to train a simple LSTM sentiment analysis model, which takes as input a tweet for which no crowd labels are available and predicts its sentiment.
First, we get the probabilistic training labels (training marginals) which are just the marginal estimates of the generative model:
In [14]:
train_marginals = gen_model.marginals(L_train)
In [15]:
from snorkel.annotations import save_marginals
save_marginals(session, L_train, train_marginals)
Next, we'll train a simple LSTM:
In [16]:
# from snorkel.learning import TextRNN - v0.6.3
from snorkel.learning.tensorflow import TextRNN # v0.7-beta
train_kwargs = {
'lr': 0.01,
'dim': 100,
'n_epochs': 200,
'dropout': 0.2,
'print_freq': 5
}
lstm = TextRNN(seed=1701, cardinality=Tweet.cardinality)
train_cands = session.query(Tweet).filter(Tweet.split == 0).order_by(Tweet.id).all()
lstm.train(train_cands, train_marginals, **train_kwargs)
In [17]:
test_cands = session.query(Tweet).filter(Tweet.split == 1).order_by(Tweet.id).all()
accuracy = lstm.score(test_cands, test_cand_labels)
print("Accuracy: {:.10f}".format(accuracy))
We see that we're already close to the accuracy of an average crowd worker! If we wanted to improve the score, we could tune the LSTM model using grid search (see the Intro tutorial), use pre-trained word embeddings, or many other common techniques for getting state-of-the-art scores. Notably, we're doing this without using gold labels, but rather noisy crowd-labels!
For more, checkout the other tutorials!
In [ ]: