Read compressed JSON file from s3 in chunks and write each chunk to parquet

I'm trying to get a 20GB JSON gzipped file from s3 in chunks, decompress each chunk, convert the chunk to parquet, and then save it to another bucket.

I have the following code that works well with smaller files, however, when I try to perform this with the 20GB files I get the following traceback below. I'm not entirely sure how to resolve this.

Traceback (most recent call last):
    data = gzip.decompress(chunk) "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 548, in decompress
    return f.read()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 292, in read
    return self._buffer.read(size)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 498, in read
    raise EOFError("Compressed file ended before the "
EOFError: Compressed file ended before the end-of-stream marker was reached
Traceback (most recent call last):
  File "/Users/samlambert/Desktop/Repos/hdns/src/collection/data_converter.py", line 33, in <module>
    data = gzip.decompress(chunk)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 548, in decompress
    return f.read()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 292, in read
    return self._buffer.read(size)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 498, in read
    raise EOFError("Compressed file ended before the "
EOFError: Compressed file ended before the end-of-stream marker was reached

with S3() as s3:
    # get s3 object (20GB gzipped JSON file)
    obj = s3.s3_client.get_object(Bucket=input_bucket, Key=object_key)

    # Separate the file into chunks
    for chunk in obj['Body'].iter_chunks():
        
        decompress and decode
        data = gzip.decompress(chunk)
        text = data.decode('utf-8')

        # At this point chunk is one string with multiple lines of JSON
        # We convert each line into it's own JSON, then append json_data
        data_in_list = text.splitlines()
        json_data = []
        for data in data_in_list:
            string_to_json = json.loads(data)
            json_data.append(string_to_json)

        # Convert list of JSON objs into one dataframe
        df = pd.DataFrame(json_data)
        
        convert df to parquet and save to s3
        parquet_filename = 'df.parquet.gzip'
        df.to_parquet(parquet_filename, index=False)
        s3_url = 's3://mybucket/parquet_test/bucket.parquet.gzip'
        df.to_parquet(s3_url, compression='gzip')

Edit:

So, I think with Pandas I can do this a lot more directly:

with S3Connect() as s3:
    obj = s3.s3_client.get_object(Bucket=input_bucket, Key=object_key)
    count = 0
    for df in pd.read_json("s3://path/to/file.json.gz", lines=True, chunksize=50000000):
        count += 1
        parquet_filename = f'df_{str(count)}.parquet.gzip'
        df.to_parquet(parquet_filename, index=False)
        s3_url = f's3://parquet_test/{parquet_filename}'
        df.to_parquet(s3_url, compression='gzip')
        # The file is also being saved locally, which I don't want
        # so I'm just doing this to remove it
        os.remove(parquet_filename)

Solution 1:

Gzip needs to know when the end of the file occurs, so by passing it small chunks of data at a time, you're in effect telling it these are small gzip files, which it's failing on since they end early. gzip.open, on the other hand, can be passed a file, or file-like object, and it will use the returns from read to know when the file ends.

So, you can simply pass it the output object from get_object, and let it request data from the S3 object as it needs it, and understand when the gzip file ends.

s3 = boto3.client('s3')
s3_object = s3.get_object(Bucket=input_bucket, Key=object_key)['Body']
with gzip.open(s3_object, "r") as f:
    for row in f:
        row = json.loads(row)
        # TODO: Handle each row as it comes in...
        print(row)