Snowflake Connector for Apache Kafka

The Snowflake Connector for Apache Kafka reads data from one or more Apache Kafka topics and loads that data into a Snowflake table.

In this Topic:

Introduction

Apache Kafka (aka “Kafka”) software enables processes to read and write messages asynchronously. The reader does not need to be connected directly to the writer; a writer can queue a message in Kafka for the reader to receive later.

Kafka uses the publish/subscribe model. A writer publishes “messages” to a “topic”, and readers subscribe to a topic to receive those messages. Kafka can process, as well as transmit, messages; however, that is outside the scope of this document.

Topics can be divided into “partitions” to increase scalability. (A Kafka topic “partition” is not related to a Snowflake micro-partition.)

A Kafka Connect cluster is a separate cluster from the Kafka Cluster. The Kafka Connect cluster supports running and scaling out connectors (components that support reading and/or writing between external systems).

The Snowflake Connector for Apache Kafka is a connector designed to run in a Kafka Connect cluster to read data from Kafka topics and write that data into Snowflake tables.

For more information about Apache Kafka, see: https://kafka.apache.org/

For more information about Kafka Connect, see: https://docs.confluent.io/3.0.0/connect/

From the perspective of Snowflake, a Kafka topic produces a stream of rows to be inserted into a Snowflake table. In general, each Kafka message contains one row.

Kafka, like many other message publish/subscribe platforms, allows a many-to-many relationship between publishers and subscribers. A single writer can publish to many topics, and a single reader can read from multiple topics. With Snowflake, the typical pattern is that one topic supplies messages (rows) for one Snowflake table.

The current version of the Snowflake Connector for Apache Kafka is used only for loading data into Snowflake.

Kafka can stream data to Snowflake with or without the Snowflake Connector for Apache Kafka. However, the Snowflake Connector for Apache Kafka makes it easier to add a reader(s) that publishes messages from a topic to a Snowflake table.

The Snowflake Connector for Apache Kafka is one of many ways to load data into Snowflake. For information about other ways to load data, see Loading Data into Snowflake.

Schema of Kafka-Compatible Snowflake Tables

A Kafka message has an internal structure that depends upon the information being sent. For example, a message from an IoT (Internet of Things) weather sensor might include the timestamp at which the data was recorded, the location of the sensor, the temperature, humidity, etc. A message from an inventory system might include the product ID and the number of items sold, perhaps along with a timestamp indicating when they were sold or shipped.

Typically, each message in a specific topic has the same basic structure. Different topics typically use different structure.

Each Kafka message is passed to Snowflake in JSON format or Avro (tm) format. The Snowflake Connector for Apache Kafka stores that formatted information in a single column of type VARIANT. The data is not parsed, and the data is not split into multiple columns in the Snowflake table.

Each Snowflake table loaded by the Kafka connector contains one additional column, also of type VARIANT, which contains the Kafka message’s metadata.

Thus every Snowflake table loaded by the connector has a simple schema: two VARIANT columns.

You can query the Snowflake tables directly by using the appropriate syntax for VARIANT columns (Querying Semi-structured Data). Alternatively, you can extract the data from these tables and store it in other tables (typically tables that split the VARIANT values into individual columns).

How the Snowflake Connector for Apache Kafka Works

The Snowflake Connector for Apache Kafka uses existing Snowflake functionality to load data into Snowflake. At a high level, it does the following:

  1. Reads the configuration file.
  2. Subscribes to one or more Kafka topics.
  3. Reads messages.
  4. Writes the messages to a file in a Snowflake internal stage.
  5. Tells Snowpipe to read the file and load the data into the table.
  6. Monitors the loading process and handles errors as they occur.

Customers interested in more detail can read the optional Algorithm Details (Optional Reading) section below.

Algorithm Details (Optional Reading)

This section provides more detail about how data is loaded by the Snowflake Connector for Apache Kafka.

  1. The Snowflake Connector for Apache Kafka reads configuration information from one of the following sources:
    • The configuration file.
    • The command line.
    • The Confluent Control Center UI, which allows the user to specify parameters when starting the connector.
  2. The Snowflake Connector for Apache Kafka subscribes to the Kafka topic(s) that were specified in the configuration file.
  3. The Snowflake Connector for Apache Kafka checks whether the specified Snowflake table exists. If the table does not exist, the connector creates it. If a table with the specified name exists, the connector checks whether the table schema (columns and data types) matches the schema that the connector expects (Schema of Kafka-Compatible Snowflake Tables). If the schema doesn’t match, an error message is emitted.
  4. The connector buffers messages from the Kafka topic(s).
  5. When a threshold (time or memory or number of messages) is reached, the connector writes the messages to a temporary file in a Snowflake internal stage.
  6. Snowflake Snowpipe is triggered to ingest the temporary file.
  7. Snowflake Snowpipe wakes up and reads the file from the stage and inserts the rows into the table that was specified in the configuration file.
  8. The connector monitors Snowpipe and deletes each file in the internal stage after confirming that the file’s data has been loaded into the table. If there were any failures during loading, the file is moved to ‘table stage’ and an error message is thrown for future investigation.
  9. The connector repeats steps 4-8.

Fault Tolerance

Kafka and the Snowflake Connector for Apache Kafka are fault-tolerant. Messages will neither be duplicated nor silently dropped. Messages will be delivered exactly once, or an error message will be generated. If an error is detected while loading a record (for example, the record was expected to be a well-formed JSON or Avro record, but wasn’t well-formed) then the record is not loaded; instead, an error message is generated.

Limitations of Fault Tolerance with the Snowflake Connector for Apache Kafka

Kafka Topics can be configured with a limit on storage space or retention time.

  • The default retention time is 7 days. If the system is offline for more than the retention time, then expired records will not be loaded. Similarly, if Kafka’s storage space limit is exceeded, some messages will not be delivered.
  • If messages in the Kafka topic are deleted or updated, these changes might not be reflected in the Snowflake table.

Warning

Instances of the Snowflake Connector for Apache Kafka do not communicate with each other. If you start multiple instances of the Snowflake Connector for Apache Kafka on the same topics or partitions, then multiple copies of the same row might be inserted into the table. This is not recommended; each topic should be processed by only one instance of the Snowflake Connector for Apache Kafka.

It’s theoretically possible for messages to flow from Kafka faster than Snowflake can ingest them. In practice, however, this is unlikely. If it does occur, then solving the problem would require performance tuning of the Kafka Connector cluster, for example:

  • Tuning the number of nodes in the Connect cluster.
  • Tuning the number of tasks allocated to the Snowflake Connector for Apache Kafka.
  • Understanding the impact of the network bandwidth between the Connector and the Snowflake deployment.

Important

Although the Snowflake Connector for Apache Kafka guarantees exactly-once delivery, it does not guarantee that rows are inserted in the order that they were originally published.

Platforms

The Snowflake Connector for Apache Kafka can run in any Kafka Connect cluster, and can send data to any Snowflake instance.

Billing Information

There is no direct charge for using the Snowflake Connector for Apache Kafka. However, there are indirect costs:

  • Snowflake Snowpipe is used to load the data that the connector reads from Kafka, and Snowpipe processing time is billable time.
  • Data storage is billable.

Installing the Snowflake Connector for Apache Kafka

The Snowflake Connector for Apache Kafka is supplied as a .jar (Java executable) file.

Pre-Requisites for Installing and Using the Snowflake Connector for Apache Kafka

  1. Apache Kafka should be configured with the desired data retention time and/or storage limit.
  2. The Kafka Connect cluster needs to be installed and configured.
  3. The Kafka Connector needs to be installed and configured.
  4. Each Kafka Connect cluster node should include enough RAM for the Snowflake Connector for Apache Kafka. The minimum recommended amount is 5 MB per Kafka partition. (This is in addition to the RAM required for any other work that Kafka Connect is doing.)
  5. Snowflake strongly recommends that your Kafka Connect instance run in the same region as your Snowflake database server is running in. (This is not strictly required, but typically improves throughput.)
  6. To limit the harm if credentials are compromised, Snowflake recommends that you create a separate user and role for each Kafka instance so that these credentials can be individually revoked if needed. Each role should be assigned as the default role for the user, and should be granted only the minimum required privileges, such as the privileges to create tables, stages, and pipes in the target schema. After the connector has started and the target tables have been created, the permissions to create tables can be revoked, and the permissions to the target tables can be limited to insertion only. For more information about creating users and roles, see:
  1. If other users need to use the table, you must grant appropriate privileges on that table to other users.

Download the .jar File

The .jar file containing the Snowflake Connector for Apache Kafka can be downloaded from:

https://mvnrepository.com/artifact/com.snowflake

Most users do not need the source code to this connector. However, for users who would like the source code, it is available at:

https://github.com/snowflakedb/snowflake-kafka-connector

Install the .jar File

The Snowflake Connector for Apache Kafka can be installed by using the instructions for installing other connectors for Kafka Connect. These instructions are at:

https://docs.confluent.io/current/connect/userguide.html

Configuring the Snowflake Connector for Apache Kafka

The Snowflake Connector for Apache Kafka is configured by creating a file in JSON format that specifies parameters such as topic name(s), Snowflake table name(s), etc.

Each configuration file specifies the topics and corresponding tables for one database and one schema in that database. (One connector can ingest messages from any number of topics, but the corresponding tables must all be in a single database and schema.)

Fields in the Configuration File

Field Notes
name Application name. This must be unique across all Kafka connectors used by the customer. This name name must be a valid Snowflake unquoted identifier. For information about valid identifiers, see Identifier Syntax.
   
connector.class com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max Number of tasks, usually the same as the number of CPU cores across the worker nodes in the Kafka Connect cluster. This number can be set lower or higher; however, the best practice is not to set it higher.
topics Comma-separated list of topics. By default, Snowflake assumes that the table name is the same as the topic name. If the table name is not the same as the topic name, then use the optional topics.table.map parameter (below) to specify the mapping from topic name to table name. This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier Syntax.
snowflake.topic2table.map This optional parameter allows a user to specify which topics should be mapped to which tables. Each topic and its table name should be separated by a colon. (See example below.) This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier Syntax.
buffer.count.records Number of records buffered in memory per Kafka partition before ingesting to Snowflake. The default value is 10000 records.
buffer.size.bytes Cumulative size in bytes of records buffered in memory per Kafka partition before ingesting to Snowflake. The default value for this is 5MB.
snowflake.url.name The Snowflake URL, in a form similar to “https://<account_name>.<region_name>.snowflakecomputing.com:443”. The “https://” is optional. The port number is optional. The region name is optional if you are using the AWS west region (us-west-2) and are not using AWS PrivateLink. For more information about Snowflake account names and region names, see Snowflake Regions.
snowflake.user.name User.
snowflake.private.key The private key to authenticate the Snowflake user.
snowflake.database.name The name of the database that contains the table to insert rows into.
snowflake.schema.name The name of the schema that contains the table to insert rows into.
key.converter This is the Kafka record’s key converter, e.g. “org.apache.kafka.connect.storage.StringConverter”. This is not used by the Snowflake Kafka Connector, but is required by the Kafka Connect Platform.
value.converter If the records are formatted in JSON, this should be “com.snowflake.kafka.connector.records.SnowflakeJsonConverter”. If the records are formatted in Avro and use Kafka’s Schema Registry Service, this should be “com.snowflake.kafka.connector.records.SnowflakeAvroConverter”. If the records are formatted in Avro and contain the schema (and therefore do not need Kafka’s Schema Registry Service), this should be “com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistery”.
value.converter.schema.registry.url If the format is Avro and you are using a Schema Registry Service, this should be the URL of the Schema Registry Service. Otherwise this field should be empty.

Sample Configuration File

Here is a sample configuration file:

{
  "name":"XYZCompanySensorData",
  "Config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"<topic1>,<topic2>",
    "snowflake.topic2table.map": "<topic1>:<table1>,<topic2>:<table2>",
    "buffer.count.records":"100",
    "buffer.size.bytes":"65536",
    "snowflake.url.name":"<account name>.<region_name>.snowflakecomputing.com:443",
    "snowflake.user.name":"User1",
    "snowflake.private.key":"xyz123",
    "snowflake.database.name":"MyDbName",
    "snowflake.schema.name":"MySchemaName",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081"
  }
}

Externalizing Secrets

Snowflake strongly recommends externalizing secrets such as the private key and storing them in an encrypted form or in a key management service like AWS KMS, Azure Key Vault, or HashiCorp Vault. This can be accomplished by implementing and installing a ConfigProvider on your Kafka Connect cluster. For more information, see Confluent’s description of this service (https://docs.confluent.io/current/connect/security.html#externalizing-secrets).

Using Key Pair Authentication

Snowflake supports using key pair authentication rather than the typical username/password authentication. This authentication method requires a 2048-bit (minimum) RSA key pair. Generate the public-private key pair using OpenSSL. The public key is assigned to the Snowflake user who uses the Snowflake client.

To configure the public/private key pair:

  1. From the command line in a terminal window, generate a private key.

    You can generate either an encrypted version of the private key or an unencrypted version of the private key.

    To generate an unencrypted version, use the following command:

    $ openssl genrsa -out rsa_key.pem 2048
    

    To generate an encrypted version, use the following command:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
    

    It is typically safer to generate an encrypted version.

    If you use the second command to encrypt the private key, then OpenSSL prompts for a passphrase used to encrypt the private key file. We recommend using a strong passphrase to protect the private key. Record this passphrase in a secure location. You will input it when connecting to Snowflake. Note that the passphrase is only used for protecting the private key and will never be sent to Snowflake.

    Sample PEM private key

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. From the command line, generate the public key by referencing the private key:

    Assuming the private key is encrypted and contained in the file named “rsa_key.p8”, use the following command:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Sample PEM public key

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Copy the public and private key files to a local directory for storage. Record the path to the files. Note that the private key is stored using the PKCS#8 (Public Key Cryptography Standards) format and is encrypted using the passphrase you specified in the previous step; however, the file should still be protected from unauthorized access using the file permission mechanism provided by your operating system. It is your responsibility to secure the file when it is not being used.

  4. Assign the public key to the Snowflake user using ALTER USER. For example:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Note

    • Only security administrators (i.e. users with the SECURITYADMIN role) or higher can alter a user.
    • Exclude the public key header and footer in the SQL statement.

    Verify the user’s public key fingerprint using DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Note

    The RSA_PUBLIC_KEY_2_FP property is described in Key Rotation (in this topic).

The private key should be passed unencrypted to the driver.

If the private key is encrypted and you need to decrypt it, you can use the following command:

openssl rsa -in rsa_key.p8 -out rsa_key.priv

Make sure to restrict the privileges on the output file (“rsa_key.priv” in the sample above) to prevent unauthorized users from reading the file.

Key Rotation

Snowflake supports multiple active keys to allow for uninterrupted rotation. Rotate and replace your public and private keys based on the expiration schedule you follow internally.

Currently, you can use the RSA_PUBLIC_KEY and RSA_PUBLIC_KEY_2 parameters for ALTER USER to associate up to 2 public keys with a single user.

To rotate your keys:

  1. Complete the steps in Using Key Pair Authentication to:

    • Generate a new private and public key set.

    • Assign the public key to the user. Set the public key value to either RSA_PUBLIC_KEY or RSA_PUBLIC_KEY_2 (whichever key value is not currently in use). For example:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Update the code to connect to Snowflake. Specify the new private key.

    Snowflake verifies the correct active public key for authentication based on the private key submitted with your connection information.

  3. Remove the old public key from the user profile. For example:

    alter user jsmith unset rsa_public_key;
    

Testing and Using the Snowflake Connector for Apache Kafka

After you install the Snowflake Connector for Apache Kafka, Snowflake recommends testing the connector with a small amount of data before using the connector in a production system. The process for testing is the same as the process for using the connector normally:

  1. Verify that Kafka and Kafka Connect are running.
  2. Verify that you have created the appropriate Kafka topic.
  3. Create (or use an existing) message publisher. Make sure that the messages published to the topic have the right format (JSON or Avro).
  4. Create a configuration file that specifies the topic and partition to read from, and the Snowflake data table to write to.
  5. Optional: Create a table into which to write data. (Creating the table is optional; if you do not create the table, then the Snowflake Connector for Apache Kafka will create the table for you. If you do not plan to use the connector to add data to an existing, non-empty table, then Snowflake recommends that you let the connector create the table for you to minimize the possibility of a schema mismatch.)
  6. Wait a few minutes for data to propagate through the system, and then check the Snowflake table to verify that the records have been inserted.

Troubleshooting

If you see an error message, this section provides troubleshooting tips.

To help you isolate the source of a problem, you might want to get more background information about how the connector works by reading the following section(s):

Possible problems include:

  1. Configuration errors:

    1. The connector doesn’t have the proper information to subscribe to the topic.
    2. The connector doesn’t have the proper information to write to the Snowflake table (for example, the key pair for authentication might be wrong).

    Note that the Snowflake Connector for Apache Kafka validates its parameters. The connector throws an error for each incompatible configuration parameter. The error message is written to the Kafka Connect cluster’s log file. If you suspect a configuration problem, check the errors in that log file.

  2. Read errors: The Snowflake Connector for Apache Kafka might not have been able to read from Kafka.

    1. Kafka or Kafka Connect might not be running.
    2. The message might not have been sent yet.
    3. The message might have been deleted (expired).
  3. Problems writing to the Snowflake internal stage:

    1. Privileges.
    2. Out of space.
    3. Stage was dropped.
    4. Some other user or process wrote unexpected files to the stage.
  4. Problems writing to the table

    1. Privileges.
  5. Connectivity/firewall issues from Kafka Connect to Snowflake/AWS.

  6. Manual cleanup of the stage where files are put.

  7. Other users making changes to the data in the table.

  8. If the data volume on the Kafka Connect clusters is very high, messages might be delayed.

Reading Log Information

The Snowflake Connector for Apache Kafka writes to the Kafka Connector log file. This file is a shared file; all Kafka Connector plugins write to the same file.

The name and location of this log file should be in your Kafka Connector configuration file (not your Snowflake Connector for Apache Kafka configuration file).

If you have problems, search this file for Snowflake-related error messages. Many such messages will have the string “ERROR” and will contain the file name “com.snowflake.kafka.connector…” to make these messages easier to find.

Reporting Problems

Please have the following information available:

  • Your Snowflake Connector for Apache Kafka configuration file (after you remove the private key).
  • A copy of the Kafka Connector Log (after ensuring that it does not contain confidential information).