saving dataset to cassandra using java spark
I'm trying to save a dataset to cassandra db using java spark. I'm able to read data into dataset successfully using the below code
Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();
But when I try to write dataset I'm getting IOException: Could not load or find table, found similar tables in keyspace
Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();
I'm setting host and port in sparksession The thing is I'm able to write in overwrite and append modes but not able to create table
Versions which I'm using are below: spark java 2.0 spark cassandra connector 2.3
Tried with different jar versions but nothing worked I have also gone through different stack overflow and github links
Any help is greatly appreciated.
The write
operation in Spark doesn't have a mode that will automatically create a table for you - there are multiple reasons for that. One of them is that you need to define a primary key for your table, otherwise, you may just overwrite data if you set incorrect primary key. Because of this, Spark Cassandra Connector provides a separate method to create a table based on your dataframe structure, but you need to provide a list of partition & clustering key columns. In Java it will look as following (full code is here):
DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
Arrays.asList("part")).seq());
Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
Arrays.asList("clust", "col2")).seq());
CassandraConnector connector = new CassandraConnector(
CassandraConnectorConf.apply(spark.sparkContext().getConf()));
dfFunctions.createCassandraTable("test", "widerows6",
partitionSeqlist, clusteringSeqlist, connector);
and then you can write data as usual:
dataset.write()
.format("org.apache.spark.sql.cassandra")
.options(ImmutableMap.of("table", "widerows6", "keyspace", "test"))
.save();