Error trying to write CSV file to Google Cloud Storage from Dataflow pipeline

I'm working on building a Dataflow pipeline that reads a CSV file (containing 250,000 rows) from my Cloud Storage bucket, modifies the value of each row and then writes the modified contents to a new CSV in the same bucket. With the code below I'm able to read and modify the contents of the original file, but when I attempt to write the contents of the new file in GCS I get the following error:

google.api_core.exceptions.TooManyRequests: 429 POST https://storage.googleapis.com/upload/storage/v1/b/my-bucket/o?uploadType=multipart: {
  "error": {
    "code": 429,
    "message": "The rate of change requests to the object my-bucket/product-codes/URL_test_codes.csv exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
    "errors": [
      {
        "message": "The rate of change requests to the object my-bucket/product-codes/URL_test_codes.csv exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
        "domain": "usageLimits",
        "reason": "rateLimitExceeded"
      }
    ]
  }
}
: ('Request failed with status code', 429, 'Expected one of', <HTTPStatus.OK: 200>) [while running 'Store Output File']

My code in Dataflow:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import traceback
import sys
import pandas as pd
from cryptography.fernet import Fernet
import google.auth
from google.cloud import storage

fernet_secret = 'aD4t9MlsHLdHyuFKhoyhy9_eLKDfe8eyVSD3tu8KzoP='
bucket = 'my-bucket'
inputFile = f'gs://{bucket}/product-codes/test_codes.csv'
outputFile = 'product-codes/URL_test_codes.csv'

#Pipeline Logic
def product_codes_pipeline(project, env, region='us-central1'):
    options = PipelineOptions(
        streaming=False,
        project=project,
        region=region,
        staging_location="gs://my-bucket-dataflows/Templates/staging",
        temp_location="gs://my-bucket-dataflows/Templates/temp",
        template_location="gs://my-bucket-dataflows/Templates/Generate_Product_Codes.py",
        subnetwork='https://www.googleapis.com/compute/v1/projects/{}/regions/us-central1/subnetworks/{}-private'.format(project, env)
    )
    
    # Transform function
    def genURLs(code):
        f = Fernet(fernet_secret)
        encoded = code.encode()
        encrypted = f.encrypt(encoded)
        decrypted = f.decrypt(encrypted.decode().encode())
        decoded = decrypted.decode()
        if code != decoded:
            print(f'Error: Code {code} and decoded code {decoded} do not match')
            sys.exit(1)
        url = 'https://some-url.com/redeem/product-code=' + encrypted.decode()
        return url
    
    class WriteCSVFIle(beam.DoFn):
        def __init__(self, bucket_name):
            self.bucket_name = bucket_name

        def start_bundle(self):
            self.client = storage.Client()

        def process(self, urls):
            df = pd.DataFrame([urls], columns=['URL'])

            bucket = self.client.get_bucket(self.bucket_name)
            bucket.blob(f'{outputFile}').upload_from_string(df.to_csv(index=False), 'text/csv')
    
    
    # End function
    p = beam.Pipeline(options=options)
    (p | 'Read Input CSV' >> beam.io.ReadFromText(inputFile, skip_header_lines=1)
       | 'Map Codes' >> beam.Map(genURLs)
       | 'Store Output File' >> beam.ParDo(WriteCSVFIle(bucket)))

    p.run()

The code produces URL_test_codes.csv in my bucket, but the file only contains one row (not including the 'URL' header) which tells me that my code is writing/overwriting the file as it processes each row. Is there a way to bulk write the contents of the entire file instead of making a series of requests to update the file? I'm new to Python/Dataflow so any help is greatly appreciated.


Solution 1:

Let's point out the issues: the evident one is a quota issue from GCS side, reflected by the '429' error codes. But as you noted, this is derived from the inherent issue, which is more related to how you try to write your data to your blob.

Since a Beam Pipeline generates a Parallel Collection of elements, when you add elements to your PCollection, each pipeline step will be executed for each element, in other words, your ParDo function will try to write something to your output file once per element in your PCollection.

So, there are some issues with your WriteCSVFIle function. For example, in order to write your PCollection to GCS, it would be better to use a separate pipeline task focused on writing the whole PCollection, such as follows:

First, you can import this Function already included in Apache Beam:

from apache_beam.io import WriteToText

Then, you use it at the end of your pipeline:

| 'Write PCollection to Bucket' >> WriteToText('gs://{0}/{1}'.format(bucket_name, outputFile))

With this option, you don't need to create a storage client or reference a blob, the function just needs to receive the GCS URI where it would write the final result and you can adjust it according to the parameters you can find in the documentation.

With this, you only need to address the Dataframe created in your WriteCSVFIle function. Each pipeline step creates a new PCollection, so if a Dataframe-creator function should receive an element from a PCollection of URLs, then the new PCollection elements resulting from the Dataframe function will have 1 dataframe per url following your current logic, but since it seems you just want to write the results from genURLs considering that 'URL' is the only column in your dataframe, maybe going directly from genURLs to WriteToText can output what you're looking for.

Either way, you can adjust your pipeline accordingly, but at least with the WriteToText transform it would take care of writing your whole final PCollection to your Cloud Storage bucket.