Does spark predicate pushdown work with JDBC?
According to this
Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data.
Spark supports push down of predicates to the data source. Is this feature also available / expected for JDBC?
(From inspecting the DB logs I can see it's not the default behavior right now - the full query is passed to the DB, even if it's later limited by spark filters)
MORE DETAILS
Running Spark 1.5 with PostgreSQL 9.4
code snippet:
from pyspark import SQLContext, SparkContext, Row, SparkConf
from data_access.data_access_db import REMOTE_CONNECTION
sc = SparkContext()
sqlContext = SQLContext(sc)
url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION)
sql = "dummy"
df = sqlContext.read.jdbc(url=url, table=sql)
df = df.limit(1)
df.show()
SQL Trace:
< 2015-09-15 07:11:37.718 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3
< 2015-09-15 07:11:37.771 EDT >LOG: execute <unnamed>: SELECT * FROM dummy WHERE 1=0
< 2015-09-15 07:11:37.830 EDT >LOG: execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = 'd' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a
drelid) LIKE '%nextval(%' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a
tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid
AND a.attnum = vals.attnum)
< 2015-09-15 07:11:40.936 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3
< 2015-09-15 07:11:40.964 EDT >LOG: execute <unnamed>: SELECT "id","name" FROM dummy
I would expect that the last select will include a limit 1
clause - but it doesn't
Solution 1:
Spark DataFrames support predicate push-down with JDBC sources but term predicate is used in a strict SQL meaning. It means it covers only WHERE
clause. Moreover it looks like it is limited to the logical conjunction (no IN
and OR
I am afraid) and simple predicates.
Everything else, like limits, counts, ordering, groups and conditions is processed on the Spark side. One caveat, already covered on SO, is that df.count()
or sqlContext.sql("SELECT COUNT(*) FROM df")
is translated to SELECT 1 FROM df
and requires both substantial data transfer and processing using Spark.
Does it mean it is a lost cause? Not exactly. It is possible to use an arbitrary subquery as a table
argument. It is less convenient than a predicate pushdown but otherwise works pretty well:
n = ... # Number of rows to take
sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n))
df = sqlContext.read.jdbc(url=url, table=sql)
Note:
This behavior may be improved in the future, once Data Source API v2 is ready:
- SPARK-15689
- SPIP: Data Source API V2