Read compressed JSON file from s3 in chunks and write each chunk to parquet
I'm trying to get a 20GB JSON gzipped file from s3 in chunks, decompress each chunk, convert the chunk to parquet, and then save it to another bucket.
I have the following code that works well with smaller files, however, when I try to perform this with the 20GB files I get the following traceback below. I'm not entirely sure how to resolve this.
Traceback (most recent call last):
data = gzip.decompress(chunk) "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 548, in decompress
return f.read()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 292, in read
return self._buffer.read(size)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 498, in read
raise EOFError("Compressed file ended before the "
EOFError: Compressed file ended before the end-of-stream marker was reached
Traceback (most recent call last):
File "/Users/samlambert/Desktop/Repos/hdns/src/collection/data_converter.py", line 33, in <module>
data = gzip.decompress(chunk)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 548, in decompress
return f.read()
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 292, in read
return self._buffer.read(size)
File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/gzip.py", line 498, in read
raise EOFError("Compressed file ended before the "
EOFError: Compressed file ended before the end-of-stream marker was reached
with S3() as s3:
# get s3 object (20GB gzipped JSON file)
obj = s3.s3_client.get_object(Bucket=input_bucket, Key=object_key)
# Separate the file into chunks
for chunk in obj['Body'].iter_chunks():
decompress and decode
data = gzip.decompress(chunk)
text = data.decode('utf-8')
# At this point chunk is one string with multiple lines of JSON
# We convert each line into it's own JSON, then append json_data
data_in_list = text.splitlines()
json_data = []
for data in data_in_list:
string_to_json = json.loads(data)
json_data.append(string_to_json)
# Convert list of JSON objs into one dataframe
df = pd.DataFrame(json_data)
convert df to parquet and save to s3
parquet_filename = 'df.parquet.gzip'
df.to_parquet(parquet_filename, index=False)
s3_url = 's3://mybucket/parquet_test/bucket.parquet.gzip'
df.to_parquet(s3_url, compression='gzip')
Edit:
So, I think with Pandas I can do this a lot more directly:
with S3Connect() as s3:
obj = s3.s3_client.get_object(Bucket=input_bucket, Key=object_key)
count = 0
for df in pd.read_json("s3://path/to/file.json.gz", lines=True, chunksize=50000000):
count += 1
parquet_filename = f'df_{str(count)}.parquet.gzip'
df.to_parquet(parquet_filename, index=False)
s3_url = f's3://parquet_test/{parquet_filename}'
df.to_parquet(s3_url, compression='gzip')
# The file is also being saved locally, which I don't want
# so I'm just doing this to remove it
os.remove(parquet_filename)
Solution 1:
Gzip needs to know when the end of the file occurs, so by passing it small chunks of data at a time, you're in effect telling it these are small gzip files, which it's failing on since they end early. gzip.open
, on the other hand, can be passed a file, or file-like object, and it will use the returns from read
to know when the file ends.
So, you can simply pass it the output object from get_object
, and let it request data from the S3 object as it needs it, and understand when the gzip file ends.
s3 = boto3.client('s3')
s3_object = s3.get_object(Bucket=input_bucket, Key=object_key)['Body']
with gzip.open(s3_object, "r") as f:
for row in f:
row = json.loads(row)
# TODO: Handle each row as it comes in...
print(row)