Writing different values to different BigQuery tables in Apache Beam

Solution 1:

This is possible using a feature recently added to BigQueryIO in Apache Beam.

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

Depending on whether the input PCollection<Foo> is bounded or unbounded, under the hood this will either create multiple BigQuery import jobs (one or more per table depending on amount of data), or it will use the BigQuery streaming inserts API.

The most flexible version of the API uses DynamicDestinations, which allows you to write different values to different tables with different schemas, and even allows you to use side inputs from the rest of the pipeline in all of these computations.

Additionally, BigQueryIO has been refactored into a number of reusable transforms that you can yourself combine to implement more complex use cases - see files in the source directory.

This feature will be included in the first stable release of Apache Beam and into the next release of Dataflow SDK (which will be based on the first stable release of Apache Beam). Right now you can use this by running your pipeline against a snapshot of Beam at HEAD from github.

Solution 2:

As of Beam 2.12.0, this feature is available in the Python SDK as well. It is marked as experimental, so you will have to pass --experiments use_beam_bq_sink to enable it. You'd do something like so:

def get_table_name(element):
  if meets_some_condition(element):
    return 'mytablename1'
  else:
    return 'mytablename2'


p = beam.Pipeline(...)

my_input_pcoll = p | ReadInMyPCollection()

my_input_pcoll | beam.io.gcp.bigquery.WriteToBigQuery(table=get_table_name)

The new sink supports a number of other options, which you can review in the pydoc