Improving speed from zip to json

I have some financial data organised as zip files. One zip file for each trading day since 2007. Each file contains between 150'000 to 550'000 records/rows and 20 columns/features or so. I need to re-organize the data following the using logic: /DataFolder/{Year}/{Column1}/{Column2}/{filename}.json

For clarity Column1 is the Ticker column and Column2 is the ExpiryDate column. Additionally for each ticker-expiryDate pair in a single file I want to populate a new column (of 0 or 1) which is the at-the money strike (given the value in column StockPrice, I get the closest value of the strike column to the stock price).

I then write the data (for each ticker-expiryDate pair) as a json file on my hard drive.

Initially I was going for some "brute-force" approach ie load the zip file as a dataframe and did some sorting on tickers and expiry dates. Obviously this was not optimal. It took about 12minutes to process a single zip file.

I then tried to improve my code by using the groups generated with pandas groupby function. It did improve the performance to slightly below 10 minutes per zip file. But still given I have close to 3'000 files to process I need to improve things massively...

I use a function to get the nearest strike given the stock price:

def find_nearest(array, value):
    array = np.asarray(array)
    idx = (np.abs(array - value)).argmin()
    return array[idx]

I use the following code for each zip file:

def process_directory(files2proc):
    #files2proc list of files to process

    for filename in files2proc:    
        df_zip = pd.read_csv(filename)
        df_grouped = df_zip.groupby(['ticker', 'expirDate'])

        for name, group in df_grouped:
            expYear = str(name[1])[-4:]
            exp_fmt = str(pd.to_datetime(name[1]).date()).replace('-', '')


            temp_path = '/Volumes/DataFolder/{}/{}/{}/{}/'.format(name[0][0], name[0], expYear, exp_fmt)
            dir_path = pathlib.Path(temp_path)
            dir_path.mkdir(parents=True, exist_ok=True)

            df_temp = df_grouped.get_group(name).reset_index(drop=True)

            strikes = df_temp['strike'].unique()
            stkprice = df_temp['stkPx'].iloc[0]
            atm_strike = find_nearest(strikes, stkprice)

            df_temp['atTheMoney'] = df_temp['strike'] == atm_strike

            file_path = temp_path + '_' + name[0] + '_' + exp_fmt + '.json'

            if not os.path.exists(file_path):
                df_temp.to_json(file_path, orient="records")
            else:
                df_orig = pd.read_json(file_path, orient="records")
                df_calls_up = df_orig.append(df_temp)
                df_calls_up.to_json(file_path, orient="records")

I fear the .reset_index and .iloc statements are time-consumming. How can I by-pass the call to find_nearest in order to improve speed? If not, is there a vectorized way of populating the at-the-money column? More generally I think I create too many temporary dataframes which are not helping the overall speed (?)

Edit: the zip contains only one csv file. I have reproduced part of a csv example below:

ticker stkPx expirDate  strike (other columns) trade_date
AAPL   34.3  1/20/2007  32.5                   1/3/2007 
AAPL   34.3  1/20/2007  35                     1/3/2007
AAPL   34.3  1/20/2007  37.5                   1/3/2007
AAPL   34.3  2/17/2007  30                     1/3/2007
AAPL   34.3  2/17/2007  35                     1/3/2007
AAPL   34.3  2/17/2007  40                     1/3/2007
(...)

I need to re-organize this file into several json files as follow: Datafolder/A/AAPL/2007/20070120/_AAPL_20070120.json Datafolder/A/AAPL/2007/20070120/_AAPL_20070217.json

using this format: Datafolder/{TickerInitial}/{Ticker}/{year of expirDate}/{expirDate as yyyymmdd}/{ticker}{expirDate as yyyymmdd}.json

Each json file should have an extra column (on top of the columns from the original zip file) called atTheMoney to flag the at the money options ie for _AAPL_20070120.json:

ticker stkPx expirDate  strike (other columns) trade_date atTheMoney
AAPL   34.3  1/20/2007  32.5                   1/3/2007   False
AAPL   34.3  1/20/2007  35                     1/3/2007   True
AAPL   34.3  1/20/2007  37.5                   1/3/2007   False

and for _AAPL_20070217.json:

ticker stkPx expirDate  strike (other columns) trade_date  atTheMoney
AAPL   34.3  2/17/2007  30                     1/3/2007    False
AAPL   34.3  2/17/2007  35                     1/3/2007    True
AAPL   34.3  2/17/2007  40                     1/3/2007    False

The at the money flag correspond to the closest strike column value given the stkPx column value( ie the strike 35 is the closest value of 34.3 to [32.5, 35, 37.5] or [30, 35, 40])

Edit2: here is my amended version of JonSG:

def process_file(zip_path, ifilename):

    stage_1 = collections.defaultdict(list)
    with zipfile.ZipFile(zip_path) as zf:
        with zf.open(ifilename + ".csv", "r") as file_in:
            reader = csv.reader(TextIOWrapper(file_in, 'utf-8'))
            colnames = {v: i for i, v in enumerate(next(reader))}
            for row in reader:
                key = f"{row[colnames['ticker']]}_{row[colnames['expirDate']]}"
                stage_1[key].append(row)

    for key, value in stage_1.items():
        min_strike_delta = float("inf")
        for row in value:
            row.append(abs(float(row[colnames['stkPx']]) - float(row[colnames['strike']])))
            if min_strike_delta > row[-1]:
                min_strike_delta = row[-1]

    value = [
        {
            **{cname: row[cvalue] for cname, cvalue in colnames.items()},
            **{"atTheMoney": row[-1] == min_strike_delta}
        }
        for row in value
    ]

    ticker, expirDate = key.split("_")
    exp_fmt = str(pd.to_datetime(expirDate).date()).replace('-', '')
    temp_path = f"/Volumes/DataCenter/{ticker[0]}/{ticker}/{exp_fmt[:4]}/{exp_fmt}/"
    filename = f"{temp_path}_{ticker}_{exp_fmt}.json"
    dir_path = pathlib.Path(temp_path)
    dir_path.mkdir(parents=True, exist_ok=True)

    with open(filename, "a+") as file_out:
        for row in value:
            file_out.write(json.dumps(row) + ",\n")

Solution 1:

Since we know that the data is clustered by the key, we can leverage this fact to reduce the amount of work we need to do. We only need to read through the data once now.

import csv
import json

## ---------------------
## Funtion to handle writing data.
## this way we don't repeate ourselves
## ---------------------
def append_data(key, rows, min_strike_delta, colnames):
    if not rows:
        return

    ticker, expirDate = key.split("_")
    filename = f"{ticker[0]}_{ticker}_{expirDate.replace('/', '')}.json"
    with open(filename, "a+") as file_out:
        for row in rows:
            row[-1] = row[-1] == min_strike_delta
            row_json = {cname: row[cvalue] for cname, cvalue in colnames.items()}
            file_out.write(json.dumps(row_json) + "\n")
## ---------------------

## ---------------------
## Let's use the fact that rows are already grouped by key to our advantage:
## ---------------------
with open("strike.csv", "r") as file_in:
    reader = csv.reader(file_in)
    colnames = {v:i for i,v in enumerate(next(reader) + ["atTheMoney"])}

    current_key = ""
    current_rows = []
    current_min_strike_delta = float("inf")

    for row in reader:
        key = f"{row[colnames['ticker']]}_{row[colnames['expirDate']]}"

        ## ---------------------
        ## If the keys are different try to write out the prior batch of rows
        ## ---------------------
        if key != current_key:
            append_data(current_key, current_rows, current_min_strike_delta, colnames)
            current_key = key
            current_rows = []
            current_min_strike_delta = float("inf")
        ## ---------------------

        strike_delta = abs(float(row[colnames['stkPx']]) - float(row[colnames['strike']]))
        current_min_strike_delta = min(current_min_strike_delta, strike_delta)
        row.append(strike_delta)
        current_rows.append(row)

## ---------------------
## If there are any remaining rows write them out
## ---------------------
append_data(current_key, current_rows, current_min_strike_delta, colnames)
## ---------------------

When run it should give us two files:

A_AAPL_1202007.json:

{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "32.5", "trade_date": "1/3/2007", "atTheMoney": false}
{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "35", "trade_date": "1/3/2007", "atTheMoney": true}
{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "37.5", "trade_date": "1/3/2007", "atTheMoney": false}

A_AAPL_2172007.json:

{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "30", "trade_date": "1/3/2007", "atTheMoney": false}
{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "35", "trade_date": "1/3/2007", "atTheMoney": true}
{"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "40", "trade_date": "1/3/2007", "atTheMoney": false}

At this point, the slowdown might be due to repeatedly opening files and seeking to the end of them to append.

Solution 2:

I think I might look at doing this in a more manual way.

Start with our modules:

import collections
import csv
import json

Then we can:

## ---------------------
## Gather the rows of data by key
## ---------------------
stage_1 = collections.defaultdict(list)
with open("strike.csv", "r") as file_in:
    reader = csv.reader(file_in)
    colnames = {v:i for i,v in enumerate(next(reader))}
    for row in reader:
        key = f"{row[colnames['ticker']]}_{row[colnames['expirDate']]}"
        stage_1[key].append(row)
## ---------------------

Now we have a dictionary with a key of your grouping id and values of the individual rows for the given key.

Now for each key in the above dictionary we will determine the atTheMoney status of all of the rows for that key, then write out a csv.

## ---------------------
## For each key find the minimum strike delta and update the values
## based on it. then write to output file
## ---------------------
for key, value in stage_1.items():
    ## --------------------
    ## find the min_strike_delta 
    ## --------------------
    min_strike_delta = float("inf")
    for row in value:
        row.append(abs(float(row[colnames['stkPx']]) - float(row[colnames['strike']])))
        if min_strike_delta > row[-1]:
            min_strike_delta = row[-1]
    ## --------------------

    ## --------------------
    ## Cast value to a list of dictionaries rather than a list of lists.
    ## This is a little heavier so let's just do it part by part rather than as part of stage 1
    ## --------------------
    value = [
        {
            **{cname: row[cvalue] for cname, cvalue in colnames.items()},
            **{"atTheMoney": row[-1] == min_strike_delta}
        }
        for row in value
    ]
    ## --------------------

    ## --------------------
    ## Create a result file name.
    ## This is just a simple one for testing and yours is more complicated
    ## --------------------
    ticker, expirDate = key.split("_")
    filename = f"{ticker[0]}_{ticker}_{expirDate.replace('/', '')}.json"
    ## --------------------

    ## --------------------
    ## Overwrite the list to output file.
    ## --------------------
    #with open(filename, "w", newline="") as file_out:
    #    json.dump(value, file_out)
    ## --------------------

    ## --------------------
    ## To append rows to a prior file
    ## You probably don't want an actual json array
    ## you probably want rows of json records...
    ## --------------------
    with open(filename, "a+") as file_out:
        for row in value:
            file_out.write(json.dumps(row) + ",\n")
    ## --------------------
## ---------------------

with your test data, this produces a couple of files:

A_AAPL_1202007.json:

[
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "32.5", "trade_date": "1/3/2007", "atTheMoney": false},
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "35", "trade_date": "1/3/2007", "atTheMoney": true},
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "1/20/2007", "strike": "37.5", "trade_date": "1/3/2007", "atTheMoney": false}
]

A_AAPL_2172007.json

[
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "30", "trade_date": "1/3/2007", "atTheMoney": false},
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "35", "trade_date": "1/3/2007", "atTheMoney": true},
    {"ticker": "AAPL", "stkPx": "34.3", "expirDate": "2/17/2007", "strike": "40", "trade_date": "1/3/2007", "atTheMoney": false}
]

Hopefully this is faster than what you have now.