How to make good reproducible Apache Spark examples

Solution 1:

Provide small sample data, that can be easily recreated.

At the very least, posters should provide a couple of rows and columns on their dataframe and code that can be used to easily create it. By easy, I mean cut and paste. Make it as small as possible to demonstrate your problem.


I have the following dataframe:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

which can be created with this code:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

Show the desired output.

Ask your specific question and show us your desired output.


How can I create a new column 'is_divisible' that has the value 'yes' if the day of month of the 'date' plus 7 days is divisible by the value in column'X', and 'no' otherwise?

Desired output:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

Explain how to get your output.

Explain, in great detail, how you get your desired output. It helps to show an example calculation.


For instance in row 1, the X = 1 and date = 2017-01-01. Adding 7 days to date yields 2017-01-08. The day of the month is 8 and since 8 is divisible by 1, the answer is 'yes'.

Likewise, for the last row X = 7 and the date = 2017-01-04. Adding 7 to the date yields 11 as the day of the month. Since 11 % 7 is not 0, the answer is 'no'.


Share your existing code.

Show us what you have done or tried, including all* of the code even if it does not work. Tell us where you are getting stuck and if you receive an error, please include the error message.

(*You can leave out the code to create the spark context, but you should include all imports.)


I know how to add a new column that is date plus 7 days but I'm having trouble getting the day of the month as an integer.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

Include versions, imports, and use syntax highlighting

  • Full details in this answer written by desertnaut.

For performance tuning posts, include the execution plan

  • Full details in this answer written by Alper t. Turker.
  • It helps to use standardized names for contexts.

Parsing spark output files

  • MaxU provided useful code in this answer to help parse Spark output files into a DataFrame.

Other notes.

  • Be sure to read how to ask and How to create a Minimal, Complete, and Verifiable example first.
  • Read the other answers to this question, which are linked above.
  • Have a good, descriptive title.
  • Be polite. People on SO are volunteers, so ask nicely.

Solution 2:

Performance tuning

If the question is related to performance tuning please include following information.

Execution Plan

It is best to include extended execution plan. In Python:

df.explain(True) 

In Scala:

df.explain(true)

or extended execution plan with statistics. In Python:

print(df._jdf.queryExecution().stringWithStats())

in Scala:

df.queryExecution.stringWithStats

Mode and cluster information

  • mode - local, client, `cluster.
  • Cluster manager (if applicable) - none (local mode), standalone, YARN, Mesos, Kubernetes.
  • Basic configuration information (number of cores, executor memory).

Timing information

slow is relative, especially when you port non-distributed application or you expect low latency. Exact timings for different tasks and stages, can be retrieved from Spark UI (sc.uiWebUrl) jobs or Spark REST UI.

Use standarized names for contexts

Using established names for each context allows us to quickly reproduce the problem.

  • sc - for SparkContext.
  • sqlContext - for SQLContext.
  • spark - for SparkSession.

Provide type information (Scala)

Powerful type inference is one of the most useful features of Scala, but it makes hard to analyze code taken out of context. Even if type is obvious from the context it is better to annotate the variables. Prefer

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

over

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

Commonly used tools can assist you:

  • spark-shell / Scala shell

    use :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
    
  • InteliJ Idea

    Use Alt + =

Solution 3:

Some additional suggestions to what has been already offered:

Include your Spark version

Spark is still evolving, although not so rapidly as in the days of 1.x. It is always (but especially if you are using a somewhat older version) a good idea to include your working version. Personally, I always start my answers with:

spark.version
# u'2.2.0'

or

sc.version
# u'2.2.0'

Including your Python version, too, is never a bad idea.


Include all your imports

If your question is not strictly about Spark SQL & dataframes, e.g. if you intend to use your dataframe in some machine learning operation, be explicit about your imports - see this question, where the imports were added in the OP only after extensive exchange in the (now removed) comments (and turned out that these wrong imports were the root cause of the problem).

Why is this necessary? Because, for example, this LDA

from pyspark.mllib.clustering import LDA

is different from this LDA:

from pyspark.ml.clustering import LDA

the first coming from the old, RDD-based API (formerly Spark MLlib), while the second one from the new, dataframe-based API (Spark ML).


Include code highlighting

OK, I'll confess this is subjective: I believe that PySpark questions should not be tagged as python by default; the thing is, python tag gives automatically code highlighting (and I believe this is a main reason for those who use it for PySpark questions). Anyway, if you happen to agree, and you still would like a nice, highlighted code, simply include the relevant markdown directive:

<!-- language-all: lang-python -->

somewhere in your post, before your first code snippet.

[UPDATE: I have requested automatic syntax highlighting for pyspark and sparkr tags, which has been implemented indeed]

Solution 4:

This small helper function might help to parse Spark output files into DataFrame:

PySpark:

from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])

Scala:

// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}

Usage:

df = read_spark_output("file:///tmp/spark.out")

PS: For pyspark, eqNullSafe is available from spark 2.3.