Source code for secScraper.post_processing

import pandas as pd
from datetime import datetime
from secScraper import qtrs
import csv
from tqdm import tqdm
from scipy.stats.mstats import winsorize
import numpy as np
import copy


"""[TBR] Legacy version that did not work so well
def make_quintiles(x, s, winsorize=0.01):
    # x is (cik, score, nb_share_unbalanced, nb_share_balanced)
    # Create labels and bins of the same size
    # labels = ['Q1', 'Q2', 'Q3', 'Q4', 'Q5']  # Not using that anymore
    quintiles = {l: [] for l in s['bin_labels']}
    
    # _, input_data, _, _ = zip(*x)
    input_data = x
    input_data = pd.Series(input_data)
    mapping = pd.qcut(input_data.rank(method='first'), s['bin_count'], labels=False)
    print(mapping)
    for idx_input, idx_output in enumerate(mapping):
        quintiles[s['bin_labels'][idx_output]].append(x[idx_input])
    return quintiles
"""
[docs]def make_quintiles(qtr_data, s, winsorize = 0.01, verbose=False): # 1. Isolate the non zero CIKs non_zero_ciks = {cik: v for cik, v in qtr_data.items() if v != {}} if verbose: print("[INFO] Non zero ciks: {}/{}".format(len(non_zero_ciks), len(qtr_data))) sorted_ciks = sorted(non_zero_ciks, key=lambda x: non_zero_ciks[x]['total'] if non_zero_ciks[x] != {} else 0) # 2. Winsorize to remove outliers start = round(len(sorted_ciks)*winsorize) # Output an int end = len(sorted_ciks) - start # Stays an int sorted_ciks = sorted_ciks[start:end] if verbose: print("[INFO] Left with {}/{} elements after winsorizing".format(len(sorted_ciks), len(qtr_data))) # 3. Make quintiles/deciles splits = np.linspace(0, len(sorted_ciks), s['bin_count']+1, endpoint=True, dtype=np.int) quintiles = dict() # Make sure bins are in increasing order: Q1 -> Q5. Otherwise, sorted_ciks' order needs to be reversed. assert int(s['bin_labels'][-1][1:]) > int(s['bin_labels'][0][1:]) for idx, l in enumerate(s['bin_labels']): quintiles[l] = {cik: qtr_data[cik] for cik in sorted_ciks[splits[idx]:splits[idx+1]]} # Sanity check: Verify that the quintiles worked as expected. O(N**2). for idx in range(1, len(s['bin_labels'])): for cik in quintiles[s['bin_labels'][idx]]: for cik_previous in quintiles[s['bin_labels'][idx-1]]: try: assert qtr_data[cik]['total'] >= qtr_data[cik_previous]['total'] except: print(cik, qtr_data[cik]) print(cik_previous, qtr_data[cik_previous]) raise return quintiles
[docs]def metrics_correlation(metric_scores, s): data = [] for m in s['diff_metrics']: flattened_metric = [] for qtr in s['list_qtr'][s['lag']:]: for cik in metric_scores[m][qtr]: if metric_scores[m][qtr][cik] != {}: flattened_metric.append(metric_scores[m][qtr][cik]['total']) data.append(flattened_metric) df = pd.DataFrame(zip(*data), columns=s['diff_metrics']) return df
[docs]def create_metric_scores(cik_scores, lookup, stock_data, s): pnf = [] metric_scores = {m: {qtr: {cik: {} for cik in cik_scores} for qtr in s['list_qtr'][s['lag']:]} for m in s['metrics']} for cik in tqdm(cik_scores): for qtr in cik_scores[cik]: _, _, flag_price_found = get_share_price(cik, qtr, lookup, stock_data) if not flag_price_found: print("[WARNING] There is no stock data for {} during {}".format(cik, qtr)) pnf.append(cik) break # That CIK from cik_scores will be left unpopulated and subsequently discarded sections = [section for section in cik_scores[cik][qtr] if section != '0' and section != 'total'] for section in sections: for m in s['metrics']: metric_scores[m][qtr][cik][section] = cik_scores[cik][qtr][section][m] metric_scores[m][qtr][cik]['total'] = cik_scores[cik][qtr]['total'][m] # metric_scores[m][qtr][cik]['0'] = cik_scores[cik][qtr]['0'] print("Unique cik", set(pnf)) return metric_scores
[docs]def get_share_price(cik, qtr, lookup, stock_data, verbose=False): """ Get the price of a share. :param cik: CIK :param qtr: qtr :param lookup: lookup dict :param stock_data: dict of the stock data :param verbose: self explanatory :return: share_price, market_cap, flag_price_found """ ticker = lookup[cik] # print("cik/ticker", cik, ticker) qtr_start_date = "{}{}{}".format(str(qtr[0]), str((qtr[1]-1)*3+1).zfill(2), '01') qtr_start_date = datetime.strptime(qtr_start_date, '%Y%m%d').date() # Find the first trading day after the beginning of the quarter. # Sanity check: is there a price available? try: time_range = list(stock_data[ticker].keys()) except: return 1, 1, False if verbose: print("Prices cover {} to {} and we are looking for {}".format(time_range[0], time_range[-1], qtr_start_date)) share_price = 1 market_cap = 1 for _ in range(7): try: share_price, market_cap = stock_data[ticker][qtr_start_date] if verbose: print("[INFO] Settled for", qtr_start_date) break except KeyError: qtr_start_date = qtr_start_date.strftime('%Y%m%d') day = str(int(qtr_start_date[7]) + 1) qtr_start_date = qtr_start_date[:7] + day qtr_start_date = datetime.strptime(qtr_start_date, '%Y%m%d').date() flag_price_found = False if share_price == 1 and market_cap == 1 else True return share_price, market_cap, flag_price_found
[docs]def buy_all_pf(qtr, funds, pf, lookup, stock_data, method): """ Allocate a given amount of money to a quarterly portfolio. Method can be balanced (weighted by market cap) or unbalanced (each stock gets the same amount of money). """ assert type(funds) == float nb_cik = len(pf) # Nb of CIK in that bin sum_market_caps = 0 # 1. Update the share price/market_cap for everyone for cik in pf: ticker = lookup[cik] share_price, market_cap, flag_price_found = get_share_price(cik, qtr, lookup, stock_data) pf[cik] = [ticker, share_price, market_cap, 0, 0, 0] # [..., share_count, $, $/funds] sum_market_caps += market_cap # Needed to balance pf # 2. Second pass where we split the funds accordingly for cik in pf: # 2.1. Calculate the $/shares to purchase share_price = pf[cik][1] market_cap = pf[cik][2] if method == 'balanced': value = funds*(market_cap/sum_market_caps) # $ amount depends on your mc elif method == 'unbalanced': value = funds*(1/nb_cik) # $ amount is equal for all stocks share_count = value/share_price # 2.2. Buy the shares -> populate the pf line with the new values # pf[cik][3:] = [share_count, $, $/funds] # $ is how much funds we have put in that stock # $/funds is the ratio of funds in that stock to the total value of the pf pf[cik][3:] = share_count, value, value/funds return pf
[docs]def sell_all_pf(qtr, pf, lookup, stock_data): """ Sell all the stocks in a portfolio. In practice, we just collect the value of the pf with the new updated share prices.""" sum_stock_values = 0 sum_market_caps = 0 # I. First pass to update the stock price and the consequent value held for cik in pf: # 1. Update the share price for that CIK/ticker share_price, market_cap, flag_price_found = get_share_price(cik, qtr, lookup, stock_data) # 2. Update the value of that line given the new share_price share_count = pf[cik][3] # This is invariant at that stage! updated_value = share_price*share_count pf[cik][1] = share_price # Update stock price pf[cik][2] = market_cap # Update market cap pf[cik][4] = updated_value # Update value of that line sum_market_caps += market_cap # 3. Add that line to the total sum_stock_values += updated_value # II. Re-update all the market_cap ratios with the new value for cik in pf: updated_value = pf[cik][4] # Updated value of that line in our pf pf[cik][5] = updated_value/sum_stock_values # This ratio might have gone up or done. # pf[cik][5] can be useful to see what is our biggest exposure after a qtr. Might have changed. return pf, sum_stock_values
[docs]def initialize_portfolio(metric_scores, s): # Introduce the pf_values pf_values = {m: {qtr: {} for qtr in metric_scores[m]} for m in metric_scores} # This first pass populates the pf with the relevant ciks. for m in s['metrics']: for idx, qtr in enumerate(s['list_qtr'][s['lag']:]): if idx == 0: # qtr == s['list_qtr'][s['lag']] data = {l: {cik: [] for cik in metric_scores[m][qtr][l]} for l in s['bin_labels']} pf_values[m][qtr]['incoming_compo'] = {} # Not useful. Will be copied later pf_values[m][qtr]['incoming_value'] = {l: s['pf_init_value'] for l in s['bin_labels']} # Not really useful pf_values[m][qtr]['new_value'] = {l: s['pf_init_value'] for l in s['bin_labels']} pf_values[m][qtr]['new_compo'] = data elif 0 < idx: # not the first qtr data = {l: {cik: [] for cik in metric_scores[m][qtr][l]} for l in s['bin_labels']} previous_qtr = s['list_qtr'][s['lag']:][idx-1] # No need to populate the incoming_value. They will be calculated gradually # pf_values[m][qtr]['incoming_compo'] = copy.deepcopy(pf_values[m][previous_qtr]['new_compo']) pf_values[m][qtr]['incoming_compo'] = {} pf_values[m][qtr]['incoming_value'] = {l: 0 for l in s['bin_labels']} pf_values[m][qtr]['new_value'] = {l: 0 for l in s['bin_labels']} pf_values[m][qtr]['new_compo'] = data # Common with idx == 0 case return pf_values
[docs]def build_portfolio(pf_values, lookup, stock_data, s): # Populate the pf for each cik and get overall values for m in s['metrics']: for idx, qtr in enumerate(s['list_qtr'][s['lag']:]): if qtr == s['list_qtr'][s['lag']]: # First quarter # Perform a new_compo only. Then copy to incoming_compo (not really necessary though) for l in s['bin_labels']: quintile_funds = pf_values[m][qtr]['new_value'][l] assert quintile_funds == 100.0 pf = buy_all_pf(qtr, quintile_funds, pf_values[m][qtr]['new_compo'][l], lookup, stock_data, s['pf_balancing']) pf_values[m][qtr]['new_compo'][l] = pf pf_values[m][qtr]['incoming_compo'][l] = copy.deepcopy(pf) #print(pf_values[m][qtr]['new_compo']) # assert 0 else: for stage in ['incoming_compo', 'new_compo']: if stage == 'incoming_compo': for l in s['bin_labels']: previous_qtr = s['list_qtr'][s['lag']:][idx-1] pf_values[m][qtr][stage][l] = copy.deepcopy(pf_values[m][previous_qtr]['new_compo'][l]) pf, quintile_funds = sell_all_pf(qtr, pf_values[m][qtr][stage][l], lookup, stock_data) pf_values[m][qtr][stage][l] = pf # Update the incoming compo with the new prices pf_values[m][qtr]['incoming_value'][l] = quintile_funds pf_values[m][qtr]['new_value'][l] = quintile_funds*(1-s['tax_rate']) elif stage == 'new_compo': # Take all the new_values and buy yourself a pf for l in s['bin_labels']: quintile_funds = pf_values[m][qtr]['new_value'][l] pf = buy_all_pf(qtr, quintile_funds, pf_values[m][qtr][stage][l], lookup, stock_data, s['pf_balancing']) else: raise ValueError('[ERROR] Stage {} undefined.'.format(stage)) return pf_values
[docs]def check_pf_value(pf_values, s): # Sanity checks # 1. Sum of all money invested should equate the funds for m in s['metrics']: for qtr in s['list_qtr'][s['lag']:]: for stage in ['incoming_compo', 'new_compo']: for l in s['bin_labels']: if stage == 'incoming_compo': declared_value = pf_values[m][qtr]['incoming_value'][l] elif stage == 'new_compo': declared_value = pf_values[m][qtr]['new_value'][l] calculated_pf_value = 0 for cik in pf_values[m][qtr][stage][l]: calculated_pf_value += pf_values[m][qtr][stage][l][cik][4] try: assert declared_value - s['epsilon'] < calculated_pf_value < declared_value + s['epsilon'] except: print([m],[qtr],[stage],[l]) print(pf_values[m][qtr]['incoming_value']) print(declared_value, calculated_pf_value) raise return True
[docs]def remove_cik_without_price(pf_scores, lookup, stock_data, s, verbose=False): """ So far, we have not checked if we had a stock price available for that all CIK. This function removes the CIK for which we have no price. < 10% of them are dropped. :param pf_scores: dict :param lookup: lookup dict :param stock_data: dict of the stock data :param s: Settings dictionary :param verbose: :return: outputs more stuff """ for m in s['metrics'][:-1]: for mod_bin in s['bin_labels']: for qtr in s['list_qtr'][s['lag']:]: cik_not_found = [] for entry in pf_scores[m][mod_bin][qtr]: cik = entry[0] _, _, flag_price_found = get_share_price(cik, qtr, lookup, stock_data) if not flag_price_found: cik_not_found.append(cik) pf_scores[m][mod_bin][qtr] = [e for e in pf_scores[m][mod_bin][qtr] if e[0] not in cik_not_found] if verbose: print("[INFO] Removed {}/{} CIK".format(len(cik_not_found), len(pf_scores[m][mod_bin][qtr]))) if len(pf_scores[m][mod_bin][qtr]) == 0: raise ValueError("[ERROR] Nothing is left!") # elif len(pf_scores[m][mod_bin][qtr]) <= 20: # print(m, mod_bin, qtr) return pf_scores
[docs]def get_pf_value(pf_scores, m, mod_bin, qtr, lookup, stock_data, s): """ Get the value of a portfolio. :param pf_scores: dict containing all the scores for all companies :param m: metric :param mod_bin: bin considered :param qtr: qtr :param lookup: lookup dict :param stock_data: dict of the stock data :param s: Settings dictionary :return: """ # Whole bin to sum -> need the balanced and unbalanced value unbalanced_value = 0 balanced_value = 0 for share in pf_scores[m][mod_bin][qtrs.previous_qtr(qtr, s)]: # Previous pf... cik = share[0] share_price, market_cap, flag_price_found = get_share_price(cik, qtr, lookup, stock_data) unbalanced_value += share_price*share[2] balanced_value += share_price*share[3] return unbalanced_value, balanced_value
[docs]def calculate_portfolio_value(pf_scores, pf_values, lookup, stock_data, s, balancing='balanced', verbose=False): """ Calculate the value of a portfolio, in equal weight and balanced weight (by market cap) mode. The value is written to pf_scores (in the inputs). :param pf_scores: dict containing all the scores for all companies :param pf_values: dict containing the value of a portfolio :param lookup: lookup dict :param stock_data: dict of the stock data :param s: Settings dictionary :return: dict pf_scores """ for m in s['metrics'][:-1]: for mod_bin in s['bin_labels']: for qtr in s['list_qtr'][s['lag']:]: # Here we have an array of arrays [cik, score, nb_shares_unbalanced, nb_shares_balanced] # 1. Unbalanced portfolio: everyone get the same amount of shares # 1.1 Get number of CIK #print(pf_scores) nb_cik = len(pf_scores[m][mod_bin][qtr]) # Nb of CIK in that bin total_mc = 0 # Update pf value! if qtr == s['list_qtr'][s['lag']]: pf_value = s['pf_init_value'] else: pf_value_unbalanced, pf_value_balanced = get_pf_value(pf_scores, m, mod_bin, qtr, lookup, stock_data, s) if balancing == 'balanced': pf_value = pf_value_balanced elif balancing == 'unbalanced': pf_value = pf_value_unbalanced else: raise ValueError('[ERROR] Balancing method unknown.') # print(pf_value_unbalanced, pf_value_balanced) pf_values[m][mod_bin][qtr][0] = pf_value pf_value *= (1 - pf_values[m][mod_bin][qtr][1]) # Apply a tax rate pf_values[m][mod_bin][qtr][2] = pf_value # This is what will be used to buy new shares # 1.2 With that amount, re-populate the pf with the new recommendation # (including last qtr even if useless) nb_errors = 0 for idx in range(nb_cik): cik = pf_scores[m][mod_bin][qtr][idx][0] price, market_cap, flag_price_found = get_share_price(cik, qtr, lookup, stock_data) nb_errors += 0 if flag_price_found else 1 if nb_errors: if verbose: print("Found", nb_errors, "errors out of", nb_cik) nb_cik -= nb_errors if nb_cik < 0: raise ValueError("WTF - No CIK left after checking for the stock data availability?") for idx in range(nb_cik): cik = pf_scores[m][mod_bin][qtr][idx][0] price, market_cap, flag_price_found = get_share_price(cik, qtr, lookup, stock_data) if not flag_price_found: continue # We skip it total_mc += market_cap pf_scores[m][mod_bin][qtr][idx][2] = (pf_value/nb_cik)/price # Unbalanced nb of shares pf_scores[m][mod_bin][qtr][idx][3] = (pf_value*market_cap)/price # Balanced nb shares # 1.3 Normalize the balanced value by the total market cap for idx in range(nb_cik): pf_scores[m][mod_bin][qtr][idx][3] /= total_mc return pf_scores
[docs]def dump_master_dict(master_dict, s): # path = '/home/alex/Desktop/Insight project/Database/dump_master_dict.csv' with open(s['path_dump_master_dict'], 'w') as f: out = csv.writer(f, delimiter=';') header = ['METRIC', 'QUARTER', 'QUINTILE', 'CIK', 'SCORE'] out.writerow(header) # Main writing loop for m in tqdm(s['metrics'][:-1]): for qtr in s['list_qtr'][s['lag']:]: for l in s['bin_labels']: for entry in master_dict[m][qtr][l]: out.writerow([m, qtr, l, entry[0], entry[1]])
[docs]def dump_pf_values(pf_values, s): # path = '/home/alex/Desktop/Insight project/Database/dump_master_dict.csv' with open(s['path_dump_pf_values'], 'w') as f: out = csv.writer(f, delimiter=';') header = ['METRIC', 'QUINTILE', 'QUARTER', 'PF_VALUE', 'TAX_RATE', 'PF_VALUE_POST_TAX'] out.writerow(header) # Main writing loop for m in tqdm(s['metrics'][:-1]): for l in s['bin_labels']: for qtr in s['list_qtr'][s['lag']:]: out.writerow([m, qtr, l, *pf_values[m][l][qtr]])
[docs]def dump_cik_scores(cik_scores, s): # path = '/home/alex/Desktop/Insight project/Database/dump_master_dict.csv' with open(s['path_dump_cik_scores'], 'w') as f: out = csv.writer(f, delimiter=';') header = ['CIK', 'QTR', 'METRIC', 'SCORE'] out.writerow(header) # Main writing loop for cik in tqdm(cik_scores.keys()): for qtr in s['list_qtr'][s['lag']:]: for m in s['metrics']: try: out.writerow([cik, qtr, m, cik_scores[cik][qtr][m]]) except KeyError: # There is no data for this qtr, CIK not listed/delisted continue