Using the Python Connector

This topic provides a series of examples that illustrate how to use the Snowflake Connector to perform standard Snowflake operations such as user login, database and table creation, warehouse creation, data insertion/loading, and querying.

The sample code at the end of this topic combines the examples into a single, working Python program.

In this Topic:

Verifying the Network Connection to Snowflake with SnowCD

After configuring your driver, you can evaluate and troubleshoot your network connectivity to Snowflake by using the Snowflake Connectivity Diagnostic Tool (SnowCD).

You can use SnowCD during the initial configuration process and on-demand at any time to evaluate and troubleshoot your network connection to Snowflake.

Connecting to Snowflake

Import the snowflake.connector module:

import snowflake.connector

Read login information from environment variables, the command line, a configuration file, or another appropriate source. For example:

PASSWORD = os.getenv('SNOWSQL_PWD')
WAREHOUSE = os.getenv('WAREHOUSE')
...

The ACCOUNT parameter might require the region and cloud platform where your account is located, in the form of:

'<your_account_name>.<region_id>.<cloud>' (e.g. 'xy12345.east-us-2.azure').

Note

For descriptions of available connector parameters, see the snowflake.connector methods.

If you will copy data from your own AWS S3 bucket, then you need the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

import os

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

Note

If your data is stored in a Microsoft Azure container, provide the credentials directly in the COPY statement.

After reading the connection information, connect using either the default authenticator or federated authentication (if enabled).

Connecting Using the Default Authenticator

Connect to Snowflake using the login parameters:

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )

You might need to extend this with other information.

Connecting Using Federated Authentication

Alternatively, if you use a SAML 2.0-compliant identity provider (IdP) for federated authentication, you can specify the Snowflake connector to use browser-based SSO:

# Connecting to Snowflake using SAML 2.0-compliant IdP federated authentication
con = snowflake.connector.connect(
  user=<USER>,
  account=<ACCOUNT>,
  authenticator='externalbrowser',
  warehouse=<WAREHOUSE>,
  database=<DATABASE>,
  schema=<SCHEMA>
)

Browser-based SSO uses the following workflow:

  1. The Python application calls the snowflake.connector.connect method with the appropriate parameters.

  2. The default web browser set for the user’s operating system launches or opens a new tab or window, displaying the IdP authentication page.

  3. The user enters their IdP username and password.

  4. If the user is enrolled in MFA (multi-factor authentication) in Snowflake, they are prompted to type the MFA passcode (sent from another device) or confirm the authentication (on the other device).

  5. When the IdP has authenticated the user’s credentials, the browser displays a success message. Then can then return to the terminal window and use the Snowflake session that has been initiated.

Note

This feature is only supported in terminal windows with web browser access. For example, a terminal window on a remote machine with a SSH (Secure Shell) session may require additional setup to open a web browser.

If your IdP is Okta, you can also choose to use native Okta SSO, instead of browser-based SSO, for federated authentication. For example:

# Connecting to Snowflake using native SSO authentication
OKTA_USER = '<okta_login_name>'
OKTA_PASSWORD = '<okta_password>'
con = snowflake.connector.connect(
  user=<OKTA_USER>,
  password=<OKTA_PASSWORD>,
  account=<SNOWFLAKE_ACCOUNT>,
  authenticator='https://<okta_account_name>.okta.com/',
  warehouse=<WAREHOUSE>,
  database=<DATABASE>,
  schema=<SCHEMA>
)

For more information about federated authentication and SSO, see Managing/Using Federated Authentication.

Using Key Pair Authentication

Snowflake supports using key pair authentication rather than the typical username/password authentication. This authentication method requires a 2048-bit (minimum) RSA key pair. Generate the PEM (Privacy Enhanced Mail) public-private key pair using OpenSSL. The public key is assigned to the Snowflake user who will use the Snowflake client.

To configure the public/private key pair:

  1. From the command line in a terminal window, generate a private key.

    You can generate either an encrypted version of the private key or an unencrypted version of the private key.

    To generate an unencrypted version, use the following command:

    $ openssl genrsa -out rsa_key.pem 2048
    

    To generate an encrypted version, use the following command:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
    

    It is typically safer to generate an encrypted version.

    If you use the second command to encrypt the private key, then OpenSSL prompts for a passphrase used to encrypt the private key file. We recommend using a strong passphrase to protect the private key. Record this passphrase in a secure location. You will input it when connecting to Snowflake. Note that the passphrase is only used for protecting the private key and will never be sent to Snowflake.

    Sample PEM private key

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. From the command line, generate the public key by referencing the private key:

    Assuming the private key is encrypted and contained in the file named “rsa_key.p8”, use the following command:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Sample PEM public key

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Copy the public and private key files to a local directory for storage. Record the path to the files. Note that the private key is stored using the PKCS#8 (Public Key Cryptography Standards) format and is encrypted using the passphrase you specified in the previous step; however, the file should still be protected from unauthorized access using the file permission mechanism provided by your operating system. It is your responsibility to secure the file when it is not being used.

  4. Assign the public key to the Snowflake user using ALTER USER. For example:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Note

    • Only security administrators (i.e. users with the SECURITYADMIN role) or higher can alter a user.

    • Exclude the public key header and footer in the SQL statement.

    Verify the user’s public key fingerprint using DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Note

    The RSA_PUBLIC_KEY_2_FP property is described in Key Rotation (in this topic).

  5. Modify and execute the sample code, below. The code decrypts the private key file and passes it to the Snowflake driver to create a connection:

  • Update the security parameters:

    • path: Specifies the local path to the private key file you created.

  • Update the session parameters:

    • user: Specifies your Snowflake login name.

    • account: Specifies the full name of your account (provided by Snowflake). Depending on the cloud platform (AWS or Azure) and region where your account is hosted, the full account name may require additional segments. For more details, see Usage Notes for the account Parameter (for the connect Method).

    • region: Deprecated

      Specifies the ID for the region where your account is located. Note that this parameter is deprecated and is documented only for backward compatibility.

      New code should specify the region (if required) as part of account.

Sample code

import snowflake.connector
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open("<path>/rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

ctx = snowflake.connector.connect(
    user='<user>',
    account='<account>',
    private_key=pkb,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema=SCHEMA
    )

cs = ctx.cursor()

Key Rotation

Snowflake supports multiple active keys to allow for uninterrupted rotation. Rotate and replace your public and private keys based on the expiration schedule you follow internally.

Currently, you can use the RSA_PUBLIC_KEY and RSA_PUBLIC_KEY_2 parameters for ALTER USER to associate up to 2 public keys with a single user.

To rotate your keys:

  1. Complete the steps in Using Key Pair Authentication to:

    • Generate a new private and public key set.

    • Assign the public key to the user. Set the public key value to either RSA_PUBLIC_KEY or RSA_PUBLIC_KEY_2 (whichever key value is not currently in use). For example:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Update the code to connect to Snowflake. Specify the new private key.

    Snowflake verifies the correct active public key for authentication based on the private key submitted with your connection information.

  3. Remove the old public key from the user profile. For example:

    alter user jsmith unset rsa_public_key;
    

Using a Proxy Server

To use a proxy server, configure the following environment variables:

  • HTTP_PROXY

  • HTTPS_PROXY

  • NO_PROXY

Note

The proxy parameters (i.e. proxy_host, proxy_port, proxy_user and proxy_password) are deprecated. Use the environment variables instead.

For example:

Linux or macOS
export HTTP_PROXY='http://username:password@proxyserver.company.com:80'
export HTTPS_PROXY='http://username:password@proxyserver.company.com:80'
Windows
set HTTP_PROXY=http://username:password@proxyserver.company.com:80
set HTTPS_PROXY=http://username:password@proxyserver.company.com:80

Tip

Snowflake’s security model does not allow Secure Sockets Layer (SSL) proxies (using an HTTPS certificate). Your proxy server must use a publicly-available Certificate Authority (CA), reducing potential security risks such as a MITM (Man In The Middle) attack through a compromised proxy.

If you must use your SSL proxy, we strongly recommend that you update the server policy to pass through the Snowflake certificate such that no certificate is altered in the middle of communications.

Optionally NO_PROXY can be used to bypass the proxy for specific communications. For example, access to AWS S3 can bypass the proxy server by specifying NO_PROXY=".amazonaws.com".

NO_PROXY does not support wildcards. Each value specified should be one of the following:

  • The end of a hostname (or a complete hostname), for example:

    • .amazonaws.com

    • xy12345.snowflakecomputing.com

  • An IP address, for example:

    • 192.196.1.15

If more than one value is specified, values should be separated by commas, for example:

localhost,.my_company.com,.snowflakecomputing.com,192.168.1.15,192.168.1.16

OCSP

When the driver connects, Snowflake sends a certificate to confirm that the connection is to Snowflake rather than to a host that is impersonating Snowflake. The driver sends that certificate to an OCSP (Online Certificate Status Protocol) server to verify that the certificate has not been revoked.

If the driver cannot reach the OCSP server to verify the certificate, the driver can “fail open” or “fail closed”.

Choosing Fail-Open or Fail-Close Mode

Versions of the Snowflake Connector for Python prior to 1.8.0 default to fail-close mode. Versions 1.8.0 and later default to fail-open. You can override the default behavior by setting the optional connection parameter ocsp_fail_open when calling the connect() method. For example:

con = snowflake.connector.connect(
    account=<account>,
    user=<user>,
    ...,
    ocsp_fail_open=False,
    ...);

Verifying the OCSP Connector or Driver Version

The driver or connector version and its configuration both determine the OCSP behavior. For more information about the driver or connector version, their configuration, and OCSP behavior, see OCSP Client & Driver Configuration.

Caching OCSP Responses

To ensure all communications are secure, the Snowflake Connector for Python uses the HTTPS protocol to connect to Snowflake, as well as to connect to all other services (e.g. AWS S3 for staging data files and Okta for federated authentication). In addition to the regular HTTPS protocol, the connector also checks the TLS/SSL certificate revocation status on each connection via OCSP (Online Certificate Status Protocol) and aborts the connection if it finds the certificate is revoked or the OCSP status is not reliable.

Because each Snowflake connection triggers up to three round trips with the OCSP server, multiple levels of cache for OCSP responses have been introduced to reduce the network overhead added to the connection:

  • Memory cache, which persists for the life of the process.

  • File cache, which persists until the cache directory (e.g. ~/.cache/snowflake) is purged.

  • OCSP response server cache.

Caching also addresses availability issues for OCSP servers (i.e. in the event the actual OCSP server is down). As long as the cache is valid, the connector can still validate the certificate revocation status.

If none of the cache layers contain the OCSP response, the client attempts to fetch the validation status directly from the CA’s OCSP server.

Modifying the OCSP Response File Cache Location

By default, the file cache is enabled in the following locations, so no additional configuration tasks are required:

Linux

~/.cache/snowflake/ocsp_response_cache.json

macOS

~/Library/Caches/Snowflake/ocsp_response_cache.json

Windows

%USERPROFILE%\AppData\Local\Snowflake\Caches\ocsp_response_cache.json

However, if you want to specify a different location and/or file name for the OCSP response cache file, the connect method accepts the ocsp_response_cache_filename parameter, which specifies the path and name for the OCSP cache file in the form of a URI.

OCSP Response Cache Server

Note

The OCSP response cache server is currently supported by the Snowflake Connector for Python 1.6.0 and higher.

The memory and file types of OCSP cache work well for applications connected to Snowflake using one of the clients Snowflake provides, with a persistent host. However, they don’t work in dynamically-provisioned environments such as AWS Lambda or Docker.

To address this situation, Snowflake provides a third level of caching: the OCSP response cache server. The OCSP response cache server fetches OCSP responses hourly from the CA’s OCSP servers and stores them for 24 hours. Clients can then request the validation status of a given Snowflake certificate from this server cache.

Important

If your server policy denies access to most or all external IP addresses and web sites, you must whitelist the cache server address to allow normal service operation. The cache server URL is ocsp*.snowflakecomputing.com:80.

If you need to disable the cache server for any reason, set the SF_OCSP_RESPONSE_CACHE_SERVER_ENABLED environment variable to false. Note that the value is case-sensitive and must be in lowercase.

Creating a Database, Schema, and Warehouse

After you log in, create a database, schema, and warehouse if they don’t yet exist, using the CREATE DATABASE, CREATE SCHEMA, and CREATE WAREHOUSE commands.

The example below shows how to create a warehouse named tiny_warehouse, database named testdb, and a schema named testschema. Note that when you create the schema, you must either specify the name of the database in which to create the schema, or you must already be connected to the database in which to create the schema. The example below executes a USE DATABASE command before the CREATE SCHEMA command to ensure that the schema is created in the correct database.

        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")

Using the Database, Schema, and Warehouse

Specify the database and schema in which you want to create tables. Also specify the warehouse that will provide resources for executing DML statements and queries.

For example, to use the database testdb, schema testschema and warehouse tiny_warehouse (created earlier):

        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")

Creating Tables and Inserting Data

Use the CREATE TABLE command to create tables and the INSERT command to populate the tables with data.

For example, create a table named testtable and insert two rows into the table:

    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")

Loading Data

Instead of inserting data into tables using individual INSERT commands, you can bulk load data from files staged in either an internal or external location.

Copying Data from an Internal Location

To load data from files on your host machine into a table, first use the PUT command to stage the file in an internal location, then use the COPY INTO <table> command to copy the data in the files into the table.

For example:

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

Where your CSV data is stored in a local directory named /tmp/data in a Linux or macOS environment, and the directory contains files named file0, file1, … file100.

Copying Data from an External Location

To load data from files already staged in an external location (i.e. your own S3 bucket) into a table, use the COPY INTO <table> command.

For example:

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
    CREDENTIALS = (
        aws_key_id='{aws_access_key_id}',
        aws_secret_key='{aws_secret_access_key}')
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

Where s3://<your_s3_bucket>/data/ specifies the name of your S3 bucket, the files in the bucket are prefixed with data, and the bucket is accessible by the specified AWS credentials.

Note

This example uses the format() function to compose the statement. If your environment has a risk of SQL injection attacks, you might prefer to bind values rather than use format().

Querying Data

Using cursor to Fetch Values

Fetch values from a table using the cursor object iterator method.

For example, to fetch columns named “col1” and “col2” from the table named testtable, which was created earlier (in Creating Tables and Inserting Data), use code similar to the following:

    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()

Alternatively, the Snowflake Connector for Python provides a convenient shortcut:

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))

If you need to get a single result (i.e. a single row), use the fetchone method:

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))

If you need to get the specified number of rows at a time, use the fetchmany method with the number of rows:

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)

Note

Use fetchone or fetchmany if the result set is too large to fit into memory.

If you need to get all results at once:

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))

To set a timeout for a query, execute a “begin” command and include a timeout parameter on the query. If the query exceeds the length of the parameter value, an error is produced and a rollback occurs.

In the following code, error 604 means the query was canceled. The timeout parameter starts Timer() and cancels if the query does not finish within the specified time.

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")

Using DictCursor to Fetch Values by Column Name

If you want to fetch a value by column name, create a cursor object of type DictCursor.

For example:

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()

Canceling a Query by Query ID

Cancel a query by query ID:

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()

Replace the string “queryID” with the actual query ID.

Improving Query Performance by Bypassing Data Conversion

To improve query performance, use the SnowflakeNoConverterToPython class in the snowflake.connector.converter_null module to bypass data conversions from the Snowflake internal data type to the native Python data type, e.g.:

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data

As a result, all data is represented in string form such that the application is responsible for converting it to the native Python data types. For example, TIMESTAMP_NTZ and TIMESTAMP_LTZ data are the epoch time represented in string form, and TIMESTAMP_TZ data is the epoch time followed by a space followed by the offset to UTC in minutes represented in string form.

No impact is made to binding data; Python native data can still be bound for updates.

Binding Data

To specify values to be used in a SQL statement, you can include literals in the statement, or you can bind variables. When you bind variables, you put one or more placeholders in the text of the SQL statement, and then specify the variable (the value to be used) for each placeholder.

The following example contrasts the use of literals and binding:

Literals:

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")

Binding:

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))

Snowflake supports the following types of binding:

  • pyformat.

  • format.

  • qmark.

  • numeric.

Each of these is explained below.

pyformat or format binding

Both pyformat binding and format binding bind data on the client side rather than on the server side.

By default, the Snowflake Connector for Python supports both pyformat and format, so you can use %(name)s or %s as the placeholder. For example:

  • Using %(name)s as the placeholder:

        conn.cursor().execute(
            "INSERT INTO test_table(col1, col2) "
            "VALUES(%(col1)s, %(col2)s)", {
                'col1': 789,
                'col2': 'test string3',
                })
    
  • Using %s as the placeholder:

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    

You can also use a list object to bind data for the IN operator:

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))

qmark or numeric binding

Both qmark binding and numeric binding bind data on the server side rather than on the client side.

To use qmark or numeric style binding, set the mode to ‘qmark’ by executing:

snowflake.connector.paramstyle='qmark'

Important

You must set the paramstyle before you call the connect() method.

If paramstyle is specified as qmark or numeric in the connection parameter, the binding variables should be ? or :N, respectively, and the binding occurs on the server side.

For example:

  • Using ? as the placeholder:

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
  • Using :N as the placeholder:

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    
  • Binding datetime with TIMESTAMP using qmark binding:

    When using qmark or numeric binding to bind data to a Snowflake TIMESTAMP data type, specify the Snowflake timestamp data type, i.e., TIMESTAMP_LTZ or TIMESTAMP_TZ, in the form of a tuple.

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "CREATE OR REPLACE TABLE testtable2 ("
        "   col1 int, "
        "   col2 string, "
        "   col3 timestamp_ltz"
        ")"
    )
    
    con.cursor().execute(
        "INSERT INTO testtable2(col1,col2,col3) "
        "VALUES(?,?,?)", (
            987,
            'test string4',
            ("TIMESTAMP_LTZ", datetime.now())
        )
    )
    

    Unlike client side binding, the server side binding requires the Snowflake data type for the column. Most common Python data types already have implicit mappings to Snowflake data types (e.g. int is mapped to FIXED). However, because Python datetime data can be bound with multiple Snowflake data types (for example, TIMESTAMP_NTZ, TIMESTAMP_LTZ or TIMESTAMP_TZ), and the default mapping is TIMESTAMP_NTZ, binding Python datetime data requires the user to specify a specific type type (e.g. TIMESTAMP_LTZ), and therefore the data type must be specified as shown in the above example.

Avoid SQL Injection Attacks

Avoid binding data using Python’s formatting function because you risk SQL injection. For example:

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )

Instead, store the values in variables, check those values (for example, by looking for suspicious semicolons inside strings), and then bind the parameters using qmark or numeric binding style.

Retrieving Column Metadata

Column metadata is stored in the Cursor object in the description attribute.

The following simple example retrieves the list of column names:

    cur = conn.cursor()
    cur.execute("SELECT * FROM test_table")
    print(','.join([col[0] for col in cur.description]))

Retrieving Snowflake Query IDs

A query ID is assigned to each query executed by Snowflake. In the Snowflake web interface, query IDs are displayed in the History History tab page and when checking the status of a query.

The Snowflake Connector for Python provides a special attribute, sfqid, in the Cursor object so that you can associate it with the status in the web interface. In order to retrieve the Snowflake query ID, execute the query first and then retrieve it through the sfqid attribute:

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

Handling Errors

The application must handle exceptions raised from Snowflake Connector properly and decide to continue or stop running the code.

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

Using execute_stream to Execute SQL Scripts

The execute_stream function enables you to run one or more SQL scripts in a stream:

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)

Closing the Connection

As a best practice, close the connection by calling the close method:

        connection.close()

This ensures the collected client metrics are submitted to the server and the session is deleted. Also, try-finally blocks help ensure the connection is closed even if an exception is raised in the middle:

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()

Using Context Manager to Connect and Control Transactions

The Snowflake Connector for Python supports a context manager that allocates and releases resources as required. The context manager is useful for committing or rolling back transactions based on the statement status when autocommit is disabled.

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail

In the above example, when the third statement fails, the context manager rolls back the changes in the transaction and closes the connection. If all statements were successful, the context manager would commit the changes and close the connection.

An equivalent code with try and except blocks is as follows:

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()

Logging

The Snowflake Connector for Python leverages the standard Python logging module to log status at regular intervals so that the application can trace its activity working behind the scenes. The simplest way to enable logging is call logging.basicConfig() in the beginning of the application.

For example, to set the logging level to INFO and store the logs in a file named /tmp/snowflake_python_connector.log:

        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)

More comprehensive logging can be enabled by setting the logging level to DEBUG as follows:

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

Note

botocore and boto3 are available through the AWS (Amazon Web Services) SDK for Python.

Sample Program

The following sample code combines many of the examples described in the previous sections into a working python program. This example contains two parts:

  • A parent class (“python_veritas_base”) contains the code for many common operations, such as connecting to the server.

  • A child class (“python_connector_example”) represents the custom portions of a particular client, for example, querying a table.

This sample code is imported directly from one of our tests to help ensure that it is has been executed on a recent build of the product.

Because this is taken from a test, it includes a small amount of code to set an alternative port and protocol used in some tests. Customers should not set the protocol or port number; instead, omit these and use the defaults.

This also contains some section markers (sometimes called “snippet tags”) to identify code that can be imported independently into the documentation. Section markers typically look similar to:

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------

These section markers are not required in customer client code.

The first part of the code sample contains the common subroutines to:

  • Read command-line arguments (for example “–warehouse MyWarehouse)”) that contain connection information.

  • Connect to the server.

  • Create and USE a warehouse, database, and schema.

  • Drop the schema, database, and warehouse when you are done with them.


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This connects gets account and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account and login information from environment variables and
        # command-line parameters.
        # Note that ACCOUNT might require the region and cloud platform where
        # your account is located, in the form of
        #     '<your_account_name>.<region>.<cloud>'
        # for example
        #     'xy12345.us-east-1.azure')
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
                PROTOCOL = connection_parameters["protocol"]
            except:
                PORT = ""
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


The second part of the code sample creates a table, inserts rows into it, etc.:


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

To run this sample, do the following:

  1. Copy the first piece of code to a file named “python_veritas_base.py”.

  2. Copy the second piece of code to a file named “python_connector_example.py”

  3. Set the SNOWSQL_PWD environment variable to your password, for example:

    export SNOWSQL_PWD='MyPassword'
    
  4. Execute the program using a command line similar to the following (replace the user and account information with your own user and account information, of course).

    Warning

    This deletes the warehouse, database, and schema at the end of the program! Do not use the name of an existing database because you will lose it!

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account xy98764 --user MyUserName
    

Here is the output:

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...

Here is a longer example:

Note

In the section where you set your account and login information, make sure to replace the variables as needed to match your Snowflake login information (name, password, etc.).

This example uses the format() function to compose the statement. If your environment has a risk of SQL injection attacks, you might prefer to bind values rather than use format().

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values). Note that ACCOUNT might also require the
# region and cloud platform where your account is located, in the form of
# '<your_account_name>.<region_id>.<cloud_platform>' (e.g. 'xy12345.east-us-2.azure')
ACCOUNT = '<your_account_name>'
USER = '<your_login_name>'
PASSWORD = '<your_password>'

import os

# Only required if you copy data from your own S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <your_s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
     CREDENTIALS = (
        aws_key_id='{aws_access_key_id}',
        aws_secret_key='{aws_secret_access_key}')
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()