Archive for : January, 2015

Load database data into Spark using JdbcRDD in Java

Update: this blog is migrated to Medium To continue access good content, please subscribe it.

Update: As of Spark 1.3, Spark SQL Data sources API is the preferred way to loading data from external data sources. Head over to my new blog post to learn more about it. Additionally, if you’d like to know about saving dataframe to database, read this post.

When I started looking for a way to load data from a relational database into Spark for analytics purpose, I couldn’t find much helpful resources in the internet. Most of the examples are using a hardcoded Scala/Java collection as their input or load data from Hadoop or Cassandra. Then I discovered JdbcRDD which can be used to load data from any JDBC compatible database. JdbcRDD is a Scala class. As of writing this (Spark v1.2), there is no Java equivalent API. While it is technically possible to use Scala class in Java, it is still a difficult job for programmers who don’t have any experience in Scala. So this tutorial explains everything in details.

The example application I wrote to explain the concepts uses MySQL sample Employees database It loads entire “employees” table into Spark using JdbcRdd then do a simple Spark transformation and finally prints the results. Let’s discuss it step by step.

JdbcRDD is a sub class of RDD which means once we loaded DB data as JdbcRDD, all Spark operations can be applied on it. JdbcRDD constructor has following parameters,

JdbcRDD(SparkContext sc, scala.Function0<java.sql.Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, scala.Function1<java.sql.ResultSet,T> mapRow, scala.reflect.ClassTag<T> evidence$1)

  1. SparkContext sc:
    Spark context
  2. scala.Function0<java.sql.Connection> getConnection:
    This is Scala interface Function0<R> which doesn’t take any parameter and returns a result of type R (in this case, it is a java.sql.Connection). This is how database connection is supplied to JdbcRDD. Please note that this is Not an active DB connection. Functional programming technique is used here. This parameter supplies a function which knows how to acquire a DB connection but actual connection is made at a later stage when Spark decides it (upon invoking a RDD action).
  3. String sql:
    The SQL statement used to query the data from the database. This SQL must have 2 binding parameters of numeric types. Spark is a distributed system. To take advantage of it, the data has to be divided into multiple partitions (In cluster, these partitions will be in multiple Spark nodes). These binding parameters are used to query the data for a single partition. Next 3 parameters are used to determine the actual values of these parameters for each partition.
  4. long lowerBound
    Lower boundary of entire data
  5. long upperBound
    Upper boundary of entire data
  6. int numPartitions
    Number of partitions
  7. scala.Function1<java.sql.ResultSet,T> mapRow
    This function is used to transform the returned ResultSet into a type of our choice.
    This is Scala interface Function1<T1, R> which takes a parameter of type T1 and returns result of type R. In this case, T1 is java.sql.ResultSet. R depends on what we want to transform it into.
  8. scala.reflect.ClassTag<T> evidence$1
    This is the parameter Java developers don’t understand. I still don’t have a good idea. Based on what I read, this is some kind of type system info Scala runtime provides internally. If we look at any JdbcRDD examples in Scala, this parameter is not even visible. But when we use it in Java, we have to provide it as an instance of ClassTag.

Now we’ll see how we can use this constructor to load “employees” table.

First we’ll define the “getConnection” function of parameter 2. AbstractFunction0 is an abstract class which implements Scala interface Function0. DbConnection class extends it and implements “apply” function by returning Connection object.

Next we’ll define “mapRow” function of parameter 7. Here the ResultSet is converted to Object array using JdbcRDD utility function “resultSetToObjectArray”.

Next a DbConnection object is instantiated with Db parameters defined as Java constants.

Finally JdbcRDD is instantiated as follows. Rest of the parameters are defined inline.

As you can see, primary key “emp_no” is used for partitioning “employees” table and the binding parameters are used to define the upper and lower boundaries of each partition. Minimum and maximum values of emp_no are used as lowerBound and upperBound respectively. The data will be loaded into 10 partitions which means first partition will have records whose primary keys are within 10001 to roughly 59000. The JdbcRDD will take care of all these calculations automatically.
Last parameter is the magic code which returns the ClassTag.

Now we have the JdbcRDD which means we can use all Spark operations. Remember it is a Scala class which means we may have to handle the same ClassTag in those operations as well. Luckily Spark provides a pure Java API for RDD with JavaRDD class. So following code converts JdbcRDD to JavaRDD.

Now let’s try a simple transformation. Following code snippet joins the “first_name” column with “last_name” using “map” transformation and prints the full name as log message.

Complete Java application is available at my GitHub repo

Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.

Join 194 other subscribers