In [ ]:
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
################################################################################
#
# Notebook to closely examine the workers marked as spammers.
In [ ]:
%matplotlib notebook
%load_ext autoreload
%autoreload 2
In [ ]:
from __future__ import division
from IPython.core.display import display, HTML
import argparse
import collections
import copy
import csv
import itertools
import math
import os
import os.path
import sys
import bs4
import matplotlib.pyplot as plt
import numpy as np
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname('__file__'), os.path.pardir)))
import third_party.krippendorff_alpha.krippendorff_alpha as krippendorff_alpha
import logs_processing.cohen_kappa as cohen_kappa
from logs_processing.fields import free_text_fields, non_english, orig_query, rel_column
In [ ]:
USE_CF_TRUST = False
DUMP_AGGREGATED_SCORES = False
PRINT_PER_WORKER_SCORES = False
SKIP_INCONSISTENT_WORKERS = False
MALICIOUS_WORKER_THRESHOLD = 0.3
SUSPICIOUS_WORKER_THRESHOLD = 0.66
MIN_JUDGEMENTS_PER_WORKER = 3
SKIP_UNCLEAR_ITEMS = False
In [ ]:
CF = '<DIRECTORY_WITH_EXPORTED_CROWD_FLOWER_RESULT_CSVs>'
INPUTS = ['f842336_A+R.part1.csv', 'f845369_A+R.part2.csv', 'f845808_A+R.part3.csv', 'f846814_A+R.part4.csv']
MODE = 'R' # 'D', 'A' or 'R'
DICTIONARY = '<DICTIONARY_OF_ENGLISH_WORDS.txt>'
DAYS = None # limit to some days
In [ ]:
def is_inconsistent(row):
bad_abd = 'bad_abandonment' in row['query'].split()
if bad_abd and row['main'] in ['D1', 'D2']:
return True
if bad_abd and row['no_detailed'] == 'D0.2':
return True
def is_malicious(row, mode, snippet_text):
if row['main'] in ['D1', 'D2']:
# TODO: fetch the doc and check the detailed answer for R.
for token in row['yes_detailed'].split():
if token in snippet_text:
return False
return True
elif row['main'] == ('D-1' if mode == 'D' else 'A-1'):
return row[non_english[mode]] not in row[orig_query[mode]]
elif row['main'] == ('D-2' if mode == 'D' else 'A-2'):
for token in row[non_english[mode]].split():
if token in snippet_text:
return False
return True
else:
return False
dictionary_contents = set()
with open(DICTIONARY) as f:
dictionary_contents = set(line.rstrip() for line in f)
def is_suspicious(row):
for f in free_text_fields:
text = row.get(f, '')
if len(text) > 0 and all(t.upper() not in dictionary_contents for t in text.split()):
return True
In [ ]:
rows = []
for input_f in INPUTS:
with open(os.path.join(CF, input_f)) as f:
rows += list(csv.DictReader(f))
In [ ]:
workers = collections.defaultdict(lambda: 0)
inconsistent_workers = collections.defaultdict(lambda: 0)
malicious_workers = collections.defaultdict(lambda: 0)
suspicious_workers = collections.defaultdict(lambda: 0)
log_id_to_snippet_text = {}
for row in rows:
if DAYS is not None and row['_started_at'].split()[0] not in DAYS:
continue
q = row[orig_query[MODE]]
worker_id = row['_worker_id']
workers[worker_id] += 1
if MODE == 'D' and is_inconsistent(row):
inconsistent_workers[worker_id] += 1
snippet = bs4.BeautifulSoup(row['snippet'], 'lxml')
snippet_text = snippet.get_text().encode('utf-8')
log_id_to_snippet_text[row['log_id']] = snippet_text
if is_malicious(row, MODE, snippet_text):
malicious_workers[worker_id] += 1
if is_suspicious(row):
suspicious_workers[worker_id] += 1
In [ ]:
def normalized(d):
return collections.defaultdict(lambda: 0, {k: (v / workers[k]) for k, v in d.iteritems() if workers[k] >= MIN_JUDGEMENTS_PER_WORKER})
inconsistent_workers = normalized(inconsistent_workers)
malicious_workers = normalized(malicious_workers)
suspicious_workers = normalized(suspicious_workers)
In [ ]:
def format_xyz_workers(xyz_workers, reverse=True):
return '\n'.join('worker_id: %s; value: %.2f' % (k, v) for k, v in sorted(
xyz_workers.iteritems(), key=lambda p: p[1], reverse=reverse)[:10],)
print 'Inconsistent:'
print format_xyz_workers(inconsistent_workers)
print '-' * 80
print 'Malicious:'
print format_xyz_workers(malicious_workers)
print '-' * 80
print 'Both:', set(inconsistent_workers.iterkeys()) & set(malicious_workers.iterkeys())
print '-' * 80
print 'Suspicious:'
print format_xyz_workers(suspicious_workers)
In [ ]:
labels = collections.defaultdict(lambda: {})
item_scores = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
potential_bad_abandonments = collections.defaultdict(lambda: {'num': 0, 'denom': 0})
aggregated = {}
num_skipped_labels = 0
skipped_workers = set()
worker_trust = {}
for row in rows:
worker_id = row['_worker_id']
log_id = row['log_id']
if workers[worker_id] < MIN_JUDGEMENTS_PER_WORKER \
or SKIP_INCONSISTENT_WORKERS and worker_id in inconsistent_workers \
or malicious_workers[worker_id] > MALICIOUS_WORKER_THRESHOLD \
or suspicious_workers[worker_id] > SUSPICIOUS_WORKER_THRESHOLD:
num_skipped_labels += 1
skipped_workers.add(worker_id)
continue
try:
relevance = int(row[rel_column[MODE]][1:])
except ValueError:
relevance = -4
if relevance == 0:
potential_bad_abandonments[row[orig_query[MODE]]]['denom'] += 1
if row.get('no_detailed') == 'D0.1' or 'bad_abandonment' in row['query'].split():
potential_bad_abandonments[row[orig_query[MODE]]]['num'] += 1
trust = float(row['_trust'])
worker_trust[worker_id] = trust
item_scores[log_id][relevance] += trust if USE_CF_TRUST else 1
labels[worker_id][log_id] = relevance
if DUMP_AGGREGATED_SCORES:
aggregated.setdefault(log_id,
{'query': row[orig_query[args.mode]], 'url': row['link'],
'text': log_id_to_snippet_text[log_id],
'r': {}, 'ambiguous': 0, 'bad_abandonment': 0})
aggregated[log_id]['r'].setdefault(relevance, 0)
aggregated[log_id]['r'][relevance] += 1
aggregated[log_id]['ambiguous'] += 'ambiguous' in row['query'].split()
aggregated[log_id]['bad_abandonment'] += 'bad_abandonment' in row['query'].split()
In [ ]:
print '%d workers should be skipped' % len(skipped_workers)
global_spammers_here = set(workers.iterkeys()) & spammers
print '%d global spammers found' % len(global_spammers_here)
print '%d of global spammers are not counted here' % len(global_spammers_here - skipped_workers)
In [ ]:
clarity_scores = {k: max(v.itervalues()) / sum(v.itervalues()) for k, v in item_scores.iteritems()}
min_score = np.mean(clarity_scores.values()) - np.std(clarity_scores.values())
unclear_items = set(
(log_id for log_id, score in clarity_scores.iteritems() if score < min_score))
if not USE_CF_TRUST:
worker_item_scores = {}
worker_worker_scores = {}
for worker_id, w_labels in labels.iteritems():
worker_cosines = []
for log_id, r in w_labels.iteritems():
if SKIP_UNCLEAR_ITEMS and log_id in unclear_items:
continue
item_score = copy.copy(item_scores[log_id])
# exclude the current worker:
item_score[r] -= 1
# normalize
denom = sum(item_score.itervalues())
if denom > 0:
worker_cosines.append(item_score[r] / denom)
worker_item_scores[worker_id] = np.mean(worker_cosines)
# Now, to worker-worker scores.
worker_worker_cosines = []
for worker_id2, w_labels2 in labels.iteritems():
cosine_w_w2 = []
if worker_id2 == worker_id:
continue
for log_id, r in w_labels.iteritems():
if (SKIP_UNCLEAR_ITEMS and log_id in unclear_items) or log_id not in w_labels2:
continue
cosine_w_w2.append(1.0 if w_labels2[log_id] == r else 0.0)
if len(cosine_w_w2) > 0:
worker_worker_cosines.append(np.mean(cosine_w_w2))
worker_worker_scores[worker_id] = np.mean(worker_worker_cosines)
In [ ]:
print 'Lowest worker-item scores:'
print format_xyz_workers(worker_item_scores, reverse=False)
print '-' * 80
print 'Lowest worker-worker scores:'
print format_xyz_workers(worker_worker_scores, reverse=False)
In [ ]:
X = np.array([worker_item_scores.get(w, 0) for w in workers])
Y = np.array([worker_worker_scores.get(w, 0) for w in workers])
colors = np.array([('red' if w in skipped_workers | global_spammers_here else 'green') for w in workers])
fig, ax = plt.subplots()
ax.scatter(X, Y, c=colors)
plt.show()
In [ ]:
MIN_W_I_SCORE = 0.0
MIN_W_W_SCORE = 0.0
In [ ]:
l_values = []
filtered_workers = set()
w_filtered = len(skipped_workers)
l_filtered = num_skipped_labels
num_labels = num_skipped_labels
for w in workers:
if w in skipped_workers:
continue
num_labels += len(labels[w])
if (w in global_spammers_here or
worker_item_scores[w] < MIN_W_I_SCORE or
worker_worker_scores[w] < MIN_W_W_SCORE):
l_filtered += len(labels[w])
w_filtered += 1
filtered_workers.add(w)
else:
l_values.append(labels[w])
print '%d workers filtered out of %d (%.1f%%)' % (
w_filtered, len(workers), w_filtered / len(workers) * 100)
print '%d labels filtered out of %d (%.1f%%)' % (
l_filtered, num_labels, l_filtered / num_labels * 100)
In [ ]:
print 'Average Cohen\'s kappa: %f' % cohen_kappa.cohen_kappa(l_values, missing_functor=lambda x: x < 0)
In [ ]:
print 'Krippendorf\'s alpha: %f' % krippendorff_alpha.krippendorff_alpha(l_values, missing_functor=lambda x: x < 0)