Writing data from bigquery to csv is slow

I wrote code that behaves weird and slow and I can't understand why. What I'm trying to do is to download data from bigquery (using a query as an input) to a CSV file, then create a url link with this CSV so people can download it as a report. I'm trying to optimize the process of writing the CSV as it takes some time and have some weird behavior.

The code iterates over bigquery results and pass each result to a channel for future parsing/writing using golang encoding/csv package. This is the relevant parts with some debugging

func (s *Service) generateReportWorker(ctx context.Context, query, reportName string) error {
    it, err := s.bigqueryClient.Read(ctx, query)
    if err != nil {
        return err
    }
    filename := generateReportFilename(reportName)
    gcsObj := s.gcsClient.Bucket(s.config.GcsBucket).Object(filename)
    wc := gcsObj.NewWriter(ctx)
    wc.ContentType = "text/csv"
    wc.ContentDisposition = "attachment"

    csvWriter := csv.NewWriter(wc)

    var doneCount uint64

    go backgroundTimer(ctx, it.TotalRows, &doneCount)

    rowJobs := make(chan []bigquery.Value, it.TotalRows)
    workers := 10
    wg := sync.WaitGroup{}
    wg.Add(workers)

    // start wrokers pool
    for i := 0; i < workers; i++ {
        go func(c context.Context, num int) {
            defer wg.Done()
            for row := range rowJobs {
                records := make([]string, len(row))
                for j, r := range records {
                    records[j] = fmt.Sprintf("%v", r)
                }
                s.mu.Lock()
                start := time.Now()
                if err := csvWriter.Write(records); err != {
                    log.Errorf("Error writing row: %v", err)
                }
                if time.Since(start) > time.Second {
                    fmt.Printf("worker %d took %v\n", num, time.Since(start))
                }
                s.mu.Unlock()
                atomic.AddUint64(&doneCount, 1)
            }
        }(ctx, i)
    }

    // read results from bigquery and add to the pool
    for {
        var row []bigquery.Value
        if err := it.Next(&row); err != nil {
            if err == iterator.Done || err == context.DeadlineExceeded {
                break
            }
            log.Errorf("Error loading next row from BQ: %v", err)
        }
        rowJobs <- row
    }

    fmt.Println("***done loop!***")

    close(rowJobs)

    wg.Wait()

    csvWriter.Flush()
    wc.Close()

    url := fmt.Sprintf("%s/%s/%s", s.config.BaseURL s.config.GcsBucket, filename)

    /// ....

}

func backgroundTimer(ctx context.Context, total uint64, done *uint64) {
    ticker := time.NewTicker(10 * time.Second)
    go func() {
        for {
            select {
            case <-ctx.Done():
                ticker.Stop()
                return
            case _ = <-ticker.C:
                fmt.Printf("progress (%d,%d)\n", atomic.LoadUint64(done), total)
            }
        }
    }()
}

bigquery Read func

func (c *Client) Read(ctx context.Context, query string) (*bigquery.RowIterator, error)  {
    job, err := c.bigqueryClient.Query(query).Run(ctx)
    if err != nil {
        return nil, err
    }
    it, err := job.Read(ctx)
    if err != nil {
        return nil, err
    }
    return it, nil
}

I run this code with query that have about 400,000 rows. the query itself take around 10 seconds, but the whole process takes around 2 minutes The output:

progress (112346,392565)
progress (123631,392565)
***done loop!***
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
progress (123631,392565)
worker 3 took 1m16.728143875s
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
progress (247525,392565)
worker 3 took 1m13.525662666s
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
progress (370737,392565)
worker 4 took 1m17.576536375s
progress (392565,392565)

You can see that writing first 112346 rows was fast, then for some reason worker 3 took 1.16minutes (!!!) to write a single row, which cause the other workers to wait for the mutex to be released, and this happened again 2 more times, which caused the whole process to take more than 2 minutes to finish.

I'm not sure whats going and how can I debug this further, why I have this stalls in the execution?


Solution 1:

As suggested by @serge-v, you can write all the records to a local file and then transfer the file as a whole to GCS. To make the process happen in a shorter time span you can split the files into multiple chunks and can use this command : gsutil -m cp -j where

gsutil is used to access cloud storage from command line

-m is used to perform a parallel multi-threaded/multi-processing copy

cp is used to copy files

-j applies gzip transport encoding to any file upload. This also saves network bandwidth while leaving the data uncompressed in Cloud Storage.

To apply this command in your go Program you can refer to this Github link.

You could try implementing profiling in your Go program. Profiling will help you analyze the complexity. You can also find the time consumption in the program through profiling.

Since you are reading millions of rows from BigQuery you can try using the BigQuery Storage API. It Provides faster access to BigQuery-managed Storage than Bulk data export. Using BigQuery Storage API rather than the iterators that you are using in Go program can make the process faster.

For more reference you can also look into the Query Optimization techniques provided by BigQuery.