Load database data into Spark using JdbcRDD in Java

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Update: As of Spark 1.3, Spark SQL Data sources API is the preferred way to loading data from external data sources. Head over to my new blog post to learn more about it. Additionally, if you’d like to know about saving dataframe to database, read this post.

When I started looking for a way to load data from a relational database into Spark for analytics purpose, I couldn’t find much helpful resources in the internet. Most of the examples are using a hardcoded Scala/Java collection as their input or load data from Hadoop or Cassandra. Then I discovered JdbcRDD which can be used to load data from any JDBC compatible database. JdbcRDD is a Scala class. As of writing this (Spark v1.2), there is no Java equivalent API. While it is technically possible to use Scala class in Java, it is still a difficult job for programmers who don’t have any experience in Scala. So this tutorial explains everything in details.

The example application I wrote to explain the concepts uses MySQL sample Employees database https://dev.mysql.com/doc/employee/en/. It loads entire “employees” table into Spark using JdbcRdd then do a simple Spark transformation and finally prints the results. Let’s discuss it step by step.

JdbcRDD is a sub class of RDD which means once we loaded DB data as JdbcRDD, all Spark operations can be applied on it. JdbcRDD constructor has following parameters,

JdbcRDD(SparkContext sc, scala.Function0<java.sql.Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, scala.Function1<java.sql.ResultSet,T> mapRow, scala.reflect.ClassTag<T> evidence$1)

  1. SparkContext sc:
    Spark context
  2. scala.Function0<java.sql.Connection> getConnection:
    This is Scala interface Function0<R> which doesn’t take any parameter and returns a result of type R (in this case, it is a java.sql.Connection). This is how database connection is supplied to JdbcRDD. Please note that this is Not an active DB connection. Functional programming technique is used here. This parameter supplies a function which knows how to acquire a DB connection but actual connection is made at a later stage when Spark decides it (upon invoking a RDD action).
  3. String sql:
    The SQL statement used to query the data from the database. This SQL must have 2 binding parameters of numeric types. Spark is a distributed system. To take advantage of it, the data has to be divided into multiple partitions (In cluster, these partitions will be in multiple Spark nodes). These binding parameters are used to query the data for a single partition. Next 3 parameters are used to determine the actual values of these parameters for each partition.
  4. long lowerBound
    Lower boundary of entire data
  5. long upperBound
    Upper boundary of entire data
  6. int numPartitions
    Number of partitions
  7. scala.Function1<java.sql.ResultSet,T> mapRow
    This function is used to transform the returned ResultSet into a type of our choice.
    This is Scala interface Function1<T1, R> which takes a parameter of type T1 and returns result of type R. In this case, T1 is java.sql.ResultSet. R depends on what we want to transform it into.
  8. scala.reflect.ClassTag<T> evidence$1
    This is the parameter Java developers don’t understand. I still don’t have a good idea. Based on what I read, this is some kind of type system info Scala runtime provides internally. If we look at any JdbcRDD examples in Scala, this parameter is not even visible. But when we use it in Java, we have to provide it as an instance of ClassTag.

Now we’ll see how we can use this constructor to load “employees” table.

First we’ll define the “getConnection” function of parameter 2. AbstractFunction0 is an abstract class which implements Scala interface Function0. DbConnection class extends it and implements “apply” function by returning Connection object.

Next we’ll define “mapRow” function of parameter 7. Here the ResultSet is converted to Object array using JdbcRDD utility function “resultSetToObjectArray”.

Next a DbConnection object is instantiated with Db parameters defined as Java constants.

Finally JdbcRDD is instantiated as follows. Rest of the parameters are defined inline.

As you can see, primary key “emp_no” is used for partitioning “employees” table and the binding parameters are used to define the upper and lower boundaries of each partition. Minimum and maximum values of emp_no are used as lowerBound and upperBound respectively. The data will be loaded into 10 partitions which means first partition will have records whose primary keys are within 10001 to roughly 59000. The JdbcRDD will take care of all these calculations automatically.
Last parameter is the magic code which returns the ClassTag.

Now we have the JdbcRDD which means we can use all Spark operations. Remember it is a Scala class which means we may have to handle the same ClassTag in those operations as well. Luckily Spark provides a pure Java API for RDD with JavaRDD class. So following code converts JdbcRDD to JavaRDD.

Now let’s try a simple transformation. Following code snippet joins the “first_name” column with “last_name” using “map” transformation and prints the full name as log message.

Complete Java application is available at my GitHub repo https://github.com/sujee81/SparkApps.

44 comments

  • spark_1

    Why you need to load relational database to spark? If your data already in relationalDB, I think you can do much faster (if you design your relationalDB correctly) and cheaply analytic processing in relationalDB without moving around your data to spark.
    In my opinion, Spark is very useful for some cases. But applying spark for all situation is not very effective. Like the old quote “If Your Only Tool Is a Hammer Then Every Problem Looks Like a Nail”

    • Sujee

      This is not a new use-case. Traditionally data from OLTP databases are replicated into OLAP databases for similar analytics. I agree with your point that doing analytics in the relational db is better when it is possible. However there are use-cases where database is not sufficient like these examples below.
      a) Machine learning algorithms – Most algorithms load the data from DB into memory, process it and save back to DB. Spark does the same.
      b) Join multiple datasources for analytics. This is a common scenario in big companies where their data is spread across several systems.

  • Navin V

    Thanks, this was very useful. I did the same with a SQL Server database but ran into issues with Microsoft’s JDBC driver. Turns out that this works with version 3.0 of the driver but not version 4.0.

    • Sujee

      Thank you. Glad you found a solution. Probably Spark is using a Jdbc version not compatible with MS SQL v 4.0 driver.

  • ramakanth

    thanx, i have been waiting for this type of connectivity. great job. it is very much use full in out migration from exiting ETL tools to hadoop and spark .

  • pef

    Hi,

    Very interesting post thx a lot. Just a small question: What if I don’t know the “upperBound” parameter ? For instance, if I want to load in spark the entire table in my RDBMS, without knowing the number of rows in it.

    Thx

    pef.

  • Donald fossouo

    Hi !
    Sorry for my question but i am not really getting the article :

    1. Is it Spark SQL who then perform request inside MySQL ?
    2. Does Spark load a part of the table and made request on it ?

    Thanks for the article i was able to reproduce the same without any change 🙂

    • Sujee

      Sorry if my blog post is not clear to you. I haven’t covered the fundamentals of spark concepts which are required to understand the JdbcRDD.

      1. Is it Spark SQL who then perform request inside MySQL ?
      JdbcRDD is part of Spark core (Not Spark SQL). It is a special type of RDD used to load DB data into Spark as RDD. First it invokes the given SQL query (multiple times with different parameters – one for each partition) to bring the data into Spark as RDD. Once loaded as RDD, the data is now part of Spark. You can apply any spark operations on it.

      Note: Once loaded into Spark, it is recommended to cache it. Otherwise, every time an action is called on RDD, it will load from DB. This is because Spark is using lazy execution model.

      2. Does Spark load a part of the table and made request on it ?
      It depends on us. The SQL query passed as parameter to JdbcRDD constructor determines it.

  • Jitendra Kumar

    @sujee sir when i use jdbcrdd for transfering my data it takes a lot of time around 12hrs+ so can we not improve this performance by using jdbcrdd

  • Jitendra Kumar

    why jdbcrdd is not recommended.. my question is that..and i want to do live chat with you for more knowledge about spark…….

    • Sujee

      JdbcRdd is just a RDD which means it only has the data. When it extracts the data from database, it doesn’t extract the schema of the data. Because of it some spark operations can not be executed because they requires a schema. Here are few such operations
      1. Query data using spark SQL or dataframe high level API
      2. Save back to database

      To fix this, a schema can be applied to RDD and convert it to a dataframe. But this is a manual process. We have to define the schema and apply.

      In contrast, JDBC data source API extracts the database data plus it’s schema. That is why it returns a dataframe.

  • jitendra kumar

    First of all thanks Sujee for help me such a lot… now my next question to you is that my spark app execute only 4 task at a time .. but i want to increase it…. how will it possible

    • Sujee

      It is my pleasure. I blog here so that I can share what I learnt and also learn from you all.
      If I understand your question correctly, you are talking about numbers of partitions. If you’d like to increase it, you can try one of the followings.
      1. Most spark APIs have overloaded methods with an additional parameter to specify the number of partitions.

      2. If you don’t have control over how your RDD is created, alternative is to repartition it. But it is an expensive operation. It may introduce performance issues.

      It is very hard to explain without seeing your app.

  • sateesh

    hi @sujeee….
    have you any idea about apache spark-odbc connection ? If possible please share

  • Gabriel

    Hello Sujee,

    I’m attempting to use the JdbcRDD approach for fetching data from Oracle DB [new Data Source method does not work for my purpose]. Only thing is that i’m encountering an ‘invalid column index’ SQLException error. See code and error below; anything look off?:
    CODE:
    // Load data from MySQL
    JdbcRDD jdbcRDD = new JdbcRDD(sc.sc(), dbConnection, “SELECT SYSDATE FROM DUAL”, 1,5, 1, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(Object[].class));

    System.out.println(“Number of rows:” + jdbcRDD.count()); //Error thrown here

    ERROR STACK:
    15/04/27 18:25:10 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (JdbcRDD[0] at JdbcRDD at Main.java:262)
    15/04/27 18:25:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    15/04/27 18:25:10 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1069 bytes)
    15/04/27 18:25:10 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    15/04/27 18:25:44 INFO JdbcRDD: closed connection
    15/04/27 18:25:44 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.sql.SQLException: Invalid column index
    at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:70)
    at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:133)
    at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:199)
    at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:263)
    at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:271)
    at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:445)
    at oracle.jdbc.driver.OraclePreparedStatement.setLongInternal(OraclePreparedStatement.java:4643)
    at oracle.jdbc.driver.OraclePreparedStatement.setLong(OraclePreparedStatement.java:4632)
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.setLong(OraclePreparedStatementWrapper.java:206)
    at org.apache.spark.rdd.JdbcRDD$$anon$1.(JdbcRDD.scala:87)

    Thanks!

    • Sujee

      Hi Gabriel,
      Sorry for late response. Your comment somehow went to spam folder.

      The issue is in the SQL query which should have 2 binding parameters to specify lower and upper bound of the partition.
      Spark RDD is internally stored as multiple partitions. Each partition contains subset of the entire data. Data in each partition is stored and processed as single unit. Different partitions could be in different nodes in the spark cluster. This makes RDD a distributed dataset.
      Now if we want load database data into Spark, it has to be divided into multiple subsets and store each subset as a partition. For this purpose, JdbcRDD provides lowerBound, upperBound and numPartitions to allow us to control how our query result is divided into multiple partitions.

      For e.g. lets say, the employee table has 30 rows. If I’d like to load all rows into 3 partitions, I’d specify it as,

      new JdbcRDD<>(sc, dbConnection, “select * from employees where emp_no >= ? and emp_no < = ?", 1, 30, 3, new MapResult(), ClassManifestFactory$.MODULE$.fromClass(Object[].class)); When this executes, spark workers will call MySQL DB three times one for each partition. The queries will roughly looks like below 1st partition: "select * from employees where emp_no >= 1 and emp_no < = 10" 2nd partition: "select * from employees where emp_no >= 1 1and emp_no < = 20" 3rd partition: "select * from employees where emp_no >= 21 and emp_no <= 30"

  • Chaitra

    Hello Sujee,

    I have a requirement where I need tow join two tables and get data .
    Is there any join functionality is spark 1.3.1 jdbcrdd .
    If you have any solution please share a sample code/ reference .

    Thanks in Advance
    –Chaitra

    • Sujee

      Hi Chaitra,
      Please have a look at Spark SQL. https://spark.apache.org/docs/latest/sql-programming-guide.html

      I presume that those 2 tables are from different datasources. It is easy to join them if they are loaded as Dataframes into Spark (JdbcRDD is not recommended anymore). For more info about loading as Dataframe, refer http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/

      Once loaded, there are 2 ways to join
      a) Programatically using Dataframe API

      DataFrame df1 = sqlContext.load(“jdbc”, options1);
      DataFrame df2 = sqlContext.load(“jdbc”, options2);

      DataFrame result = df1.join(df2, df1.col(“emp_no”).equalTo(df2.col(“emp_no”)), “inner”);

      Here table 1 and 2 are joined (inner join) using common column “emp_no”

      b) Join using SQL
      This approach is for those who prefer SQL. Internally both approaches do the same.

      First register Dataframes as temp tables, then query using SQL.

      DataFrame df1 = sqlContext.load(“jdbc”, options1);
      df1.registerTempTable(“table1”);

      DataFrame df2 = sqlContext.load(“jdbc”, options2);
      df2.registerTempTable(“table2”);

      DataFrame result = sqlContext.sql(“select * from table1 t1 inner join table2 t2 where t1.emp_no = t2.emp_no”);

  • Satish Chandra

    HI Sujee,
    I am giving the postgresql jdbc driver while submitting a spark-submit job client mode with –jars options but still getting “No suitable driver found for postgresql”
    Currently I am using Spark 1.4.1 version as i understand –jars is the option to pass JDBC drivers to the executors

    Please let me know if any further inputs required to analyze this further

  • Chandra Sekhar

    Hi, I tried to connect Hana DB using Spark SQL as per below.
    df = sqlctx.read.format(‘jdbc’).options(driver=’com.sap.db.jdbc.Driver’,url=urlname,dbtable=’xyz’).load()
    I am able to connect to database and df.printSchema() is showing correctly but if i try to run df.show() it’s giving serialization error in connection. how to make connection as serializable in python. please help me.

  • Deepanshu

    hi,
    Can u help me.I need to join multiple table present in hive using spark java and create a view using this result.

    Regards
    Deepanshu

  • Guru

    Is it possible to load data from multiple jdbc data sources and perform join?
    i.e.

    load and registerTempTable(msSqlTable)
    load and registerTempTable(teradataTable)
    sqlContext.sql(“select * from msSqlTable a join teradataTable b where a.id = b.id”)

  • Maniya

    I have tried to use this JdbcRDD program for querying Oracle DB and It worked fine. Now I need to know the number of records that got fetched from DB . if I use a single partition, I am able to get the count using jdbcRDD .count(). But If I use multiple partitions, I never get the right count. Can someone help me on this?

  • raghava

    Hi Sujee.. Where can I find the processed data of SPARK-RDD? As per my understanding it process the data in memory and again stored the processed data in db..

  • Rajkumar

    Hi Sujee,

    I have a question.
    I want to excute the query without inner bound and outer bound.
    Ex: “Select *from Table_Name”
    Is is possible. If Possible , please let me know

Leave a Reply

Your email address will not be published. Required fields are marked *

Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.

Join 194 other subscribers