In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
%load_ext autotime
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
from db_utils import query_hive_ssh
import re
import copy
from diff_utils import *
import time
import numpy as np
import os
import multiprocessing as mp
In [3]:
cols = ['rev_comment', 'insertion', 'insert_only', 'rev_id', 'page_id', 'page_title', 'rev_timestamp', 'user_id', 'user_text']
nss = ['user', 'article']
datasets = []
for ns in nss:
d1 = {
'table':'blocked_talk_diff_no_admin',
'partition':'ns=%s' % ns,
'ns': ns,
'name': 'all_blocked_user',
'cols' : cols
}
datasets.append(d1)
d2 = {
'table':'%s_talk_diff_no_admin_sample' % ns,
'partition':'',
'ns': ns,
'name': 'talk_diff_no_admin_sample',
'cols' : cols,
}
datasets.append(d1)
d3 = {
'table':'talk_diff_no_admin',
'partition':'ns=%s/year=2015' % ns,
'ns': ns,
'name': 'talk_diff_no_admin_2015',
'cols' : cols,
}
datasets.append(d3)
cols2 = ['rev_comment', 'insertion', 'insert_only', 'rev_id', 'page_id', 'page_title', 'rev_timestamp', 'user_id', 'user_text', 'bot', 'admin']
datasets = []
for year in range(2001, 2016):
for ns in nss:
d = {
'table':'talk_diff',
'partition':'ns=%s/year=%d' % (ns, year),
'ns': '%s' % ns,
'name': 'talk_diff_%d' % year,
'cols' : cols2
}
datasets.append(d)
In [4]:
for d in datasets:
d['hdfs_path'] = '/user/hive/warehouse/enwiki.db/%(table)s/%(partition)s' % d
d['stat2_path'] = '/home/ellery/detox/data/samples/%(ns)s/raw/%(name)s' % d
d['raw_local_path'] = '/Users/ellerywulczyn/detox/data/samples/%(ns)s/raw/%(name)s/' % d
d['clean_local_path'] = '/Users/ellerywulczyn/detox/data/samples/%(ns)s/clean/%(name)s/' % d
In [5]:
def transfer_partition(d, dry = False):
if not dry:
# transfer from HDFS to stat2
cmd = "ssh stat1002.eqiad.wmnet 'rm -rf %s'" % d['stat2_path']
print(os.system(cmd))
cmd = "ssh stat1002.eqiad.wmnet 'hadoop fs -copyToLocal %s %s '" % (d['hdfs_path'], d['stat2_path'])
print(os.system(cmd))
#transfer from stat2 to local
cmd = 'rm -rf %s' % d['raw_local_path']
print(os.system(cmd))
cmd = 'rsync -avz stat1002.eqiad.wmnet:%s/* %s' % (d['stat2_path'], d['raw_local_path'])
os.system(cmd)
In [7]:
for d in datasets:
local_path = transfer_partition(d, dry = False)
In [8]:
[d['raw_local_path'] for d in datasets]
Out[8]:
Cleaning and Filtering
In [9]:
def cf_helper(path, cols, k = 5):
df = pd.read_csv(path, sep = '\t', quoting = 3, encoding = 'utf-8', header = None, usecols=range(len(cols)))
if df.shape[0] ==0:
return pd.DataFrame(columns = cols)
if df.shape[1] != len(cols):
print(path)
print(df.shape)
return pd.DataFrame(columns = cols)
df.columns = cols
df = df.assign(key = lambda x: np.random.randint(0, high=5*k, size=x.shape[0]))
dfs = [e[1] for e in df.groupby('key')]
p = mp.Pool(k)
dfs = p.map(clean_and_filter, dfs)
p.close()
p.join()
return pd.concat(dfs)
In [10]:
def clean_and_filter_parallel(d, k = 7):
indir = d['raw_local_path']
outdir = d['clean_local_path']
os.system("rm -rf %s" % outdir)
os.system("mkdir %s" % outdir)
cols = d['cols']
files = []
for root, dirnames, filenames in os.walk(indir):
for filename in filenames:
if '_0' in filename:
files.append(os.path.join(root, filename))
for i, file in enumerate(files):
df = cf_helper(file, cols, k = k)
del df['key']
df.to_csv(os.path.join(outdir, "chunk_%d.tsv" % i), sep = '\t', index = False)
In [11]:
for d in datasets:
print(d['raw_local_path'])
clean_and_filter_parallel(d)
In [19]:
query = """
SELECT
*
FROM
enwiki.block_events
"""
block_events_df = query_hive_ssh(query, '../../data/block_events.tsv', priority = True, quoting=3, delete=False)
block_events_df.columns = [c.split('.')[1] for c in block_events_df.columns]
In [20]:
query = """
SELECT
*
FROM
enwiki.blocked_user
"""
blocked_user_df = query_hive_ssh(query, '../../data/blocked_user.tsv', priority = True, quoting=3, delete=False)
blocked_user_df.columns = [c.split('.')[1] for c in blocked_user_df.columns]
In [7]:
query = """
SELECT
*
FROM
enwiki.npa_warnings
"""
npa_warnings_df = query_hive_ssh(query, '../../data/npa_warnings.tsv', priority = True, quoting=3, delete=False)
npa_warnings_df.columns = [c.split('.')[1] for c in npa_warnings_df.columns]
In [3]:
query = """
SELECT
user_text,
COUNT(*) AS num_days
FROM
(SELECT
user_text,
day
FROM
(SELECT
rev_user_text AS user_text,
SUBSTR(rev_timestamp,0,8) AS day
FROM
enwiki.revision
WHERE
rev_user != 0
AND rev_timestamp <= '2015-01-01'
) a
GROUP BY
user_text,
day ) b
GROUP BY
user_text
HAVING
COUNT(*) > 7
"""
long_term_users_df = query_hive_ssh(query, '../../data/long_term_users.tsv', priority = True, quoting=3, delete=False)
In [ ]:
## Annotate users by gender
query = """
SELECT
user_id,
user_name as user_text,
up_value as gender
FROM
enwiki.user_properties p,
enwiki.user u
WHERE
p.up_user = u.user_id
AND up_property = 'gender'
"""
d_gender = query_analytics_store(query, {})
d_gender.to_csv('../../data/genders.tsv', sep = '\t', index = False)
all_blocked_userWe want to get the k posts before and after each block event for different values of [k1, k2, ..kn]. In order for us to grow k as we please without labeling headaches, we will create a file containing the k_i-1 through k_i posts for each block event that we have not yet labeled.
In [16]:
block_events_df = pd.read_csv('../../data/block_events.tsv', sep = "\t")
block_events_df.columns = [c.split('.')[1] for c in block_events_df.columns]
In [17]:
nss = ['user', 'article']
rel_path = '../../data/samples'
In [18]:
for ns in nss:
infile = os.path.join(rel_path, ns, 'clean', 'all_blocked_user.tsv')
out_dir = os.path.join(rel_path, ns, 'clean', 'blocked_user_onion')
df = pd.read_csv(infile, sep = '\t')
users = list(set(df['user_text']))
print(len(users))
k_prev = 0
ks = [5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 150, 200, 250, 300, 500, 1000]
dfs = {k:[] for k in ks}
t1 = time.time()
for i, user in enumerate(users):
if i % 1000 ==0:
print (i)
print(time.time()-t1)
t1 = time.time()
df_user = df[df['user_text'] == user].sort_values(by='rev_timestamp')
if df_user.shape[0] == 0:
continue
block_events_df_user = block_events_df[block_events_df['user_text']==user]
seen_ids = set()
for i,r in block_events_df_user.iterrows():
ts = r['timestamp']
for k in ks:
df_user_pre = df_user[df_user['rev_timestamp'] <= ts][-k:]
if df_user_pre.shape[0] > 0:
df_user_pre = df_user_pre[df_user_pre['rev_id'].apply(lambda x: x not in seen_ids )]
if df_user_pre.shape[0] > 0:
seen_ids.update(tuple(df_user_pre['rev_id']))
dfs[k].append(df_user_pre)
df_user_post = df_user[df_user['rev_timestamp'] > ts][:k]
if df_user_post.shape[0] > 0:
df_user_post = df_user_post[df_user_post['rev_id'].apply(lambda x: x not in seen_ids ) ]
if df_user_post.shape[0] > 0:
seen_ids.update(tuple(df_user_post['rev_id']))
dfs[k].append(df_user_post)
dfs = {k: pd.concat(v) for k,v in dfs.items()}
sizes = [(k, len(v)) for k,v in dfs.items()]
sizes.sort(key=lambda x: x[0])
print(sizes)
os.system('rm -rf %s' % out_dir)
os.system('mkdir %s' % out_dir)
for k, v in dfs.items():
v.iloc[np.random.permutation(len(v))].to_csv(out_dir +'/%d.tsv' % k, sep = '\t', index=False)
In [ ]: