Tag : etl

Loading database data using Spark 2.0 Data Sources API

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Last year, I blogged about loading database data using Spark 1.3. With multiple releases afterwards, spark and its data source API evolved a lot. With the new 2.0 major release, I thought about revisiting it again.

In this post, I’m using Spark 2.0.0 to rewrite the example of reading employees table from MySQL database to generate full name from first name and last name columns.

The new DataFrameReader class (introduced in version 1.4) defines methods used for loading external storages such as files, databases, …etc. DataFrameReader can be accessed by calling read() method in SparkSession (a new entry point introduced in 2.0 that subsumes SQLContext and HiveContext). Once we have a reference to DataFrameReader, database can be accessed using one of following approaches.

  1. Using format() and options() methods

    This is the example shown in official Spark SQL documentation. Here format() method is used to indicate the type of datasource. Options() method is used to pass JDBC connection properties as a Map. Alternatively option() method can be used which allows to define single property at a time. By chaining multiple option() methods, multiple properties can be passed. Option() method also has multiple variations to be used with different data types such as Double, String, …etc.
    The list of supported properties can be viewed here. http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

     

  2. Using convenient jdbc() method
    In this blog post, I’m focusing on this approach as it is more convenient and easy to read.

Steps

  1. MySQL connection parameters are defined as constants

  2. Initializing SparkSession

  3. JDBC connection properties

    “user” and “password” are mandatory connection properties need to be added here. All other mandatory properties are defined as method parameters of jdbc() method.
    If you’ve used old versions of spark before, another thing you’ll notice is user and password are no longer needed to be defined as part of connection URL.

  4. Loading MySQL query result as DataFrame

    Here we invoke jdbc() method with connection URL, db query, partitionColumn, lowerBound, upperBound, numPartitions and connection properties.

    Note: There is another variation of jdbc() method with parameters – connection URL, db query and connection properties. In that case, partitionColumn, lowerBound, upperBound, numPartitions have to be defined as connection properties along with user and password.

  5. Executing DataFrame by invoking an action

    Complete Java application is available at my GitHub repo https://github.com/sujee81/SparkApps.

Save apache spark dataframe to database

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Some of my readers asked about saving Spark dataframe to database. You’d be surprised if I say that it can be done in a single line with the new spark JDBC datasource API. It is true. Let’s look at it in details.

In this example, I’m going to save a sample dataframe to my local MySQL database.

  1. Define MySQL connection properties.
  2. Create Spark context and sql context.
  3. Now we’ll create a sample dataframe which will be later saved to MySQL in a table. In this example, I’m going to load a sample JSON file from local filesystem as a dataframe. Any dataframes can be used here. Refer Spark SQL documentation to learn about other ways of creating dataframes.
  4. Spark jdbc datasource API provides 2 options to save dataframe to a database.
    1. Option 1: Create new table and insert all records using “createJDBCTable” function.

      This function will create a new table with the given name. The table’s schema will be based on the schema of the dataframe. Once the table is created, it will insert all records.

      parameters
      url: connection URL
      table: table name
      allowExisting: If true, drop any table with same name. If false and there is already a table with same name, it will throw an exception

      e.g.

       

    2. Option 2: Insert all records to an existing table using “insertIntoJDBC” function.

      This function is useful if you already have a table and you want to insert records there. Please note that dataframe’s schema and existing table’s schema should match.

      parameters
      url: connection URL
      table: table name
      overwrite: If true, it will truncate the table before inserting records.

      e.g.

       

  5. Now run the program and verify that “users” table is created and all records are there.Users table

 

Complete application is available at my GitHub repo https://github.com/sujee81/SparkApps.

Loading database data into Spark using Data Sources API

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Update: Read this new post for Spark 2.0 example.

With Spark 1.3 release, it is easy to load database data into Spark using Spark SQL data sources API. In my last blog post, I explained about using JdbcRDD to do the same. With this new release, JdbcRDD is no longer the preferred approach. I will explain the differences later in another blog post. Now we’ll look at the new APIs.

Data sources API which provides a unified interface to query external data sources from Spark SQL is introduced in Spark 1.2. However an official JDBC data source API is released only in version 1.3. This release also introduced a very important feature of Spark – The DataFrame API which is a higher level abstraction over RDD and provides a simpler API for data processing.

There are two ways to call data sources API

  1. Programmatically using SQLContext load function
  2. Using SQL – We’ll look at this in another blog post

SQLContext load

The load function is defined in Scala as follows,

First parameter ‘source’ specifies the type of data source API. In our case, it is ‘jdbc’. 2nd parameter is a map of options required by the data source implementation specified in the first parameter. It varies from data source to data source. For JDBC datasource following parameters are required (Reproduced from Spark SQL documentation).

  1. url:
    The JDBC URL to connect to.
  2. dbtable:
    The JDBC table that should be read. Note that anything that is valid in a ‘FROM’ clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses.
  3. driver:
    The class name of the JDBC driver needed to connect to this URL. This class with be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem.
  4. partitionColumn, lowerBound, upperBound, numPartitions:
    These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question.

I’m going explain this using the same MySQL example application used in JdbcRDD example. It uses MySQL sample Employees database https://dev.mysql.com/doc/employee/en/. First it loads part of MySQL data into Spark using Data sources API and prints it in the log.

  1. MySQL connection parameters are defined as constants
  2. Initializing SQLContext from SparkContext
  3. JDBC data source options for MySQL database

    As of writing this, it is not possible to separately specify username and password. So it has been specified as part of connection URL.

    The most interesting part here is the ‘dbtable’ option. I have used a derived table created from a subquery. The subquery only selects the required columns from ’employees’ table. It also concatenates first name and last name and returns it as full name. Since this transformation happens inside the MySQL, overall performance of this app will be improved. Please note that similar optimization is also possible in JdbcRDD. Here I’m doing it this way to demontrate that ‘dbtable’ is not just limited to name of an existing table.

    Last 4 parameters are similar to JdbcRDD. Only difference is, ‘partitionColumn’ is an option here. In JdbcRDD, it has to be part of the SQL query.

  4. Loading MySQL query result as DataFrame

    Like RDD, DataFrame is also lazily executed. So calling load function will not immediately load the data from MySQL. It will wait until an action is executed on the returned DataFrame.

  5. Executing DataFrame by invoking an action

Complete Java application is available at my GitHub repo https://github.com/sujee81/SparkApps.

If you’d like to know about saving dataframe to database, read this post

Load database data into Spark using JdbcRDD in Java

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Update: As of Spark 1.3, Spark SQL Data sources API is the preferred way to loading data from external data sources. Head over to my new blog post to learn more about it. Additionally, if you’d like to know about saving dataframe to database, read this post.

When I started looking for a way to load data from a relational database into Spark for analytics purpose, I couldn’t find much helpful resources in the internet. Most of the examples are using a hardcoded Scala/Java collection as their input or load data from Hadoop or Cassandra. Then I discovered JdbcRDD which can be used to load data from any JDBC compatible database. JdbcRDD is a Scala class. As of writing this (Spark v1.2), there is no Java equivalent API. While it is technically possible to use Scala class in Java, it is still a difficult job for programmers who don’t have any experience in Scala. So this tutorial explains everything in details.

The example application I wrote to explain the concepts uses MySQL sample Employees database https://dev.mysql.com/doc/employee/en/. It loads entire “employees” table into Spark using JdbcRdd then do a simple Spark transformation and finally prints the results. Let’s discuss it step by step.

JdbcRDD is a sub class of RDD which means once we loaded DB data as JdbcRDD, all Spark operations can be applied on it. JdbcRDD constructor has following parameters,

JdbcRDD(SparkContext sc, scala.Function0<java.sql.Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, scala.Function1<java.sql.ResultSet,T> mapRow, scala.reflect.ClassTag<T> evidence$1)

  1. SparkContext sc:
    Spark context
  2. scala.Function0<java.sql.Connection> getConnection:
    This is Scala interface Function0<R> which doesn’t take any parameter and returns a result of type R (in this case, it is a java.sql.Connection). This is how database connection is supplied to JdbcRDD. Please note that this is Not an active DB connection. Functional programming technique is used here. This parameter supplies a function which knows how to acquire a DB connection but actual connection is made at a later stage when Spark decides it (upon invoking a RDD action).
  3. String sql:
    The SQL statement used to query the data from the database. This SQL must have 2 binding parameters of numeric types. Spark is a distributed system. To take advantage of it, the data has to be divided into multiple partitions (In cluster, these partitions will be in multiple Spark nodes). These binding parameters are used to query the data for a single partition. Next 3 parameters are used to determine the actual values of these parameters for each partition.
  4. long lowerBound
    Lower boundary of entire data
  5. long upperBound
    Upper boundary of entire data
  6. int numPartitions
    Number of partitions
  7. scala.Function1<java.sql.ResultSet,T> mapRow
    This function is used to transform the returned ResultSet into a type of our choice.
    This is Scala interface Function1<T1, R> which takes a parameter of type T1 and returns result of type R. In this case, T1 is java.sql.ResultSet. R depends on what we want to transform it into.
  8. scala.reflect.ClassTag<T> evidence$1
    This is the parameter Java developers don’t understand. I still don’t have a good idea. Based on what I read, this is some kind of type system info Scala runtime provides internally. If we look at any JdbcRDD examples in Scala, this parameter is not even visible. But when we use it in Java, we have to provide it as an instance of ClassTag.

Now we’ll see how we can use this constructor to load “employees” table.

First we’ll define the “getConnection” function of parameter 2. AbstractFunction0 is an abstract class which implements Scala interface Function0. DbConnection class extends it and implements “apply” function by returning Connection object.

Next we’ll define “mapRow” function of parameter 7. Here the ResultSet is converted to Object array using JdbcRDD utility function “resultSetToObjectArray”.

Next a DbConnection object is instantiated with Db parameters defined as Java constants.

Finally JdbcRDD is instantiated as follows. Rest of the parameters are defined inline.

As you can see, primary key “emp_no” is used for partitioning “employees” table and the binding parameters are used to define the upper and lower boundaries of each partition. Minimum and maximum values of emp_no are used as lowerBound and upperBound respectively. The data will be loaded into 10 partitions which means first partition will have records whose primary keys are within 10001 to roughly 59000. The JdbcRDD will take care of all these calculations automatically.
Last parameter is the magic code which returns the ClassTag.

Now we have the JdbcRDD which means we can use all Spark operations. Remember it is a Scala class which means we may have to handle the same ClassTag in those operations as well. Luckily Spark provides a pure Java API for RDD with JavaRDD class. So following code converts JdbcRDD to JavaRDD.

Now let’s try a simple transformation. Following code snippet joins the “first_name” column with “last_name” using “map” transformation and prints the full name as log message.

Complete Java application is available at my GitHub repo https://github.com/sujee81/SparkApps.

Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.

Join 194 other subscribers