Loading database data into Spark using Data Sources API

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Update: Read this new post for Spark 2.0 example.

With Spark 1.3 release, it is easy to load database data into Spark using Spark SQL data sources API. In my last blog post, I explained about using JdbcRDD to do the same. With this new release, JdbcRDD is no longer the preferred approach. I will explain the differences later in another blog post. Now we’ll look at the new APIs.

Data sources API which provides a unified interface to query external data sources from Spark SQL is introduced in Spark 1.2. However an official JDBC data source API is released only in version 1.3. This release also introduced a very important feature of Spark – The DataFrame API which is a higher level abstraction over RDD and provides a simpler API for data processing.

There are two ways to call data sources API

  1. Programmatically using SQLContext load function
  2. Using SQL – We’ll look at this in another blog post

SQLContext load

The load function is defined in Scala as follows,

First parameter ‘source’ specifies the type of data source API. In our case, it is ‘jdbc’. 2nd parameter is a map of options required by the data source implementation specified in the first parameter. It varies from data source to data source. For JDBC datasource following parameters are required (Reproduced from Spark SQL documentation).

  1. url:
    The JDBC URL to connect to.
  2. dbtable:
    The JDBC table that should be read. Note that anything that is valid in a ‘FROM’ clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses.
  3. driver:
    The class name of the JDBC driver needed to connect to this URL. This class with be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem.
  4. partitionColumn, lowerBound, upperBound, numPartitions:
    These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question.

I’m going explain this using the same MySQL example application used in JdbcRDD example. It uses MySQL sample Employees database https://dev.mysql.com/doc/employee/en/. First it loads part of MySQL data into Spark using Data sources API and prints it in the log.

  1. MySQL connection parameters are defined as constants
  2. Initializing SQLContext from SparkContext
  3. JDBC data source options for MySQL database

    As of writing this, it is not possible to separately specify username and password. So it has been specified as part of connection URL.

    The most interesting part here is the ‘dbtable’ option. I have used a derived table created from a subquery. The subquery only selects the required columns from ’employees’ table. It also concatenates first name and last name and returns it as full name. Since this transformation happens inside the MySQL, overall performance of this app will be improved. Please note that similar optimization is also possible in JdbcRDD. Here I’m doing it this way to demontrate that ‘dbtable’ is not just limited to name of an existing table.

    Last 4 parameters are similar to JdbcRDD. Only difference is, ‘partitionColumn’ is an option here. In JdbcRDD, it has to be part of the SQL query.

  4. Loading MySQL query result as DataFrame

    Like RDD, DataFrame is also lazily executed. So calling load function will not immediately load the data from MySQL. It will wait until an action is executed on the returned DataFrame.

  5. Executing DataFrame by invoking an action

Complete Java application is available at my GitHub repo https://github.com/sujee81/SparkApps.

If you’d like to know about saving dataframe to database, read this post

41 comments

  • Sreeharsha

    I had done using SELECT statement from the above code but i cant get how to insert the values into the mysql database using spark.Does we have to pass any arguments using Lowerbound and Upperbound to insert the values into the database.Can we use the same mysql INSERT statement or something else? Any suggestions please….!

  • Sreeharsha Eedupuganti

    Hi Sujee…
    I am using Spark-1.2.1 will it supports DataFrames or we need to upgrade to the latest version of Spark [1.3.0]

    • Sujee

      No. Both Dataframe and JDBC datasource were introduced in version 1.3. You’ll have to upgrade. This release introduced many important features as well as improved performance.

  • SatyaMurthy

    Hi Sujee, can you please write a post on how to insert the values into database without using dataframes

    • Sujee

      Hi SatyaMurthy,
      Dataframe is the preferred approach because it has both schema and data. That means it has all necessary information to create table (if required) as well as insert records into database. Also datasource APIs (incl. JDBC) will only work with dataframes. I’m not sure why you go for an option without dataframe. If your data is a RDD, it can be easily converted to dataframe using “SQLContext.applySchema()” function.

      For some reason, if none of above options work for you, you can always write your own logic like below.

      1. prepare JDBC connection
      2. yourRDD.foreach({
      //call insert statement
      });

    • Sujee

      Hi Mujadid,
      Technically it should work with all JDBC supported databases. I haven’t tested HANA though. You could test this by adding HANA JDBC driver as Maven dependency and change the connection properties accordingly.

  • Gabriel

    Hello Sujee,

    First of all great post! I do have one question, i am setting up a maven project using your approach to read from oracle DB. Do you know what specific Scala version works best with Spark 1.3.1? I’m running into some errors where Scala collections is not being found despite having scala-lib in my pom.xml dependencies (thinking its a compatibility issue). The error comes at runtime during load function call and looks like so: “java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class”
    code:
    DataFrame jdbcDF = sqlContext.load(“jdbc”, options);

    Thanks in advance!

    • Sujee

      Hi Gabriel,
      My example codes are written in Java. I’m currently learning Scala. I’m sorry that I am not able to help you with Scala at this time. I suggest that you post this question in Spark mailing list or Stackoverflow. Both are very active. Someone will be able help you.

      Thank you.

  • Gabriel

    Hello Sujee,

    I took your suggestion and posted on Stackoverflow and fixed the issue from my last post; others can reference the following for the fix: http://stackoverflow.com/questions/30084047/spark-cassandra-maven-project-with-java-source-making-scala-lib-calls

    Now would it be possible for you to show how to [using your approach from this post] save the said DataFrame(s) to Cassandra? I’ve been referencing the following Datastax post for help on this: http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java

    The problem is that their example only illustrates taking a list of items, converting to JavaRDD and saving to Cassandra. I’m attempting to do the same but with DataFrames instead. They are using ‘javaFunctions()’ method which needs data of ‘or.apache.spark.SparkContext’ type. Not sure how to convert my DataFrame into such. Your help would be greatly appreciated. Many Thanks!

    -Gabriel

    • Sujee

      Hi Gabriel,
      Were you able to solve your issue?

      I’ve no prior experience in Cassandra. Based on what I understand from the blog you mentioned above, you could try the followings,

      //Load from Oracle DB
      DataFrame productDf = sqlContext.load(“jdbc”, options);

      //Convert Dataframe to JavaRDD
      JavaRDD productsRDD = productDf.toJavaRDD();

      //Save to Cassandra
      javaFunctions(productsRDD, Product.class).saveToCassandra(“java_api”, “products”);

  • Satish

    Hello Sujee,
    As you insist to upgrade to newer Spark 1.3 version to make use of Dataframes, if somebody is using Spark 1.2 version he can use SchemaRDD right? Please correct me if I am wrong. I have couple of questions could you please provide your inputs on those
    Spark 1.2 version documentation refers JDBC Data Source API but in your prior comments you mentioned it is available from Spark 1.3 ?
    Scenerio is to load huge volume of historical data, I understand JDBC Data source API is a better approach than JDBCRDD. Can you please provide more inputs on this?

    • Sujee

      Hi Satish,
      Please see my reply below.

      As you insist to upgrade to newer Spark 1.3 version to make use of Dataframes, if somebody is using Spark 1.2 version he can use SchemaRDD right? Please correct me if I am wrong.
      [Sujee] It is true that SchemaRDD is renamed as Dataframe. But Dataframe evolved a lot from there. Also unlike SchemaRDD, Dataframe is the base of all Spark modules not just for Spark SQL. IMO, SchemaRDD can’t be a replacement for Dataframe.

      I have couple of questions could you please provide your inputs on those
      Spark 1.2 version documentation refers JDBC Data Source API but in your prior comments you mentioned it is available from Spark 1.3 ?
      [Sujee] I don’t see it here. http://spark.apache.org/docs/1.2.0/sql-programming-guide.html#data-sources Am I missing something?

      Scenerio is to load huge volume of historical data, I understand JDBC Data source API is a better approach than JDBCRDD. Can you please provide more inputs on this?
      [Sujee] I haven’t personally done any performance testing. However I read the source code of both. Basically the methodology is almost same except what it returns.
      JdbcRDD is a RDD. If there is a need to convert it to SchemaRDD to do SQL operations, then an additional step is required as mentioned here http://spark.apache.org/docs/1.2.0/sql-programming-guide.html#inferring-the-schema-using-reflection or http://spark.apache.org/docs/1.2.0/sql-programming-guide.html#programmatically-specifying-the-schema
      This step could be an expensive one for huge data because the conversion function is executed for each row.
      Another issue with this step is that schema of the RDD has to be manually defined. For datasources with a schema (e.g. databases), why do I have to manually define schema?

      Datasource API solves this problem. When it loads a JDBC datasource, it loads data and its schema then returns as Dataframe. Thus, SQL operations can be executed without any additional steps.
      The biggest reason to use Datasource API is to take advantage of its support for predicate pushdown. Basically it pushes some of the predicates to source system. Not only source system can executes these predicates faster but also returns less data than what JdbcRDD will return.

      For example, let’s take a look at following code modified from example in this blog.

      DataFrame jdbcDF = sqlContext.load(“jdbc”, options);

      //filter only employees with full name “Domenick Erman”
      DataFrame filteredDf = jdbcDF.filter(jdbcDF.col(“full_name”).equalTo(“Domenick Erman”));

      List employeeFullNameRows = filteredDf.collectAsList();

      for (Row employeeFullNameRow : employeeFullNameRows) {
      LOGGER.info(employeeFullNameRow);
      }

      Here there is only 2 records with full name “Domenick Erman”. So why do we need to unnecessarily load all rows into Spark and then filter only 2 records from it?
      Since Spark executes lazily, it can optimize the lineage. For this example, take a look at the actual SQL query executed.

      SELECT emp_no,full_name FROM (select emp_no, concat_ws(‘ ‘, first_name, last_name) as full_name from employees) as employees_name WHERE full_name = ‘Domenick Erman’ AND emp_no >= ? AND emp_no < ? Our predicate full_name = 'Domenick Erman' is automatically added. Now database will return only 2 rows. This is called predicate pushdown.

  • Mandar Vaidya

    Hi,

    I am running the sql query on dataframe for performing the aggregation operation.But i am getting the java.lang.ClassCastException: java.math.BigDecimal cannot be cast to org.apache.spark.sql.types.Decimal at org.apache.spark.sql.types.Decimal$DecimalIsFractional$.plus(Decimal.scala:330) Error.
    I am using oracle db.My database schema is :
    EMPNAME VARCHAR2(50)
    EMPAGE NUMBER(38)
    EMPSALARY LONG
    EMPCOUNTRY VARCHAR2(50)

    when i am running query as follows:

    DataFrame aggregateCountDF = sqlContext.sql(“select EMPCOUNTRY ,count(EMPAGE),avg(EMPAGE),Sum(EMPAGE) from tempTable group by EMPCOUNTRY”);

    displayDataFrame( aggregateCountDF.collectAsList() );

    where tempTable is registered table in spark.

    I am not able to find why it is giving this error.

    Thanks in advanced,
    Mandar.

  • Mandar Vaidya

    Hi Sujee,

    Thanks for your reply. I have tried in both the spark 1.3.1 and 1.4.0 version but getting same error.

    Regards,
    Mandar Vaidya.

  • Mujadid khalid

    Hi !

    I am a bit confused about lowerBound,upperBound and partitionColumn.
    Can you explain these parameters.

    I am trying to read 10 records from offset 100 to 110 but i am getting whole table instead of only 10 records. here is my code

    val sc = new SparkContext(new SparkConf().setAppName(“SparkJdbcDs”).setMaster(“local[*]”))
    val sqlContext = new SQLContext(sc)
    val options = new HashMap[String, String]()
    options.put(“driver”, “com.mysql.jdbc.Driver”)
    options.put(“url”, “jdbc:mysql://*******:3306/temp?user=****&password=****”)
    options.put(“dbtable”, “tempTable”)
    options.put(“lowerBound”, “100”)
    options.put(“upperBound”, “110”)
    options.put(“numPartitions”, “1”)
    sqlContext.load(“jdbc”, options)

    Anything wrong?

    • Sujee

      Hi Mujadid,
      The following is taken from the documentation of org.apache.spark.sql.jdbc.JDBCRelation#columnPartition method

      “Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.”

      So to get only 10 records, filter has to be added manually in the query.

      options.put(“dbtable”, “(select * from tempTable where id >= 100 and id < = 110) as tempTable2") options.put("partitionColumn", "id") options.put("numPartitions", "3") With above option, 10 records will be loaded into 3 partitions. So roughly each partition will have 3 records. So spark queries will look like below. query for partition 1. select * from (select * from tempTable where id >= 100 and id < = 110) as tempTable2 where id < 103 query for partition 2. select * from (select * from tempTable where id >= 100 and id < = 110) as tempTable2 where id >= 103 and id < 106 query for partition 3. select * from (select * from tempTable where id >= 100 and id < = 110) as tempTable2 where id > 106

      Now if we look at the options you have used.

      options.put(“dbtable”, “tempTable”)
      options.put(“partitionColumn”, “id”)
      options.put(“numPartitions”, “3”)

      Spark queries will look like below.
      query for partition 1. select * from tempTable where id < 103 query for partition 2. select * from tempTable where id >= 103 and id < 106 query for partition 3. select * from tempTable where id > 106

      Look at the 1st and 3rd query. Now you know why it loads all records in your table.

      For more info. https://issues.apache.org/jira/browse/SPARK-6800

      • Mujadid khalid

        Hi Sujee!

        I mean to say that if we run first time application we fetch all records. then more records added and again running application, we should only get updated records. like can we use limit and offset concept with spark sql?

        And how?

  • Guangyao

    Great post! I’ve got deprecated warning on ” sqlContext.load(“jdbc”, options)” after upgrade my Spark to 1.4.0…which method should i use now?

  • M.

    Hi,

    Thank you for your post.

    I don’t understand the “lowerBound/upperBound” parameters. For example, I’m running the following:
    connectionOptions.put(“url”, “jdbc:postgresql://*****:++32/u?user=user&password=”);
    connectionOptions.put(“dbtable”, “(SELECT ID, DATA FROM data limit 100) AS data”);
    connectionOptions.put(“driver”, “org.postgresql.Driver”);
    connectionOptions.put(“partitionColumn”, “ID”);
    connectionOptions.put(“lowerBound”, “100”);
    connectionOptions.put(“upperBound”, “300”);
    connectionOptions.put(“numPartitions”, “2”);

    And for some reason everything goes to a single partition. In the docs it says that no filter is applied to the ID column, and that this parameters are only for computing the partition stride … but that means that each partition will contain only 200 records? What if my DB has millions of records?

    Could you please shed some light into the matter?

    Thanks for your time

  • Satish Chandra

    HI Sujee,
    I have seen in couple of blogs where once we have Sql extracted data in jdbc we can apply saveToCassandra to it to store in a Cassandra table

    Please find my code below
    val myRDD27 = new JdbcRDD( sc, ()=> DriverManager.getConnection(url,user,pass),”select * from wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?”,5,0,1,(r: ResultSet) => {(r.getInt(“alarm_type_code”),r.getString(“language_code”),r.getString(“alrm_type_cd_desc”))})
    myRDD27.saveToCassandra(“keyspace”,”arm_typ_txt”,SomeColumns(“alarm_type_code”,”language_code”,”alrm_type_cd_desc”))

    But i am getting an error as
    Exception in thread “main” java.lang.NoSuchMethodError: com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions

    Could please provide your inputs which will be a great help

    Thanks for your support
    Satish Chandra

  • Balapoorna

    Hi,

    is there any way that ti can use oracle instead of mysql. do i have to do anything else than changing drivers and url??

  • Jinfan Huang

    with the format of url = “jdbc:oracle:thin:@dburl:port:sid” for oracle, and url = “jdbc:mysql://dburl:port/sid” for MySQL, they worked properly under scala.
    However, under java
    MYSQL_CONNECTION_URL =
    “jdbc:mysql://dburl:port/sid?user=” + MYSQL_USERNAME + “&password=” + MYSQL_PWD;
    MySQL works fine;
    However for oracle neither works
    DB_CONNECTION_URL = DB_URL+”:”+DB_PORT+”:”+DB_NAME+”,”+DB_USERNAME + “,” + DB_PWD;
    DB_CONNECTION_URL = DB_URL+”:”+DB_PORT+”/”+DB_NAME+”?user=”+DB_USERNAME + “&password=” + DB_PWD;
    complain is ORA-12505, TNS:listener does not currently know of SID given in connect descriptor

  • Satish Chandra J

    Please provide your inputs on Partition Column to be used in DataSourceAPI or JDBCRDD in a scenerio where the source table does not have a Numeric Columns which is sequential and unique such that proper partitioning can take place

  • Sivakumar

    Sujee,

    Is it mandatory to specify lowerBound and upperBound? If I want to load contents of entire table is there a way to do that? Thanks.

  • Ruchira

    sqlContext.load(“jdbc”, options);

    // As per spark 1.3 load method is depreciated. Would like to know the best way with the latest spark version.

  • Jorge Machado

    Just use :

    val jdbcDF = sqlContext.read.format(“jdbc”).options(
    Map(“url”->”jdbc:postgresql://dbhost:3307/db_name”,
    “dbtable”->”(select * from some_table) as some_alias”,
    “driver”->”org.postgresql.Driver”,
    “user”-> “dbuser”,
    “partitionColumn”->”id”,
    “lowerBound”->”0″,
    “upperBound”-> “1000″,
    “numPartitions”->”2″,
    “fetchSize”->”100″,
    “password”->”some_secret”)).load()

  • arun

    SQLContext.load(source: String, options: Map[String, String]): DataFrame

    ….how do I come to know what values I need to Send in second argument..the documentation says nothing in detail ?

  • Ramesh

    Hi,

    Can you please answer the below query.

    I have Oracle table ‘A’. I don’t know the lower bound and upper bound values of a partition column.

    In your blog you say that, to hardcode those values. But in production it’s not a good practice to hardcode those values.

    How can I do that??

  • Ramesh

    I know that I can do it in one way.
    By reading the table once using sqlContext.read(select max(partition _col), min(partition_col) from A) and create a dataframe with these 2 columns and get the values of Max and min bounds.

    Then assign to 2 variables(variable1 and variable2).

    Then again read the table SQLcontext.read(select * from A).
    .option(JDBC…)
    .option(partition column xx)
    . option(lowerbound variable1)
    . option(upperbound variable2)
    .numpartitions xx)

    This way I can do but the problem here is we are hitting the Oracle box twice which is not required.
    Can you reply on the same.

  • Niral Patel

    Hi Sujee,

    Could you please let me know what would be the jdbc drivers for Spark 2.0 Dataset to SQL Server… I am using this code but need to specify jdbc driver and that I don’t know.

Leave a Reply

Your email address will not be published. Required fields are marked *

Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.

Join 194 other subscribers