Google BQ - how to upsert existing data in tables?
I'm using Python client library for loading data in BigQuery tables. I need to update some changed rows in those tables. But I couldn't figure out how to correctly update them? I want some similar UPSERT
function - insert row only if its not exists, otherwise - update existing row.
Is it the right way to use a special field with checksum in tables (and compare sum in loading process)? If there is a good idea, how to solve this with Python client? (As I know, it can't update existing data)
Please explain me, what's the best practice?
Solution 1:
BigQuery now supports MERGE
, which can combine both an INSERT
and UPDATE
in one atomic operation i.e. UPSERT
.
Using Mikhail's example tables, it would look like:
MERGE merge_example.table_data T
USING merge_example.table_changes S
ON T.id = S.id
WHEN MATCHED THEN
UPDATE SET value = s.value
WHEN NOT MATCHED THEN
INSERT (id, value) VALUES(id, value)
See here.
Solution 2:
BigQuery is by design append-only preferred. That means that you better let duplicate rows from the same entity in the table and write your queries to always read most recent row.
Updating rows as you know in transactional tables possible with limitations. Your project can make up to 1,500 table operations per table per day. That's very limited and their purpose is totally different. 1 operation can touch multiple rows, but still 1500 operation per table per day. So if you want individual updates to rows, that's not working out as it limits to 1500 rows per day.
Since BQ is used as data lake, you should just stream new rows every time the user eg: updates their profile. You will end up having from 20 saves 20 rows for the same user. Later you can rematerilize your table to have unique rows by removing duplicate data.
See the most question for the later: BigQuery - DELETE statement to remove duplicates
Solution 3:
BigQuery does not support UPSERT
directly, but if you really need it - you can use UPDATE
and INSERT
one after another to achieve the same. See below simplified example
Assume you have two tables as below - one that holds your data (yourproject.yourdadtaset.table_data
) and another (yourproject.yourdadtaset.table_changes
) that contains your changes that you want to apply to first table
table_data
table_changes
Now below queries run one after another do the trick:
Update Query:
#standardSQL
UPDATE `yourproject.yourdadtaset.table_data` t
SET t.value = s.value
FROM `yourproject.yourdadtaset.table_changes` s
WHERE t.id = s.id
result will be
And now - INSERT Query
#standardSQL
INSERT `yourproject.yourdadtaset.table_data` (id, value)
SELECT id, value
FROM `yourproject.yourdadtaset.table_changes`
WHERE NOT id IN (SELECT id FROM `yourproject.yourdadtaset.table_data`)
with result as (and we are done here)
Hope above example simple and clear, so you can apply it in your case
Solution 4:
I maybe late for this but you can perform upsert in BigQuery using Dataflow/Apache Beam. You can do a CoGroupByKey to get values sharing common key from both data sources (one being the destination table) and update the data read from the destination BQ table. Finally load the data in truncate load mode. Hope this helps.
This way you avoid all the quota limits in BigQuery and do all updation in Dataflow.
An example of it using Java. You must be able to easily convert it to Python:
// Each shares a common key ("K").
PCollection<KV<K, V1>> source = p.apply(...Read source...);
PCollection<KV<K, V2>> bigQuery = BigQueryIO.readTableRows().from(...table-id...);
//You can also use read() instead of readTableRows() and fromQuery() instead of from() depending on your use-case.
// Create tuple tags for the value types in each collection.
final TupleTag<V1> t1 = new TupleTag<V1>();
final TupleTag<V2> t2 = new TupleTag<V2>();
//Merge collection values into a CoGbkResult collection
PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
KeyedPCollectionTuple.of(t1, pt1)
.and(t2, pt2)
.apply(CoGroupByKey.<K>create());
// Access results and do something.
PCollection<TableRow> finalResultCollection =
coGbkResultCollection.apply(ParDo.of(
new DoFn<KV<K, CoGbkResult>, T>() {
@Override
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
// Get all collection 1 values
Iterable<V1> pt1Vals = e.getValue().getAll(t1);
// Now get collection 2 values
// This must always be unique as you are upserting the table. Hence used getOnly()...
V2 pt2Val = e.getValue().getOnly(t2);
if(pt1Vals is null){ //no matching key
output V2 value in PCollection
}
else if(V2 is null){ // pt1Vals are latest
output latest/distinct value from pt1Vals to PCollection
}
else if(both are not null){ // pt1Vals are latest
output latest/distinct value from pt1Vals to PCollection and
don't output anything from V2
}
c.output(elements);
}
}));
finalResultCollection.apply(BigQueryIO.writeTableRows()
.to("my-project:output.output_table")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));