Using the Spark Connector

The connector adheres to the standard Spark API, but with the addition of Snowflake-specific configuration options, which are described in this topic.

In this Topic:

Using the Connector in Scala

Specifying the Data Source Class Name

To use Snowflake as a data source in Spark, use the .format option to provide the Snowflake connector class name that defines the data source.

Note that the class name is different depending on the version of the connector you are using:

  • Version 2.0 and higher: net.snowflake.spark.snowflake
  • Version 1.x and lower (deprecated): com.snowflakedb.spark.snowflakedb

To ensure a compile-time check of the class name, we highly recommend defining a variable for the class name, e.g.:

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

Also, for convenience, the Utils class provides the variable, which can be imported as follows:

import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

Note

All examples in this topic use SNOWFLAKE_SOURCE_NAME as the class definition.

Enabling/Disabling Pushdown in a Session

Version 2.1.0 (and higher) of the connector supports query pushdown, which can significantly improve performance by pushing query processing to Snowflake when Snowflake is the Spark data source.

By default, pushdown is enabled.

To disable it within a Spark session, after instantiating a SparkSession object, invoke the following static method call:

SnowflakeConnectorUtils.disablePushdownSession(spark)

where spark is your SparkSession object.

You can re-enable pushdown at any time by invoking the following method:

SnowflakeConnectorUtils.enablePushdownSession(spark)

Moving Data from Snowflake to Spark

Note

When using DataFrames, the Snowflake connector supports SELECT queries only.

To read data from Snowflake into a Spark DataFrame:

  1. Use the read() method of the SqlContext object to construct a DataFrameReader.

  2. Specify SNOWFLAKE_SOURCE_NAME using the format() method. For the definition, see Specifying the Data Source Class Name (in this topic).

  3. Specify the connector options using either the option() or options() method. For more information, see Setting Configuration Options for the Connector (in this topic).

  4. Specify one of the following options for the table data to be read:

    • dbtable: The name of the table to be read. All columns and records are retrieved; i.e. it is equivalent to SELECT * FROM <dbtable>.
    • query: The exact query (SELECT statement) to run.

Usage Notes

  • Currently, the connector does not support other types of queries (e.g. SHOW or DESC, or DML statements) when using DataFrames.

Performance Considerations

When transferring data between Snowflake and Spark, use the following methods to analyze/improve performance:

  • Use the net.snowflake.spark.snowflake.Utils.getLastSelect() method to see the actual query issued when moving data from Snowflake to Spark.

  • If you use the filter or where functionality of the Spark DataFrame, check that the respective filters are present in the issued SQL query. The Snowflake connector will try to translate all the filters requested by Spark to SQL.

    However, there are forms of filters that the Spark infrastructure today does not pass to the Snowflake connector. As a result, in some situations, a large number of unnecessary records are requested from Snowflake.

  • If you need only a subset of columns, make sure the reflect the subset in the SQL query.

  • In general, if the SQL query issued does not match what you expect based on the DataFrame operations, use the query option to provide the exact SQL syntax you want.

Examples

Read an entire table:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

Read the results of a query:

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()

Moving Data from Spark to Snowflake

The steps for saving the contents of a DataFrame to a Snowflake table are similar to writing from Snowflake to Spark:

  1. Use the write() method of the DataFrame to construct a DataFrameWriter.

  2. Specify SNOWFLAKE_SOURCE_NAME using the format() method. For the definition, see Specifying the Data Source Class Name (in this topic).

  3. Specify the connector options using either the option() or options() method. For more information, see Setting Configuration Options for the Connector (in this topic).

  4. Use the dbtable option to specify the table to which data is written.

  5. Use the mode() method to specify the save mode for the content.

    For more information, see SaveMode (Spark documentation).

Examples

df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Exporting JSON from Spark to Snowflake

Spark DataFrames can contain JSON objects, serialized as strings. The following code provides an example of converting a regular DataFrame to a DataFrame containing JSON data:

val rdd = myDataFrame.toJSON
val schema = new StructType(Array(StructField("JSON", StringType)))
val jsonDataFrame = sqlContext.createDataFrame(
            rdd.map(s => Row(s)), schema)

Note that the resulting jsonDataFrame contains a single column of type StringType. As a result, when this DataFrame is exported to Snowflake with the common SaveMode.Overwrite mode, a new table in Snowflake is created with a single column of type VARCHAR.

To load jsonDataFrame into a VARIANT column:

  1. Create a Snowflake table (connecting to Snowflake in Scala using the Snowflake JDBC Driver). See Configuring and Using the JDBC Driver for Snowflake JDBC Driver connection parameter descriptions:

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class SnowflakeJDBCExample {
      public static void main(String[] args) throws Exception {
      String jdbcUrl = "jdbc:snowflake://xy12345.snowflakecomputing.com/";
    
      Properties properties = new Properties();
      properties.put("user", "peter");
      properties.put("password", "test");
      properties.put("account", "xy12345");
      properties.put("warehouse", "mywh");
      properties.put("db", "mydb");
      properties.put("schema", "public");
    
        // get connection
        System.out.println("Create JDBC connection");
        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
        System.out.println("Done creating JDBC connection\n");
        // create statement
        System.out.println("Create JDBC statement");
        Statement statement = connection.createStatement();
        System.out.println("Done creating JDBC statement\n");
        // create a table
        System.out.println("Create my_variant_table table");
        statement.executeUpdate("create or replace table my_variant_table(json VARIANT)");
        statement.close();
        System.out.println("Done creating demo table\n");
    
        connection.close();
        System.out.println("Close connection\n");
      }
    }
    
  2. Instead of using SaveMode.Overwrite, use SaveMode.Append, to reuse the existing table. When the string value representing JSON is loaded into Snowflake, because the target column is of type VARIANT, it is parsed as JSON. For example:

    df.write
        .format(SNOWFLAKE_SOURCE_NAME)
        .options(sfOptions)
        .option("dbtable", "my_variant_table")
        .mode(SaveMode.Append)
        .save()
    

Executing DDL/DML SQL Statements

Use the runQuery() method of the Utils object to execute DDL/DML SQL statements, in addition to queries, e.g.:

Utils.runQuery(sfOptions, "CREATE TABLE FOO(A INTEGER)")

Where sfOptions is the parameters map used to read/write DataFrames.

Working with Timestamps and Time Zones

Spark provides only one type of timestamp, equivalent to the Scala/Java Timestamp type. It is almost identical in behavior to the TIMESTAMP_LTZ (local time zone) data type in Snowflake. As such, when transferring data between Spark and Snowflake, we recommend using the following approaches to preserve time correctly, relative to time zones:

  • Use only the TIMESTAMP_LTZ data type in Snowflake.

    Note

    The default timestamp data type mapping is TIMESTAMP_NTZ (no time zone), so you must explicitly set the TIMESTAMP_TYPE_MAPPING parameter to use TIMESTAMP_LTZ.

  • Set the Spark time zone to UTC and use this time zone in Snowflake (i.e. don’t set the sfTimezone option for the connector, and don’t explicitly set a time zone in Snowflake). In this scenario, TIMESTAMP_LTZ and TIMESTAMP_NTZ are effectively equivalent.

    To set the time zone, add the following line to your Spark code:

    java.util.TimeZone.setDefault(java.util.TimeZone.getTimeZone("UTC"))
    

If you don’t implement either of these approaches, undesired time modifications may occur. For example, consider the following scenario:

  • The time zone in Spark is set to America/New_York.
  • The time zone in Snowflake is set to Europe/Warsaw, which can happen by either:
    • Setting sfTimezone to Europe/Warsaw for the connector.
    • Setting sfTimezone to snowflake for the connector and setting the TIMEZONE session parameter in Snowflake to Europe/Warsaw.
  • Both TIMESTAMP_NTZ and TIMESTAMP_LTZ are in use in Snowflake.

In this scenario:

  1. If a value representing 12:00:00 in a TIMESTAMP_NTZ column in Snowflake is sent to Spark, this value doesn’t carry any time zone information. Spark treats the value as 12:00:00 in New York.
  2. If Spark sends this value 12:00:00 (in New York) back to Snowflake to be loaded into a TIMESTAMP_LTZ column, it is automatically converted and loaded as 18:00:00 (for the Warsaw time zone).
  3. If this value is then converted to TIMESTAMP_NTZ in Snowflake, the user sees 18:00:00, which is different from the original value, 12:00:00.

To summarize, we recommend strictly following at least one of these rules:

  • Use the same time zone, ideally UTC, for both Spark and Snowflake.
  • Use only the TIMESTAMP_LTZ data type for transferring data between Spark and Snowflake.

Sample Scala Program

Important

This sample program assumes you are using version 2.2.0 (or higher) of the connector, which uses a Snowflake internal stage for storing temporary data and, therefore, does not require an S3 location for storing temporary data. If you are using an earlier version, you must have an existing S3 location and and include values for tempdir, awsAccessKey, awsSecretKey for sfOptions. For more details, see AWS Options for External Data Transfers (in this topic).

The following Scala program provides a full use case for the Snowflake Connector for Spark. Before using the code, replace the following strings with the appropriate values, as described in Setting Configuration Options for the Connector (in this topic):

  • <account_name>: Name of your account (supplied by Snowflake).
  • <region_id>: Snowflake Region in which your account is located (use only if your account is not in US West).
  • <user_name> , <password>: Login credentials for the Snowflake user.
  • <database> , <schema> , <warehouse>: Defaults for the Snowflake session.
import org.apache.spark.sql._

//
// Configure your Snowflake environment
//
// For accounts in US West, sfURL format is <account_name>.snowflakecomputing.com (i.e. no region ID)
// For all other regions, sfURL format is <account_name>.<region_id>.snowflakecomputing.com
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.<region_id>.snowflakecomputing.com",
    "sfAccount" -> "<account_name>",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
)

//
// Create a DataFrame from a Snowflake table
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t1")
    .load()

//
// DataFrames can also be populated via a SQL query
//
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "select c1, count(*) t1 group by c1")
    .load()

//
// Join, augment, aggregate, etc. the data in Spark and then use the
// Data Source API to write the data back to a table in Snowflake
//
df.write
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("dbtable", "t2")
    .mode(SaveMode.Overwrite)
    .save()

Using the Connector with Python

Using the connector with Python is very similar to the Scala usage.

We recommend using the bin/pyspark script included in the Spark distribution.

Configuring the pyspark Script

The pyspark script must be configured similarly to the spark-shell script, using the --packages or --jars options. For example:

bin/pyspark --packages net.snowflake:snowflake-jdbc:2.8.1,net.snowflake:spark-snowflake_2.10:2.0.0

For more information about configuring the spark-shell script, see Step 3: Configure the Local Spark Cluster or Amazon EMR-hosted Spark Environment.

Enabling/Disabling Pushdown in a Session

Version 2.1.0 (and higher) of the connector supports query pushdown, which can significantly improve performance by pushing query processing to Snowflake when Snowflake is the Spark data source.

By default, pushdown is not enabled. To enable it within a Spark session, after instantiating a SparkSession object, invoke the following static method call:

SnowflakeConnectorUtils.enablePushdownSession()

For example:

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

In this example, sc is your SparkSession object.

You can also disable it at any time by invoking the disablePushdownSession() method. For example:

sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

Sample Python Script

Important

This sample script assumes you are using version 2.2.0 (or higher) of the connector, which uses a Snowflake internal stage for storing temporary data and, therefore, does not require an S3 location for storing this data. If you are using an earlier version, you must have an existing S3 location and and include values for tempdir, awsAccessKey, awsSecretKey for sfOptions. For more details, see AWS Options for External Data Transfers (in this topic).

Once the pyspark script has been configured, you can perform SQL queries and other operations. Here’s an example Python script that performs a simple SQL query. This script illustrates basic connector usage. Most of the Scala examples in this document can be adapted with minimal effort/changes for use with Python.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext

sc = SparkContext("local", "Simple App")
spark = SQLContext(sc)
spark_conf = SparkConf().setMaster('local').setAppName('<YOUR_APP_NAME>')

# You might need to set these
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<YOUR_AWS_KEY>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<YOUR_AWS_SECRET>")

# Set options below
# For accounts in US West, sfURL format is <account_name>.snowflakecomputing.com (i.e. no region ID)
# For all other regions, sfURL format is <account_name>.<region_id>.snowflakecomputing.com
sfOptions = {
  "sfURL" : "<account_name>.<region_id>.snowflakecomputing.com",
  "sfAccount" : "<account_name>",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>",
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 1 as my_num union all select 2 as my_num") \
  .load()

df.show()

Tip

Note the usage of sfOptions and SNOWFLAKE_SOURCE_NAME. This simplifies the code and reduces the chance of errors.

For details about the supported options for sfOptions, see Setting Configuration Options for the Connector (in this topic).

Data Type Mappings

The Spark Connector supports converting between many common data types.

From Spark SQL to Snowflake

Spark Data Type Snowflake Data Type
IntegerType INTEGER
LongType INTEGER
DoubleType DOUBLE
FloatType FLOAT
ShortType INTEGER
ByteType INTEGER. Snowflake does not support the BYTE type.
BooleanType BOOLEAN
StringType If length is specified, VARCHAR; otherwise, STRING
BinaryType Not supported
TimestampType TIMESTAMP
DateType DATE
DecimalType DECIMAL

From Snowflake to Spark SQL

Snowflake Data Type Spark Data Type
ARRAY null
BIGINT If signed, then LongType; otherwise, DecimalType(20, 0)
BINARY Not supported
BIT Not supported
BLOB Not supported
BOOLEAN BooleanType
CHAR StringType
CLOB StringType
DATALINK null
DATE DateType
DECIMAL DecimalType
DISTINCT null
DOUBLE DoubleType
FLOAT FloatType
INTEGER If signed, then IntegerType; otherwise, LongType
JAVA_OBJECT null

Setting Configuration Options for the Connector

The following options configure the behavior of the connector. They can be specified using .option(<key>, <value>) or .options(<map>) for a Spark DataframeReader class.

Tip

To facilitate using the options, we recommend saving them in a single Map variable and using the .options() API.

Required Connection Options

The following options are required for connecting to Snowflake:

sfUrl

URL for your Snowflake account.

The format of the URL domain is different depending on the Snowflake Region where your account is located:

US West:account_name.snowflakecomputing.com
Other regions:account_name.region_id.snowflakecomputing.com

Where account_name is the name of your account (provided by Snowflake) and region_id is:

Cloud Platform Region ID Snowflake Region
AWS us-east-1 US East
AWS eu-west-1 EU (Dublin)
AWS eu-central-1 EU (Frankfurt)
AWS ap-southeast-2 Asia Pacific (Sydney)
Microsoft Azure east-us-2.azure East US 2

For example, if your account name is xy12345:

  • In US West, the URL would be xy12345.snowflakecomputing.com.
  • In US East, the URL would be xy12345.us-east-1.snowflakecomputing.com.
  • In EU (Frankfurt), the URL would be xy12345.eu-central-1.snowflakecomputing.com.
  • In East US 2, the URL would be xy12345.east-us-2.azure.snowflakecomputing.com.
sfUser
Login name for the Snowflake user.
sfPassword
Password for the Snowflake user.

Required Context Options

The following options are required for setting the database and schema context for the session:

sfDatabase
The database to use for the session after connecting.
sfSchema
The schema to use for the session after connecting.

Additional Options

All the remaining options are not required:

sfaccount
Account name (e.g. xy12345). This option is no longer required because the account name is specified in sfUrl. It is documented here only for backward compatibility.
sfWarehouse
The default virtual warehouse to use for the session after connecting.
sfRole
The default security role to use for the session after connecting.
sfTimezone

The time zone to be used by Snowflake when working with Spark. Note that the parameter only sets the time zone in Snowflake; the Spark environment remains unmodified. The supported values are:

  • spark: Use the time zone from Spark (default).

  • snowflake: Use the current time zone for Snowflake.

  • sf_default: Use the default time zone for the Snowflake user who is connecting.

  • time_zone: Use a specific time zone (e.g. America/New_York), if valid.

    For more information about the impact of setting this option, see Working with Timestamps and Time Zones (in this topic).

sfCompress
If set to on (default), the data passed between Snowflake and Spark is compressed.
s3MaxFileSize
The size of the file used when moving data from Snowflake to Spark. The default is 10MB.
parallelism

The size of the thread pool to use for data uploads and downloads between Snowflake and Spark. The default is 4.

In general, this value does not need to changed, unless you have a specific need to increase or decrease throughput. Parallelism in Spark applications is best managed through partitions and executors. In addition, the degree of parallelism should not be set to an arbitrarily large number to produce a high level of throughput. This can have negative and unintended effects, including potentially causing slower uploads/downloads.

Example:

df.write
.format(SNOWFLAKE_SOURCE_NAME)
.option("parallelism", "8")
.mode(SaveMode.Overwrite)
.save()
preactions

A semicolon-separated list of SQL commands that are executed before data is transferred between Spark and Snowflake.

If a SQL command contains %s, it is replaced with the table name referenced for the operation.

postactions

A semicolon-separated list of SQL commands that are executed after data is transferred between Spark and Snowflake.

If a SQL command contains %s, it is replaced with the table name referenced for the operation.

truncate_columns
If set to on (default), a COPY command automatically truncates text strings that exceed the target column length. If set to off, the COPY statement produces an error if a loaded string exceeds the target column length.
autopushdown

This parameter controls whether automatic query pushdown is enabled.

If pushdown is enabled, then when a query is run on Spark, if part of the query can be “pushed down” to the Snowflake server, it will be pushed down. This improves performance of some queries.

This parameter is optional.

The default value is “on” if the connector is plugged into a compatible version of Spark. Otherwise, the default value is “off”.

If the connector is plugged into a different version of Spark than the connector is intended for (e.g. if version 2.3 of the connector is plugged into version 2.2 of Spark), then auto-pushdown will be disabled even if this parameter is set to “on”.

purge

If this is set to “on”, then the connector deletes temporary files created when transferring from Spark to Snowflake via external data transfer. If this parameter is set to “off”, then those files are not automatically deleted by the connector.

Purging works only for transfers from Spark to Snowflake, not for transfers from Snowflake to Spark.

The possible values are “on” and “off”. The default value is “off”.

columnmap

This parameter is useful when writing data from Spark to Snowflake and the column names in the Snowflake table do not match the column names in the Spark table. You can create a map that indicates which Spark source column corresponds to each Snowflake destination column.

The parameter is a single string literal, such as:

"Map(col_2 -> col_b, col_3 -> col_a)"

For example, suppose that a Dataframe named “df” in Spark has three columns:

col_1, col_2 and col_3,

Suppose also that a table named “tb” in Snowflake has two columns:

col_a and col_b.

And suppose that the user wants to copy values:

  • from df.col_2 to tb.col_b
  • from df.col_3 to tb.col_a

The value of the columnmap parameter should be:

“Map(col_2 -> col_b, col_3 -> col_a)”

The user can generate this value by executing the following Scala code:

Map("col_2"->"col_b","col_3"->"col_a").toString()

The default value of this parameter is null. In other words, by default, column names in the source and destination tables should match.

This parameter is used only when writing from Spark to Snowflake; it does not apply when writing from Snowflake to Spark.

AWS Options for External Data Transfers

These options are used to specify the AWS S3 location where temporary data is stored and provide authentication details for accessing the location. They are required only if you are doing an external data transfer. External data transfers are required if either of the following is true:

  • You are using version 2.1.x or lower of the Spark Connector (which does not support internal transfers), or
  • Your transfer is likely to take 36 hours or more (internal transfers use temporary credentials that expire after 36 hours).
tempDir

The S3 location where intermediate data will be stored (e.g. s3n://xy12345-bucket/spark-snowflake-tmp/).

If tempDir is specified, you must also specify either: - awsAccessKey , awsSecretKey, or - temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

awsAccessKey , awsSecretKey

These are standard AWS credentials that allow access to the location specified in tempDir. Note that both of these options must be set together.

If they are set, they can be retrieved from the existing SparkContext object.

If you specify these variables, you must also specify tempDir.

These credentials should also be set for the Hadoop cluster.

temporary_aws_access_key_id , temporary_aws_secret_access_key, temporary_aws_session_token

These are temporary AWS credentials that allow access to the location specified in tempDir. Note that all three of these options must be set together.

Also, if these options are set, they take precedence over the awsAccessKey and awsSecretKey options.

If you specify temporary_aws_access_key_id , temporary_aws_secret_access_key, and temporary_aws_session_token , you must also specify tempDir. Otherwise, these parameters will be ignored.

check_bucket_configuration

If set to on (default), the connector checks if the bucket used for data transfer has a lifecycle policy configured (see Preparing an AWS S3 Location for more information). If there is no lifecycle policy present, a warning is logged.

Disabling this option (by setting to off) skips this check. This can be useful if a user can access the bucket data operations, but not the bucket lifecycle policies. Disabling the option can also speed up query execution times slightly.

For details, see Authenticating S3 for Data Exchange (in this topic).

Azure Options for External Data Transfers

This section describes the parameters that apply to Azure Blob Storage when doing external data transfers. External data transfers are required if either of the following is true:

  • You are using version 2.1.x or lower of the Spark Connector (which does not support internal transfers), or
  • Your transfer is likely to take 36 hours or more (internal transfers use temporary credentials that expire after 36 hours).

When using an external transfer with Azure, you specify the location of the Azure container and the SAS (shared-access signature) for that container using the parameters described below.

tempDir

The Azure Blob Storage container where intermediate data will be stored. This will be in the form of a URL, for example:

wasb://<azure_container>@<azure_account>.<Azure_endpoint>/

temporary_azure_sas_token

Specify the SAS token for Azure Blob Storage.

Specifying Azure Information to Spark

When using Azure Blob Storage to provide temporary storage to transfer data between Spark and Snowflake, you must inform not only the Snowflake Spark Connector, but also Spark, where the temporary storage is.

To notify Spark of the temporary storage location, execute commands similar to the following on your Spark cluster:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<Azure_endpoint>", <Azure_SAS>)

Note that the last of these lines contains the following variables:

  • container
  • account
  • Azure endpoint (This is the endpoint for your Azure deployment location. For example, if you are using a US deployment, the endpoint is likely to be “blob.core.windows.net”.)
  • Azure_SAS (the Shared Access Signature security token)

Replace each of these variables with the proper information for your Azure account.

Passing Snowflake Session Parameters as Options for the Connector

The Snowflake Connector for Spark supports sending arbitrary session-level parameters to Snowflake (see Session Parameters for more info). This can be achieved by adding a ("<key>" -> "<value>") pair to the options object, where <key> is the session parameter name and <value> is the value.

Note

The <value> should be a string enclosed in double quotes, even for parameters that accept numbers or Boolean values, e.g. "1" or "true".

For example, the following code sample passes the USE_CACHED_RESULT session parameter with a value of "false", which disables using the results of previously-executed queries:

// ... assuming sfOptions contains Snowflake connector options

// Add to the options request to keep connection alive
sfOptions += ("USE_CACHED_RESULT" -> "false")

// ... now use sfOptions with the .options() method

Authenticating S3 for Data Exchange

This task is required only in either of the following circumstances:

  • The Snowflake Connector for Spark version is 2.1.x (or lower). Starting with v2.2.0, the connector uses a Snowflake internal temporary stage for data exchange. If you are not currently using version 2.2.0 (or higher) of the connector, Snowflake strongly recommends upgrading to the latest version.
  • The Snowflake Connector for Spark version is 2.2.0 (or higher), but your jobs regularly exceed 36 hours in length. This is the maximum duration for the AWS token used by the connector to access the internal stage for data exchange.

If you are using an older version of the connector, you need to prepare an S3 location that the connector can use to exchange data between Snowflake and Spark.

To allow access to the S3 bucket/directory used to exchange data between Spark and Snowflake (as specified for tempDir), two authentication methods are supported:

  • Permanent AWS credentials (also used to configure Hadoop/Spark authentication for accessing S3)
  • Temporary AWS credentials

Using Permanent AWS Credentials

This is the standard AWS authentication method. It requires a pair of awsAccessKey and awsSecretKey values.

Note

These values should also be used to configure Hadoop/Spark for accessing S3. For more information, including examples, see Authenticating Hadoop/Spark Using S3A or S3N (in this topic).

For example:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<access_key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<secret_key>")

// Then, configure your Snowflake environment
//
// For accounts in US West, sfURL format is <account_name>.snowflakecomputing.com (i.e. no region ID)
// For all other regions, sfURL format is <account_name>.<region_id>.snowflakecomputing.com
//
var sfOptions = Map(
    "sfURL" -> "<account_name>.<region_id>.snowflakecomputing.com",
    "sfAccount" -> "<account_name>",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>",
    "awsAccessKey" -> sc.hadoopConfiguration.get("fs.s3n.awsAccessKeyId"),
    "awsSecretKey" -> sc.hadoopConfiguration.get("fs.s3n.awsSecretAccessKey"),
    "tempdir" -> "s3n://<temp-bucket-name>"
)

For details about the options supported by sfOptions, see AWS Options for External Data Transfers (in this topic).

Authenticating Hadoop/Spark Using S3A or S3N

Hadoop/Spark ecosystems support 2 URI schemes for accessing S3:

s3a://

New, recommended method (for Hadoop 2.7 and higher)

To use this method, modify the Scala examples in this topic to add the following Hadoop configuration options:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3a.access.key", <accessKey>)
hadoopConf.set("fs.s3a.secret.key", <secretKey>)

Make sure the tempdir option uses s3a:// as well.

s3n://

Older method (for Hadoop 2.6 and lower)

In some systems, it is necessary to specify it explicitly as shown in the following Scala example:

val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", <accessKey>)
hadoopConf.set("fs.s3.awsSecretAccessKey", <secretKey>)

Using Temporary AWS Credentials

This method uses the temporary_aws_access_key_id, temporary_aws_secret_access_key, and temporary_aws_session_token configuration options for the connector.

This method allows additional security by providing Snowflake with only temporary access to the S3 bucket/directory used for data exchange.

Note

Temporary credentials can only be used to configure the S3 authentication for the connector; they cannot be used to configure Hadoop/Spark authentication.

Also, if you provide temporary credentials, they take precedence over any permanent credentials that have been provided.

The following Scala code sample provides an example of authenticating using temporary credentials:

import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest

import net.snowflake.spark.snowflake.Parameters

// ...

val sts_client = new AWSSecurityTokenServiceClient()
val session_token_request = new GetSessionTokenRequest()

// Set the token duration to 2 hours.

session_token_request.setDurationSeconds(7200)
val session_token_result = sts_client.getSessionToken(session_token_request)
val session_creds = session_token_result.getCredentials()

// Create a new set of Snowflake connector options, based on the existing
// sfOptions definition, with additional temporary credential options that override
// the credential options in sfOptions.
// Note that constants from Parameters are used to guarantee correct
// key names, but literal values, such as temporary_aws_access_key_id are, of course,
// also allowed.

var sfOptions2 = collection.mutable.Map[String, String]() ++= sfOptions
sfOptions2 += (Parameters.PARAM_TEMP_KEY_ID -> session_creds.getAccessKeyId())
sfOptions2 += (Parameters.PARAM_TEMP_KEY_SECRET -> session_creds.getSecretAccessKey())
sfOptions2 += (Parameters.PARAM_TEMP_SESSION_TOKEN -> session_creds.getSessionToken())

sfOptions2 can now be used with the options() DataFrame method.

Authenticating Azure for Data Exchange

This section describes how to authenticate when using Azure Blob Storage for data exchange.

Authenticating this way is required only in either of the following circumstances:

  • The Snowflake Connector for Spark version is 2.1.x (or lower). Starting with v2.2.0, the connector uses a Snowflake internal temporary stage for data exchange. If you are not currently using version 2.2.0 (or higher) of the connector, Snowflake strongly recommends upgrading to the latest version.
  • The Snowflake Connector for Spark version is 2.2.0 (or higher), but your jobs regularly exceed 36 hours in length. This is the maximum duration for the Azure token used by the connector to access the internal stage for data exchange.

You need to prepare an Azure container that the connector can use to exchange data between Snowflake and Spark.

Using Azure Credentials

This is the standard Azure Blob Storage authentication method. It requires a pair of values: tempDir (a URL) and temporary_azure_sas_token values.

Note

These values should also be used to configure Hadoop/Spark for accessing Azure. For more information, including examples, see Authenticating Hadoop/Spark Using Azure (in this topic).

For example:

sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<Azure_endpoint>", <Azure_SAS>)

// Then, configure your Snowflake environment
//
val sfOptions = Map(
  "sfURL" -> "<snowflake_account>.<snowflake_server_url>",
  "sfAccount" -> "<snowflake_account>",
  "sfUser" -> "<user_name>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database_name>",
  "sfSchema" -> "<schema_name>",
  "sfWarehouse" -> "<warehouse_name>",
  "sfCompress" -> "on",
  "sfSSL" -> "on",
  "tempdir" -> "wasb://<azure_container>@<azure_account>.<Azure_endpoint>/",
  "temporary_azure_sas_token" -> "<azure_sas>"
)

For details about the options supported by sfOptions, see Azure Options for External Data Transfers (in this topic).

Authenticating Hadoop/Spark Using Azure

To use this method, modify the Scala examples in this topic to add the following Hadoop configuration options:

val hadoopConf = sc.hadoopConfiguration
sc.hadoopConfiguration.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
sc.hadoopConfiguration.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb")
sc.hadoopConfiguration.set("fs.azure.sas.<container>.<account>.<Azure_endpoint>", <Azure_SAS>)

Make sure the tempdir option uses wasb:// as well.