In [1]:
"""
Initialization
"""
'''
Standard modules
'''
import os
import sqlite3
import csv
import time
import codecs
from pprint import pprint
'''
Analysis modules
'''
import pandas as pd
'''
Custom modules
'''
import config
import utilities
'''
Misc
'''
nb_name = '20171002-daheng-load_and_prepare_data'
In [2]:
"""
Register
MENG_NEWS_TWEETS_DIR = os.path.join(DATA_DIR, 'raw-news_tweets-meng') DEPRECATED
ORIGINAL_NEWS_TWEETS_DIR = os.path.join(DATA_DIR, 'raw-news_tweets-original')
in config.
"""
Out[2]:
Raw data files are located inside ORIGINAL_NEWS_TWEETS_DIR folder.
In [3]:
! ls -1 ./data/raw-news_tweets-original/
In [4]:
! ls ./data/raw-news_tweets-original/dataset1/news
In [5]:
! ls ./data/raw-news_tweets-original/dataset1/tweets/2014-11-18/
In [6]:
! head -1 ./data/raw-news_tweets-original/dataset1/news/2014-11-18.txt
In [7]:
! head -3 ./data/raw-news_tweets-original/dataset1/tweets/2014-11-18/2685_Missouris_Nixon_Declares_State_of_Emergency_Awaiting_Grand_Jury-Businessweek
In [8]:
"""
Register
NEWS_TWEETS_DDL_FILE = os.path.join(DATA_DIR, 'original-news_tweets.schema.sql')
in config.
"""
Out[8]:
In [9]:
! cat ./data/original-news_tweets.schema.sql
In [10]:
"""
Register
NEWS_TWEETS_DB = os.path.join(DATA_DIR, 'news_tweets-meng.db')
in config.
"""
def check_db():
db_exists = os.path.exists(config.NEWS_TWEETS_DB_FILE)
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
if not db_exists:
print('No db exists. Creating new db and tables...')
with open(config.NEWS_TWEETS_DDL_FILE, 'r') as f:
schema = f.read()
conn.executescript(schema)
else:
print('db already exists!')
In [11]:
if 1 == 1:
check_db()
In [13]:
%%time
"""
Parse and load original news data
Note: should be executed two times
- config.ORIGINAL_NEWS_TWEETS_DIR/dataset1/news/
- config.ORIGINAL_NEWS_TWEETS_DIR/dataset2/news/
"""
if 0 == 1:
'''
Get all [date].txt news files
'''
news_dir = os.path.join(config.ORIGINAL_NEWS_TWEETS_DIR, 'dataset1', 'news')
news_files = os.listdir(news_dir)
# sort files by date
news_files.sort(key=lambda x: time.strptime(x, '%Y-%m-%d.txt'))
'''
Define news file line format information
'''
csv.register_dialect('original_news_line', delimiter='\t', doublequote=False, quoting=csv.QUOTE_NONE)
# NOTE: line_index 7 is news_abstract! news_doc should be line_index 8
# line_index 9 is entities for news_doc in format of [entity_name]:[entity_type]:[frequency]
news_line_index_dict = {'news_native_id': 0, 'news_url': 1, 'news_title': 2, 'news_source': 3,
'news_post_time': 4, 'news_keywords': 6, 'news_doc': 8, 'news_entities': 9}
query = '''
insert into news (news_native_id, news_url, news_title, news_source, news_post_time, news_collected_time, news_keywords, news_doc, news_entities)
values (?, ?, ?, ?, ?, ?, ?, ?, ?)
'''
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
cursor = conn.cursor()
'''
For each news file, read-in, parse, and load into db news table
'''
for news_file in news_files:
# parse out news collected date from the name of the file
news_collected_time = os.path.split(news_file)[1].split('.')[0]
with open(os.path.join(news_dir, news_file), 'r') as f:
news_tpl_lst = []
news_lines = csv.reader(f, dialect='original_news_line')
for news_line in news_lines:
# check format of news article
if len(news_line) == 10:
news_tpl = (news_line[news_line_index_dict['news_native_id']],
news_line[news_line_index_dict['news_url']],
news_line[news_line_index_dict['news_title']],
news_line[news_line_index_dict['news_source']],
utilities.parse_news_post_time(news_line[news_line_index_dict['news_post_time']]),
news_collected_time,
news_line[news_line_index_dict['news_keywords']],
news_line[news_line_index_dict['news_doc']],
news_line[news_line_index_dict['news_entities']])
news_tpl_lst.append(news_tpl)
cursor.executemany(query, news_tpl_lst)
In [12]:
%%time
"""
Parse and load original tweets data
"""
if 0 == 1:
'''
Retrieve all valid dates for news
'''
news_valid_dates_lst = []
query_news_valid_dates = '''
select distinct news_collected_time from news
order by news_collected_time;'''
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute(query_news_valid_dates)
news_valid_dates_lst = [item[0] for item in cursor.fetchall()]
'''
Tmp var to manipulate with valid date lst in case of unexpected errors when building db
Also see Notes section
'''
tmp_lst = []
for news_valid_date in news_valid_dates_lst:
if news_valid_date not in config.ORIGINAL_TWEETS_ERROR_DATES_LST:
tmp_lst.append(news_valid_date)
news_valid_dates_lst = tmp_lst
'''
Establie conn to db for each news date and commit inserts
'''
for news_valid_date in news_valid_dates_lst:
localtime = time.asctime(time.localtime(time.time()))
print('Processing tweets associated with news on {} ({})...'.format(news_valid_date, localtime))
'''
Retrieve news_native_id from db
'''
news_native_ids_lst = []
query_select_news = '''
select news_native_id from news
where news_collected_time = :news_valid_date
order by news_native_id asc;'''
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute(query_select_news, {'news_valid_date': news_valid_date})
news_native_ids_lst = [item[0] for item in cursor.fetchall()]
'''
Define tweets file line format information
'''
csv.register_dialect('original_tweet_line', delimiter='\t', doublequote=False, quoting=csv.QUOTE_NONE)
tweet_line_index_dict = {'tweet_native_id': 0, 'tweet_text': 1, 'tweet_post_time': 2}
query_insert_tweets = '''insert into tweets (tweet_native_id, tweet_text, tweet_post_time, tweet_collected_time, news_native_id) values (?, ?, ?, ?, ?)'''
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
cursor = conn.cursor()
'''
For each news:
- locate corresponding tweets file
- read-in file lines
- parse content
- load into tweets table
'''
for news_ind, news_native_id in enumerate(news_native_ids_lst):
news_collected_time = news_valid_date
# print out progress info every 100 news articles
# if not news_ind % 100:
# print('Processing tweets associated with news {} on {} ({}/{})...'.format(news_native_id, news_collected_time, news_ind+1, len(news_native_ids_lst)))
# determine if the news_tweets file is in dataset1 folder or dataset2 folder
if news_collected_time <= '2015-06-05':
news_tweets_file_dir = os.path.join(config.ORIGINAL_NEWS_TWEETS_DIR, 'dataset1', 'tweets', news_collected_time)
else:
news_tweets_file_dir = os.path.join(config.ORIGINAL_NEWS_TWEETS_DIR, 'dataset2', 'tweets', news_collected_time)
# find out the name of the news_tweets file
tweets_files_lst = [file for file in os.listdir(news_tweets_file_dir) if file.startswith(str(news_native_id))]
# if news_tweets file exists and its size larger than 0
if tweets_files_lst and os.stat(os.path.join(news_tweets_file_dir, tweets_files_lst[0])).st_size > 0:
news_tweets_file = tweets_files_lst[0]
with open(os.path.join(news_tweets_file_dir, news_tweets_file), 'r') as f:
tweets_tpl_lst = []
# get rid of '\0' (Error: line contains NULL byte)
tweet_lines = csv.reader((line.replace('\0','') for line in f), dialect='original_tweet_line')
for tweet_line in tweet_lines:
# filter out non-complete tweet lines and retweets
if len(tweet_line) >=3 and not tweet_line[tweet_line_index_dict['tweet_text']].startswith('RT'):
# build tpl for tweets table
tweet_tpl = (tweet_line[tweet_line_index_dict['tweet_native_id']],
tweet_line[tweet_line_index_dict['tweet_text']],
utilities.parse_tweet_post_time(tweet_line[tweet_line_index_dict['tweet_post_time']]),
news_collected_time,
news_native_id)
tweets_tpl_lst.append(tweet_tpl)
if tweets_tpl_lst:
cursor.executemany(query_insert_tweets, tweets_tpl_lst)
In [14]:
"""
Twitter internal server was unstable on some dates.
Tweets data collected on these dates contain errors cannot be parsed.
Delete corresponding news data on these dates.
Also see Notes section
"""
if 0 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
cursor = conn.cursor()
query_delete_news = '''
delete from news where news_collected_time = :news_collected_time
'''
for original_tweets_error_date in config.ORIGINAL_TWEETS_ERROR_DATES_LST:
print('Delete news on {} ...'.format(original_tweets_error_date))
cursor.execute(query_delete_news, {'news_collected_time': original_tweets_error_date})
In [15]:
%%time
"""
Build indexes on news table news_native_id, news_title, news_post_time, and news_collected_time fields.
"""
if 0 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
cursor = conn.cursor()
queries_lst = ['create index news_news_native_id on news(news_native_id);',
'create index news_news_title on news(news_title);',
'create index news_news_post_time on news(news_post_time);',
'create index news_news_collected_time on news(news_collected_time);']
for query in queries_lst:
cursor.execute(query)
In [12]:
%%time
"""
Build indexes on tweets table tweet_native_id, tweet_post_time, tweet_collected_time, and news_native_id fields.
"""
if 0 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
cursor = conn.cursor()
queries_lst = ['create index tweets_tweet_native_id on tweets(tweet_native_id);',
'create index tweets_tweet_post_time on tweets(tweet_post_time);',
'create index tweets_tweet_collected_time on tweets(tweet_collected_time);',
'create index tweets_news_native_id on tweets(news_native_id);']
for query_ind, query in enumerate(queries_lst):
print('Building index {}/{} ...'.format(query_ind+1, len(queries_lst)))
cursor.execute(query)
In [2]:
"""
List out tables and indices
"""
if 1 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
cursor = conn.cursor()
query = """
select name from sqlite_master
where type = 'table';
"""
cursor.execute(query)
print('TABLES:')
print(cursor.fetchall())
query = """
select name from sqlite_master
where type = 'index'
"""
cursor.execute(query)
print('INDICES:')
print(cursor.fetchall())
In [ ]:
%%time
"""
DEPRECATED
Load news data by Meng
Note: should be executed two times
- data/raw-news_tweets-meng/dataset1/news.txt
- data/raw-news_tweets-meng/dataset2/news.txt
"""
if 0 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
with open(os.path.join(config.DATA_DIR, 'raw-news_tweets-meng/dataset2/news.txt'), 'r') as f:
news_df_chunksize = 10000
for df_chunk in pd.read_csv(f, sep='\t', chunksize=news_df_chunksize, iterator=True):
column_names = {
'POST_TIME': 'post_time',
'NEWS_SOURCE': 'news_source',
'NEWS_TITLE': 'news_title',
'NEWS_DOC': 'news_doc'
}
df_chunk = df_chunk.rename(columns=column_names)
df_chunk.to_sql(name='news', con=conn, if_exists='append', index=False)
In [ ]:
%%time
"""
DEPRECATED
Load tweets data by Meng
Note: should be executed two times
- data/raw-news_tweets-meng/dataset1/tweets.txt
- data/raw-news_tweets-meng/dataset2/tweets.txt
"""
if 0 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
with open(os.path.join(config.DATA_DIR, 'raw-news_tweets-meng/dataset2/tweets.txt'), 'r') as f:
tweets_df_chunksize = 100000
for df_chunk in pd.read_csv(f, sep='\t', chunksize=tweets_df_chunksize, iterator=True):
column_names = {
'POST_TIME': 'post_time',
'TWEET_TEXT': 'tweet_text'
}
df_chunk = df_chunk.rename(columns=column_names)
df_chunk.to_sql(name='tweets', con=conn, if_exists='append', index=False)
In [18]:
%%time
"""
Check number of news per day
"""
if 1 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = '''
select date(news_collected_time) as news_date, count(news_id) as news_num
from news
group by date(news_collected_time);
'''
cursor.execute(query)
for row in cursor.fetchall():
print('{}: {}'.format(row['news_date'], row['news_num']))
In [38]:
"""
Check any single news article
"""
if 1 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
'''
Note:
- News collected until 2015-03-17 have empty news_entities field.
- Straing from news_native_id = '34583' on 2015-03-18, most news have news_entities field information (with few exceptions each day).
'''
query = """select * from news where news_native_id = '34583';"""
cursor.execute(query)
result = cursor.fetchone()
row_keys_lst = [item[0] for item in cursor.description]
for row_ind, row in enumerate(result):
print('({}/{}) {}: {}'.format(row_ind+1, len(result), row_keys_lst[row_ind], row))
# for row in cursor.fetchall():
# print('{}: {}'.format(row['news_native_id'], row['news_entities']))
In [16]:
%%time
"""
Check number of tweets on a given date
"""
if 1 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
tweet_collected_time = '2015-04-03'
query = """
select tweet_collected_time, count(tweet_id) as tweets_num
from tweets
where tweet_collected_time = :tweet_collected_time;
"""
cursor.execute(query, {'tweet_collected_time': tweet_collected_time})
for row in cursor.fetchall():
print('{}: {}'.format(row['tweet_collected_time'], row['tweets_num']))
In [17]:
%%time
"""
Check number of tweets per day
"""
if 1 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = """
select tweet_collected_time, count(tweet_id) as tweets_num
from tweets
group by tweet_collected_time;
"""
cursor.execute(query)
for row in cursor.fetchall():
print('{}: {}'.format(row['tweet_collected_time'], row['tweets_num']))
In [18]:
%%time
"""
Check number of tweets per news
"""
if 1 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = """
select news_native_id, count(tweet_id) as tweets_num
from tweets
where tweet_collected_time < '2014-11-20'
group by news_native_id;
"""
cursor.execute(query)
for row in cursor.fetchall():
print('{}: {}'.format(row['news_native_id'], row['tweets_num']))
In [19]:
%%time
"""
Check tweets for a given news
"""
if 1 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
cursor = conn.cursor()
query = """
select tweet_native_id, tweet_text, tweet_post_time, tweet_collected_time from tweets
where tweets.news_native_id = :news_native_id;
"""
cursor.execute(query, {'news_native_id': 3098})
for row in cursor.fetchall():
print(row)
In [2]:
%%time
"""
Build pickle for news data over selected period.
Register
NEWS_PERIOD_DF_PKL = os.path.join(DATA_DIR, 'news-period.df.pkl')
in config.
"""
if 0 == 1:
with sqlite3.connect(config.NEWS_TWEETS_DB_FILE) as conn:
query = """
select news_id, news_native_id, news_title, news_post_time, news_collected_time, news_keywords, news_doc, news_entities from news
where news_collected_time <= '2015-04-14';
"""
news_period_df = pd.read_sql_query(sql=query,con=conn,parse_dates=['news_post_time', 'news_collected_time'])
'''
Remove news with empty news_title and news_doc field
'''
news_period_df = news_period_df[news_period_df['news_title'] != '']
news_period_df = news_period_df[news_period_df['news_doc'] != '']
'''
Make pickle
'''
news_period_df.to_pickle(config.NEWS_PERIOD_DF_PKL)
In [3]:
%%time
"""
Test recover df pickle
"""
if 1 == 1:
news_period_df = pd.read_pickle(config.NEWS_PERIOD_DF_PKL)
In [4]:
news_period_df
Out[4]:
In [5]:
"""
Check number of news per day
"""
with pd.option_context('display.max_rows', 150):
print(news_period_df.groupby(['news_collected_time']).size())
In [6]:
"""
Check number of news with empty ‘news_entities’ field per day
"""
with pd.option_context('display.max_rows', 150):
print(news_period_df[news_period_df['news_entities'] == ''].groupby(['news_collected_time']).size())
In [7]:
news_period_df.dtypes
Out[7]: