Archive for : April, 2015

Basic descriptive statistics in Apache Spark

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Spark core module provides basic descriptive statistics operations for RDD of numeric data. More complex statistics operations are available in MLlib module which is beyond the scope of this post.

The descriptive statistics operations are only available under a specialised version of Spark RDD called DoubleRDD. Following example application calculates mean, max, min, …etc of a sample dataset.

First we’ll generate a sample test dataset. Following code generates numbers from 1 to 99999 using Java 8 stream API.

Next create DoubleRDD from sample data. Please note the use of “parallelizeDoubles” method instead of usual “parallelize” method. This method accepts list of Doubles and generates JavaDoubleRDD.

Now we’ll calculate the mean of our dataset.

There are similar methods for other statistics operation such as max, standard deviation, …etc.

Every time one of this method is invoked , Spark performs the operation on the entire RDD data. If more than one operations performed, it will repeat again and again which is very inefficient. To solve this, Spark provides “StatCounter” class which executes once and provides results of all basic statistics operations in the same time.

Now results can be accessed as follows,

When this application runs, logs output will look like below.

Complete application is available at my GitHub repo https://github.com/sujee81/SparkApps.

Save apache spark dataframe to database

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Some of my readers asked about saving Spark dataframe to database. You’d be surprised if I say that it can be done in a single line with the new spark JDBC datasource API. It is true. Let’s look at it in details.

In this example, I’m going to save a sample dataframe to my local MySQL database.

  1. Define MySQL connection properties.
  2. Create Spark context and sql context.
  3. Now we’ll create a sample dataframe which will be later saved to MySQL in a table. In this example, I’m going to load a sample JSON file from local filesystem as a dataframe. Any dataframes can be used here. Refer Spark SQL documentation to learn about other ways of creating dataframes.
  4. Spark jdbc datasource API provides 2 options to save dataframe to a database.
    1. Option 1: Create new table and insert all records using “createJDBCTable” function.

      This function will create a new table with the given name. The table’s schema will be based on the schema of the dataframe. Once the table is created, it will insert all records.

      parameters
      url: connection URL
      table: table name
      allowExisting: If true, drop any table with same name. If false and there is already a table with same name, it will throw an exception

      e.g.

       

    2. Option 2: Insert all records to an existing table using “insertIntoJDBC” function.

      This function is useful if you already have a table and you want to insert records there. Please note that dataframe’s schema and existing table’s schema should match.

      parameters
      url: connection URL
      table: table name
      overwrite: If true, it will truncate the table before inserting records.

      e.g.

       

  5. Now run the program and verify that “users” table is created and all records are there.Users table

 

Complete application is available at my GitHub repo https://github.com/sujee81/SparkApps.

Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.

Join 194 other subscribers