In [1]:
import pandas as pd
import requests
from bs4 import BeautifulSoup as BSoup
from datetime import datetime
OK = True
BAD = False
In [2]:
def check_response_ok (response):
"""Checks the validity of the response object and returns OK (True) or BAD (False).
If response object is bad, write a message to 'IBD_errors.log'"""
import sys
try:
response.raise_for_status ()
return OK
except:
f = open ("IBD_errors.log", "a")
f.write ('{:%m/%d/%Y %H:%M:%S}'.format (datetime.now ()))
f.write (" -- {}:\n\t\t{}".format (sys.exc_info ()[0], response.url))
return BAD
return BAD
In [3]:
def get_list_sec_filings ():
"""Generate the list of index files archived in EDGAR since start_year (earliest: 1993) until the most recent quarter
Note: this does not download the filings itself, just enough information to generate the filing urls from it.
"""
import datetime
current_year = datetime.date.today().year
current_quarter = (datetime.date.today().month - 1) // 3 + 1
# go back the last four years so we get the last ten 10-Q's and last three 10-K's
start_year = current_year - 4
with open("logfile.txt", "a+") as logfile:
logfile.write('Start year for downloading SEC data is {:d}'.format(start_year))
## Generate a list of quarter-year combinations for which to get urls
years = list(range(start_year, current_year))
quarters = ['QTR1', 'QTR2', 'QTR3', 'QTR4']
history = [(y, q) for y in years for q in quarters]
for i in range(1, current_quarter + 1):
history.append((current_year, 'QTR%d' % i))
urls = ['https://www.sec.gov/Archives/edgar/full-index/%d/%s/master.idx' % (x[0], x[1]) for x in history]
urls.sort()
## Update the database with these urls
update_index_files_db (urls)
#return urls
In [4]:
def update_index_files_db (urls):
"""Download index files and write content into SQLite."""
import sqlite3
import requests
con = sqlite3.connect('edgar_idx.db')
cur = con.cursor()
# to do: check if table exists, if yes, then update, don't erase
cur.execute('DROP TABLE IF EXISTS idx')
cur.execute('CREATE TABLE IF NOT EXISTS idx (cik TEXT, conm TEXT, type TEXT, date TEXT, path TEXT)')
updaterecords = tuple()
with open("logfile.txt", "a+") as logfile:
for url in urls:
#to do: how exactly does this work? modify to only download missing entries
#get the data located at this url
lines = requests.get(url).text.splitlines()
#parse the data into sec filings type and remote path (and some other info)
records = [tuple(line.split('|')) for line in lines[11:]]
#put this into the database (to be downloaded later)
cur.executemany('INSERT INTO idx VALUES (?, ?, ?, ?, ?)', records)
logfile.write('{:s} - downloaded info and wrote to SQLite DB\n'.format(url))
con.commit()
con.close()
In [19]:
def download_sec_filing (cik, co_name, filing_type, filing_date, filing_link):
"""Download the specified SEC filing and save as text file. Returns the file name filing was saved to. """
# inspired by http://kaikaichen.com/?p=681
import csv
import requests
import os
saveas = '_'.join([cik, co_name, filing_type, filing_date])
saveDir = os.path.join("SECDATA", co_name)
# Reorganize to rename the output filename.
url = 'https://www.sec.gov/Archives/' + filing_link.strip()
if not os.path.exists(saveDir):
os.makedirs(saveDir)
with open("logfile.txt", "a+") as logfile:
with open(os.path.join (saveDir, saveas), 'wb') as f:
f.write(requests.get('%s' % url).content)
logfile.write('{:s} - downloaded and saved as {:s}\n'.format(url, os.path.join (saveDir, saveas)))
return saveas
In [17]:
def get_all_filings_ticker (ticker):
"""Downloads the last ten 10-Q and 10-Q/A filings, and the last three 10-K filings for the given ticker. """
import pandas
from sqlalchemy import create_engine
#open the database of index file names to generate url
engine = create_engine('sqlite:///edgar_idx.db')
with engine.connect() as conn, conn.begin():
#load the table with the index files info
idx = pandas.read_sql_table('idx', conn)
#load the look-up table for ticker symbol to CIK translation
cik_ticker_name = pandas.read_sql_table ('cik_ticker_name', conn)
#handle the case where there are multiple stocks with the same ticker; just select the first?!?
cik =((cik_ticker_name.cik[cik_ticker_name.ticker == ticker]))
print (type(cik.iloc[0]))
all_links = idx[idx.cik == cik.iloc[0]]
all_filings = all_links[all_links.type == '10-Q']
#verify that this gets the amended 10-Q's
all_filings.append (all_links[all_links.type == '10-Q\A'])
all_filings = all_links[all_links.type == '10-K']
#print (all_10Qs, all_10Ks)
for q in range (0, all_filings.cik.size):
#print (all_10Qs.iloc[q])
cik_no = all_filings.cik.iloc[q]
co_name = all_filings.conm.iloc[q].replace (' ', '-')
filing_type = all_filings.type.iloc[q]
filing_date = all_filings.date.iloc[q]
filing_url = all_filings.path.iloc[q]
#only download the filings from the last three years
print(all_filings.iloc[q])
download_sec_filing(cik_no, co_name, filing_type, filing_date, filing_url)
#check whether we already have this file, if not, download it
#update some database record to reflect that a new file was downloaded
#idx["local_file"][...] = saveas
#extract sales data and eps data from this file
#update screener_results with this data
In [7]:
def get_cik_ticker_lookup_db ():
"""This creates the look-up table to translate ticker symbol to CIK identifier.
WARNING! This destroys the existing table!"""
import sqlite3
from sqlalchemy import create_engine
import pandas as pd
#read in the cik-ticker-company name lookup table
df = pd.read_csv ("cik_ticker.csv", sep='|')
#print (df.columns)
#select only the columns we need
lookup_table_df = df.loc[:, ['CIK', 'Ticker', 'Name']]
#print (lookup_table_df.CIK.size)
#write this as a second table in edgar_idx
con = sqlite3.connect('edgar_idx.db')
cur = con.cursor()
cur.execute('DROP TABLE IF EXISTS cik_ticker_name')
cur.execute('CREATE TABLE cik_ticker_name (cik TEXT, ticker TEXT, name TEXT)')
#loop over dataframe, and collect the data to insert
counter = 0
records = []
for i in range (0, lookup_table_df.CIK.size):
records.append (("{}".format (lookup_table_df.CIK.iloc[i]),
"{}".format (lookup_table_df.Ticker.iloc[i]),
"{}".format (lookup_table_df.Name.iloc[i]))
)
counter += 1
#print (records)
#insert data into the table
cur.executemany ('INSERT INTO cik_ticker_name VALUES (?, ?, ?)', records)
con.commit ()
con.close ()
In [8]:
def get_Ibd_rank_and_group (ticker):
import numpy as np
#return the result as a dict containing values for keys 'group', 'rank', 'leader'
#if urls can not be accessed, returns an empty dict
#if not all values are found, returns NaN for the missing values
result = {}
#first, search for the ticker symbol
url_name1 = "http://myibd.investors.com/search/searchresults.aspx?Ntt={}".format (ticker.lower ())
print ("url1: {}".format (url_name1))
response1 = requests.get(url_name1)
if (check_response_ok (response1)):
#then, find the link to the stock's page by following down the chain of links
#not sure if there is a more direct path to the stock's page
soup1 = BSoup (response1.content, "lxml")
#if not a direct hit, there is a second step (hop) necessary
url_name2 = soup1.find ("a", {"id": "ctl00_ctl00_secondaryContent_leftContent_SearchResultsMgrCtrl_didYouMeanCompanyRepeater_ctl00_didYouMeanLink"})
print ("url2: {}".format (url_name2))
if url_name2:
response2 = requests.get (url_name2.get('href'))
if (check_response_ok (response2)):
soup2 = BSoup (response2.content, "lxml")
#<span id="ctl00_ctl00_secondaryContent_leftContent_CustomContentCtrl_StockTwitMiniChart1_lblCompany" onclick="javascript:window.open('http://research.investors.com/stock-quotes/nyse-thor-industries-inc-tho.htm')">Thor Industries Inc</span>
url_name3 = soup2.find ("span", {"id": "ctl00_ctl00_secondaryContent_leftContent_CustomContentCtrl_StockTwitMiniChart1_lblCompany"})
print ("url3a: {}".format (url_name3))
else:
#---> what to do? return None, return NaN, set something to NaN?
return result
else:
url_name3 = soup1.find ("span", {"id": "ctl00_ctl00_secondaryContent_leftContent_CustomContentCtrl_StockTwitMiniChart1_lblCompany"})
print ("url3b: {}".format (url_name3))
if not url_name3:
return result
tokens = url_name3.get ('onclick').split ("'")
#---> check that tokens contains valid data
if not tokens:
return result
for t in tokens:
if "http://" in t:
stock_link = t
break
response3 = requests.get (stock_link)
if (check_response_ok (response3)):
soup3 = BSoup (response3.content, "lxml")
#finally got the page. Now find the ticker's group, ranking, and no.1 in that group
market_group = soup3.find ("div", {"class": "spansubHead"})
if (market_group):
result["group"] = market_group.text
else:
result["group"] = None
#this span occurs only once, always contains the rank of the current ticker
ticker_rank = soup3.find ("span", {"id":"ctl00_ctl00_secondaryContent_leftContent_GrpLeaders_ltlSymbolRank"})
if (ticker_rank):
result["rank"] = ticker_rank.text
else:
result["rank"] = np.nan
#if ticker is not no. 1, the leader can be found in another anchor-tag inside the StockRolldiv
#this will return None if current ticker is no. 1
group_no1 = soup3.find ("a", {"class": "stockRoll"})
if (group_no1):
result["leader"] = group_no1.text
else:
result["leader"] = None
else:
return None
#did not find a valid website on IBD for this ticker symbol
return result
In [9]:
def add_Ibd_data (df):
df["IBD_group"] = ""
df["IBD_rank"] = 0
for ticker in df.Symbol:
ibd_data = get_Ibd_rank_and_group (ticker)
#print (ibd_data)
if "group" in ibd_data:
df["IBD_group"][df.Symbol == ticker] = ibd_data["group"]
if "rank" in ibd_data:
df["IBD_rank"][df.Symbol == ticker] = ibd_data["rank"]
return df
In [10]:
def get_annual_sales_data (ticker):
response = requests.get("https://eresearch.fidelity.com/eresearch/evaluate/fundamentals/financials.jhtml?stockspage=incomestatement&symbols={}&period=annual".format (ticker))
print (response.url)
#now the html code is in the requests object
#BeautifulSoup can parse html code and convert into something usable
if check_response_ok (response):
soup = BSoup (response.content, "lxml")
#<th class="top-bottom-border col-96PX lft-border" scope="col"><span class="bold">2017</span> (06/30 - 03/31) </th>
dt=soup.find ("table", {"class": "datatable-component"})
#the summary attribute contains a short description of the table, check that this is the Net Income table
#print ("net income" in dt.get("summary").lower ())
#find all rows in this table; the first row contains the header with the dates
allrows = dt.findChildren (name = "tr")
firstrow=(allrows[0])
#get the dates on the table:
cols = []
for f in firstrow.find_all ("th"):
cols.append(f.text[6:-2])
alldatarows = (allrows[1:])
idxs= ["0",]
for row in allrows[1:]:
f = row.find ("th")
if f:
idxs.append (f.text)
new_table = pd.DataFrame(columns=cols, index = idxs) # I know the size
row_marker = 0
for row in dt.find_all('tr'):
column_marker = 0
columns = row.find_all('td')
for column in columns:
if not "Log in" in column.get_text():
new_table.iat[row_marker,column_marker] = column.get_text()
else:
new_table.iat[row_marker, column_marker] = 0.0
column_marker += 1
row_marker += 1
return (new_table)
In [11]:
def get_quarterly_sales_data (ticker):
response = requests.get("https://eresearch.fidelity.com/eresearch/evaluate/fundamentals/financials.jhtml?stockspage=incomestatement&symbols={}&period=quarterly".format (ticker))
print (response.url)
#now the html code is in the requests object:
#BeautifulSoup can parse html code and convert into something usable
if check_response_ok (response):
soup = BSoup (response.content, "lxml")
#<th class="top-bottom-border col-96PX lft-border" scope="col"><span class="bold">2017</span> (06/30 - 03/31) </th>
dt=soup.find ("table", {"class": "datatable-component"})
#the summary attribute contains a short description of the table, check that this is the Net Income table
#print ("net income" in dt.get("summary").lower ())
#find all rows in this table; the first row contains the header with the dates
allrows = dt.findChildren (name = "tr")
firstrow=(allrows[0])
#get the dates on the table:
cols = []
for f in firstrow.find_all ("th"):
cols.append("{}/{}".format(f.text[6:11], f.text[:4]))
alldatarows = (allrows[1:])
idxs= ["0",]
for row in allrows[1:]:
f = row.find ("th")
if f:
idxs.append (f.text)
new_table = pd.DataFrame(columns=cols, index = idxs) # I know the size
row_marker = 0
for row in dt.find_all('tr'):
column_marker = 0
columns = row.find_all('td')
for column in columns:
if not "Log in" in column.get_text():
new_table.loc[row_marker,column_marker] = column.get_text()
else:
new_table.loc[row_marker, column_marker] = 0.0
column_marker += 1
row_marker += 1
return (new_table)
In [12]:
def add_initial_sales_data (df):
df["current_year"] = 0.0
df["last_year"] = 0.0
df["last_last_year"] = 0.0
df["current_Q"] = 0.0
df["last_Q"] = 0.0
df["last_last_Q"] =0.0
df["current_Y_date"] = ""
df["current_Q_date"] = ""
for symbol in df.Symbol:
annual = get_annual_sales_data (symbol)
quarterly = get_quarterly_sales_data (symbol)
df.loc["current_year", symbol] = annual.loc["Sales/Turnover (Net)"].iloc[0]
print (annual.loc["Sales/Turnover (Net)"], df.loc["current_year", symbol])
df.loc["last_year", symbol] = annual.loc["Sales/Turnover (Net)"].iloc[1]
df.loc["last_last_year", symbol] = annual.loc["Sales/Turnover (Net)"].iloc[2]
df.loc["current_Q", symbol] = quarterly.loc["Sales/Turnover (Net)"].iloc[0]
df.loc["last_Q", symbol] = quarterly.loc["Sales/Turnover (Net)"].iloc[1]
df.loc["last_last_Q", symbol] = quarterly.loc["Sales/Turnover (Net)"].iloc[2]
df.loc["current_Y_date", symbol] = annual.columns[0]
df.loc["current_Q_date", symbol] = quarterly.columns[0]
#print (df[df.Symbol == symbol])
return df
In [13]:
def sales_data_initial_analysis (df):
#this won't work
#if not "Sales" in df.columns:
# print ("Error: no sales data found in dataframe")
# exit (1)
df["Sales_pct_current_Y"] = df["current_year"] / df["last_year"] * 100.0 - 100.0
df["Sales_pct_last_Y"] = df["last_year"] / df["last_last_year"] * 100.0 - 100.0
df["Sales_pct_current_Q"] = df["current_Q"] / df["last_Q"] * 100.0 - 100.0
df["Sales_pct_last_Q"] = df["last_Q"] / df["last_last_Q"] * 100.0 - 100.0
return df
In [18]:
if __name__ == "__main__":
#df = pd.read_excel ("screener_results_IBD.xls")
#print (df[df.Symbol == "NVDA"])
#print (get_annual_sales_data ("NVDA"))
#df = add_Ibd_data (df)
## TODO: KEEP TRACK OF WHICH ONES FAILED AND WHICH ONES WERE SUCCESSFULLY PROCESSED
### REMEMBER TO REMOVE TRAILING JUNK FROM SCREENER_RESULTS.XLS FIRST!!! ###
df_ibd = pd.read_excel ("screener_results.xls")
#df_ibd = add_Ibd_data (df)
#df_ibd.to_excel ("screener_results_IBD.xls")
df_sales0 = add_initial_sales_data (df_ibd)
df_sales0.to_excel ("screener_results_IBD_sales1.xls")
#df_sales0 = pd.read_excel ("screener_results_IBD_sales1.xls")
df_sales1 = sales_data_initial_analysis (df_sales0)
#df.to_excel("screener_results_IBD_sales.xls")
#update database of available SEC filings
#get_list_sec_filings ()
#get_cik_ticker_lookup_db ()
In [20]:
get_all_filings_ticker('AAPL')
In [14]:
#for testing the functions in here:
df_screenerresults = pd.read_excel ("screener_results.xls")
print (df_screenerresults.iloc[7])
#get_list_sec_filings ()
#ibd_df = get_ibd_data (df_screenerresults.iloc[7])
In [23]:
print (df_screenerresults.iloc[7].Symbol)
ibd_df = add_Ibd_data (df_screenerresults.iloc[5:7])
print (ibd_df)
In [27]:
df = df_screenerresults.iloc[7:9]
df_is = add_initial_sales_data (df)
print (df_is)
In [30]:
asd= (get_annual_sales_data (df_screenerresults.iloc[7].Symbol))
print (type(asd))
print (asd.columns)
print (asd.index)
In [16]:
print (get_quarterly_sales_data (df_screenerresults.iloc[7].Symbol))