Snowflake Connector for Kafka

The Snowflake Connector for Kafka reads data from one or more Apache Kafka (aka “Kafka”) topics and loads the data into a Snowflake table.

In this Topic:


Apache Kafka software enables processes to read and write messages asynchronously. The reader does not need to be connected directly to the writer; a writer can queue a message in Kafka for the reader to receive later.

Kafka uses the publish/subscribe model. A writer publishes messages to a topic, and readers subscribe to a topic to receive those messages. Kafka can process, as well as transmit, messages; however, that is outside the scope of this document.

Topics can be divided into partitions to increase scalability. Note that a Kafka topic partition is not the same as a Snowflake micro-partition.

A Kafka Connect cluster is a separate cluster from the Kafka cluster. The Kafka Connect cluster supports running and scaling out connectors (components that support reading and/or writing between external systems).

The Snowflake Connector for Kafka is designed to run in a Kafka Connect cluster to read data from Kafka topics and write the data into Snowflake tables.

For more details about Apache Kafka, see For more details about Kafka Connect, see

From the perspective of Snowflake, a Kafka topic produces a stream of rows to be inserted into a Snowflake table. In general, each Kafka message contains one row.

Kafka, like many message publish/subscribe platforms, allows a many-to-many relationship between publishers and subscribers. A single writer can publish to many topics, and a single reader can read from multiple topics. With Snowflake, the typical pattern is that one topic supplies messages (rows) for one Snowflake table.

The current version of the Kafka connector is used only for loading data into Snowflake.

Kafka can stream data to Snowflake with or without the Kafka connector. However, the connector makes it easier to add one or more readers that publish messages from a topic to a Snowflake table.

The Kafka connector provides one of many methods for loading data into Snowflake. For information about other ways to load data, see Loading Data into Snowflake.

Schema of Kafka-compatible Snowflake Tables

A Kafka message has an internal structure that depends upon the information being sent. For example, a message from an IoT (Internet of Things) weather sensor might include the timestamp at which the data was recorded, the location of the sensor, the temperature, humidity, etc. A message from an inventory system might include the product ID and the number of items sold, perhaps along with a timestamp indicating when they were sold or shipped.

Typically, each message in a specific topic has the same basic structure. Different topics typically use different structure.

Each Kafka message is passed to Snowflake in JSON format or Avro format. The Kafka connector stores that formatted information in a single column of type VARIANT. The data is not parsed, and the data is not split into multiple columns in the Snowflake table.

Each Snowflake table loaded by the Kafka connector contains one additional column, also of type VARIANT, which contains the Kafka message’s metadata.

Thus, every Snowflake table loaded by the connector has a schema consisting of two VARIANT columns.

You can query the Snowflake tables directly by using the appropriate syntax for VARIANT columns. Alternatively, you can extract the data from these tables and store it in other tables, which typically flatten the VARIANT values into individual columns.

Workflow for the Snowflake Connector for Kafka

The connector uses existing Snowflake functionality to load data into Snowflake. At a high level, it does the following:

  1. Reads the configuration file.

  2. Subscribes to one or more Kafka topics.

  3. Reads messages.

  4. Writes the messages to a file in a Snowflake internal stage.

  5. Tells Snowpipe to read the file and load the data into the table.

  6. Monitors the loading process and handles errors as they occur.

For more details, see the optional Algorithm Details — Optional section (in this topic).

Algorithm Details — Optional

This section provides more detail about how data is loaded by the Snowflake Connector for Kafka:

  1. The Snowflake Connector for Kafka reads configuration information from one of the following sources:

    • The configuration file.

    • The command line.

    • The Confluent Control Center UI, which allows the user to specify parameters when starting the connector.

  2. The connector subscribes to the Kafka topic(s) that were specified in the configuration file.

  3. The connector checks whether the specified Snowflake table exists:

    • If the table does not exist, the connector creates it.

    • If a table with the specified name exists, the connector checks whether the table schema (columns and data types) matches the schema that the connector expects (Schema of Kafka-compatible Snowflake Tables). If the schema doesn’t match, an error message is returned.

  4. The connector buffers messages from the Kafka topic(s).

  5. When a threshold (time or memory or number of messages) is reached, the connector writes the messages to a temporary file in a Snowflake internal stage.

  6. Snowflake Snowpipe is triggered to ingest the temporary file.

  7. Snowflake Snowpipe reads the file from the stage and inserts the rows into the table that was specified in the configuration file.

  8. The connector monitors Snowpipe and deletes each file in the internal stage after confirming that the file’s data has been loaded into the table. If there were any failures during loading, the file is moved to ‘table stage’ and an error message is thrown for future investigation.

  9. The connector repeats steps 4-8.

Fault Tolerance

Both Kafka and the Snowflake Connector for Kafka are fault-tolerant. Messages will neither be duplicated nor silently dropped. Messages will be delivered exactly once, or an error message will be generated. If an error is detected while loading a record (for example, the record was expected to be a well-formed JSON or Avro record, but wasn’t well-formed, then the record is not loaded; instead, an error message is returned.

Limitations of Fault Tolerance with the Connector

Kafka Topics can be configured with a limit on storage space or retention time.

  • The default retention time is 7 days. If the system is offline for more than the retention time, then expired records will not be loaded. Similarly, if Kafka’s storage space limit is exceeded, some messages will not be delivered.

  • If messages in the Kafka topic are deleted or updated, these changes might not be reflected in the Snowflake table.


Instances of the Kafka connector do not communicate with each other. If you start multiple instances of the connector on the same topics or partitions, then multiple copies of the same row might be inserted into the table. This is not recommended; each topic should be processed by only one instance of the connector.

It’s theoretically possible for messages to flow from Kafka faster than Snowflake can ingest them. In practice, however, this is unlikely. If it does occur, then solving the problem would require performance tuning of the Kafka Connect cluster. For example:

  • Tuning the number of nodes in the Connect cluster.

  • Tuning the number of tasks allocated to the connector.

  • Understanding the impact of the network bandwidth between the connector and the Snowflake deployment.


Although the Kafka connector guarantees exactly-once delivery, it does not guarantee that rows are inserted in the order that they were originally published.

Supported Platforms

The Kafka connector can run in any Kafka Connect cluster, and can send data to any Snowflake instance.

Billing Information

There is no direct charge for using the Kafka connector. However, there are indirect costs:

  • Snowflake Snowpipe is used to load the data that the connector reads from Kafka, and Snowpipe processing time is charged to your account.

  • Data storage is charged to your account.

Installing the Snowflake Connector for Kafka

The connector is supplied as a .jar (Java executable) file.

Pre-Requisites for Installing and Using the Connector

  1. The Snowflake Connector for Kafka is built with Kafka Connect API 2.0.0. You should use a Kafka Connect API version between 2.0.0 and 2.2.0 (inclusive). Earlier versions are not compatible with the connector. Newer versions have not been tested with the connector.

  2. If you use Avro format:

    • Use the Avro parser, version 1.8.2 (or higher), from org.apache.avro.

    • If you use the schema registry feature with Avro, use version 5.0.0 (or higher) of the kafka-connect-avro-converter from io.confluent.

  3. Apache Kafka should be configured with the desired data retention time and/or storage limit.

  4. The Kafka Connect cluster must be installed and configured.

  5. The Kafka Connector must be installed and configured.

  6. Each Kafka Connect cluster node should include enough RAM for the Kafka connector. The minimum recommended amount is 5 MB per Kafka partition. This is in addition to the RAM required for any other work that Kafka Connect is doing.

  7. Snowflake strongly recommends that your Kafka Connect instance run in the same region as your Snowflake database server is running in. This is not strictly required, but typically improves throughput.

  8. To limit the harm if credentials are compromised, Snowflake recommends that you create a separate user and role for each Kafka instance so that these credentials can be individually revoked if needed. Each role should be assigned as the default role for the user, and should be granted only the minimum required privileges, which generally include:


    • INSERT privilege on the specific table.

    • Create / drop / list stage.

    • Create / drop / query Snowpipe.

    • Insert into tables through Snowpipe.

    • Describe tables.

    • Describe stage.

    • Describe Snowpipe.


    After the connector has started and the target tables have been created, the permissions to create tables can be revoked, and the permissions to the target tables can be limited to insertion only.

    For more information about creating users and roles, see:

  9. If a table for the Kafka connector must also be accessed by other users, you must grant appropriate privileges on that table to those users.

Download the .jar File

The .jar file containing the Snowflake Connector for Kafka can be downloaded from:

The source code for the connector is available at:

Install the .jar File

The Snowflake Connector for Kafka can be installed using the instructions for installing other connectors for Kafka Connect. The instructions are available at:

Configuring the Snowflake Connector for Kafka

The connector is configured by creating a file in JSON format that specifies parameters such as topic name(s), Snowflake table name(s), etc.

Each configuration file specifies the topics and corresponding tables for one database and one schema in that database. Note that a connector can ingest messages from any number of topics, but the corresponding tables must all be in a single database and schema.

Fields in the Configuration File




Application name. This must be unique across all Kafka connectors used by the customer. This name name must be a valid Snowflake unquoted identifier. For information about valid identifiers, see Identifier Syntax.




Number of tasks, usually the same as the number of CPU cores across the worker nodes in the Kafka Connect cluster. This number can be set lower or higher; however, we do not recommend setting it higher.


Comma-separated list of topics. By default, Snowflake assumes that the table name is the same as the topic name. If the table name is not the same as the topic name, then use the optional parameter (below) to specify the mapping from topic name to table name. This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier Syntax.

This optional parameter allows a user to specify which topics should be mapped to which tables. Each topic and its table name should be separated by a colon (see example below). This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier Syntax.


Number of records buffered in memory per Kafka partition before ingesting to Snowflake. The default value is 10000 records.


Number of seconds between buffer flushes, where the flush is from the Snowflake Connector’s memory cache to the Snowflake stage. The default value is 30 seconds.


Cumulative size in bytes of records buffered in memory per Kafka partition before ingesting to Snowflake. The default value for this is 5 MB.

The URL for accessing your Snowflake account, in the form of https://<account_name>.<region_id> Note that the https:// and port number are optional. The region ID is not used if your account is in the AWS US West region and your are not using AWS PrivateLink. For more information about Snowflake account names and region names, see Supported Regions.

User login name for the Snowflake account.


The private key to authenticate the user. Include only the key, not the header or footer. If the key is split across multiple lines, remove the line breaks. You can provide either an unencrypted key, or you can provide an encrypted key and provide the snowflake.private.key.passphrase parameter to enable Snowflake to decrypt the key. Use this parameter if and only if the snowflake.private.key parameter value is encrypted. This decrypts private keys that were encrypted according to the instructions at: .


If the value of this parameter is not empty, the Snowflake Connector will use this phrase to try to decrypt the private key.

The name of the database that contains the table to insert rows into.

The name of the schema that contains the table to insert rows into.


This is the Kafka record’s key converter (e.g. ""). This is not used by the Kafka connector, but is required by the Kafka Connect Platform.


If the records are formatted in JSON, this should be "com.snowflake.kafka.connector.records.SnowflakeJsonConverter". . If the records are formatted in Avro and use Kafka’s Schema Registry Service, this should be "com.snowflake.kafka.connector.records.SnowflakeAvroConverter". . If the records are formatted in Avro and contain the schema (and therefore do not need Kafka’s Schema Registry Service), this should be "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistery".


If the format is Avro and you are using a Schema Registry Service, this should be the URL of the Schema Registry Service. Otherwise this field should be empty.

To enable the Snowflake Kafka Connector to access Snowflake through a proxy server, set this parameter to specify the host of that proxy server.


To enable the Snowflake Kafka Connector to access Snowflake through a proxy server, set this parameter to specify the port of that proxy server.

Sample Configuration File

    "": "<topic1>:<table1>,<topic2>:<table2>",
    "":"<account name>.<region_id>",


Because the configuration file typically contains security related information, such as the private key, set read/write privileges appropriately on the file.

Externalizing Secrets

Snowflake strongly recommends externalizing secrets such as the private key and storing them in an encrypted form or in a key management service like AWS KMS, Azure Key Vault, or HashiCorp Vault. This can be accomplished by implementing and installing a ConfigProvider on your Kafka Connect cluster. For more information, see the Confluent description of this service (

Using Key Pair Authentication

Snowflake supports using key pair authentication rather than the typical username/password authentication. This authentication method requires a 2048-bit (minimum) RSA key pair. Generate the public-private key pair using OpenSSL. The public key is assigned to the Snowflake user who uses the Snowflake client.

To configure the public/private key pair:

  1. From the command line in a terminal window, generate a private key.

    You can generate either an encrypted version of the private key or an unencrypted version of the private key.

    To generate an unencrypted version, use the following command:

    $ openssl genrsa -out rsa_key.pem 2048

    To generate an encrypted version, use the following command:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8

    It is typically safer to generate an encrypted version.

    If you use the second command to encrypt the private key, then OpenSSL prompts for a passphrase used to encrypt the private key file. We recommend using a strong passphrase to protect the private key. Record this passphrase in a secure location. You will input it when connecting to Snowflake. Note that the passphrase is only used for protecting the private key and will never be sent to Snowflake.

    Sample PEM private key

  2. From the command line, generate the public key by referencing the private key:

    Assuming the private key is encrypted and contained in the file named “rsa_key.p8”, use the following command:

    $ openssl rsa -in rsa_key.p8 -pubout -out

    Sample PEM public key

    -----BEGIN PUBLIC KEY-----
    -----END PUBLIC KEY-----
  3. Copy the public and private key files to a local directory for storage. Record the path to the files. Note that the private key is stored using the PKCS#8 (Public Key Cryptography Standards) format and is encrypted using the passphrase you specified in the previous step; however, the file should still be protected from unauthorized access using the file permission mechanism provided by your operating system. It is your responsibility to secure the file when it is not being used.

  4. Assign the public key to the Snowflake user using ALTER USER. For example:



    • Only security administrators (i.e. users with the SECURITYADMIN role) or higher can alter a user.

    • Exclude the public key header and footer in the SQL statement.

    Verify the user’s public key fingerprint using DESCRIBE USER:

    DESC USER jsmith;
    | property                      | value                                               | default | description                                                                   |
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |


    The RSA_PUBLIC_KEY_2_FP property is described in Key Rotation (in this topic).

The private key should be passed unencrypted to the client.

If the private key is encrypted and you need to decrypt it, you can use the following command:

openssl rsa -in rsa_key.p8 -out rsa_key.priv

Make sure to restrict the privileges on the output file (rsa_key.priv in the above example command) to prevent unauthorized users from reading the file.

Key Rotation

Snowflake supports multiple active keys to allow for uninterrupted rotation. Rotate and replace your public and private keys based on the expiration schedule you follow internally.

Currently, you can use the RSA_PUBLIC_KEY and RSA_PUBLIC_KEY_2 parameters for ALTER USER to associate up to 2 public keys with a single user.

To rotate your keys:

  1. Complete the steps in Using Key Pair Authentication to:

    • Generate a new private and public key set.

    • Assign the public key to the user. Set the public key value to either RSA_PUBLIC_KEY or RSA_PUBLIC_KEY_2 (whichever key value is not currently in use). For example:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
  2. Update the code to connect to Snowflake. Specify the new private key.

    Snowflake verifies the correct active public key for authentication based on the private key submitted with your connection information.

  3. Remove the old public key from the user profile. For example:

    alter user jsmith unset rsa_public_key;

Testing and Using the Snowflake Connector for Kafka

After you install the connector, Snowflake recommends testing with a small amount of data before using the connector in a production system. The process for testing is the same as the process for using the connector normally:

  1. Verify that Kafka and Kafka Connect are running.

  2. Verify that you have created the appropriate Kafka topic.

  3. Create (or use an existing) message publisher. Make sure that the messages published to the topic have the right format (JSON or Avro).

  4. Create a configuration file that specifies the topic and partition to read from, and the Snowflake data table to write to.

  5. (Optional) Create a table into which to write data. This step is optional; if you do not create the table, the Kafka connector creates the table for you. If you do not plan to use the connector to add data to an existing, non-empty table, then Snowflake recommends that you let the connector create the table for you to minimize the possibility of a schema mismatch.

  6. Wait a few minutes for data to propagate through the system, and then check the Snowflake table to verify that the records have been inserted.


Consider verifying you network connection to Snowflake using SnowCD before loading data to Snowflake in your test and production environments.


This section provides troubleshooting tips for errors you may encounter.

To help you isolate the source of a problem, you might want to familiarize yourself with how the connector works by reading the Workflow for the Snowflake Connector for Kafka section (in this topic).

Possible errors that you might encounter include:

Configuration error

Possible causes of the error:

  • The connector doesn’t have the proper information to subscribe to the topic.

  • The connector doesn’t have the proper information to write to the Snowflake table (e.g. the key pair for authentication might be wrong).

Note that the Kafka connector validates its parameters. The connector throws an error for each incompatible configuration parameter. The error message is written to the Kafka Connect cluster’s log file. If you suspect a configuration problem, check the errors in that log file.

Read error

The connector might not have been able to read from Kafka for the following reasons:

  • Kafka or Kafka Connect might not be running.

  • The message might not have been sent yet.

  • The message might have been deleted (expired).

Write error (stage)

Possible causes of the error:

  • Insufficient privileges on the stage.

  • Stage is out of space.

  • Stage was dropped.

  • Some other user or process wrote unexpected files to the stage.

Write error (table)

Possible causes of the error:

  • Insufficient privileges on the table.

Additional possible issues include:

  1. Connectivity/firewall issues from Kafka Connect to Snowflake/AWS.

  2. Manual cleanup of the stage where files are put.

  3. Other users make changes to the data in the table.

  4. If the data volume on the Kafka Connect clusters is very high, messages might be delayed.

Reading Log Information

The Snowflake Connector for Kafka writes to the Kafka Connector log file. This file is a shared file; all Kafka Connector plugins write to the same file.

The name and location of this log file should be in your Kafka Connector configuration file, which is separate from your Snowflake Connector for Kafka configuration file.

If you encounter issues, search this file for Snowflake-related error messages. Most messages will have the string ERROR and will contain the file name com.snowflake.kafka.connector... to make these messages easier to find.

Reporting Issues

Please have the following information available:

  • Configuration file for your Snowflake Connector for Kafka (remove the private key before providing the file to Snowflake).

  • Copy of the Kafka Connector Log (ensure that the file does not contain confidential or sensitive information).