Snowflake Connector for Kafka

The Snowflake Connector for Kafka reads data from one or more Apache Kafka (aka “Kafka”) topics and loads the data into a Snowflake table.

In this Topic:

Introduction

Apache Kafka software enables processes to read and write messages asynchronously. The reader does not need to be connected directly to the writer; a writer can queue a message in Kafka for the reader to receive later.

Kafka uses the publish/subscribe model. A writer publishes messages to a topic, and readers subscribe to a topic to receive those messages. Kafka can process, as well as transmit, messages; however, that is outside the scope of this document.

Topics can be divided into partitions to increase scalability. Note that a Kafka topic partition is not the same as a Snowflake micro-partition.

A Kafka Connect cluster is a separate cluster from the Kafka cluster. The Kafka Connect cluster supports running and scaling out connectors (components that support reading and/or writing between external systems).

The Snowflake Connector for Kafka is designed to run in a Kafka Connect cluster to read data from Kafka topics and write the data into Snowflake tables.

For more details about Apache Kafka, see https://kafka.apache.org/. For more details about Kafka Connect, see https://docs.confluent.io/3.0.0/connect/.

From the perspective of Snowflake, a Kafka topic produces a stream of rows to be inserted into a Snowflake table. In general, each Kafka message contains one row.

Kafka, like many message publish/subscribe platforms, allows a many-to-many relationship between publishers and subscribers. A single writer can publish to many topics, and a single reader can read from multiple topics. With Snowflake, the typical pattern is that one topic supplies messages (rows) for one Snowflake table.

The current version of the Kafka connector is used only for loading data into Snowflake.

Kafka can stream data to Snowflake with or without the Kafka connector. However, the connector makes it easier to add one or more readers that publish messages from a topic to a Snowflake table.

The Kafka connector provides one of many methods for loading data into Snowflake. For information about other ways to load data, see Loading Data into Snowflake.

Schema of Kafka-compatible Snowflake Tables

Every Snowflake table loaded by the Kafka connector has a schema consisting of two VARIANT columns:

  • RECORD_DATA. This contains the Kafka message.

  • RECORD_METADATA. This contains metadata about the message, for example, the topic from which the message was read.

If Snowflake creates the table, then the table contains only these two columns. If the user creates the table for the Kafka Connector to add rows to, then the table can contain more than these two columns (any additional columns must allow NULL values because data from the connector does not include values for those columns).

The RECORD_DATA column contains the Kafka message. A Kafka message has an internal structure that depends upon the information being sent. For example, a message from an IoT (Internet of Things) weather sensor might include the timestamp at which the data was recorded, the location of the sensor, the temperature, humidity, etc. A message from an inventory system might include the product ID and the number of items sold, perhaps along with a timestamp indicating when they were sold or shipped.

Typically, each message in a specific topic has the same basic structure. Different topics typically use different structure.

Each Kafka message is passed to Snowflake in JSON format or Avro format. The Kafka connector stores that formatted information in a single column of type VARIANT. The data is not parsed, and the data is not split into multiple columns in the Snowflake table.

The RECORD_METADATA column contains the following information:

Field

Java . Data Type

SQL . Data Type

Required

Description

topic

long

BIGINT

Yes

The name of the Kafka topic that the record came from.

partition

String

VARCHAR

Yes

The number of the partition within the topic. (Note that this is the Kafka partition, not the Snowflake micro-partition.)

offset

int

INTEGER

Yes

The offset in that partition.

CreateTime / . LogAppendTime

long

BIGINT

No

This is the timestamp associated with the message in the Kafka topic. The value is milliseconds since midnight January 1, 1970, UTC. For more information, see: https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html

key

String

VARCHAR

No

If the message is a Kafka KeyedMessage, this is the key for that message. In order for the connector to store the key in the RECORD_METADATA, the key.converter parameter in the Kafka configuration file must be set to “org.apache.kafka.connect.storage.StringConverter”; otherwise, the connector ignores keys.

schema_id

int

INTEGER

No

When using Avro with a schema registry to specify a schema, this is the schema’s ID in that registry.

headers

Object

OBJECT

No

A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers.

The field names and values are case-sensitive.

Because topic names are often used as the basis for table names, Snowflake recommends that, when possible, topic names follow the rules for table names, as documented at Object Identifiers. For more information about how Kafka topic names are converted to Snowflake table names when the topic names do not follow the rules for Snowflake identifiers, see Kafka Connector Artifact Names.

In the future, Snowflake might store additional metadata in the RECORD_METADATA column.

Expressed in JSON syntax, a sample message might look similar to the following:

{
    "meta":
    {
        "offset": 1,
        "topic": "PressureOverloadWarning",
        "partition": 12,
        "key": "key name",
        "schema_id": 123,
        "CreateTime": 1234567890,
        "headers":
        {
            "name1": "value1",
            "name2": "value2"
        }
    },
    "content":
    {
        "ID": 62,
        "PSI": 451,
        "etc": "..."
    }
}

You can query the Snowflake tables directly by using the appropriate syntax for querying VARIANT columns.

Here is a simple example of extracting data based on the topic in the RECORD_METADATA:

select
       record_metadata:CreateTime,
       record_data:ID
    from table1
    where record_metadata:topic = 'PressureOverloadWarning';

The output would look similar to:

+------------+-----+
| CREATETIME | ID  |
+------------+-----+
| 1234567890 | 451 |
+------------+-----+

Alternatively, you can extract the data from these tables, flatten the data into individual columns, and store the data in other tables, which typically are easier to query.

Workflow for the Snowflake Connector for Kafka

The connector uses existing Snowflake functionality to load data into Snowflake. At a high level, it does the following:

  1. Reads the configuration file.

  2. Subscribes to one or more Kafka topics.

  3. Reads messages.

  4. Writes the messages to a file in a named Snowflake internal stage.

  5. Tells Snowpipe to read the file and load the data into the table.

  6. Monitors the loading process and handles errors as they occur.

For more details, see the optional Algorithm Details — Optional section (in this topic).

Algorithm Details — Optional

This section provides more detail about how data is loaded by the Snowflake Connector for Kafka:

  1. The Snowflake Connector for Kafka reads configuration information from one of the following sources:

    • The configuration file.

    • The command line.

    • The Confluent Control Center UI, which allows the user to specify parameters when starting the connector.

  2. The connector subscribes to the Kafka topic(s) that were specified in the configuration file.

  3. The connector checks whether the specified Snowflake table exists:

    • If the table does not exist, the connector creates it.

    • If a table with the specified name exists, the connector checks whether the table schema (columns and data types) matches the schema that the connector expects (Schema of Kafka-compatible Snowflake Tables). If the schema doesn’t match, an error message is returned.

  4. The connector buffers messages from the Kafka topic(s).

  5. When a threshold (time or memory or number of messages) is reached, the connector writes the messages to a temporary file in a Snowflake internal stage.

  6. Snowflake Snowpipe is triggered to ingest the temporary file.

  7. Snowflake Snowpipe reads the file from the internal stage and inserts the rows into the table that was specified in the configuration file.

  8. The connector monitors Snowpipe and deletes each file in the internal stage after confirming that the file’s data has been loaded into the table. If there were any failures during loading, the file is moved to ‘table stage’ and an error message is thrown for future investigation.

  9. The connector repeats steps 4-8.

Fault Tolerance

Both Kafka and the Snowflake Connector for Kafka are fault-tolerant. Messages are neither duplicated nor silently dropped. Messages are delivered exactly once, or an error message will be generated. If an error is detected while loading a record (for example, the record was expected to be a well-formed JSON or Avro record, but wasn’t well-formed), then the record is not loaded; instead, an error message is returned.

Limitations of Fault Tolerance with the Connector

Kafka Topics can be configured with a limit on storage space or retention time.

  • The default retention time is 7 days. If the system is offline for more than the retention time, then expired records will not be loaded. Similarly, if Kafka’s storage space limit is exceeded, some messages will not be delivered.

  • If messages in the Kafka topic are deleted or updated, these changes might not be reflected in the Snowflake table.

Attention

Instances of the Kafka connector do not communicate with each other. If you start multiple instances of the connector on the same topics or partitions, then multiple copies of the same row might be inserted into the table. This is not recommended; each topic should be processed by only one instance of the connector.

It is theoretically possible for messages to flow from Kafka faster than Snowflake can ingest them. In practice, however, this is unlikely. If it does occur, then solving the problem would require performance tuning of the Kafka Connect cluster. For example:

  • Tuning the number of nodes in the Connect cluster.

  • Tuning the number of tasks allocated to the connector.

  • Understanding the impact of the network bandwidth between the connector and the Snowflake deployment.

Important

Although the Kafka connector guarantees exactly-once delivery, it does not guarantee that rows are inserted in the order that they were originally published.

Supported Platforms

The Kafka connector can run in any Kafka Connect cluster, and can send data to any Snowflake instance.

Billing Information

There is no direct charge for using the Kafka connector. However, there are indirect costs:

  • Snowflake Snowpipe is used to load the data that the connector reads from Kafka, and Snowpipe processing time is charged to your account.

  • Data storage is charged to your account.

Installing the Snowflake Connector for Kafka

The connector is supplied as a .jar (Java executable) file.

Pre-Requisites for Installing and Using the Connector

  1. The Snowflake Connector for Kafka is built for Kafka Connect API 2.0.0. You should use a Kafka Connect API version between 2.0.0 and 2.3.0 (inclusive). Earlier versions are not compatible with the connector. Newer versions have not been tested with the connector.

  2. If you use Avro format:

  3. Apache Kafka should be configured with the desired data retention time and/or storage limit.

  4. The Kafka Connect cluster must be installed and configured.

  5. The Kafka Connector must be installed and configured.

  6. Each Kafka Connect cluster node should include enough RAM for the Kafka connector. The minimum recommended amount is 5 MB per Kafka partition. This is in addition to the RAM required for any other work that Kafka Connect is doing.

  7. Snowflake strongly recommends that your Kafka Connect instance run in the same region as your Snowflake database server is running in. This is not strictly required, but typically improves throughput.

  8. To limit the harm if credentials are compromised, Snowflake recommends that you create a separate user and role for each Kafka instance so that these credentials can be individually revoked if needed. Each role should be assigned as the default role for the user, and should be granted only the minimum required privileges, which generally include:

    • CREATE TABLE.

    • INSERT privilege on the specific table.

    • Create / drop / list stage.

    • Create / drop / query Snowpipe.

    • Insert into tables through Snowpipe.

    • Describe tables.

    • Describe stage.

    • Describe Snowpipe.

    Note

    After the connector has started and the target tables have been created, the permissions to create tables can be revoked, and the permissions to the target tables can be limited to insertion only.

    For more information about creating users and roles, see:

  9. If a table for the Kafka connector must also be accessed by other users, you must grant appropriate privileges on that table to those users.

Download the .jar File

The .jar file containing the Snowflake Connector for Kafka can be downloaded from:

The source code for the connector is available at:

Install the .jar File

The Snowflake Connector for Kafka can be installed using the instructions for installing other connectors for Kafka Connect. The instructions are available at:

Configuring the Snowflake Connector for Kafka

The connector is configured by creating a file in JSON format that specifies parameters such as topic name(s), Snowflake table name(s), etc.

Each configuration file specifies the topics and corresponding tables for one database and one schema in that database. Note that a connector can ingest messages from any number of topics, but the corresponding tables must all be in a single database and schema.

Fields in the Configuration File

Field

Notes

name

Application name. This must be unique across all Kafka connectors used by the customer. This name name must be a valid Snowflake unquoted identifier. For information about valid identifiers, see Identifier Requirements.

connector.class

com.snowflake.kafka.connector.SnowflakeSinkConnector

tasks.max

Number of tasks, usually the same as the number of CPU cores across the worker nodes in the Kafka Connect cluster. This number can be set lower or higher; however, Snowflake does not recommend setting it higher.

topics

Comma-separated list of topics. By default, Snowflake assumes that the table name is the same as the topic name. If the table name is not the same as the topic name, then use the optional topic2table.map parameter (below) to specify the mapping from topic name to table name. This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier Requirements.

topics.regex

This is a regular expression (“regex”) that specifies the topics that contain the messages to load into Snowflake tables. The connector loads data from any topic name that matches the regex. The regex must follow the rules for Java regular expressions (i.e. be compatible with java.util.regex.Pattern). The configuration file should contain either topics or topics.regex, not both.

snowflake.topic2table.map

This optional parameter allows a user to specify which topics should be mapped to which tables. Each topic and its table name should be separated by a colon (see example below). This table name must be a valid Snowflake unquoted identifier. For information about valid table names, see Identifier Requirements.

buffer.count.records

Number of records buffered in memory per Kafka partition before ingesting to Snowflake. The default value is 10000 records.

buffer.flush.time

Number of seconds between buffer flushes, where the flush is from the Snowflake Connector’s memory cache to the Snowflake stage. The default value is 30 seconds.

buffer.size.bytes

Cumulative size in bytes of records buffered in memory per Kafka partition before ingesting to Snowflake. The default value for this is 5 MB.

snowflake.url.name

The URL for accessing your Snowflake account, in the form of https://<account_name>.<region_id>.snowflakecomputing.com:443. Note that the https:// and port number are optional. The region ID is not used if your account is in the AWS US West region and your are not using AWS PrivateLink. For more information about Snowflake account names and region names, see Supported Regions.

snowflake.user.name

User login name for the Snowflake account.

snowflake.private.key

The private key to authenticate the user. Include only the key, not the header or footer. If the key is split across multiple lines, remove the line breaks. You can provide either an unencrypted key, or you can provide an encrypted key and provide the snowflake.private.key.passphrase parameter to enable Snowflake to decrypt the key. Use this parameter if and only if the snowflake.private.key parameter value is encrypted. This decrypts private keys that were encrypted according to the instructions at: https://docs.snowflake.net/manuals/user-guide/jdbc-configure.html#using-key-pair-authentication .

snowflake.private.key.passphrase

If the value of this parameter is not empty, the Snowflake Connector uses this phrase to try to decrypt the private key.

snowflake.database.name

The name of the database that contains the table to insert rows into.

snowflake.schema.name

The name of the schema that contains the table to insert rows into.

key.converter

This is the Kafka record’s key converter (e.g. "org.apache.kafka.connect.storage.StringConverter"). This is not used by the Kafka connector, but is required by the Kafka Connect Platform.

value.converter

If the records are formatted in JSON, this should be "com.snowflake.kafka.connector.records.SnowflakeJsonConverter". . If the records are formatted in Avro and use Kafka’s Schema Registry Service, this should be "com.snowflake.kafka.connector.records.SnowflakeAvroConverter". . If the records are formatted in Avro and contain the schema (and therefore do not need Kafka’s Schema Registry Service), this should be "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistery".

value.converter.schema.registry.url

If the format is Avro and you are using a Schema Registry Service, this should be the URL of the Schema Registry Service. Otherwise this field should be empty.

jvm.proxy.host

To enable the Snowflake Kafka Connector to access Snowflake through a proxy server, set this parameter to specify the host of that proxy server.

jvm.proxy.port

To enable the Snowflake Kafka Connector to access Snowflake through a proxy server, set this parameter to specify the port of that proxy server.

value.converter.basic.auth.credentials.source

If you are using the Avro data format and require secure access to the Kafka schema registry, set this parameter to the string “USER_INFO”, and set the value.converter.basic.auth.user.info parameter described below. Otherwise, omit this parameter.

value.converter.basic.auth.user.info

If you are using the Avro data format and require secure access to the Kafka schema registry, set this parameter to the string “<user_ID>:<password>”“, and set the value.converter.basic.auth.credentials.source parameter described above. Otherwise, omit this parameter.

Sample Configuration File

{
  "name":"XYZCompanySensorData",
  "Config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"<topic1>,<topic2>",
    "snowflake.topic2table.map": "<topic1>:<table1>,<topic2>:<table2>",
    "buffer.count.records":"100",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"65536",
    "snowflake.url.name":"<account name>.<region_id>.snowflakecomputing.com",
    "snowflake.user.name":"User1",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"MyDbName",
    "snowflake.schema.name":"MySchemaName",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"TomSmith:MyPassword"
  }
}

Important

Because the configuration file typically contains security related information, such as the private key, set read/write privileges appropriately on the file to limit access.

Externalizing Secrets

Snowflake strongly recommends externalizing secrets such as the private key and storing them in an encrypted form or in a key management service like AWS KMS, Azure Key Vault, or HashiCorp Vault. This can be accomplished by implementing and installing a ConfigProvider on your Kafka Connect cluster. For more information, see the Confluent description of this service (https://docs.confluent.io/current/connect/security.html#externalizing-secrets).

Using Key Pair Authentication

Snowflake supports using key pair authentication rather than the typical username/password authentication. This authentication method requires a 2048-bit (minimum) RSA key pair. Generate the public-private key pair using OpenSSL. The public key is assigned to the Snowflake user who uses the Snowflake client.

To configure the public/private key pair:

  1. From the command line in a terminal window, generate a private key.

    You can generate either an encrypted version of the private key or an unencrypted version of the private key.

    To generate an unencrypted version, use the following command:

    $ openssl genrsa -out rsa_key.pem 2048
    

    To generate an encrypted version, use the following command:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
    

    It is typically safer to generate an encrypted version.

    If you use the second command to encrypt the private key, then OpenSSL prompts for a passphrase used to encrypt the private key file. We recommend using a strong passphrase to protect the private key. Record this passphrase in a secure location. You will input it when connecting to Snowflake. Note that the passphrase is only used for protecting the private key and will never be sent to Snowflake.

    Sample PEM private key

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. From the command line, generate the public key by referencing the private key:

    Assuming the private key is encrypted and contained in the file named “rsa_key.p8”, use the following command:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Sample PEM public key

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Copy the public and private key files to a local directory for storage. Record the path to the files. Note that the private key is stored using the PKCS#8 (Public Key Cryptography Standards) format and is encrypted using the passphrase you specified in the previous step; however, the file should still be protected from unauthorized access using the file permission mechanism provided by your operating system. It is your responsibility to secure the file when it is not being used.

  4. Assign the public key to the Snowflake user using ALTER USER. For example:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Note

    • Only security administrators (i.e. users with the SECURITYADMIN role) or higher can alter a user.

    • Exclude the public key header and footer in the SQL statement.

    Verify the user’s public key fingerprint using DESCRIBE USER:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Note

    The RSA_PUBLIC_KEY_2_FP property is described in Key Rotation (in this topic).

Key Rotation

Snowflake supports multiple active keys to allow for uninterrupted rotation. Rotate and replace your public and private keys based on the expiration schedule you follow internally.

Currently, you can use the RSA_PUBLIC_KEY and RSA_PUBLIC_KEY_2 parameters for ALTER USER to associate up to 2 public keys with a single user.

To rotate your keys:

  1. Complete the steps in Using Key Pair Authentication to:

    • Generate a new private and public key set.

    • Assign the public key to the user. Set the public key value to either RSA_PUBLIC_KEY or RSA_PUBLIC_KEY_2 (whichever key value is not currently in use). For example:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Update the code to connect to Snowflake. Specify the new private key.

    Snowflake verifies the correct active public key for authentication based on the private key submitted with your connection information.

  3. Remove the old public key from the user profile. For example:

    alter user jsmith unset rsa_public_key;
    

Testing and Using the Snowflake Connector for Kafka

After you install the connector, Snowflake recommends testing with a small amount of data before using the connector in a production system. The process for testing is the same as the process for using the connector normally:

  1. Verify that Kafka and Kafka Connect are running.

  2. Verify that you have created the appropriate Kafka topic.

  3. Create (or use an existing) message publisher. Make sure that the messages published to the topic have the right format (JSON or Avro).

  4. Create a configuration file that specifies the topic and partition to read from, and the Snowflake data table to write to.

  5. (Optional) Create a table into which to write data. This step is optional; if you do not create the table, the Kafka connector creates the table for you. If you do not plan to use the connector to add data to an existing, non-empty table, then Snowflake recommends that you let the connector create the table for you to minimize the possibility of a schema mismatch.

  6. Wait a few minutes for data to propagate through the system, and then check the Snowflake table to verify that the records have been inserted.

Tip

Consider verifying you network connection to Snowflake using SnowCD before loading data to Snowflake in your test and production environments.

Manual Clean Up

In order for the Kafka Connector to read data from Kafka and write to Snowflake tables, the connector uses the following Snowflake resources:

  • Tables (one for each topic).

  • Stages (one for each table).

  • Snowpipes (one for each topic partition in each topic).

Each named internal stage stores not only files to be loaded into tables, but also “state” information that is used to ensure exactly-once delivery of rows from Kafka to the table. If a stage and its state information are preserved, then if the connector is stopped and restarted, the connector automatically tries to resume at the point where it left off. If a stage is removed, the connector cannot resume where it left off, and exactly-once delivery is not guaranteed.

When the Kafka connector is shut down, it does not know whether the customer plans to restart it again with the intention of resuming where processing left off, so the connector does not clean up the stages and snowpipes that it was using. If the customer stops the connector, and does not plan to restart the connector to load more data into the tables, the customer should manually clean up the stages and snowpipes by using commands such as:

  • DROP PIPE.

  • DROP STAGE.

If the data loaded into the tables is no longer needed, the customer might also want to drop the tables by using the DROP TABLE command.

To drop pipes, stages, and tables, the user must know the names of those pipes, stages, and tables. The rules for composing names (based on topic names) are below in the section Kafka Connector Artifact Names.

Kafka Connector Artifact Names

The Snowflake Connector for Kafka creates Snowflake objects, in particular tables, stages, and snowpipes. Objects created by the connector are called “connector artifacts”. To clean up a connector artifact manually (for example, to drop a table), you must know the name of the connector artifact. This section describes the naming rules the connector follows when creating these artifacts.

Table name

If you want to specify the name of the table for each topic, you can set the snowflake.topic2table.map parameter in the configuration file.

If you do not specify a table name for each topic, then the connector creates the table names based on the topic names.

If the topic name follows the rules for Snowflake identifers, then the table name will be the same as the topic name, or will be the upper-case version of the topic name (for example, the topic name “temperature_data” will be converted to the table name “TEMPERATURE_DATA”).

If the topic name does not follow the rules for table names, and if the snowflake.topic2table.map parameter is not set, then the connector converts the topic name to a valid Snowflake table name by using the following rules:

  • If the first character in the topic name is not an English language letter (a-z, or A-Z) or an underscore (_), then the table name is generated by adding an underscore to the start of the topic name, for example:

    topic name: 99LuftBallons

    table name: _99LuftBallons

  • If any character inside the topic name is not a legal character for a Snowflake table name, that character is replaced with the underscore character (“_”). (For more information about which characters are valid in table names, see Identifier Requirements.)

  • If any of the preceding adjustments to the name were made, then it is possible for topics that had different names to result in table names that are not different. For example, if you have topics “numbers+x” and “numbers-x”, then after the “+” and “-” are replaced with the underscore character, the names of both tables would be “numbers_x”. To avoid accidental duplication of table names, the connector appends a suffix to the table name. The suffix is an underscore followed by a hash code generated by calling the Java expression:

    Math.abs(<topic_name_string>.hashcode())

    where the <topic_name_string> is the original topic name string (without the added or substituted underscore characters). It is extremely unlikely that this will result in duplicate names.

Stage name

For each table, there is one stage. The stage name is SNOWFLAKE_KAFKA_CONNECTOR_<connector_name>_STAGE_<table_name>.

Pipe name

The connector creates one pipe for each topic partition. The name is:

SNOWFLAKE_KAFKA_CONNECTOR_<connector_name>_PIPE_<table name>_<partition_number>.

Tip

Snowflake recommends that you choose topic names that follow the rules for Snowflake identifier names, when possible.

Troubleshooting

This section provides troubleshooting tips for errors you might encounter.

To help you isolate the source of a problem, you might want to familiarize yourself with how the connector works by reading the Workflow for the Snowflake Connector for Kafka section (in this topic).

Possible errors that you might encounter include:

Configuration error

Possible causes of the error:

  • The connector doesn’t have the proper information to subscribe to the topic.

  • The connector doesn’t have the proper information to write to the Snowflake table (e.g. the key pair for authentication might be wrong).

Note that the Kafka connector validates its parameters. The connector throws an error for each incompatible configuration parameter. The error message is written to the Kafka Connect cluster’s log file. If you suspect a configuration problem, check the errors in that log file.

Read error

The connector might not have been able to read from Kafka for the following reasons:

  • Kafka or Kafka Connect might not be running.

  • The message might not have been sent yet.

  • The message might have been deleted (expired).

Write error (stage)

Possible causes of the error:

  • Insufficient privileges on the stage.

  • Stage is out of space.

  • Stage was dropped.

  • Some other user or process wrote unexpected files to the stage.

Write error (table)

Possible causes of the error:

  • Insufficient privileges on the table.

Additional possible issues include:

  1. Connectivity/firewall issues from Kafka Connect to Snowflake/AWS.

  2. Manual cleanup of the stage where files are put.

  3. Other users make changes to the data in the table.

  4. If the data volume on the Kafka Connect clusters is very high, messages might be delayed.

Reading Log Information

The Snowflake Connector for Kafka writes to the Kafka Connector log file. This file is a shared file; all Kafka Connector plugins write to the same file.

The name and location of this log file should be in your Kafka Connector configuration file, which is separate from your Snowflake Connector for Kafka configuration file.

If you encounter issues, search this file for Snowflake-related error messages. Most messages will have the string ERROR and will contain the file name com.snowflake.kafka.connector... to make these messages easier to find.

Reporting Issues

Please have the following information available:

  • Configuration file for your Snowflake Connector for Kafka (remove the private key before providing the file to Snowflake).

  • Copy of the Kafka Connector Log (ensure that the file does not contain confidential or sensitive information).