Using the Snowflake Connector for Python

This topic provides a series of examples that illustrate how to use the Snowflake Connector to perform standard Snowflake operations such as user login, database and table creation, warehouse creation, data insertion/loading, and querying.

The sample code at the end of this topic combines the examples into a single, working Python program.

In this Topic:

Connecting to Snowflake

First, import the snowflake.connector module:

import snowflake.connector

Then set the user information in variables that will be used in the next example to log into Snowflake:

# Setting your account information
ACCOUNT = '<your_account_name>'
USER = '<your_login_name>'
PASSWORD = '<your_password>'

import os

# only required if you copy data from your own S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

Note

In the above example, replace the variables to match your Snowflake login information (name, password, etc.) as needed.

After setting the variables, connect using either the default authenticator or federated authentication (if enabled).

Connecting Using the Default Authenticator

Connect to Snowflake using the required login parameters:

# Connecting to Snowflake using the default authenticator
cnx = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

Connecting Using Federated Authentication

Alternatively, if you use a SAML 2.0-compliant identity provider (IdP) for federated authentication, you can specify the Snowflake connector to use browser-based SSO:

# Connecting to Snowflake using SAML 2.0-compliant IdP federated authentication
cnx = snowflake.connector.connect(
  user=USER,
  account=ACCOUNT,
  authenticator='externalbrowser',
)

Browser-based SSO uses the following workflow:

  1. The Python application calls the snowflake.connector.connect method with the appropriate parameters.
  2. The default web browser set for the user’s operating system launches or opens a new tab or window, displaying the IdP authentication page.
  3. The user enters their IdP username and password.
  4. If the user is enrolled in MFA (multi-factor authentication) in Snowflake, they are prompted to type the MFA passcode (sent from another device) or confirm the authentication (on the other device).
  5. When the IdP has authenticated the user’s credentials, the browser displays a success message. Then can then return to the terminal window and use the Snowflake session that has been initiated.

Note

This feature is only supported in terminal windows with web browser access. For example, a terminal window on a remote machine with a SSH (Secure Shell) session may require additional setup to open a web browser.

If your IdP is Okta, you can also choose to use native Okta SSO, instead of browser-based SSO, for federated authentication. For example:

# Connecting to Snowflake using native SSO authentication
OKTA_USER = '<okta_login_name>'
OKTA_PASSWORD = '<okta_password>'
cnx = snowflake.connector.connect(
  user=OKTA_USER,
  password=OKTA_PASSWORD,
  account=SNOWFLAKE_ACCOUNT,
  authenticator='https://<okta_accout_name>.okta.com/',
)

For more information about federated authentication and SSO, see Managing/Using Federated Authentication.

Caching OCSP Responses in Memory and File

To ensure all communications are secure, the Snowflake Connector for Python uses the HTTPS protocol to connect to Snowflake, as well as to connect to all other services (e.g. AWS S3 for staging data files and Okta for federated authentication). In addition to the regular HTTPS protocol, the connector also checks the TLS/SSL certificate revocation status on each connection via OCSP (Online Certificate Status Protocol) and aborts the connection if it finds the certificate is revoked or the OCSP status is not reliable.

Because each Snowflake connection triggers up to three round trips with the OCSP server, a memory and file cache for OCSP responses has been introduced to reduce the network overhead added to the connection. The cache also addresses availability issues for OCSP servers, i.e. in the event the actual OCSP server is down, as long as the cache is valid, the connector can still validate the certificate revocation status.

By default, the cache is enabled in the following locations, so no additional configuration tasks are required:

Linux:~/.cache/snowflake/ocsp_response_cache
Mac OS:~/Library/Caches/Snowflake/ocsp_response_cache
Windows:%USERPROFILE%\AppData\Local\Snowflake\Caches\ocsp_response_cache

However, if you want to specify a different location and/or file name for the OCSP response cache file, the connect method accepts the ocsp_response_cache_filename parameter, which specifies the path and name for the OCSP cache file in the form of a URI.

Creating a Database, Schema, and Warehouse

After login, create a database, schema, and warehouse, if they don’t yet exist, using the CREATE DATABASE, CREATE SCHEMA, and CREATE WAREHOUSE commands.

For example, create a warehouse named tiny_warehouse, database named testdb, and a schema named testschema.

# Creating database, schema and warehouse if not exists
cnx.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
cnx.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
cnx.cursor().execute("USE DATABASE testdb")
cnx.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

Using the Database, Schema and Warehouse

Specify the database and schema in which you want to create tables. Also specify the warehouse that will provide resources for executing DML statements and queries.

For example, to use the database testdb, schema testschema and warehouse tiny_warehouse (created in the previous section):

# Using Database, Schema and Warehouse
cnx.cursor().execute("USE warehouse tiny_warehouse")
cnx.cursor().execute("USE testdb.testschema")

Creating Tables and Inserting Data

Use the CREATE TABLE command to create tables and the INSERT command to populate the tables with data.

For example, create a table named testtable and insert two rows into the table:

# Creating Table and Inserting Data
cnx.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
cnx.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

Loading Data

Instead of inserting data into tables using individual INSERT commands, you can bulk load data from files staged in either an internal or external location.

Copying Data from an Internal Location

To load data from files on your host machine into a table, first use the PUT command to stage the file in an internal location, then use the COPY INTO table command to copy the data in the files into the table.

For example:

# Putting Data
cnx.cursor().execute("PUT file:///tmp/data/file* @%testtable")
cnx.cursor().execute("COPY INTO testtable")

Where your CSV data is stored in a local directory named /tmp/data in a Linux or Mac OS environment, and the directory contains files named file0, file1, … file100.

Copying Data from an External Location

To load data from files already staged in an external location (i.e. your own S3 bucket) into a table, use the COPY INTO table command.

For example:

# Copying Data
cnx.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
    CREDENTIALS = (
        aws_key_id='{aws_access_key_id}',
        aws_secret_key='{aws_secret_access_key}')
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

Where s3://<your_s3_bucket>/data/ specifies the name of your S3 bucket, the files in the bucket are prefixed with data, and the bucket is accessible by the specified AWS credentials.

Querying Data

Using cursor to Fetch Values

Fetch values from a table using the cursor object iterator method.

For example, to fetch values from testtable:

# Querying Data
cur = cnx.cursor()
try:
    cur.execute("SELECT * FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

Alternatively, the Snowflake Connector for Python provides a convenient shortcut:

for (col1, col2) in cnx.cursor().execute("SELECT * FROM testtable"):
    print('{0}, {1}'.format(col1, col2))

If you need to get a single result, use the fetchone method:

col1, col2 = cnx.cursor().execute("SELECT * FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))

If you need to get the specified number of rows at a time, use the fetchmany method with the number of rows:

cur = cnx.cursor().execute("SELECT * FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)

Note

Use fetchone or fetchmany if the result set is too large to fit into memory.

If you need to get all results at once:

results = cnx.cursor().execute("SELECT * FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))

To set a timeout for a query, execute a “begin” command and include a timeout parameter on the query. If the query exceeds the length of the parameter value, an error is produced and a rollback occurs.

In the following code, error 604 means the query was canceled. The timeout parameter starts Timer() and cancels if the query does not finish within the specified time.

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")

Using DictCursor to Fetch Values by Column Name

If you want to fetch a value by column name, create a cursor object of type DictCursor.

For example:

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = cnx.cursor(DictCursor)
try:
    cur.execute("SELECT * FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()

Canceling a Query by Query ID

Cancel a query by query ID:

cur = cn.cursor()

try:
 cur.execute(r"select SYSTEM$CANCEL_QUERY('queryID')")
 one = cur.fetchall()
 print(len(one));
 print(one[0])
finally:
 cur.close()

Binding Data

Occasionally you may want to bind data with a placeholder in a query. By default, Snowflake Connector for Python supports both pyformat and format, so you can use %(name)s or %s as the placeholder. For example:

  • Using %(name)s as the placeholder:

    # Binding Data on the client side
    cnx.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%(col1)s, %(col2)s)", {
            'col1': 789,
            'col2': 'test string3',
            })
    
  • Using %s as the placeholder:

    # Binding Data on the client side
    cnx.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    

You can also use a list object to bind data for the IN operator:

# Binding data for IN operator
cnx.cursor().execute(
    "SELECT * FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))

If paramstyle is specified as qmark or numeric in the connection parameter, the binding variables should be ? or :N, respectively, and the binding occurs in the server side.

For example:

  • Using ? as the placeholder:

    # Binding data in the server side
    cnx.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
  • Using :N as the placeholder:

    # Binding data in the server side
    cnx.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    
  • Binding datetime with TIMESTAMP:

    If datetime Python data type is bound, specify the Snowflake timestamp data type, i.e., TIMESTAMP_LTZ or TIMESTAMP_TZ columns, in a form of tuple. For example:

    cnx.cursor().execute(
        "CREATE OR REPLACE TABLE testtable2 ("
        "   col1 int, "
        "   col2 string, "
        "   col3 timestamp_ltz"
        ")"
    )
    
    cnx.cursor().execute(
        "INSERT INTO testtable(col1,col2,col3) "
        "VALUES(?,?,?)", (
            987,
            'test string4',
            ("TIMESTAMP_LTZ", datetime.now())
        )
    )
    

    Unlike client side binding, the server side binding requires the Snowflake data type for the column. Although most of common Python data types already have implicit mappings to Snowflake datatype, e.g., int is mapped to FIXED, since datetime can be bound with multiple Snowflake data types, i.e., TIMESTAMP_NTZ, TIMESTAMP_LTZ or TIMESTAMP_TZ, and the default mapping is TIMESTAMP_NTZ, if it is bound to TIMESTAMP_LTZ, for example, the data type must be specified as shown in the above example.

Important

Don’t bind data using Python’s formatting function because you risk SQL injection. For example:

# Binding data (UNSAFE EXAMPLE)
cnx.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
# Binding data (UNSAFE EXAMPLE)
cnx.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
# Binding data (UNSAFE EXAMPLE)
cnx.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )

Retrieving Column Metadata

Column metadata is stored in the Cursor object in the description attribute.

The following simple example retrieves the list of column names:

# Retrieving the column names
cur = cnx.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

Retrieving Snowflake Query IDs

A query ID is assigned to each query executed by Snowflake. In the Snowflake web interface, query IDs are displayed in the History page and when checking the status of a query.

The Snowflake Connector for Python provides a special attribute, sfqid, in the Cursor object so that you can associate it with the status in the web interface. In order to retrieve the Snowflake query ID, execute the query first and then retrieve it through the sfqid attribute:

# Retrieving a Snowflake Query ID
cur = cnx.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

Handling Errors

The application must handle exceptions raised from Snowflake Connector properly and decide to continue or stop running the code.

# Catching the syntax error
cur = cnx.cursor()
try:
    cur.execute("SELECT * FROOOOM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

Using execute_stream to Execute SQL Scripts

The execute_stream function enables you to run one or more SQL scripts in a stream:

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)

Logging

The Snowflake Connector for Python leverages the standard Python logging module to log status at regular intervals so that the application can trace its activity working behind the scenes. The simplest way to enable logging is call logging.basicConfig() in the beginning of the application.

For example, to set the logging level to INFO and store the logs in a file named /tmp/snowflake_python_connector.log:

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

Sample Program

The following sample code combines most of the examples described in the previous sections into a working python program:

Note

In the section where you set your account and login information, make sure to replace the variables as needed to match your Snowflake login information (name, password, etc.).

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Setting your account and login information (replace the
# variables with the necessary values)
ACCOUNT = '<your_account_name>'
USER = '<your_login_name>'
PASSWORD = '<your_password>'

import os

# Only required if you copy data from your own S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
cnx = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
cnx.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
cnx.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
cnx.cursor().execute("USE DATABASE testdb")
cnx.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
cnx.cursor().execute("USE WAREHOUSE tiny_warehouse")
cnx.cursor().execute("USE testdb.testschema")

# Creating a table and inserting data
cnx.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
cnx.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
cnx.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
cnx.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <your_s3_bucket> with the name of your bucket)
cnx.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
     CREDENTIALS = (
        aws_key_id='{aws_access_key_id}',
        aws_secret_key='{aws_secret_access_key}')
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = cnx.cursor()
try:
    cur.execute("SELECT * FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
cnx.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = cnx.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = cnx.cursor()
try:
    cur.execute("SELECT * FROOOOM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = cnx.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)