Loading database data using Spark 2.0 Data Sources API

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Last year, I blogged about loading database data using Spark 1.3. With multiple releases afterwards, spark and its data source API evolved a lot. With the new 2.0 major release, I thought about revisiting it again.

In this post, I’m using Spark 2.0.0 to rewrite the example of reading employees table from MySQL database to generate full name from first name and last name columns.

The new DataFrameReader class (introduced in version 1.4) defines methods used for loading external storages such as files, databases, …etc. DataFrameReader can be accessed by calling read() method in SparkSession (a new entry point introduced in 2.0 that subsumes SQLContext and HiveContext). Once we have a reference to DataFrameReader, database can be accessed using one of following approaches.

  1. Using format() and options() methods

    This is the example shown in official Spark SQL documentation. Here format() method is used to indicate the type of datasource. Options() method is used to pass JDBC connection properties as a Map. Alternatively option() method can be used which allows to define single property at a time. By chaining multiple option() methods, multiple properties can be passed. Option() method also has multiple variations to be used with different data types such as Double, String, …etc.
    The list of supported properties can be viewed here. http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

     

  2. Using convenient jdbc() method
    In this blog post, I’m focusing on this approach as it is more convenient and easy to read.

Steps

  1. MySQL connection parameters are defined as constants

  2. Initializing SparkSession

  3. JDBC connection properties

    “user” and “password” are mandatory connection properties need to be added here. All other mandatory properties are defined as method parameters of jdbc() method.
    If you’ve used old versions of spark before, another thing you’ll notice is user and password are no longer needed to be defined as part of connection URL.

  4. Loading MySQL query result as DataFrame

    Here we invoke jdbc() method with connection URL, db query, partitionColumn, lowerBound, upperBound, numPartitions and connection properties.

    Note: There is another variation of jdbc() method with parameters – connection URL, db query and connection properties. In that case, partitionColumn, lowerBound, upperBound, numPartitions have to be defined as connection properties along with user and password.

  5. Executing DataFrame by invoking an action

    Complete Java application is available at my GitHub repo https://github.com/sujee81/SparkApps.

7 comments

  • wasp

    Hi
    I read oracle data when i use jdbcDF.collectAsList() , there is a error: can not cast Integer to String
    I think may be some field data in my table has some error? And i use jdbcRDD there is no error.
    but i do not know how to find it out, can you give me some help?

  • sumanth nag

    im facicng the this error
    java.sql.SQLException: No suitable driver
    at java.sql.DriverManager.getDriver(DriverManager.java:315)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:54)
    at scala.Option.getOrElse(Option.scala:121)

    any clue on this ,in the previous api we used to mention driver param,is it not needed in 2.0 ?? then how will spark understand the driver to use.

  • Sreeharsha Eedupuganti

    This is my code :

    DataFrame df=hiveql.sql(“select * from consumer.complaints”);
    df.registerTempTable(“consumer”);

    DataFrame result=df.groupBy(“company”).agg(functions.count(“company”).as(“count”));
    result.select(“company”,”count”).agg(functions.max(“count”).as(“ascount”)).show();

    Output:

    +——-+
    |ascount|
    +——-+
    | 237629|
    +——-+

    Question : Lets say i had a table called sample

    +————+————+
    |ascount |company|
    +————+————+
    | 237629 | XYZ |
    +———–+————+

    i am getting ascount value from this table but what i need here is i want to get the “company” name for the “ascount” value we get. Any suggestions please….Thanks

  • JaeMyoung, Oh

    Running these source with Eclipse in Windows 10, I’ve got error as below. How can I resolved these problem ?
    …………………………………………………………………………………………………………………………………………………………..
    2017-02-16 10:00:58[main] INFO SharedState:54 – Warehouse path is ‘file:C:\workspace\SparkApps-master\spark-2_0_0-load-from-db/spark-warehouse’.
    Exception in thread “main” java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:/workspace/SparkApps-master/spark-2_0_0-load-from-db/spark-warehouse
    at org.apache.hadoop.fs.Path.initialize(Path.java:206)
    at org.apache.hadoop.fs.Path.(Path.java:172)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeQualifiedPath(SessionCatalog.scala:114)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:145)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.(SessionCatalog.scala:89)
    at org.apache.spark.sql.internal.SessionState.catalog$lzycompute(SessionState.scala:95)
    at org.apache.spark.sql.internal.SessionState.catalog(SessionState.scala:95)
    at org.apache.spark.sql.internal.SessionState$$anon$1.(SessionState.scala:112)
    at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:112)
    at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
    at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:382)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:238)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:194)
    at com.sparkexpert.Main.main(Main.java:34)
    Caused by: java.net.URISyntaxException: Relative path in absolute URI: file:C:/workspace/SparkApps-master/spark-2_0_0-load-from-db/spark-warehouse
    at java.net.URI.checkPath(Unknown Source)
    at java.net.URI.(Unknown Source)
    at org.apache.hadoop.fs.Path.initialize(Path.java:203)
    … 15 more
    …………………………………………………………………………………………………………………………………………………………..

  • Panshul Gupta

    Hey, Thanks for the awesome post. It helped a lot.
    I am trying to implement this in Java.

    I am trying to do something similar to :
    final String dbTable =
    “(select emp_no, concat_ws(‘ ‘, first_name, last_name) as full_name from employees) as employees_name”;

    as:

    String trxLoadQueryString = “SELECT * FROM trx_rec”;
    jdbcOptions.put(“dbtable”, trxLoadQueryString);

    Dataset trxDataFrame = session.read().format(“jdbc”).options(jdbcOptions).load();

    On execution of the above, I get an SQL error:
    You have an error in your SQL syntax: near ‘SELECT * FROM trx_rec WHERE 1=0’

    Do not understand why is there a WHERE 1=0 at the end of my query.

Leave a Reply

Your email address will not be published. Required fields are marked *

Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.

Join 194 other subscribers