How can I connect to a postgreSQL database into Apache Spark using scala?
Solution 1:
Our goal is to run parallel SQL queries from the Spark workers.
Build setup
Add the connector and JDBC to the libraryDependencies
in build.sbt
. I've only tried this with MySQL, so I'll use that in my examples, but Postgres should be much the same.
libraryDependencies ++= Seq(
jdbc,
"mysql" % "mysql-connector-java" % "5.1.29",
"org.apache.spark" %% "spark-core" % "1.0.1",
// etc
)
Code
When you create the SparkContext
you tell it which jars to copy to the executors. Include the connector jar. A good-looking way to do this:
val classes = Seq(
getClass, // To get the jar with our own code.
classOf[mysql.jdbc.Driver] // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)
Now Spark is ready to connect to the database. Each executor will run part of the query, so that the results are ready for distributed computation.
There are two options for this. The older approach is to use org.apache.spark.rdd.JdbcRDD
:
val rdd = new org.apache.spark.rdd.JdbcRDD(
sc,
() => {
sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
},
"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
0, 1000, 10,
row => row.getString("BOOK_TITLE")
)
Check out the documentation for the parameters. Briefly:
- You have the
SparkContext
. - Then a function that creates the connection. This will be called on each worker to connect to the database.
- Then the SQL query. This has to be similar to the example, and contain placeholders for the starting and ending key.
- Then you specify the range of keys (0 to 1000 in my example) and the number of partitions. The range will be divided among the partitions. So one executor thread will end up executing
SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100
in the example. - And at last we have a function that converts the
ResultSet
into something. In the example we convert it into aString
, so you end up with anRDD[String]
.
Since Apache Spark version 1.3.0 another method is available through the DataFrame API. Instead of the JdbcRDD
you would create an org.apache.spark.sql.DataFrame
:
val df = sqlContext.load("jdbc", Map(
"url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
"dbtable" -> "BOOKS"))
See https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases for the full list of options (the key range and number of partitions can be set just like with JdbcRDD
).
Updates
JdbcRDD
does not support updates. But you can simply do them in a foreachPartition
.
rdd.foreachPartition { it =>
val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
for (bookTitle <- it) {
del.setString(1, bookTitle)
del.executeUpdate
}
}
(This creates one connection per partition. If that is a concern, use a connection pool!)
DataFrame
s support updates through the createJDBCTable
and insertIntoJDBC
methods.