Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)?

Solution 1:

kafka data source is an external module and is not available to Spark applications by default.

You have to define it as a dependency in your pom.xml (as you have done), but that's just the very first step to have it in your Spark application.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>

With that dependency you have to decide whether you want to create a so-called uber-jar that would have all the dependencies bundled altogether (that results in a fairly big jar file and makes the submission time longer) or use --packages (or less flexible --jars) option to add the dependency at spark-submit time.

(There are other options like storing the required jars on Hadoop HDFS or using Hadoop distribution-specific ways of defining dependencies for Spark applications, but let's keep things simple)

I'd recommend using --packages first and only when it works consider the other options.

Use spark-submit --packages to include the spark-sql-kafka-0-10 module as follows.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Include the other command-line options as you wish.

Uber-Jar Approach

Including all the dependencies in a so-called uber-jar may not always work due to how META-INF directories are handled.

For kafka data source to work (and other data sources in general) you have to ensure that META-INF/services/org.apache.spark.sql.sources.DataSourceRegister of all the data sources are merged (not replace or first or whatever strategy you use).

kafka data sources uses its own META-INF/services/org.apache.spark.sql.sources.DataSourceRegister that registers org.apache.spark.sql.kafka010.KafkaSourceProvider as the data source provider for kafka format.

Solution 2:

The top answer is correct this solved the issue for me:

assemblyMergeStrategy in assembly := {
  case "reference.conf" => MergeStrategy.concat
  case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
  case PathList("META-INF", xs@_*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

Solution 3:

Even I had similar issue, issue started when we upgraded the Cloudera-Spark version from 2.2 --> 2.3.

Issue was: my uber jar META-INF/serives/org.apache.spark.sql.sources.DataSourceRegister was getting overwritten by similar file which is present in some other jars. Hence it was not able to find the KafkaConsumer entry in 'DataSourceRegister' file.

Resolution: modifying the POM.xml helped me.

<configuration>
                           <transformers>
                                <transformer
                                     implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                     <resource>
                                           META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
                                     </resource>
                                </transformer>
                           </transformers>

Solution 4:

For uber-jar, adding ServicesResourceTransformer to shade-plugin works for me.

<transformers>
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>