Using the Snowflake Connector for Spark

The connector adheres to the standard Spark API with the addition of Snowflake-specific configuration options, which are covered in this topic.

In this Topic:

Using the Connector in Scala

Specifying the Data Source Class Name

To use Snowflake as a data source in Spark, use the .format option to provide the Snowflake connector class name that defines the data source.

Note that the class name is different depending on the version of the connector you are using:

  • Version 2.0 and higher: net.snowflake.spark.snowflake
  • Version 1.x and lower (deprecated): com.snowflakedb.spark.snowflakedb

To ensure a compile-time check of the class name, we highly recommend defining a variable for the class name, e.g.:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

Also, for convenience, the Utils class provides the variable, which can be imported as follows:

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

Note

All examples in this topic use SNOWFLAKE_SOURCE_NAME as the class definition.

Enabling Pushdown in a Session

Version 2.1.0 (and higher) of the connector supports query pushdown, which can significantly improve performance by pushing query processing to Snowflake when Snowflake is the Spark data source.

By default, pushdown is not enabled. To enable it within a Spark session, after instantiating a SparkSession object, invoke the following method:

SnowflakeConnectorUtils.enablePushdownSession(spark)

Where spark is your SparkSession object.

You can also disable it at any time by invoking the following method:

SnowflakeConnectorUtils.disablePushdownSession(spark)

Moving Data from Snowflake to Spark

Note

When using DataFrames, the Snowflake connector supports SELECT queries only.

To read data from Snowflake into a Spark DataFrame:

  1. Use the read() method of the SqlContext object to construct a DataFrameReader.

  2. Specify SNOWFLAKE_SOURCE_NAME using the format() method. For the definition, see Specifying the Data Source Class Name (in this topic).

  3. Specify the connector options using either the option() or options() method. For more information, see Setting Configuration Options for the Connector (in this topic).

  4. Specify one of the following options for the table data to be read:

    • dbtable: The name of the table to be read. All columns and records are retrieved; i.e. it is equivalent to SELECT * FROM <dbtable>.
    • query: The exact query (SELECT statement) to run.

Usage Notes

  • Currently, the connector does not support other types of queries (e.g. SHOW or DESC, or DML statements).

Performance Considerations

When transferring data between Snowflake and Spark, use the following methods to analyze/improve performance:

  • Use the net.snowflake.spark.snowflake.Utils.getLastSelect() method to see the actual query issued when moving data from Snowflake to Spark.

  • If you use the filter or where functionality of the Spark DataFrame, check that the respective filters are present in the issued SQL query. The Snowflake connector will try to translate all the filters requested by Spark to SQL.

    However, there are forms of filters that the Spark infrastructure today does not pass to the Snowflake connector. As a result, in some situations, a large number of unnecessary records are requested from Snowflake.

  • If you need only a subset of columns, make sure the reflect the subset in the SQL query.

  • In general, if the SQL query issued does not match what you expect based on the DataFrame operations, use the query option to provide the exact SQL syntax you want.

Examples

Read an entire table:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

Read the results of a query:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()

Moving Data from Spark to Snowflake

The steps for saving the contents of a DataFrame to a Snowflake table are similar to writing from Snowflake to Spark:

  1. Use the write() method of the DataFrame to construct a DataFrameWriter.

  2. Specify SNOWFLAKE_SOURCE_NAME using the format() method. For the definition, see Specifying the Data Source Class Name (in this topic).

  3. Specify the connector options using either the option() or options() method. For more information, see Setting Configuration Options for the Connector (in this topic).

  4. Use the dbtable option to specify the table to which data is written.

  5. Use the mode() method to specify the save mode for the content.

    For more information, see SaveMode (in the Spark documentation).

Examples

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Exporting JSON from Spark to Snowflake

Spark DataFrames can contain JSON objects, serialized as strings. The following code provides an example of converting a regular DataFrame to a DataFrame containing JSON data:

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)

Note that the resulting jsonDataFrame contains a single column of type StringType. As a result, when this DataFrame is exported to Snowflake with the common SaveMode.Overwrite mode, a new table in Snowflake is created with a single column of type VARCHAR.

To load jsonDataFrame into a VARIANT column:

  1. Create a Snowflake table (connecting to Snowflake in Scala using the Snowflake JDBC Driver). See Configuring and Using the JDBC Driver for Snowflake JDBC Driver connection parameter descriptions:

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
      String jdbcUrl = "jdbc:snowflake://abc123.snowflakecomputing.com/";
    
      Properties properties = new Properties();
      properties.put("user", "peter");
      properties.put("password", "test");
      properties.put("account", "abc123");
      properties.put("warehouse", "mywh");
      properties.put("db", "mydb");
      properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
  2. Instead of using SaveMode.Overwrite, use SaveMode.Append, to reuse the existing table. When the string value representing JSON is loaded into Snowflake, because the target column is of type VARIANT, it is parsed as JSON. For example:

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    

Working with Timestamps and Time Zones

Spark provides only one type of timestamp, equivalent to the Scala/Java Timestamp type. It is almost identical in behavior to the TIMESTAMP_LTZ (local time zone) data type in Snowflake. As such, when transferring data between Spark and Snowflake, we recommend using the following approaches to preserve time correctly, relative to time zones:

  • Use only the TIMESTAMP_LTZ data type in Snowflake.

    Note

    The default timestamp data type mapping is TIMESTAMP_NTZ (no time zone), so you must explicitly choose to use TIMESTAMP_LTZ instead using the TIMESTAMP_TYPE_MAPPING parameter.

  • Set the Spark time zone to UTC and use this time zone in Snowflake (i.e. don’t set the sfTimezone option for the connector, and don’t explicitly set a time zone in Snowflake). In this scenario, TIMESTAMP_LTZ and TIMESTAMP_NTZ are effectively equivalent.

If you don’t implement either of these approaches, undesired time modifications may occur. For example, consider the following scenario:

  • The time zone in Spark is set to America/New_York.
  • The time zone in Snowflake is set to Europe/Warsaw, which can happen by either:
    • Setting sfTimezone to Europe/Warsaw for the connector.
    • Setting sfTimezone to snowflake for the connector and setting the TIMEZONE session parameter in Snowflake to Europe/Warsaw.
  • Both TIMESTAMP_NTZ and TIMESTAMP_LTZ are in use in Snowflake.

In this scenario:

  1. If a value representing 12:00:00 in a TIMESTAMP_NTZ column in Snowflake is sent to Spark, this value doesn’t carry any time zone information. Spark treats the value as 12:00:00 in New York.
  2. If Spark sends this value 12:00:00 (in New York) back to Snowflake to be loaded into a TIMESTAMP_LTZ column, it is automatically converted and loaded as 18:00:00 (for the Warsaw time zone).
  3. If this value is then converted to TIMESTAMP_NTZ in Snowflake, the user sees 18:00:00, which is different from the original value, 12:00:00.

To summarize, we recommend strictly following at least one of these rules:

  • Use the same time zone, ideally UTC, for both Spark and Snowflake.
  • Use only the TIMESTAMP_LTZ data type for transferring data between Spark and Snowflake.

Sample Scala Program

Important

This sample program assumes you are using version 2.2.0 (or later) of the connector, which uses a Snowflake internal stage for storing temporary data and, therefore, does not require an S3 location for storing temporary data. If you are using an earlier version, you must have an existing S3 location and and include values for tempdir, awsAccessKey, awsSecretKey for sfOptions. For more details, see AWS S3 Options — Required only for Version 2.1.x (or lower) (in this topic).

The following Scala program provides a full use case for the Snowflake Connector for Spark. Before using the code, replace the following strings with the appropriate values, as described in Setting Configuration Options for the Connector (in this topic):

  • <account_name>: Name of your account (supplied by Snowflake).
  • <region_id>: Snowflake Region in which your account is located (use only if your account is not in US West).
  • <user_name> , <password>: Login credentials for the Snowflake user.
  • <database> , <schema> , <warehouse>: Defaults for the Snowflake session.
import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
// For accounts in US West, sfURL format is <account_name>.snowflakecomputing.com (i.e. no region ID)
// For all other regions, sfURL format is <account_name>.<region_id>.snowflakecomputing.com
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.<region_id>.snowflakecomputing.com",
    "sfAccount" -> "<account_name>",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Using the Connector with Python

Using the connector with Python is very similar to the Scala usage.

We recommend using the bin/pyspark script included in the Spark distribution.

Configuring the pyspark Script

The pyspark script must be configured similarly to the spark-shell script, using the --packages or --jars options. For example:

bin/pyspark--packages net.snowflake:snowflake-jdbc:2.8.1,net.snowflake:spark-snowflake_2.10:2.0.0

For more information about configuring the spark-shell script, see Step 3: Configure the Local Spark Cluster or Amazon EMR-hosted Spark Environment.

Enabling Pushdown in a Session

Version 2.1.0 (and higher) of the connector supports query pushdown, which can significantly improve performance by pushing query processing to Snowflake when Snowflake is the Spark data source.

By default, pushdown is not enabled. To enable it within a Spark session, after instantiating a SparkSession object, invoke the following method:

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

In this example, sc is your SparkSession object.

You can also disable it at any time by invoking the following method:

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

Sample Python Script

Important

This sample script assumes you are using version 2.2.0 (or later) of the connector, which uses a Snowflake internal stage for storing temporary data and, therefore, does not require an S3 location for storing this data. If you are using an earlier version, you must have an existing S3 location and and include values for tempdir, awsAccessKey, awsSecretKey for sfOptions. For more details, see AWS S3 Options — Required only for Version 2.1.x (or lower) (in this topic).

Once the pyspark script has been configured, you can perform SQL queries and other operations. Here’s an example Python script that performs a simple SQL query. This script illustrates basic connector usage. Most of the Scala examples in this document can be adapted with minimal effort/changes for use with Python.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<YOUR_APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<YOUR_AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<YOUR_AWS_SECRET>")

# Set options below
# For accounts in US West, sfURL format is <account_name>.snowflakecomputing.com (i.e. no region ID)
# For all other regions, sfURL format is <account_name>.<region_id>.snowflakecomputing.com
sfOptions = {
  "sfURL" : "<account_name>.<region_id>.snowflakecomputing.com",
  "sfAccount" : "<account_name>",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>",
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

Tip

Note the usage of sfOptions and SNOWFLAKE_SOURCE_NAME. This simplifies the code and reduces the chance of errors.

For details about the supported options for sfOptions, see Setting Configuration Options for the Connector (in this topic).

Setting Configuration Options for the Connector

The following options configure the behavior of the connector. They can be specified using .option(<key>, <value>) or .options(<map>) for a Spark DataframeReader class.

Tip

To facilitate using the options, we recommend saving them in a single Map variable and using the .options() API.

Required Connection Options

The following options are required for connecting to Snowflake:

sfUrl

URL for your Snowflake account.

The format of the URL domain is different depending on the Snowflake Region where your account is located:

US West:<account_name>.snowflakecomputing.com
Other regions:<account_name>.<region_id>.snowflakecomputing.com

Where:

  • <account_name> is the name of your account (provided by Snowflake).

  • <region_id> is:

    Snowflake Region Region ID
    US East us-east-1
    EU (Frankfurt) eu-central-1
    Asia Pacific (Sydney) ap-southeast-2

For example, if your account name is abc123:

  • In US West, the URL would be abc123.snowflakecomputing.com.
  • In EU (Frankfurt), the URL would be abc123.eu-central-1.snowflakecomputing.com.
sfUser
Login name for the Snowflake user.
sfPassword
Password for the Snowflake user.

Additional Options

All the remaining options are not required:

sfaccount
Account name, e.g. abc123. This option is no longer required because the account name is specified in sfUrl. It is documented here only for backward compatibility.
sfDatabase
The default database to use for the session after connecting.
sfSchema
The default schema to use for the session after connecting.
sfWarehouse
The default virtual warehouse to use for the session after connecting.
sfRole
The default security role to use for the session after connecting.
sfTimezone

The time zone to be used by Snowflake when working with Spark. Note that the parameter only sets the time zone in Snowflake; the Spark environment remains unmodified. The supported values are:

  • spark: Use the time zone from Spark (default).

  • snowflake: Use the current time zone for Snowflake.

  • sf_default: Use the default time zone for the Snowflake user who is connecting.

  • <time_zone>: Use a specific time zone (e.g. America/New_York), if valid.

    For more information about the impact of setting this option, see Working with Timestamps and Time Zones (in this topic).

sfCompress
If set to on (default), the data passed between Snowflake and Spark is compressed.
s3MaxFileSize
The size of the file used when moving data from Snowflake to Spark. The default is 10MB.
preactions

A semicolon-separated list of SQL commands that are executed before data is transferred between Spark and Snowflake.

If a SQL command contains %s, it is replaced with the table name referenced for the operation.

postactions

A semicolon-separated list of SQL commands that are executed after data is transferred between Spark and Snowflake.

If a SQL command contains %s, it is replaced with the table name referenced for the operation.

AWS S3 Options — Required only for Version 2.1.x (or lower)

These options are used to specify the AWS S3 location where temporary data is stored and provide authentication details for accessing the location. They are required only if using version 2.1.x (or lower) of the connector:

tempDir
The S3 location where intermediate data will be stored, e.g. s3n://abc123-bucket/spark-snowflake-tmp/.
awsAccessKey , awsSecretKey

Standard AWS credentials that allow access to the location specified in tempDir. These credentials should also be set for the Hadoop cluster.

If they are set, they can be retrieved from the existing SparkContext object.

temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

Temporary AWS credentials that allow access to the location specified in tempDir. Note that all three of these options must be set together.

Also, if these options are set, they take precedence over the awsAccessKey and awsSecretKey options.

check_bucket_configuration

If set to on (default), the connector checks if the bucket used for data transfer has a lifecycle policy configured (see Preparing an S3 Location — Required for Version 2.1.x (or Lower) for more information). If there is no lifecycle policy present, a warning is logged.

Disabling this option (by setting to off) skips this check. This can be useful if a user can access the bucket data operations, but not the bucket lifecycle policies. Disabling the option can also speed up query execution times slightly.

For details, see Authenticating S3 for Data Exchange — Required only for Version 2.1.x (or lower) (in this topic).

Passing Snowflake Session Parameters as Options for the Connector

The Snowflake Connector for Spark supports sending arbitrary session-level parameters to Snowflake (see Session Parameters for more info). This can be achieved by adding a ("<key>" -> "<value>") pair to the options object, where <key> is the session parameter name and <value> is the value.

Note

The <value> should be a string enclosed in double quotes, even for parameters that accept numbers or Boolean values, e.g. "1" or "true".

For example, the following code sample passes the USE_CACHED_RESULT session parameter with a value of "false", which disables using the results of previously-executed queries:

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method

Authenticating S3 for Data Exchange — Required only for Version 2.1.x (or lower)

Note

This task is required only for version 2.1.x (or lower) of the connector. Starting with v2.2.0, the connector uses Snowflake internal stages for all data exchange, which eliminates the requirement for an S3 location. If you are not currently using version 2.2.0 (or higher) of the connector, Snowflake strongly recommends upgrading to the latest version.

If you are using an older version of the connector, you need to prepare an S3 location that the connector can use to exchange data between Snowflake and Spark.

To allow access to the S3 bucket/directory used to exchange data between Spark and Snowflake (as specified for tempDir), two authentication methods are supported:

  • Permanent AWS credentials (also used to configure Hadoop/Spark authentication for accessing S3)
  • Temporary AWS credentials

Using Permanent AWS Credentials

This is the standard AWS authentication method. It requires a pair of awsAccessKey and awsSecretKey values.

Note

These values should also be used to configure Hadoop/Spark for accessing S3. For more information, including examples, see Authenticating Hadoop/Spark Using S3A or S3N (in this topic).

For example:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
// For accounts in US West, sfURL format is <account_name>.snowflakecomputing.com (i.e. no region ID)
// For all other regions, sfURL format is <account_name>.<region_id>.snowflakecomputing.com
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.<region_id>.snowflakecomputing.com",
    "sfAccount" -> "<account_name>",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)

For details about the options supported by sfOptions, see AWS S3 Options — Required only for Version 2.1.x (or lower) (in this topic).

Authenticating Hadoop/Spark Using S3A or S3N

Hadoop/Spark ecosystems support 2 URI schemes for accessing S3:

s3a://

New, recommended method (for Hadoop 2.7 and higher)

To use this method, modify the Scala examples in this topic to add the following Hadoop configuration options:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)

Make sure the tempdir option uses s3a:// as well.

s3n://

Older method (for Hadoop 2.6 and lower)

In some systems, it is necessary to specify it explicitly as shown in the following Scala example:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)

Using Temporary AWS Credentials

This method uses the temporary_aws_access_key_id, temporary_aws_secret_access_key, and temporary_aws_session_token configuration options for the connector.

This method allows additional security by providing Snowflake with only temporary access to the S3 bucket/directory used for data exchange.

Note

Temporary credentials can only be used to configure the S3 authentication for the connector; they cannot be used to configure Hadoop/Spark authentication.

Also, if you provide temporary credentials, they take precedence over any permanent credentials that have been provided.

The following Scala code sample provides an example of authenticating using temporary credentials:

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())

sfOptions2 can now be used with the options() DataFrame method.