Google BQ - how to upsert existing data in tables?

I'm using Python client library for loading data in BigQuery tables. I need to update some changed rows in those tables. But I couldn't figure out how to correctly update them? I want some similar UPSERT function - insert row only if its not exists, otherwise - update existing row.

Is it the right way to use a special field with checksum in tables (and compare sum in loading process)? If there is a good idea, how to solve this with Python client? (As I know, it can't update existing data)

Please explain me, what's the best practice?


Solution 1:

BigQuery now supports MERGE, which can combine both an INSERT and UPDATE in one atomic operation i.e. UPSERT.

Using Mikhail's example tables, it would look like:

MERGE merge_example.table_data T
USING merge_example.table_changes S
ON T.id = S.id
WHEN MATCHED THEN
  UPDATE SET value = s.value
WHEN NOT MATCHED THEN
  INSERT (id, value) VALUES(id, value)

enter image description here

See here.

Solution 2:

BigQuery is by design append-only preferred. That means that you better let duplicate rows from the same entity in the table and write your queries to always read most recent row.

Updating rows as you know in transactional tables possible with limitations. Your project can make up to 1,500 table operations per table per day. That's very limited and their purpose is totally different. 1 operation can touch multiple rows, but still 1500 operation per table per day. So if you want individual updates to rows, that's not working out as it limits to 1500 rows per day.

Since BQ is used as data lake, you should just stream new rows every time the user eg: updates their profile. You will end up having from 20 saves 20 rows for the same user. Later you can rematerilize your table to have unique rows by removing duplicate data.

See the most question for the later: BigQuery - DELETE statement to remove duplicates

Solution 3:

BigQuery does not support UPSERT directly, but if you really need it - you can use UPDATE and INSERT one after another to achieve the same. See below simplified example

Assume you have two tables as below - one that holds your data (yourproject.yourdadtaset.table_data) and another (yourproject.yourdadtaset.table_changes) that contains your changes that you want to apply to first table

table_data
enter image description here

table_changes
enter image description here

Now below queries run one after another do the trick:

Update Query:

#standardSQL
UPDATE `yourproject.yourdadtaset.table_data` t
SET t.value = s.value
FROM `yourproject.yourdadtaset.table_changes` s
WHERE t.id = s.id

result will be
enter image description here

And now - INSERT Query

#standardSQL
INSERT `yourproject.yourdadtaset.table_data` (id, value)
SELECT id, value
FROM `yourproject.yourdadtaset.table_changes`
WHERE NOT id IN (SELECT id FROM `yourproject.yourdadtaset.table_data`)

with result as (and we are done here)
enter image description here

Hope above example simple and clear, so you can apply it in your case

Solution 4:

I maybe late for this but you can perform upsert in BigQuery using Dataflow/Apache Beam. You can do a CoGroupByKey to get values sharing common key from both data sources (one being the destination table) and update the data read from the destination BQ table. Finally load the data in truncate load mode. Hope this helps.

This way you avoid all the quota limits in BigQuery and do all updation in Dataflow.

An example of it using Java. You must be able to easily convert it to Python:

      // Each shares a common key ("K").
      PCollection<KV<K, V1>> source = p.apply(...Read source...);
      PCollection<KV<K, V2>> bigQuery = BigQueryIO.readTableRows().from(...table-id...);
    //You can also use read() instead of readTableRows() and fromQuery() instead of from() depending on your use-case.

      // Create tuple tags for the value types in each collection.
      final TupleTag<V1> t1 = new TupleTag<V1>();
      final TupleTag<V2> t2 = new TupleTag<V2>();

      //Merge collection values into a CoGbkResult collection
      PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
        KeyedPCollectionTuple.of(t1, pt1)
                             .and(t2, pt2)
                             .apply(CoGroupByKey.<K>create());

      // Access results and do something.
      PCollection<TableRow> finalResultCollection =
        coGbkResultCollection.apply(ParDo.of(
          new DoFn<KV<K, CoGbkResult>, T>() {
            @Override
            public void processElement(ProcessContext c) {
              KV<K, CoGbkResult> e = c.element();
              // Get all collection 1 values
              Iterable<V1> pt1Vals = e.getValue().getAll(t1);
              // Now get collection 2 values

    // This must always be unique as you are upserting the table. Hence used getOnly()...
              V2 pt2Val = e.getValue().getOnly(t2);

              if(pt1Vals is null){ //no matching key
                output V2 value in PCollection
               }
              else if(V2 is null){ // pt1Vals are latest
                output latest/distinct value from pt1Vals to PCollection
               }
               else if(both are not null){ // pt1Vals are latest
                output latest/distinct value from pt1Vals to PCollection and 
                 don't output anything from V2
               }

              c.output(elements);
            }
          }));

finalResultCollection.apply(BigQueryIO.writeTableRows()
.to("my-project:output.output_table")
      .withSchema(schema)
   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));