Save apache spark dataframe to database

Update: this blog is migrated to Medium https://medium.com/spark-experts. To continue access good content, please subscribe it.

Some of my readers asked about saving Spark dataframe to database. You’d be surprised if I say that it can be done in a single line with the new spark JDBC datasource API. It is true. Let’s look at it in details.

In this example, I’m going to save a sample dataframe to my local MySQL database.

  1. Define MySQL connection properties.
  2. Create Spark context and sql context.
  3. Now we’ll create a sample dataframe which will be later saved to MySQL in a table. In this example, I’m going to load a sample JSON file from local filesystem as a dataframe. Any dataframes can be used here. Refer Spark SQL documentation to learn about other ways of creating dataframes.
  4. Spark jdbc datasource API provides 2 options to save dataframe to a database.
    1. Option 1: Create new table and insert all records using “createJDBCTable” function.

      This function will create a new table with the given name. The table’s schema will be based on the schema of the dataframe. Once the table is created, it will insert all records.

      parameters
      url: connection URL
      table: table name
      allowExisting: If true, drop any table with same name. If false and there is already a table with same name, it will throw an exception

      e.g.

       

    2. Option 2: Insert all records to an existing table using “insertIntoJDBC” function.

      This function is useful if you already have a table and you want to insert records there. Please note that dataframe’s schema and existing table’s schema should match.

      parameters
      url: connection URL
      table: table name
      overwrite: If true, it will truncate the table before inserting records.

      e.g.

       

  5. Now run the program and verify that “users” table is created and all records are there.Users table

 

Complete application is available at my GitHub repo https://github.com/sujee81/SparkApps.

42 comments

  • Jitendra Kumar

    Sir i had already tried that but problem remain same out of 10Lacs record it still take 30 secs… can we not improve this performance…

    • Sujee

      Hi Jitendra Kumar,
      There is no single answer for this. It depends on your hardware and software configuration.

      It could be the network latency. You could make sure, your database is in the same network sub group as your spark cluster.
      Spark is a distributed framework. Make sure less data shuffling between cluster notes.
      Another factor is number of partitions in a RDD.

      First identify which component responsible for performance issue and try to fix it.

      For spark related performance issues, I find this talk very useful. https://www.youtube.com/watch?v=kkOG_aJ9KjQ

  • Sreeharsha Eedupuganti

    Hi…. without using dataframes can we insert the data into tables in mysql i.e you are just inserting the values from a text/json file. What i need is can we pass the values using INSERT statement for example

    insert into table person(col1,col2) values (val1, val2)

    • Sujee

      Hi Sreeharsha,
      May I know more about your data? Where is it loaded from? Is it loaded into Spark as a dataframe or RDD?

      Both “createJDBCTable” and “insertIntoJDBC” are internally creating the “INSERT INTO …” statements for each record of dataframe. Since dataframe has the schema information, it is possible to create these statements automatically. If your dataframe’s schema is different from your target db table’s schema, you can use the powerful transformation features of spark to convert it to new dataframe which matches the schema of your table. It will be something like below

      Dataframe originalDataframe = loaded from somewhere into spark
      Dataframe modifiedDataframe = originalDataframe.map( transformation logic goes here);
      modifiedDataframe.insertIntoJDBC(MYSQL_CONNECTION_URL, “person”, false);

  • Ernst Bolt

    Nice job! Thanks for sharing. insertIntoJDBC works with Sql Server. createJDBCTable not (yet). It throws an exception, syntax error near ‘IF’.

  • Naresh

    Hi Sujee,

    How to load text file into Oracle table, means i was coded in sqlcontext.load “DataFrame usersDf = sqlContext.load(“src/main/resources/sample.txt”)”, i am getting error:it is supporting to load the file
    Any one Can you please me on this.

  • Naresh

    Thanks Sujee,

    Thanks for quick reply. I am using CSV file, and one more doubt is how to update row in while loading into table.
    I need to apply some transformations while transfer into table like updating, changing date formats. Can you please suggest on this…

  • Naresh

    Hi Sujee,

    I am getting this below error..
    15/05/12 04:50:24 ERROR yarn.ApplicationMaster: User class threw exception: Failed to load class for data source: CSV
    java.lang.RuntimeException: Failed to load class for data source: CSV
    Please check my code using java
    public class SparkToDb implements Serializable {

    private static final String USERNAME = “NRS”;
    private static final String PWD = “c0ntr0l8”;
    private static final String MYSQL_CONNECTION_URL =
    “jdbc:oracle:thin:NRS/c0ntr0l8@//q1cnn1d1.hydc.sbc.com:1532/Q1NRS”
    private static final JavaSparkContext sc =
    new JavaSparkContext(new SparkConf().setAppName(“SparkSaveToDb”).setMaster(“local[2]”))
    private static final SQLContext sqlContext = new SQLContext(sc);
    public static void main(String[] args) {
    DataFrame usersDf = sqlContext.load(“Naresh/sample.txt”,”CSV”);
    System.out.println(“Loading data from sample file”);
    usersDf.insertIntoJDBC(MYSQL_CONNECTION_URL, “ERIL”, false);
    System.out.println(“Inserted into JDBC Table”);

  • Apoorva

    Hi,
    I was trying to read from a Csv file and insert those entries into database.
    I figured out that internally spark created two RDD i.e. rdd_0_0 and rdd_0_1 that works on same data and does all the processing.
    Please help me in confirming if that is the case spark works:

    public final class TestJavaAggregation1 implements Serializable {
    /**
    *
    */
    private static final long serialVersionUID = 1L;
    static CassandraConfig config = null;
    static PreparedStatement statement = null;

    private transient SparkConf conf;
    private PersonAggregationRowWriterFactory aggregationWriter = new PersonAggregationRowWriterFactory();

    public Session session;

    private TestJavaAggregation1(SparkConf conf) {
    this.conf = conf;
    }

    public static void main(String[] args) throws Exception {
    SparkConf conf = new SparkConf().setAppName(“REadFromCSVFile”)
    .setMaster(“local[1]”).set(“spark.executor.memory”, “1g”);
    conf.set(“spark.cassandra.connection.host”, “localhost”);
    TestJavaAggregation1 app = new TestJavaAggregation1(conf);
    app.run();
    }

    private void run() {
    JavaSparkContext sc = new JavaSparkContext(conf);

    aggregateData(sc);
    sc.stop();
    }

    private JavaRDD sparkConfig(JavaSparkContext sc) {
    JavaRDD lines = sc.textFile(“PersonAggregation1_500.csv”, 1);
    System.out.println(lines.getCheckpointFile());
    lines.cache();

    final String heading = lines.first();
    System.out.println(heading);
    String headerValues = heading.replaceAll(“\t”, “,”);
    System.out.println(headerValues);

    CassandraConnector connector = CassandraConnector.apply(sc.getConf());
    Session session = connector.openSession();
    try {
    session.execute(“DROP KEYSPACE IF EXISTS java_api5”);
    session.execute(“CREATE KEYSPACE java_api5 WITH replication = {‘class’: ‘SimpleStrategy’, ‘replication_factor’: 1}”);
    session.execute(“CREATE TABLE java_api5.person (hashvalue INT, id INT, state TEXT, city TEXT, country TEXT, full_name TEXT, PRIMARY KEY((hashvalue), id, state, city, country, full_name)) WITH CLUSTERING ORDER BY (id DESC);”);
    session.execute(“CREATE TABLE java_api5.persontest (hashvalue INT, id INT, state TEXT, city TEXT, country TEXT, full_name TEXT, PRIMARY KEY((hashvalue), id, state, city, country, full_name)) WITH CLUSTERING ORDER BY (id DESC);”);
    session.execute(“CREATE TABLE java_api5.state_count (id INT PRIMARY KEY, state TEXT, count INT)”);
    } catch (Exception ex) {
    ex.printStackTrace();
    }
    return lines;
    }

    @SuppressWarnings(“serial”)
    public void aggregateData(JavaSparkContext sc) {
    JavaRDD lines = sparkConfig(sc);

    System.out.println(“FirstRDD” + lines.partitions().size());
    JavaRDD result = lines
    .map(new Function() {
    int i = 0;

    public PersonAggregation call(String row) {
    PersonAggregation aggregate = new PersonAggregation();
    row = row + “,” + this.hashCode();
    String[] parts = row.split(“,”);

    aggregate.setId(Integer.valueOf(parts[0]));
    aggregate.setFull_name(parts[1] + ” ” + parts[2] + ” ”
    + parts[3]);
    aggregate.setState(parts[4]);
    aggregate.setCity(parts[5]);
    aggregate.setCountry(parts[6]);
    aggregate.setHashValue(Integer.valueOf(parts[7]));

    //below save inserts 200 entries into the database while the CSV file has only 100 records.
    saveToJavaCassandra(aggregate);
    return aggregate;
    }
    });
    System.out.println(“______________________________” + result.collect().size() + “_______________________________”);

    List personAggregationList = result.collect();

    JavaRDD aggregateRDD = sc
    .parallelize(personAggregationList);
    javaFunctions(aggregateRDD).writerBuilder(“java_api5”, “person”,
    aggregationWriter).saveToCassandra();

    }
    }

    • Sujee

      Hi Apoorva,
      I haven’t worked with Cassandra before. I had a quick look at your code. It looks good. Few points to improve performance.

      1. Except for testing purpose, “local[1]” in spark config is not recommended. This will only use single thread.

      2. System.out.println(“______________________________” + result.collect().size() + “_______________________________”);

      Careful with comments like above which will trigger Spark jobs.

      Worst, the next line “List personAggregationList = result.collect();” will execute the collect function again.

      This could be rewritten as

      List personAggregationList = result.collect();
      System.out.println(“______________________________” + personAggregationList.size() + “_______________________________”);

      3. I don’t see any reason to create a new RDD from personAggregationList. “result” RDD itself could be used in place of aggregateRDD.

      List personAggregationList = result.collect();

      JavaRDD aggregateRDD = sc.parallelize(personAggregationList);

  • Chaitra

    Hi Sujee,

    I am new to spark,facing issue with data loading from flat file Ti dB.

    We r using oracle db, flat file data is saved in javaRDD of bean class.

    Not able to create table in db when trying to create table using createJDBC .

    Manually created table and tried to insert data into it. Able to do it using insertIntoJDBc.
    But the data is not getting inserted into table as per order.
    How to specify column and data in dataframe so that data will be inserted correctly.
    Thanks in Advance
    –Chaitra

  • Naresh

    Hi Sujee,

    I have doubt on Dataframe Partitons , What will be solution if the table is not partitioned on non-numaric colum, how to process in this case.
    What is the difference between Data Sorces API Vs AddPartition. How to insert the data using JdbcRDD- I am able to print the data using jdbcRDD.toJavaRDD() but how to insert into Oracle Table (We have option using DataFrame) but i need to insert using RDD’s only, any option can you please suggest me.

    Which way we can get better perforamance?.

    Thanks..

  • Naresh

    Hi sujee,

    Do you have any idea about Group By function using Java RDD’s , with out using DataFrames.
    Can you please suggest if you have idea on this..

  • Mandar Vaidya

    Hi sujee,

    I am trying to insert the data-frame into the oracle db using spark 1.3.1 SQL API.But i am getting following Exception:

    2015-06-24 17:40:15[sparkDriver-akka.actor.default-dispatcher-2] INFO MemoryStore:59 – Block broadcast_1 of size 4792 dropped from memory (free 83036733)
    2015-06-24 17:40:15[Spark Context Cleaner] INFO ContextCleaner:59 – Cleaned broadcast 1
    Exception in thread “main” java.sql.SQLException: ORA-00933: SQL command not properly ended

    at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:112)
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:331)
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:288)
    at oracle.jdbc.driver.T4C8Oall.receive(T4C8Oall.java:743)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:216)
    at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:955)
    at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1169)
    at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3285)
    at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3368)
    at org.apache.spark.sql.DataFrame.createJDBCTable(DataFrame.scala:1266)
    at com.sparkexpert.OracleDBConnect.main(OracleDBConnect.java:67)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

    but I am able to insert the dataframe using insertIntoJDBC() method successfully on existing table with same as data frame schema structure.
    df3.insertIntoJDBC( ORACLE_CONNECTION_URL, “employee1”, true );

    Thanks in Advanced.

    Regards,
    Mandar Vaidya.

  • Bluefish

    Hi, thanks for your share. I got a question when I used
    def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit.
    I have a table with five columns and one column is ‘id’ and auto_increment.
    In my program , I create a case class with 4 propertys. It works right in other project like Java, C# or others I had ever done, but in this, I got an error about
    Column count doesn’t match value count at row 1
    Could you help me?

  • Shahbaz Khan

    I am getting this error:

    java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/test
    at java.sql.DriverManager.getConnection(DriverManager.java:689)
    at java.sql.DriverManager.getConnection(DriverManager.java:208)
    at org.apache.spark.sql.jdbc.JdbcUtils$.createConnection(JdbcUtils.scala:34)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:230)
    at org.apache.spark.sql.DataFrame.createJDBCTable(DataFrame.scala:1462)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:40)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
    at $iwC$$iwC$$iwC$$iwC$$iwC.(:53)
    at $iwC$$iwC$$iwC$$iwC.(:55)
    at $iwC$$iwC$$iwC.(:57)
    at $iwC$$iwC.(:59)
    at $iwC.(:61)
    at (:63)
    at .(:67)
    at .()
    at .(:7)
    at .()
    at $print()
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

    You know why? I included the driver-class-path with the jar

  • Mujadid khalid

    Hi Suji!

    It’s great tutorial. I am using it for SQL Server as well. But I am facing issue when we save dataframe into sql server as
    df.createJDBCTable(url, table, true) giving exception incorrect syntax near IF

    I think issue is that sql server does not support query create table if not exist

    It works fine If we write code as follow
    df.createJDBCTable(url, table, false)

    Any idea how to resolve this issue?

  • Baljeet Singh

    It’s a very nice tutorial. I’m using Sql Server as database. The createJDBCTable command is working fine with it. But when i’m trying to insert more records in the same table that I have created in database using insertIntoJDBC, it is throwing an error message –
    Exception in thread “main” com.microsoft.sqlserver.jdbc.SQLServerException: There is already an object named ‘TestTable1’ in the database.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:216)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1515)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:404)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:350)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:5696)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:1715)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:180)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:155)
    at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeUpdate(SQLServerPreparedStatement.java:314)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:252)
    at org.apache.spark.sql.DataFrame.insertIntoJDBC(DataFrame.scala:1495)
    at com.dataframe.load.db.DataFrameToObjectDBCommand.saveInDB(DataFrameToObjectDBCommand.java:156)
    at com.dataframe.load.db.DataFrameToObjectDBCommand.main(DataFrameToObjectDBCommand.java:162)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    15/09/11 12:47:23 INFO spark.SparkContext: Invoking stop() from shutdown hook
    15/09/11 12:47:23 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on 10.184.57.103:54266 in memory (size: 19.5 KB, free: 265.4 MB)
    15/09/11 12:47:23 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 10.184.57.103:54266 in memory (size: 1821.0 B, free: 265.4 MB).

    I have tried insertIntoJDBC using by passing passing last argument as true and false but it is throwing same exception.
    Another thing is i’m using Spark-Sql version 1.4.1 and both the methods, createJDBCTable and insertIntoJDBC are deprecated. Can u pls help in doing the same. or if there is any other way to do the same in 1.4.1 version.
    Thanks in advance.

  • Gianluca

    I’m experiencing the same problem with jdbc (version 4.2) for SQL Server (2012) . I can only create a new table but when the table already exists, the code thows an exception. The same code works with MariaDB.

    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    import sys

    sc=SparkContext(appName=”MySQL Query”)

    sqlctx=SQLContext(sc)
    serverName=”SQLIPAddress”
    serverPort=”SQL Port”
    serverUsername=”username”
    serverPassword=”password”
    serverDatabase=”database”
    #########################################################################
    connString=”jdbc:sqlserver://{SERVER}:{PORT};user={USER};password={PASSWORD};databasename={DATABASENAME}”
    connString=connString.format(SERVER=serverName,PORT=serverPort,USER=serverUsername,PASSWORD=serverPassword,DATABASENAME=serverDatabase)

    df=sqlctx.read.format(“jdbc”).options(url=connString,dbtable=”(select * from TestTable) as test_Table”).load()

    df.show()

    try:
    df.write.jdbc(connString,”Test_Target”,”append”)
    print(“saving completed”)
    except:
    print(“Error in saving data”,sys.exc_info()[0])

    sc.stop()

    This code thows CREATE TABLE FAILED. If i change connString to a MariaDB, all is fine.

    Does anyone have any ideas?

  • vivek

    Hi i tried to load parquet files using the above statements into mysql it worked. But when i tried to load it into vertica database this is the error i am facing

    Exception in thread “main” java.sql.SQLSyntaxErrorException: [Vertica][VJDBC](5108) ERROR: Type “TEXT” does not exist
    at com.vertica.util.ServerErrorData.buildException(Unknown Source)
    at com.vertica.io.ProtocolStream.readExpectedMessage(Unknown Source)
    at com.vertica.dataengine.VDataEngine.prepareImpl(Unknown Source)
    at com.vertica.dataengine.VDataEngine.prepare(Unknown Source)
    at com.vertica.dataengine.VDataEngine.prepare(Unknown Source)
    at com.vertica.jdbc.common.SPreparedStatement.(Unknown Source)
    at com.vertica.jdbc.jdbc4.S4PreparedStatement.(Unknown Source)
    at com.vertica.jdbc.VerticaJdbc4PreparedStatementImpl.(Unknown Source)
    at com.vertica.jdbc.VJDBCObjectFactory.createPreparedStatement(Unknown Source)
    at com.vertica.jdbc.common.SConnection.prepareStatement(Unknown Source)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:275)
    at org.apache.spark.sql.DataFrame.createJDBCTable(DataFrame.scala:1611)
    at com.sparkread.SparkVertica.JdbctoVertica.main(JdbctoVertica.java:51)
    Caused by: com.vertica.support.exceptions.SyntaxErrorException: [Vertica][VJDBC](5108) ERROR: Type “TEXT” does not exist
    … 13 more

    This error is because vertica db doesn’t support the datatypes which is in the parquet file. I do not wanted to type cast the columns since its going to be a performance issue. we are looking to load around 280 million rows. Could you please suggest the best way to load the data into vertica db.

  • Salesforce Training in Chennai

    I’d like to access a table on a MS SQL Server (Microsoft). Is it possible from Databricks? To my understanding, the syntax is something like this (in a SQL Notebook): CREATE TEMPORARY TABLE jdbcTable

    USING org.apache.spark.sql.jdbc

    OPTIONS ( url “jdbc:sqlserver://aaaa.database.windows.net;DatabaseName=bbbb;user=xxx;password=yyy”,
    dbtable “SalesLT.Product” ) The following error occurs: Error in SQL statement: SQLException: No suitable driver found for jdbc:sqlserver

  • cloud computing Training in Chennai

    I’ve attempted to import the project into Intellij but it insists that RDD.toDF() ‘is not a member of org.apache.spark.rdd.RDD[Auction]. (I started with the eBook version, and then found this version when looking for an answer.) Intellij has “optimized” my imports to import org.apache.spark.{SparkConf, SparkContext}Can you suggest what I’m missing?

  • ak47

    The “insertIntoJDBC” approach (for existing table) results in a commit per partition. What if insert from one executer node fails? Now you have partial data in the table. This approach is not safe.

  • Suresh

    JavaSparkContext is not acceptible in SQLContext() method
    private static final JavaSparkContext sc =new JavaSparkContext(conf);
    private static final SQLContext sqlContext = new SQLContext(sc);

    SQLContext(sc) is giving error a “The constructor SQLContext(JavaSparkContext) is undefined”

    I am using Spark version 1.1.0.

  • Suresh

    I am trying to insert data into oracle database using Spark Sql.
    But JavaSparkContext is not acceptible in SQLContext() method.
    Below is the code snippet:

    private static final JavaSparkContext sc =new JavaSparkContext(conf);
    private static final SQLContext sqlContext = new SQLContext(sc);

    SQLContext(sc) is giving error in eclipse as “The constructor SQLContext(JavaSparkContext) is undefined”

    I am using Spark version 1.1.0.

  • arjun

    Hi,

    I am getting performance issue while inserting dataframe into MySQL table. It is taking around 15 min to insert 5K records(each record contains 86 columns) into SQL table even i have given 5G, 5 cores while running spark job.

    Can some one help in this why it is taking much time to load small amoiunt of data.

  • Amrit

    Hi Anyone worked with SparkSQL using pyspark. Please help me with this problem below:

    I am actually trying to define a function and then register the function, which is working fine.
    The challenge here is : I am trying to use HiveContext and in the function definition I am writing
    a HQL query.

    from pyspark.sql import *
    from pyspark.sql.functions import udf
    from pyspark.sql import HiveContext
    from pyspark.sql.types import StringType
    sqlCtx = HiveContext(sc)

    def Foo(x-params):
    x-params=sqlCtx.sql(“select Xyz from dbname.table_name”)
    return x-params //x-params is considered a DF not a string

    sqlCtx.registerFunction(“Foo”,Foo(StringType()),returnType=StringType())

    Getting issues: not sure whether I am using the correct return type.

    Really appreciate your help.

  • RAYFORD

    Some databases, such as H2, convert all names to upper case. You ll need to use upper case to refer to those names in Spark SQL.

  • Sandeep

    Hi, I am trying to write data from Spark to Snowflake database(cloud datawarehouse). Could you please look at the error message at the end.
    Write from Spark to Snowflake
    spark-shell –jars /usr/hdp/2.4.0.0-169/hadoop/lib/aws-java-sdk-1.7.4.jar,/root/dev/sf/spark-streaming-kafka-assembly_2.10-1.6.0.jar,/usr/hdp/2.4.0.0-169/tez/lib/hadoop-aws-2.7.1.2.4.0.0-169.jar,/root/dev/sf/spark-snowflakedb_2.10-1.1.0.jar,/root/dev/sf/snowflake_jdbc.jar,/usr/hdp/2.4.0.0-169/slider/lib/hadoop-common-2.7.1.2.4.0.0-169.jar,/usr/hdp/2.4.0.0-169/mahout/lib/hadoop/hadoop-client-2.7.1.2.4.0.0-169.jar –driver-memory 30G –num-executors 32

    in spark shell:
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.sql.SaveMode
    import com.snowflakedb.spark.snowflakedb
    import com.snowflakedb.spark.snowflakedb.Parameters
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType}
    val zkQuorum =”xxxxxxxxxx.aaaaa.com:0000″
    val consumerGroup = “logstash-grp-vpn”
    val topics = “vpnlogs”
    val SNOWFLAKE_SOURCE_NAME = “com.snowflakedb.spark.snowflakedb”
    val numThreads = 31
    val sparkConf = new SparkConf().setAppName(“vpn-kafka”).setMaster(s”local[$numThreads]”)

    val ssc = new StreamingContext(sc, Seconds(20))
    val topicMap = topics.split(“,”).map((_, numThreads.toInt)).toMap
    sc.hadoopConfiguration.set(“fs.s3n.impl”, “org.apache.hadoop.fs.s3native.NativeS3FileSystem”)
    sc.hadoopConfiguration.set(“fs.s3.impl”, “org.apache.hadoop.fs.s3.S3FileSystem”)
    sc.hadoopConfiguration.set(“fs.s3n.awsAccessKeyId”, “AAAAAAAAAAAAAAAA”)
    sc.hadoopConfiguration.set(“fs.s3n.awsSecretAccessKey”,”AAAAAAAAAAAAAAAAaa”)

    //configure your Snowflake environment
    var sfOptions = Map( “sfURL” -> “xxxxxx.snowflakecomputing.com”,
    “sfAccount” -> “aaa”,”sfUser” -> “ETL_USER”,
    “sfPassword” -> “xxxxxxxxxxx”,
    “sfDatabase” -> “DEV”,
    “sfSchema” -> “RAWFILES”,
    “sfWarehouse” -> “LOADING”,
    “awsAccessKey” -> sc.hadoopConfiguration.get(“fs.s3n.awsAccessKeyId”),
    “awsSecretKey” -> sc.hadoopConfiguration.get(“fs.s3n.awsSecretAccessKey”),
    “tempdir” -> “s3n://snowflake-poc”)

    kafkaStream.foreachRDD { rdd =>rdd.collect().foreach(println)
    val df=rdd.toDF(“test”)
    rdd.collect().foreach(println)
    df.write .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option(“dbtable”, “vpn_all”)
    .mode(SaveMode.Overwrite).save()}

    ssc.start()

    ################ Below is the error message#####################

    val: ERROR kafkaStream not found

  • Venkata

    Hi,
    We are trying to insert JSONB data type into Postgres using SPARK. But i am getting the error. Here is what i am doing.

    Dataframe Schema:
    root
    |– acctID: string (nullable = true)
    |– evtDetailInfo: struct (nullable = true)
    | |– @dateEvt: string (nullable = true)
    | |– @timeEvt: string (nullable = true)
    | |– @timeID: string (nullable = true)
    | |– CICSTranCode: string (nullable = true)
    | |– custName: string (nullable = true)
    | |– evtInfo: array (nullable = true)
    | | |– element: string (containsNull = true)
    | |– evtType: string (nullable = true)
    | |– operID: string (nullable = true)
    | |– trackingNbr: string (nullable = true)

    DataBase Table Schema:
    CREATE TABLE public.test
    (
    acct_id bigint NOT NULL,
    evt_detl jsonb NOT NULL,
    evt_val bigint NOT NULL
    )

    When I use dataFrame_toSave.write.mode(SaveMode.Append).jdbc(dbUrl, “public.test”, dbPropForDFtoSave) to save the data, I am seeing the below error.

    Exception in thread “main” java.lang.IllegalArgumentException: Can’t get JDBC type for struct<@dateEvt:string,@timeEvt:string,@timeID:string,CICSTranCode:string,custName:string,evtInfo:array,evtType:string,operID:string,trackingNbr:string>

    Error:
    Exception in thread “main” java.lang.IllegalArgumentException: Can’t get JDBC type for struct<@dateEvt:string,@timeEvt:string,@timeID:string,CICSTranCode:string,custName:string,evtInfo:array,evtType:string,operID:string,trackingNbr:string>
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:282)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:281)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:281)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441)
    at com.capitalone.frauddecisionengine.processor.DataFramePr

    Can you suggest the best approach to save the data frame into the posgres JSONB table.

  • praveen

    Dataframe Schema:
    root
    |– acctID: string (nullable = true)
    |– evtDetailInfo: struct (nullable = true)
    | |– @dateEvt: string (nullable = true)
    | |– @timeEvt: string (nullable = true)
    | |– @timeID: string (nullable = true)
    | |– CICSTranCode: string (nullable = true)
    | |– custName: string (nullable = true)
    | |– evtInfo: array (nullable = true)
    | | |– element: string (containsNull = true)
    | |– evtType: string (nullable = true)
    | |– operID: string (nullable = true)
    | |– trackingNbr: string (nullable = true)

    DataBase Table Schema:
    CREATE TABLE public.test
    (
    acct_id bigint NOT NULL,
    evt_detl jsonb NOT NULL,
    evt_val bigint NOT NULL
    )

    When I use dataFrame_toSave.write.mode(SaveMode.Append).jdbc(dbUrl, “public.test”, dbPropForDFtoSave) to save the data, I am seeing the below error.

    Exception in thread “main” java.lang.IllegalArgumentException: Can’t get JDBC type for struct<@dateEvt:string,@timeEvt:string,@timeID:string,CICSTranCode:string,custName:string,evtInfo:array,evtType:string,operID:string,trackingNbr:string>

    Can you suggest the best approach to save the data frame into the posgres JSONB table.

    Exception in thread “main” java.lang.IllegalArgumentException: Can’t get JDBC type for struct<@dateEvt:string,@timeEvt:string,@timeID:string,CICSTranCode:string,custName:string,evtInfo:array,evtType:string,operID:string,trackingNbr:string>
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:282)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:281)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:281)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441)

  • Karol

    I have one question regarding transactions. Is this possible to have whole save operation done in transactional way, I mean, transactional in scope of all saving cluster nodes.

  • Dissapointed Spark users

    This post is from 2015 and now, in 2017, there is still not a very basic CRUD example in the Spark SQL documentation. Sheesh!

    There are a bazzilion of SQL database out there. People use them. If you want Spark to succeed you should write a top notch how to (read, insert, update, delete) data in/from/to a basic SQL database (MySQL, Oracle, etc).

    There would be no need for various blogs about opinions about how to achieve a simple task. Just read documentation and go on your merry way.

Leave a Reply

Your email address will not be published. Required fields are marked *

Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.

Join 194 other subscribers