In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
%load_ext autotime

import warnings
warnings.filterwarnings('ignore')
import pandas as pd
from db_utils import query_hive_ssh
import re
import copy
from diff_utils import *
import time
import numpy as np
import os
import multiprocessing as mp

In [3]:
cols =  ['rev_comment', 'insertion', 'insert_only', 'rev_id', 'page_id', 'page_title', 'rev_timestamp', 'user_id', 'user_text']
nss = ['user', 'article']
datasets = []

for ns in nss:
    
    
    d1 = {
        'table':'blocked_talk_diff_no_admin',
        'partition':'ns=%s' % ns, 
        'ns': ns,
        'name': 'all_blocked_user',
        'cols' : cols
        }
    datasets.append(d1)
    
    
    d2 = {
        'table':'%s_talk_diff_no_admin_sample' % ns,
        'partition':'', 
        'ns': ns,
        'name': 'talk_diff_no_admin_sample',
        'cols' : cols,
        }
    datasets.append(d1)
    
    d3 = {
        'table':'talk_diff_no_admin',
        'partition':'ns=%s/year=2015' % ns, 
        'ns': ns,
        'name': 'talk_diff_no_admin_2015',
        'cols' : cols,
        }
    datasets.append(d3)
        
        
cols2 = ['rev_comment', 'insertion', 'insert_only', 'rev_id', 'page_id', 'page_title', 'rev_timestamp', 'user_id', 'user_text', 'bot', 'admin']           
datasets = []

for year in range(2001, 2016):
    for ns in nss:    
        d = {
            'table':'talk_diff',
            'partition':'ns=%s/year=%d' % (ns, year), 
            'ns': '%s' % ns,
            'name': 'talk_diff_%d' % year,
            'cols' : cols2
            }
        datasets.append(d)


time: 13.4 ms

In [4]:
for d in datasets:
    d['hdfs_path'] = '/user/hive/warehouse/enwiki.db/%(table)s/%(partition)s' % d
    d['stat2_path'] = '/home/ellery/detox/data/samples/%(ns)s/raw/%(name)s' % d
    d['raw_local_path'] = '/Users/ellerywulczyn/detox/data/samples/%(ns)s/raw/%(name)s/' % d
    d['clean_local_path'] = '/Users/ellerywulczyn/detox/data/samples/%(ns)s/clean/%(name)s/' % d


time: 1.76 ms

In [5]:
def transfer_partition(d, dry = False):
    
    if not dry:
        # transfer from HDFS to stat2
        cmd = "ssh stat1002.eqiad.wmnet 'rm -rf %s'" % d['stat2_path']
        print(os.system(cmd))
        cmd = "ssh stat1002.eqiad.wmnet 'hadoop fs -copyToLocal %s %s '" % (d['hdfs_path'], d['stat2_path'])
        print(os.system(cmd))
        #transfer from stat2 to local
        cmd = 'rm -rf %s' % d['raw_local_path']
        print(os.system(cmd))
        cmd = 'rsync -avz  stat1002.eqiad.wmnet:%s/* %s' % (d['stat2_path'], d['raw_local_path'])
        os.system(cmd)


time: 3.97 ms

In [7]:
for d in datasets:
    local_path = transfer_partition(d, dry = False)


0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
time: 5h 59min 59s

In [8]:
[d['raw_local_path'] for d in datasets]


Out[8]:
['/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2001/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2001/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2002/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2002/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2003/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2003/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2004/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2004/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2005/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2005/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2006/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2006/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2007/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2007/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2008/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2008/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2009/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2009/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2010/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2010/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2011/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2011/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2012/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2012/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2013/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2013/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2014/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2014/',
 '/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2015/',
 '/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2015/']
time: 5.76 ms

Cleaning and Filtering


In [9]:
def cf_helper(path, cols, k = 5):
    df = pd.read_csv(path, sep = '\t', quoting = 3, encoding = 'utf-8', header = None, usecols=range(len(cols)))
    if df.shape[0] ==0:
        return pd.DataFrame(columns = cols)
    if df.shape[1] != len(cols):
        print(path)
        print(df.shape)
        return pd.DataFrame(columns = cols)
    df.columns = cols
    df = df.assign(key = lambda x: np.random.randint(0, high=5*k, size=x.shape[0]))
    dfs = [e[1] for e in df.groupby('key')]
    p = mp.Pool(k)
    dfs = p.map(clean_and_filter, dfs)
    p.close()
    p.join()
    return pd.concat(dfs)


time: 10.1 ms

In [10]:
def clean_and_filter_parallel(d, k = 7):
    
    indir = d['raw_local_path']
    outdir = d['clean_local_path']
    os.system("rm -rf %s" % outdir)
    os.system("mkdir %s" % outdir)
    cols = d['cols']
    files = []
    for root, dirnames, filenames in os.walk(indir):
        for filename in filenames:
            if '_0' in filename:
                files.append(os.path.join(root, filename))
                
    for i, file in enumerate(files):            
        df = cf_helper(file, cols, k = k) 
        del df['key']
        df.to_csv(os.path.join(outdir, "chunk_%d.tsv" % i), sep = '\t', index = False)


time: 6.8 ms

In [11]:
for d in datasets:
    print(d['raw_local_path'])
    clean_and_filter_parallel(d)


/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2001/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2001/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2002/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2002/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2003/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2003/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2004/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2004/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2005/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2005/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2006/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2006/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2007/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2007/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2008/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2008/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2009/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2009/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2010/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2010/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2011/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2011/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2012/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2012/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2013/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2013/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2014/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2014/
/Users/ellerywulczyn/detox/data/samples/user/raw/talk_diff_2015/
/Users/ellerywulczyn/detox/data/samples/article/raw/talk_diff_2015/
time: 4h 2min 57s

Download block_events and blocked_users


In [19]:
query = """
SELECT 
      *
FROM
    enwiki.block_events
"""

block_events_df = query_hive_ssh(query, '../../data/block_events.tsv', priority = True, quoting=3, delete=False)
block_events_df.columns = [c.split('.')[1] for c in block_events_df.columns]

In [20]:
query = """
SELECT 
      *
FROM
    enwiki.blocked_user
"""

blocked_user_df = query_hive_ssh(query, '../../data/blocked_user.tsv', priority = True, quoting=3, delete=False)
blocked_user_df.columns = [c.split('.')[1] for c in blocked_user_df.columns]

Download NPA warnings


In [7]:
query = """
SELECT 
      *
FROM
    enwiki.npa_warnings
"""

npa_warnings_df = query_hive_ssh(query, '../../data/npa_warnings.tsv', priority = True, quoting=3, delete=False)
npa_warnings_df.columns = [c.split('.')[1] for c in npa_warnings_df.columns]

Download Long term Users


In [3]:
query = """
SELECT
    user_text,
    COUNT(*) AS num_days
FROM
    (SELECT
        user_text,
        day
    FROM
        (SELECT 
            rev_user_text AS user_text,
            SUBSTR(rev_timestamp,0,8) AS day
        FROM
            enwiki.revision
        WHERE
            rev_user != 0
            AND rev_timestamp <= '2015-01-01'
        ) a
    GROUP BY
        user_text,
        day ) b
GROUP BY
    user_text
HAVING
    COUNT(*) > 7 
"""

long_term_users_df = query_hive_ssh(query, '../../data/long_term_users.tsv', priority = True, quoting=3, delete=False)

Download Gender


In [ ]:
## Annotate users by gender
query = """
SELECT
    user_id,
    user_name as user_text,
    up_value as gender
FROM
    enwiki.user_properties p,
    enwiki.user u
WHERE 
    p.up_user = u.user_id
    AND up_property = 'gender'
"""
d_gender = query_analytics_store(query, {})
d_gender.to_csv('../../data/genders.tsv', sep = '\t', index = False)

Onionize all_blocked_user

We want to get the k posts before and after each block event for different values of [k1, k2, ..kn]. In order for us to grow k as we please without labeling headaches, we will create a file containing the k_i-1 through k_i posts for each block event that we have not yet labeled.


In [16]:
block_events_df = pd.read_csv('../../data/block_events.tsv', sep = "\t")
block_events_df.columns = [c.split('.')[1] for c in block_events_df.columns]


time: 98.8 ms

In [17]:
nss = ['user', 'article']
rel_path = '../../data/samples'


time: 5.49 ms

In [18]:
for ns in nss:
    infile = os.path.join(rel_path, ns, 'clean', 'all_blocked_user.tsv')
    out_dir = os.path.join(rel_path, ns, 'clean', 'blocked_user_onion')
    df = pd.read_csv(infile, sep = '\t')
    users = list(set(df['user_text']))
    print(len(users))

    k_prev = 0
    ks = [5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 150, 200, 250, 300, 500, 1000]
    dfs = {k:[] for k in ks}

    t1 = time.time()
    for i, user in enumerate(users):
        if i % 1000 ==0:
            print (i)
            print(time.time()-t1)
            t1 = time.time()
        df_user = df[df['user_text'] == user].sort_values(by='rev_timestamp')
        if df_user.shape[0] == 0:
            continue

        block_events_df_user = block_events_df[block_events_df['user_text']==user]
        seen_ids = set()


        for i,r in block_events_df_user.iterrows():
            ts = r['timestamp']
            for k in ks:
                df_user_pre = df_user[df_user['rev_timestamp'] <= ts][-k:]

                if df_user_pre.shape[0] > 0:
                    df_user_pre = df_user_pre[df_user_pre['rev_id'].apply(lambda x: x not in seen_ids )]
                    if df_user_pre.shape[0] > 0:
                        seen_ids.update(tuple(df_user_pre['rev_id']))
                        dfs[k].append(df_user_pre)

                df_user_post = df_user[df_user['rev_timestamp'] > ts][:k]
                if df_user_post.shape[0] > 0:
                    df_user_post = df_user_post[df_user_post['rev_id'].apply(lambda x: x not in seen_ids ) ]
                    if df_user_post.shape[0] > 0:
                        seen_ids.update(tuple(df_user_post['rev_id']))
                        dfs[k].append(df_user_post)

    dfs = {k: pd.concat(v) for k,v in dfs.items()}

    sizes = [(k, len(v)) for k,v in dfs.items()]
    sizes.sort(key=lambda x: x[0])
    print(sizes)


    os.system('rm -rf %s' % out_dir)
    os.system('mkdir  %s' % out_dir)

    for k, v in dfs.items():
        v.iloc[np.random.permutation(len(v))].to_csv(out_dir +'/%d.tsv' % k, sep = '\t', index=False)


12190
0
1.0013580322265625e-05
1000
146.72958707809448
2000
145.94880199432373
3000
146.34610795974731
4000
147.22265601158142
5000
146.68694591522217
6000
149.18577790260315
7000
146.5544879436493
8000
147.50978899002075
9000
147.3107430934906
10000
148.7514340877533
11000
147.40962505340576
12000
150.8360137939453
[(5, 50919), (10, 25188), (20, 35633), (30, 27717), (40, 24094), (50, 21628), (60, 19779), (70, 18399), (80, 17203), (90, 16166), (100, 15309), (150, 67494), (200, 58173), (250, 51180), (300, 45063), (500, 145860), (1000, 248597)]
6456
0
8.821487426757812e-06
1000
142.303564786911
2000
142.61061596870422
3000
142.2872679233551
4000
143.64273118972778
5000
141.68254113197327
6000
145.03327083587646
[(5, 33115), (10, 22105), (20, 34353), (30, 28441), (40, 24606), (50, 22164), (60, 20420), (70, 18997), (80, 17832), (90, 16820), (100, 16007), (150, 71386), (200, 60493), (250, 52953), (300, 46564), (500, 146053), (1000, 229950)]
time: 47min 34s

In [ ]: