import csv
from tqdm import tqdm
from datetime import datetime
import glob
import multiprocessing as mp
[docs]class ReadOnlyDict(dict):
"""
Simple dictionary class that makes it read-only. This applies to the settings dictionary most likely.
"""
__readonly = False # Start with a read/write dict
[docs] def set_read_state(self, read_only=True):
"""
Allow or deny modifying dictionary.
:param read_only: bool to set the state of the dictionary
:return:
"""
self.__readonly = bool(read_only)
def __setitem__(self, key, value):
"""
Prevents modification of an item when read only.
:param key: A key
:param value: A value
:return: void
"""
if self.__readonly:
raise RuntimeError('This dictionary is currently read only!')
return dict.__setitem__(self, key, value)
def __delitem__(self, key):
"""
Prevents deletion of an item when read only.
:param key: A key
:return: void
"""
if self.__readonly:
raise RuntimeError('This dictionary is currently read only!')
return dict.__delitem__(self, key)
[docs]def unique_cik(path_list):
"""
Identify all unique CIK in a path list.
:param path_list: list of path, most likely obtain from a recursive glob.glob
:return: list of unique CIK found
"""
all_cik = [int(e.split('/')[-1].split('_')[4]) for e in path_list]
return set(all_cik)
[docs]def paths_to_cik_dict(file_list, unique_sec_cik):
"""
Organizes a list of file paths into a dictionary, the keys being the CIKs. unique_sec_cik is used to initialize
the cik_dict.
:param file_list: unorganized list of paths
:param unique_sec_cik: set of all unique CIK found
:return: a dictionary containing all the paths, organized by CIKs
"""
cik_dict = {k: [] for k in unique_sec_cik}
for path in tqdm(file_list):
split_path = path.split('/')
cik = int(split_path[-1].split('_')[4]) # Cast to an int
cik_dict[cik].append(path)
return cik_dict
[docs]def load_cik_path(s):
"""
Find all the file paths and organize them by CIK.
:param s: Settings dictionary
:return: Dictionary of paths with the keys being the CIK.
"""
file_list = glob.glob(s['path_stage_1_data']+'**/*.txt', recursive=True)
print("[INFO] Loaded {:,} 10-X".format(len(file_list)))
file_list = filter_cik_path(file_list, s)
print("[INFO] Shrunk to {:,} {}".format(len(file_list), s['report_type']))
# print("[INFO] Example:", file_list[0])
unique_sec_cik = unique_cik(file_list) # Get unique CIKs
print("[INFO] Found {:,} unique CIK in master index".format(len(unique_sec_cik)))
cik_path = paths_to_cik_dict(file_list, unique_sec_cik) # Create a dict based on that
print("[INFO] cik_path contains data on {:,} CIK numbers".format(len(cik_path)))
return cik_path
[docs]def filter_cik_path(file_list, s):
"""
Filter out all the reports that are not of the considered type. The considered type is available in the settings
dictionary.
:param file_list:
:param s:
:return:
"""
filtered_file_list = [f for f in file_list if f.split('/')[-1].split('_')[1] in s['report_type']]
return filtered_file_list
[docs]def load_lookup(s):
"""
Load the CIK -> Lookup table.
:param s: Settings dictionary
:return: Lookup table in the form of a dictionary.
"""
# Load the lookup table
with open(s['path_lookup']) as f:
cik_to_ticker = dict()
reader = csv.reader(f, delimiter='|')
next(reader) # Skip header
for row in reader:
cik_to_ticker[int(row[0])] = row[1]
return cik_to_ticker
[docs]def intersection_sec_lookup(cik_path, lookup):
"""
Finds the intersection of the set of CIKs contained in the cik_path dictionary and the CIKs contained in the lookup
table. This is part of the steps taken to ensure that we have bijections between all the sets of CIKs for all
external databases.
:param cik_path: Dictionary of paths organized by CIKs
:param lookup: lookup table CIK -> ticker
:return: both dictionaries with only the intersection of CIKs left as keys.
"""
# 1. Create unique list of keys
unique_cik = set(cik_path.keys())
unique_lookup = set(lookup.keys())
# 2. Intersection
intersection_cik = list(unique_cik & unique_lookup)
# 3. Update both dictionaries (fwd and backwd propagation)
inter_cik = {cik: cik_path[cik] for cik in intersection_cik}
inter_lookup = {cik: lookup[cik] for cik in intersection_cik}
return inter_cik, inter_lookup
[docs]def load_stock_data(s, penny_limit=0, verbose=True):
"""
Load all the stock data and pre-processes it.
WARNING: Despite all (single process) efforts, this still takes a while. Using map seems to be the fastest
way in python for that O(N) operation but it still takes ~ 60 s on my local machine (1/3rd reduction)
:param s: Settings dictionary
:return: dict stock_data[ticker][time stamp] = (closing, market cap)
"""
with open(s['path_stock_database']) as f:
header = next(f).split(',')
header[-1] = header[-1].strip()
idx_date = header.index("date")
idx_ticker = header.index("TICKER")
idx_closing = header.index("ASK")
idx_outstanding_shares = header.index("SHROUT")
start = s['time_range'][0]
finish = s['time_range'][-1]
print("[INFO] Loading data from {} to {}".format(start, finish))
def process_line(line):
row = line.split(',')
date = row[idx_date]
qtr = tuple((int(date[:4]), int(date[4:6]) // 3 + 1))
if start <= qtr <= finish: # Only data in time range
row[-1] = row[-1].strip()
ticker = row[idx_ticker]
closing_price = row[idx_closing]
outstanding_shares = row[idx_outstanding_shares]
if ticker == '' or closing_price == '' or outstanding_shares == '':
return '0', 1, 0, 0
# 2. Process the row
closing_price = float(closing_price)
market_cap = 1000 * closing_price * int(outstanding_shares)
if market_cap < penny_limit:
return '0', ticker, 0, 0
return ticker, datetime.strptime(date, '%Y%m%d').date(), closing_price, market_cap
else:
return '0', 3, 0, 0
print("[INFO] Starting the mapping")
result = map(process_line, f)
stock_data = dict()
# previous_ticker = '0'
counter_incomplete_line = 0
counter_line_out_of_range = 0
penny_stocks = []
nb_lines = 0
for e in tqdm(result, total=30563446):
nb_lines += 1
if e[0] != '0':
# if e[0] != previous_ticker: # Not faster and less flexible
if e[0] not in stock_data.keys():
stock_data[e[0]] = dict()
# previous_ticker = e[0]
stock_data[e[0]][e[1]] = (e[2], e[3])
else:
if e[1] == 1: # Incomplete line
counter_incomplete_line += 1
elif type(e[1]) == str:
penny_stocks.append(e[1])
elif e[1] == 3:
counter_line_out_of_range += 1
# Remove all the penny stocks
penny_stocks = set(penny_stocks)
stock_data = {k: v for k, v in stock_data.items() if k not in penny_stocks}
if verbose:
print("[INFO] stock_data load statistics:")
print("Incomplete lines: {:,}/{:,}".format(counter_incomplete_line, nb_lines))
print("Penny stocks found (at least one entry below threshold): {}/{}"
.format(len(penny_stocks), len(penny_stocks) + len(stock_data.keys())))
print("Lines out of range: {:,}/{:,}".format(counter_line_out_of_range, nb_lines))
return stock_data
[docs]def load_index_data(s):
"""
Loads the csv files containing the daily historical data for the stock market indexes that were selected in s.
:param s: Settings dictionary
:return: dictionary of the index data.
"""
# 1. Find all the indexes in the folder
file_list = glob.glob(s['path_stock_indexes']+'**/*.csv', recursive=True)
file_list = [f for f in file_list if f.split('/')[-1] != 'filtered_index_data.csv']
index_names = [f.split('/')[-1][14:-4] for f in file_list]
paths = zip(file_list, index_names)
# 2. Open all these files and add the data to a dictionary
index_data = {k: {} for k in index_names}
for path in paths:
with open(path[0]) as f:
reader = csv.reader(f)
header = next(reader)
idx_date = header.index("Date")
idx_closing = header.index("Close")
for row in reader:
date = datetime.strptime(row[idx_date], '%Y-%m-%d').date()
index_data[path[1]][date] = float(row[1]) # Load all
return index_data
[docs]def intersection_lookup_stock(lookup, stock):
"""
Finds the intersection of the set of CIKs contained in the lookup dictionary and the CIKs contained in the stock
database. This is part of the steps taken to ensure that we have bijections between all the sets of CIKs for all
external databases.
:param lookup: lookup dictionary
:param stock: stock data, organized in a dictionary with tickers as keys.
:return: both dictionaries with only the intersection of CIKs left as keys.
"""
# 1. Create unique lists to compare
unique_lookup = set(list(lookup.values()))
unique_stock = set(list(stock.keys()))
# 2. Create intersection of tickers
intersection_tickers = list(unique_lookup & unique_stock)
print(len(intersection_tickers))
# 3. Return a new intersection dictionary
inter_lookup = {k: v for k, v in lookup.items() if v in intersection_tickers}
inter_stock = {k: stock[k] for k in stock.keys() if k in intersection_tickers}
return inter_lookup, inter_stock
[docs]def review_cik_publications(cik_path, s):
"""Filter the CIK based on how many publications there are per quarter
This function reviews all the CIK to make sure there is only 1 publication per qtr
It provides a few hooks to correct issues but these have not been implemented.
Around 10 % of the CIK seem to have problems at one point or another.
:param cik_path:
:param s: Settings dictionary
:return: A filtered version of the cik_path dictionary - only has the keys that passed the test.
"""
cik_to_delete = []
for cik, paths in tqdm(cik_path.items()):
# Make sure there are enough reports to enable diff calculations
if not len(paths) > s['lag']: # You need more reports than the lag
cik_to_delete.append(cik)
continue
quarterly_submissions = {key: [] for key in s['list_qtr']}
for path_report in paths: # For each report for that CIK
split_path = path_report.split('/')
qtr = (int(split_path[-3]), int(split_path[-2][3])) # Ex: (2016, 3)
if qtr in quarterly_submissions.keys():
published = split_path[-1].split('_')[0]
published = datetime.strptime(published, '%Y%m%d').date()
type_report = split_path[-1].split('_')[1]
if type_report in s['report_type']: # Add to the dict
metadata = {'type': type_report, 'published': published, 'qtr': qtr}
quarterly_submissions[qtr].append(metadata)
# Check for continuity of the reports
flag_continuity = check_report_continuity(quarterly_submissions, s)
if not flag_continuity: # Will ignore that CIK
cik_to_delete.append(cik)
continue
# Create a subset of cik_dict based on the cik not faulty
print()
print("[INFO] {} CIKs caused trouble".format(len(cik_to_delete)))
cik_dict = {k: v for k, v in cik_path.items() if k not in cik_to_delete}
return cik_dict
[docs]def check_report_type(quarterly_submissions, qtr):
"""
Verify that all the reports in quarterly_submissions were published at the right time based on their type. A 10-K
is supposed to be published and only published in Q1. A 10-Q is supposed to be published and only published in
Q2, Q3 or Q4.
:param quarterly_submissions: dictionary of reports published, by qtr. There should only be one report per qtr
:param qtr: A given qtr
:return: void but will raise if the report in [0] was not published at the right time.
"""
if quarterly_submissions[qtr][0]['type'] == '10-K':
if qtr[1] == 1:
return True
else:
return False
elif quarterly_submissions[qtr][0]['type'] == '10-Q':
if qtr[1] == 2 or qtr[1] == 3 or qtr[1] == 4:
return True
else:
return False
else:
raise ValueError('[ERROR] Only 10-K and 10-Q supported.')
[docs]def check_report_continuity(quarterly_submissions, s, verbose=False):
"""
Verify that the sequence of reports for the various qtr is 0-...0-1-...-1-0-...-0. In other words, once you are
listed you only have one and only one report per quarter until you are delisted.
:param quarterly_submissions:
:param s:
:return:
"""
flag_success, qtr = find_first_listed_qtr(quarterly_submissions, s)
if verbose:
print("First quarter is", qtr)
if not flag_success:
if verbose:
print('Returned False. Could not find the first quarter, they seem all empty.')
return False
# raise ValueError('Could not find the first quarter, they seem all empty.')
# Now we start going through the submissions for each qtr. There shall only be one.
idx = s['list_qtr'].index(qtr)
for qtr in s['list_qtr'][idx:]:
if len(quarterly_submissions[qtr]) == 1:
# Verify that 10-K are published in Q1 only and 10-Q in Q2-3-4
if check_report_type(quarterly_submissions, qtr):
continue
else:
return False
elif len(quarterly_submissions[qtr]) == 0: # Has it been delisted?
flag_is_delisted = is_permanently_delisted(quarterly_submissions, qtr, s)
if verbose:
print("Returned {} because flag_is_delisted is {}".format(flag_is_delisted, flag_is_delisted))
return True if flag_is_delisted else False
else: # More than one report -> failed
if verbose:
print("Returned False because there is more than one report for {}".format(qtr))
return False
if verbose:
print("Returned True and everything is good")
return True
[docs]def find_first_listed_qtr(quarterly_submissions, s):
"""
Finds the first qtr for which the company published at least one report.
:param quarterly_submissions: dictionary of submissions indexes by qtr
:param s: Settings dictionary
:return: bool for success and first qtr when the company was listed.
"""
flag_listed = False
for qtr in s['list_qtr']:
if len(quarterly_submissions[qtr]) == 0:
continue
else:
flag_listed = True
break
return flag_listed, qtr
[docs]def is_permanently_delisted(quarterly_submissions, qtr, s):
"""
Check if a company is permanently delisted starting from a given qtr. This function is not great, I should have made
a single function that finds the first qtr for which a company is listed and the qtr for which it became delisted,
if ever.
:param quarterly_submissions:
:param qtr: a given qtr
:param s: Settings dictionary
:return: bool assessing whether or not it is permanently delisted after the given qtr
"""
flag_permanently_delisted = True
idx = s['list_qtr'].index(qtr) # Index of the quarter that is empty
for qtr in s['list_qtr'][idx:]: # Check again and check the rest
if len(quarterly_submissions[qtr]):
flag_permanently_delisted = False
break
return flag_permanently_delisted
[docs]def dump_tickers_crsp(path_dump_file, tickers):
"""
Dump all tickers to a file - should not be useful anymore.
:param path_dump_file: path for csv dump
:param tickers: all the tickers to dump.
:return: void
"""
with open(path_dump_file, 'w') as f:
out = csv.writer(f)
for ticker in tickers:
out.writerow([ticker])