saving dataset to cassandra using java spark

I'm trying to save a dataset to cassandra db using java spark. I'm able to read data into dataset successfully using the below code

Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();

But when I try to write dataset I'm getting IOException: Could not load or find table, found similar tables in keyspace

Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();

I'm setting host and port in sparksession The thing is I'm able to write in overwrite and append modes but not able to create table

Versions which I'm using are below: spark java 2.0 spark cassandra connector 2.3

Tried with different jar versions but nothing worked I have also gone through different stack overflow and github links

Any help is greatly appreciated.


The write operation in Spark doesn't have a mode that will automatically create a table for you - there are multiple reasons for that. One of them is that you need to define a primary key for your table, otherwise, you may just overwrite data if you set incorrect primary key. Because of this, Spark Cassandra Connector provides a separate method to create a table based on your dataframe structure, but you need to provide a list of partition & clustering key columns. In Java it will look as following (full code is here):

DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("part")).seq());
Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("clust", "col2")).seq());
CassandraConnector connector = new CassandraConnector(
          CassandraConnectorConf.apply(spark.sparkContext().getConf()));
dfFunctions.createCassandraTable("test", "widerows6",
          partitionSeqlist, clusteringSeqlist, connector);

and then you can write data as usual:

dataset.write()
   .format("org.apache.spark.sql.cassandra")
   .options(ImmutableMap.of("table", "widerows6", "keyspace", "test"))
   .save();