Solution 1:

I think you need to follow the approach for Dynamo DB as Sink as per https://docs.databricks.com/spark/latest/structured-streaming/examples.html#write-to-amazon-dynamodb-using-foreach-in-scala-and-python using a foreach.

From the DB manuals to get you going - focus on foreach:

from pyspark.sql.functions import *

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
       .selectExpr("value % 10 as key")
       .groupBy("key")
       .count()
       .toDF("key", "count")
       .writeStream
       .foreach(SendToDynamoDB_ForeachWriter())
      #.foreach(sendToDynamoDB_simple)  // alternative, use one or the other
       .outputMode("update")
       .start()'
)

streamingDF.writeStream.foreach() allows you to write the output of a streaming query to arbitrary locations. That's the clue here.