How to make good reproducible Apache Spark examples
Solution 1:
Provide small sample data, that can be easily recreated.
At the very least, posters should provide a couple of rows and columns on their dataframe and code that can be used to easily create it. By easy, I mean cut and paste. Make it as small as possible to demonstrate your problem.
I have the following dataframe:
+-----+---+-----+----------+
|index| X|label| date|
+-----+---+-----+----------+
| 1| 1| A|2017-01-01|
| 2| 3| B|2017-01-02|
| 3| 5| A|2017-01-03|
| 4| 7| B|2017-01-04|
+-----+---+-----+----------+
which can be created with this code:
df = sqlCtx.createDataFrame(
[
(1, 1, 'A', '2017-01-01'),
(2, 3, 'B', '2017-01-02'),
(3, 5, 'A', '2017-01-03'),
(4, 7, 'B', '2017-01-04')
],
('index', 'X', 'label', 'date')
)
Show the desired output.
Ask your specific question and show us your desired output.
How can I create a new column 'is_divisible'
that has the value 'yes'
if the day of month of the 'date'
plus 7 days is divisible by the value in column'X'
, and 'no'
otherwise?
Desired output:
+-----+---+-----+----------+------------+
|index| X|label| date|is_divisible|
+-----+---+-----+----------+------------+
| 1| 1| A|2017-01-01| yes|
| 2| 3| B|2017-01-02| yes|
| 3| 5| A|2017-01-03| yes|
| 4| 7| B|2017-01-04| no|
+-----+---+-----+----------+------------+
Explain how to get your output.
Explain, in great detail, how you get your desired output. It helps to show an example calculation.
For instance in row 1, the X = 1 and date = 2017-01-01. Adding 7 days to date yields 2017-01-08. The day of the month is 8 and since 8 is divisible by 1, the answer is 'yes'.
Likewise, for the last row X = 7 and the date = 2017-01-04. Adding 7 to the date yields 11 as the day of the month. Since 11 % 7 is not 0, the answer is 'no'.
Share your existing code.
Show us what you have done or tried, including all* of the code even if it does not work. Tell us where you are getting stuck and if you receive an error, please include the error message.
(*You can leave out the code to create the spark context, but you should include all imports.)
I know how to add a new column that is date
plus 7 days but I'm having trouble getting the day of the month as an integer.
from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))
Include versions, imports, and use syntax highlighting
- Full details in this answer written by desertnaut.
For performance tuning posts, include the execution plan
- Full details in this answer written by Alper t. Turker.
- It helps to use standardized names for contexts.
Parsing spark output files
- MaxU provided useful code in this answer to help parse Spark output files into a DataFrame.
Other notes.
- Be sure to read how to ask and How to create a Minimal, Complete, and Verifiable example first.
- Read the other answers to this question, which are linked above.
- Have a good, descriptive title.
- Be polite. People on SO are volunteers, so ask nicely.
Solution 2:
Performance tuning
If the question is related to performance tuning please include following information.
Execution Plan
It is best to include extended execution plan. In Python:
df.explain(True)
In Scala:
df.explain(true)
or extended execution plan with statistics. In Python:
print(df._jdf.queryExecution().stringWithStats())
in Scala:
df.queryExecution.stringWithStats
Mode and cluster information
-
mode
-local
,client
, `cluster. - Cluster manager (if applicable) - none (local mode), standalone, YARN, Mesos, Kubernetes.
- Basic configuration information (number of cores, executor memory).
Timing information
slow is relative, especially when you port non-distributed application or you expect low latency. Exact timings for different tasks and stages, can be retrieved from Spark UI (sc.uiWebUrl
) jobs
or Spark REST UI.
Use standarized names for contexts
Using established names for each context allows us to quickly reproduce the problem.
-
sc
- forSparkContext
. -
sqlContext
- forSQLContext
. -
spark
- forSparkSession
.
Provide type information (Scala)
Powerful type inference is one of the most useful features of Scala, but it makes hard to analyze code taken out of context. Even if type is obvious from the context it is better to annotate the variables. Prefer
val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))
over
val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))
Commonly used tools can assist you:
-
spark-shell
/ Scala shelluse
:t
scala> val rdd = sc.textFile("README.md") rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> :t rdd org.apache.spark.rdd.RDD[String]
-
InteliJ Idea
Use Alt + =
Solution 3:
Some additional suggestions to what has been already offered:
Include your Spark version
Spark is still evolving, although not so rapidly as in the days of 1.x. It is always (but especially if you are using a somewhat older version) a good idea to include your working version. Personally, I always start my answers with:
spark.version
# u'2.2.0'
or
sc.version
# u'2.2.0'
Including your Python version, too, is never a bad idea.
Include all your imports
If your question is not strictly about Spark SQL & dataframes, e.g. if you intend to use your dataframe in some machine learning operation, be explicit about your imports - see this question, where the imports were added in the OP only after extensive exchange in the (now removed) comments (and turned out that these wrong imports were the root cause of the problem).
Why is this necessary? Because, for example, this LDA
from pyspark.mllib.clustering import LDA
is different from this LDA:
from pyspark.ml.clustering import LDA
the first coming from the old, RDD-based API (formerly Spark MLlib), while the second one from the new, dataframe-based API (Spark ML).
Include code highlighting
OK, I'll confess this is subjective: I believe that PySpark questions should not be tagged as python
by default; the thing is, python
tag gives automatically code highlighting (and I believe this is a main reason for those who use it for PySpark questions). Anyway, if you happen to agree, and you still would like a nice, highlighted code, simply include the relevant markdown directive:
<!-- language-all: lang-python -->
somewhere in your post, before your first code snippet.
[UPDATE: I have requested automatic syntax highlighting for pyspark
and sparkr
tags, which has been implemented indeed]
Solution 4:
This small helper function might help to parse Spark output files into DataFrame:
PySpark:
from pyspark.sql.functions import *
def read_spark_output(file_path):
step1 = spark.read \
.option("header","true") \
.option("inferSchema","true") \
.option("delimiter","|") \
.option("parserLib","UNIVOCITY") \
.option("ignoreLeadingWhiteSpace","true") \
.option("ignoreTrailingWhiteSpace","true") \
.option("comment","+") \
.csv("file://{}".format(file_path))
# select not-null columns
step2 = t.select([c for c in t.columns if not c.startswith("_")])
# deal with 'null' string in column
return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])
Scala:
// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
val step1 = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "|")
.option("parserLib", "UNIVOCITY")
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.option("comment", "+")
.csv(filePath)
val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)
val columns = step2.columns
columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}
Usage:
df = read_spark_output("file:///tmp/spark.out")
PS: For pyspark, eqNullSafe
is available from spark 2.3
.