I've been searching for a while if there is any way to use a Scala class in Pyspark, and I haven't found any documentation nor guide about this subject.

Let's say I create a simple class in Scala that uses some libraries of apache-spark, something like:

class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._

    df.select(col(column))
  }
}
  • Is there any possible way to use this class in Pyspark?
  • Is it too tough?
  • Do I have to create a .py file?
  • Is there any guide that shows how to do that?

By the way I also looked at the spark code and I felt a bit lost, and I was incapable of replicating their functionality for my own purpose.


Yes it is possible although can be far from trivial. Typically you want a Java (friendly) wrapper so you don't have to deal with Scala features which cannot be easily expressed using plain Java and as a result don't play well with Py4J gateway.

Assuming your class is int the package com.example and have Python DataFrame called df

df = ... # Python DataFrame

you'll have to:

  1. Build a jar using your favorite build tool.

  2. Include it in the driver classpath for example using --driver-class-path argument for PySpark shell / spark-submit. Depending on the exact code you may have to pass it using --jars as well

  3. Extract JVM instance from a Python SparkContext instance:

    jvm = sc._jvm
    
  4. Extract Scala SQLContext from a SQLContext instance:

    ssqlContext = sqlContext._ssql_ctx
    
  5. Extract Java DataFrame from the df:

    jdf = df._jdf
    
  6. Create new instance of SimpleClass:

    simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
    
  7. Callexe method and wrap the result using Python DataFrame:

    from pyspark.sql import DataFrame
    
    DataFrame(simpleObject.exe(), ssqlContext)
    

The result should be a valid PySpark DataFrame. You can of course combine all the steps into a single call.

Important: This approach is possible only if Python code is executed solely on the driver. It cannot be used inside Python action or transformation. See How to use Java/Scala function from an action or a transformation? for details.