How to run ETL pipeline on Databricks (Python)
Solution 1:
I think you need to follow the approach for Dynamo DB as Sink as per https://docs.databricks.com/spark/latest/structured-streaming/examples.html#write-to-amazon-dynamodb-using-foreach-in-scala-and-python using a foreach
.
From the DB manuals to get you going - focus on foreach
:
from pyspark.sql.functions import *
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreach(SendToDynamoDB_ForeachWriter())
#.foreach(sendToDynamoDB_simple) // alternative, use one or the other
.outputMode("update")
.start()'
)
streamingDF.writeStream.foreach() allows you to write the output of a streaming query to arbitrary locations. That's the clue here.