Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)?

Solution 1:

kafka data source is an external module and is not available to Spark applications by default.

You have to define it as a dependency in your pom.xml (as you have done), but that's just the very first step to have it in your Spark application.


With that dependency you have to decide whether you want to create a so-called uber-jar that would have all the dependencies bundled altogether (that results in a fairly big jar file and makes the submission time longer) or use --packages (or less flexible --jars) option to add the dependency at spark-submit time.

(There are other options like storing the required jars on Hadoop HDFS or using Hadoop distribution-specific ways of defining dependencies for Spark applications, but let's keep things simple)

I'd recommend using --packages first and only when it works consider the other options.

Use spark-submit --packages to include the spark-sql-kafka-0-10 module as follows.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Include the other command-line options as you wish.

Uber-Jar Approach

Including all the dependencies in a so-called uber-jar may not always work due to how META-INF directories are handled.

For kafka data source to work (and other data sources in general) you have to ensure that META-INF/services/org.apache.spark.sql.sources.DataSourceRegister of all the data sources are merged (not replace or first or whatever strategy you use).

kafka data sources uses its own META-INF/services/org.apache.spark.sql.sources.DataSourceRegister that registers org.apache.spark.sql.kafka010.KafkaSourceProvider as the data source provider for kafka format.

Solution 2:

The top answer is correct this solved the issue for me:

assemblyMergeStrategy in assembly := {
  case "reference.conf" => MergeStrategy.concat
  case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
  case PathList("META-INF", xs@_*) => MergeStrategy.discard
  case _ => MergeStrategy.first

Solution 3:

Even I had similar issue, issue started when we upgraded the Cloudera-Spark version from 2.2 --> 2.3.

Issue was: my uber jar META-INF/serives/org.apache.spark.sql.sources.DataSourceRegister was getting overwritten by similar file which is present in some other jars. Hence it was not able to find the KafkaConsumer entry in 'DataSourceRegister' file.

Resolution: modifying the POM.xml helped me.


Solution 4:

For uber-jar, adding ServicesResourceTransformer to shade-plugin works for me.

    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>