How to read records in JSON format from Kafka using Structured Streaming?
I am trying to use structured streaming approach using Spark-Streaming based on DataFrame/Dataset API to load a stream of data from Kafka.
I use:
- Spark 2.10
- Kafka 0.10
- spark-sql-kafka-0-10
Spark Kafka DataSource has defined underlying schema:
|key|value|topic|partition|offset|timestamp|timestampType|
My data come in json format and they are stored in the value column. I am looking for a way how to extract underlying schema from value column and update received dataframe to columns stored in value? I tried the approach below but it does not work:
val columns = Array("column1", "column2") // column names
val rawKafkaDF = sparkSession.sqlContext.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe",topic)
.load()
val columnsToSelect = columns.map( x => new Column("value." + x))
val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)
// some analytics using stream dataframe kafkaDF
val query = kafkaDF.writeStream.format("console").start()
query.awaitTermination()
Here I am getting Exception org.apache.spark.sql.AnalysisException: Can't extract value from value#337;
because in time of creation of the stream, values inside are not known...
Do you have any suggestions?
Solution 1:
From the Spark perspective value
is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.
If data is serialized as a JSON string you have two options. You can cast
value
to StringType
and use from_json
and provide a schema:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
val schema: StructType = StructType(Seq(
StructField("column1", ???),
StructField("column2", ???)
))
rawKafkaDF.select(from_json($"value".cast(StringType), schema))
or cast
to StringType
, extract fields by path using get_json_object
:
import org.apache.spark.sql.functions.get_json_object
val columns: Seq[String] = ???
val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))
rawKafkaDF.select(exprs: _*)
and cast
later to the desired types.