Automating Snowpipe for AWS S3

This topic provides instructions for triggering Snowpipe data loads automatically using Amazon SQS (Simple Queue Service) notifications for an S3 bucket.

Note

This feature is limited to Snowflake accounts hosted on AWS.

Attention

Automated Snowpipe relies on Amazon S3 event notifications to determine when new files arrive in monitored S3 buckets and are ready to load. Currently, AWS cannot guarantee reliable delivery of event notifications to consumers such as an SQS queue. Therefore, until AWS can provide reliability guarantees for S3 event notifications, Snowflake cannot guarantee loading of all files staged to monitored S3 buckets.

As a workaround, we suggest following our best practices for staging your data files and intermittently executing an ALTER PIPE … REFRESH statement to load any missed files. For satisfactory performance, we also recommend using a selective path prefix with ALTER PIPE to reduce the number of files that need to be listed and checked if they have been loaded already (e.g. bucket_name/YYYY/MM/DD/ or even bucket_name/YYYY/MM/DD/HH/ depending on your volume).

Note that calling Snowpipe REST endpoints to load data is not subject to this limitation.

In this Topic:

Limitations of Automated Snowpipe Using Amazon SQS

  • Virtual Private Snowflake (VPS) customers: Amazon SQS is not currently supported by AWS as a VPC endpoint. Although AWS services within a VPC (including VPS) can communicate with SQS, this traffic is not within the VPC, and therefore is not protected by the VPC.

  • Snowpipe with auto-ingest enabled uses SQS notifications to determine when new files arrive in monitored S3 buckets and are ready to load. SQS notifications contain the S3 event and a list of the file names. They do not include the actual data in the files.

Configuring Automated Snowpipe Using Amazon SQS

Before proceeding, determine whether an S3 event notification exists for the target path (or “prefix,” in AWS terminology) in your S3 bucket where your data files are located. AWS rules prohibit creating conflicting notifications for the same path.

The following options for automating Snowpipe using Amazon SQS are supported:

  • Option 1. New S3 event notification: Create an event notification for the target path in your S3 bucket. The event notification informs Snowpipe via an SQS queue when files are ready to load.

    This is the most common option.

    Important

    If a conflicting event notification exists for your S3 bucket, use Option 2 instead.

  • Option 2. Existing event notification: Configure Amazon Simple Notification Service (SNS) as a broadcaster to share notifications for a given path with multiple endpoints (or “subscribers,” e.g. SQS queues or AWS Lambda workloads), including the Snowflake SQS queue for Snowpipe automation. An S3 event notification published by SNS informs Snowpipe via an SQS queue when files are ready to load.

Option 1: Creating a New S3 Event Notification to Automate Snowpipe

This section describes the most common option for triggering Snowpipe data loads automatically using Amazon SQS (Simple Queue Service) notifications for an S3 bucket. The steps explain how to create an event notification for the target path (or “prefix,” in AWS terminology) in your S3 bucket where your data files are stored.

Important

If a conflicting event notification exists for your S3 bucket, use Option 2: Configuring Amazon SNS to Automate Snowpipe Using SQS Notifications (in this topic) instead. AWS rules prohibit creating conflicting notifications for the same target path.

The following diagram shows the Snowpipe auto-ingest process flow:

Snowpipe Auto-ingest Process Flow
  1. Data files are loaded in a stage.

  2. An S3 event notification informs Snowpipe via an SQS queue that files are ready to load. Snowpipe copies the files into a queue.

  3. A Snowflake-provided virtual warehouse loads data from the queued files into the target table based on parameters defined in the specified pipe.

Note

The instructions in this topic assume a target table already exists in the Snowflake database where your data will be loaded.

Step 1: Create a Stage (If Needed)

Create an external S3 stage using the CREATE STAGE command, or you can choose to use an existing external stage (the Snowpipe auto-ingest feature does not currently support internal, i.e. Snowflake, stages, or external Azure stages). Snowpipe fetches your data files from the stage and temporarily queues them before loading them into your target table.

For example, set snowpipe_db.public as the current schema for the current user session, and then create a stage named mystage:

USE SCHEMA snowpipe_db.public;

CREATE STAGE mystage
  URL='s3://mybucket/load/files'
  CREDENTIALS=(AWS_KEY_ID='**************' AWS_SECRET_KEY='*********************');

Step 2: Create a Pipe with Auto-Ingest Enabled

Create a pipe using the CREATE PIPE command. The pipe defines the COPY INTO <table> statement used by Snowpipe to load data from the ingestion queue into the target table.

For example, create a pipe in the snowpipe_db.public schema that loads the data from files staged in the mystage stage into the mytable table:

create or replace pipe snowpipe_db.public.mypipe auto_ingest=true as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

The AUTO_INGEST=true parameter specifies to read event notifications sent from an S3 bucket to an SQS queue when new data is ready to load.

Important

Compare the stage reference in the pipe definition with existing pipes. Verify that the directory paths for the same S3 bucket do not overlap; otherwise, multiple pipes could load the same set of data files multiple times, into one or more target tables. This can happen, for example, when multiple stages reference the same S3 bucket with different levels of granularity, such as s3://mybucket/path1 and s3://mybucket/path1/path2. In this use case, if files are staged in s3://mybucket/path1/path2, the pipes for both stages would load a copy of the files.

This is different from the manual Snowpipe setup (with auto-ingest disabled), which requires users to submit a named set of files to a REST API to queue the files for loading. With auto-ingest enabled, each pipe receives a generated file list from the S3 event notifications. Additional care is required to avoid data duplication.

Note

Cloning a database or schema clones all objects, including pipes, in the source database or schema. When a data file is created in a stage location (e.g. Blob storage container), a copy of the notification is sent to to every pipe that matches the stage location. This results in the following behavior:

  • If a table is fully qualified in the COPY statement in the pipe definition (in the form of db_name.schema_name.table_name or schema_name.table_name), then Snowpipe loads duplicate data into the source table (i.e. the database.schema.table in the COPY statement) for each pipe.

  • If a table is not fully qualified in the pipe definition, then Snowpipe loads the data into the same table (e.g. mytable) in the source and cloned databases/schemas.

Step 3: Configure Security

For each user who will execute continuous data loads using Snowpipe, grant sufficient access control privileges on the objects for the data load (i.e. the target database, schema, and table; the stage object, and the pipe).

Note

To follow the general principle of “least privilege”, we recommend creating a separate user and role to use for ingesting files using a pipe. The user should be created with this role as its default role.

Using Snowpipe requires a role with the following privileges:

Object

Privilege

Notes

Named pipe

OWNERSHIP

Named stage

USAGE , READ

Named file format

USAGE

Optional; only needed if the stage you created in Step 1: Create a Stage (If Needed) references a named file format.

Target database

USAGE

Target schema

USAGE

Target table

INSERT , SELECT

Use the GRANT <privileges> … TO ROLE command to grant privileges to the role.

Note

Only security administrators (i.e. users with the SECURITYADMIN role) or higher, or another role with both the CREATE ROLE privilege on the account and the global MANAGE GRANTS privilege, can create roles and grant privileges.

For example, create a role named snowpipe1 that can access a set of snowpipe_db.public database objects as well as a pipe named mypipe; then, grant the role to a user:

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe1;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe1;

grant usage on schema snowpipe_db.public to role snowpipe1;

grant insert, select on snowpipe_db.public.mytable to role snowpipe1;

grant usage on stage snowpipe_db.public.mystage to role snowpipe1;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe1;

-- Grant the role to a user
grant role snowpipe1 to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe1;

Step 4: Configure Event Notifications

Configure event notifications for your S3 bucket to notify Snowpipe when new data is available to load. The auto-ingest feature relies on SQS queues to deliver event notifications from S3 to Snowpipe.

For ease of use, Snowpipe SQS queues are created and managed by Snowflake. The SHOW PIPES command output displays the Amazon Resource Name (ARN) of your SQS queue.

  1. Execute the SHOW PIPES command:

    SHOW PIPES;
    

    Note the ARN of the SQS queue for the stage in the notification_channel column. Copy the ARN to a convenient location.

    Note

    Following AWS guidelines, Snowflake designates no more than one SQS queue per S3 bucket. This SQS queue may be shared among multiple buckets in the same AWS account. The SQS queue coordinates notifications for all pipes connecting the external stages for the S3 bucket to the target tables. When a data file is uploaded into the bucket, all pipes that match the stage directory path perform a one-time load of the file into their corresponding target tables.

  2. Log into the Amazon S3 console.

  3. Configure an event notification for your S3 bucket using the instructions provided in the AWS S3 documentation. Complete the fields as follows:

    • Name: Name of the event notification (e.g. Auto-ingest Snowflake).

    • Events: Select the ObjectCreate (All) option.

    • Send to: Select SQS Queue from the dropdown list.

    • SQS: Select Add SQS queue ARN from the dropdown list.

    • SQS queue ARN: Paste the SQS queue name from the SHOW PIPES output.

Note

These instructions create a single event notification that monitors activity for the entire S3 bucket. This is the simplest approach. This notification handles all pipes configured at a more granular level in the S3 bucket directory. Snowpipe only loads data files as specified in pipe definitions. Note, however, that a high volume of notifications for activity outside a pipe definition could negatively impact the rate at which Snowpipe filters notifications and takes action.

Alternatively, in the above steps, configure one or more paths and/or file extensions (or prefixes and suffixes, in AWS terminology) to filter event activity. For instructions, see the object key name filtering information in the relevant AWS documentation topic. Repeat these steps for each additional path or file extension you want the notification to monitor.

Note that AWS limits the number of these notification queue configurations to a maximum of 100 per S3 bucket.

Also note that AWS does not allow overlapping queue configurations (across event notifications) for the same S3 bucket. For example, if an existing notification is configured for s3://mybucket/load/path1, then you cannot create another notification at a higher level, such as s3://mybucket/load, or vice-versa.

Snowpipe with auto-ingest is now configured!

When new data files are added to the S3 bucket, the event notification informs Snowpipe to load them into the target table defined in the pipe.

Step 5: Load Historical Files

To load any backlog of data files that existed in the external stage before SQS notifications were configured, execute an ALTER PIPE … REFRESH statement.

Option 2: Configuring Amazon SNS to Automate Snowpipe Using SQS Notifications

This section describes how to trigger Snowpipe data loads automatically using Amazon SQS (Simple Queue Service) notifications for an S3 bucket. The steps explain how to configure Amazon Simple Notification Service (SNS) as a broadcaster to publish event notifications for your S3 bucket to multiple subscribers (e.g. SQS queues or AWS Lambda workloads), including the Snowflake SQS queue for Snowpipe automation.

Note

These instructions assume an event notification exists for the target path in your S3 bucket where your data files are located. If no event notification exists, either:

The following diagram shows the process flow for Snowpipe auto-ingest with AWS SNS:

Snowpipe Auto-ingest Process Flow with AWS SNS
  1. Data files are loaded in a stage.

  2. An S3 event notification published by SNS informs Snowpipe via an SQS queue that files are ready to load. Snowpipe copies the files into a queue.

  3. A Snowflake-provided virtual warehouse loads data from the queued files into the target table based on parameters defined in the specified pipe.

Note

The instructions assume a target table already exists in the Snowflake database where your data will be loaded.

Prerequisite: Create an AWS SNS Topic and Subscription

  1. Create an SNS topic in your AWS account to handle all messages for the Snowflake stage location on your S3 bucket.

  2. Subscribe your target destinations for the S3 event notifications (e.g. other SQS queues or AWS Lambda workloads) to this topic. SNS publishes event notifications for your bucket to all subscribers to the topic.

For instructions, see the SNS documentation.

Step 1: Subscribe the Snowflake SQS Queue to the SNS Topic

  1. Log into the AWS Management Console.

  2. From the home dashboard, choose Simple Notification Service (SNS).

  3. Choose Topics from the left-hand navigation pane.

  4. Locate the topic for your S3 bucket. Note the topic ARN.

  5. Using a Snowflake client, query the SYSTEM$GET_AWS_SNS_IAM_POLICY system function with your SNS topic ARN:

    select system$get_aws_sns_iam_policy('<sns_topic_arn>');
    

    The function returns an IAM policy that grants a Snowflake SQS queue permission to subscribe to the SNS topic.

    For example:

    select system$get_aws_sns_iam_policy('arn:aws:sns:us-west-2:001234567890:s3_mybucket');
    
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | SYSTEM$GET_AWS_SNS_IAM_POLICY('ARN:AWS:SNS:US-WEST-2:001234567890:S3_MYBUCKET')                                                                                                                                                                   |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | {"Version":"2012-10-17","Statement":[{"Sid":"1","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"},"Action":["sns:Subscribe"],"Resource":["arn:aws:sns:us-west-2:001234567890:s3_mybucket"]}]}                 |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    
  6. Return to the AWS Management Console. Choose Topics from the left-hand navigation pane.

  7. Select the checkbox beside the topic for your S3 bucket, and from the Actions menu, click Edit topic policy. Click the Advanced view tab to edit the JSON format of the policy.

  8. Merge the IAM policy addition from the SYSTEM$GET_AWS_SNS_IAM_POLICY function results into the JSON document.

  9. Click the Update policy button.

Step 2: Create a Stage (If Needed)

Create an external S3 stage using the CREATE STAGE command, or you can choose to use an existing external stage (the Snowpipe auto-ingest feature does not currently support internal, i.e. Snowflake, stages, or external Azure stages). Snowpipe fetches your data files from the stage and temporarily queues them before loading them into your target table.

For example, set snowpipe_db.public as the current schema for the current user session, and then create a stage named mystage:

USE SCHEMA snowpipe_db.public;

CREATE STAGE mystage
  URL='s3://mybucket/load/files'
  CREDENTIALS=(AWS_KEY_ID='**************' AWS_SECRET_KEY='*********************');

Step 3: Create a Pipe with Auto-Ingest Enabled

Create a pipe using the CREATE PIPE command. The pipe defines the COPY INTO <table> statement used by Snowpipe to load data from the ingestion queue into the target table. In the COPY statement, identify the SNS topic ARN from Prerequisite: Create an AWS SNS Topic and Subscription.

For example, create a pipe in the snowpipe_db.public schema that loads the data from files staged in the mystage stage into the mytable table.

create or replace pipe snowpipe_db.public.mypipe auto_ingest=true aws_sns_topic='<sns_topic_arn>' as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Where:

AUTO_INGEST = true

Specifies to read event notifications sent from an S3 bucket to an SQS queue when new data is ready to load.

AWS_SNS_TOPIC = <sns_topic_arn>

Specifies the ARN for the SNS topic for your S3 bucket, e.g. arn:aws:sns:us-west-2:001234567890:s3_mybucket in the current example. The CREATE PIPE statement subscribes the Snowflake SQS queue to the specified SNS topic. Note that the pipe will only copy files to the ingest queue triggered by event notifications via the SNS topic.

To remove either parameter from a pipe, it is currently necessary to recreate the pipe using the CREATE OR REPLACE PIPE syntax.

Important

Compare the stage reference in the pipe definition with existing pipes. Verify that the directory paths for the same S3 bucket do not overlap; otherwise, multiple pipes could load the same set of data files multiple times, into one or more target tables. This can happen, for example, when multiple stages reference the same S3 bucket with different levels of granularity, such as s3://mybucket/path1 and s3://mybucket/path1/path2. In this use case, if files are staged in s3://mybucket/path1/path2, the pipes for both stages would load a copy of the files.

This is different from the manual Snowpipe setup (with auto-ingest disabled), which requires users to submit a named set of files to a REST API to queue the files for loading. With auto-ingest enabled, each pipe receives a generated file list from the S3 event notifications. Additional care is required to avoid data duplication.

Step 4: Configure Security

For each user who will execute continuous data loads using Snowpipe, grant sufficient access control privileges on the objects for the data load (i.e. the target database, schema, and table; the stage object, and the pipe).

Note

To follow the general principle of “least privilege”, we recommend creating a separate user and role to use for ingesting files using a pipe. The user should be created with this role as its default role.

Using Snowpipe requires a role with the following privileges:

Object

Privilege

Notes

Named pipe

OWNERSHIP

Named stage

USAGE , READ

Named file format

USAGE

Optional; only needed if the stage you created in Step 2: Create a Stage (If Needed) references a named file format.

Target database

USAGE

Target schema

USAGE

Target table

INSERT , SELECT

Use the GRANT <privileges> … TO ROLE command to grant privileges to the role.

Note

Only security administrators (i.e. users with the SECURITYADMIN role) or higher can create roles.

For example, create a role named snowpipe1 that can access a set of snowpipe_db.public database objects as well as a pipe named mypipe; then, grant the role to a user:

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe1;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe1;

grant usage on schema snowpipe_db.public to role snowpipe1;

grant insert, select on snowpipe_db.public.mytable to role snowpipe1;

grant usage, read on stage snowpipe_db.public.mystage to role snowpipe1;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe1;

-- Grant the role to a user
grant role snowpipe1 to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe1;

Snowpipe with auto-ingest is now configured!

When new data files are added to the S3 bucket, the event notification informs Snowpipe to load them into the target table defined in the pipe.

Step 5: Load Historical Files

To load any backlog of data files that existed in the external stage before SQS notifications were configured, execute an ALTER PIPE … REFRESH statement.

SYSTEM$PIPE_STATUS Output

The SYSTEM$PIPE_STATUS function retrieves a JSON representation of the current status of a pipe.

For pipes with AUTO_INGEST set to TRUE, the function returns a JSON object containing the following name/value pairs (if applicable to the current pipe status):

{“executionState”:”<value>”,”oldestFileTimestamp”:<value>,”pendingFileCount”:<value>,”notificationChannelName”:”<value>”,”numOutstandingMessagesOnChannel”:<value>,”lastReceivedMessageTimestamp”:”<value>”,”lastForwardedMessageTimestamp”:”<value>”,”error”:<value>,”fault”:<value>}

Where:

executionState

Current execution state of the pipe; could be any one of the following:

  • RUNNING (i.e. everything is normal; Snowflake may or may not be actively processing files for this pipe)

  • STOPPED_FEATURE_DISABLED

  • STOPPED_STAGE_DROPPED

  • STOPPED_FILE_FORMAT_DROPPED

  • STOPPED_MISSING_PIPE

  • STOPPED_MISSING_TABLE

  • STALLED_COMPILATION_ERROR

  • STALLED_INITIALIZATION_ERROR

  • STALLED_EXECUTION_ERROR

  • STALLED_INTERNAL_ERROR

  • PAUSED

  • PAUSED_BY_SNOWFLAKE_ADMIN

  • PAUSED_BY_ACCOUNT_ADMIN

oldestFileTimestamp

Earliest timestamp among data files currently queued (if applicable), where the timestamp is set when the file is added to the queue.

pendingFileCount

Number of files currently being processed by the pipe. If the pipe is paused, this value will decrease as any files queued before the pipe was paused are processed. When this value is 0, either there are no files queued for this pipe or the pipe is effectively paused.

notificationChannelName

AWS SQS queue associated with the pipe.

numOutstandingMessagesOnChannel

Number of messages in the SQS queue that have been queued but not received yet.

lastReceivedMessageTimestamp

Timestamp of the last message received from the SQS queue. Note that this message might not apply to the specific pipe, e.g., if the path/prefix associated with the message does not match the path/prefix in the pipe definition. In addition, only messages triggered by created data objects are consumed by auto-ingest pipes.

lastForwardedMessageTimestamp

Timestamp of the last “create object” event message with a matching path/prefix that was forwarded to the pipe.

error

Error message produced when the pipe was last compiled for execution (if applicable); often caused by problems accessing the necessary objects (i.e. table, stage, file format) due to permission problems or dropped objects.

fault

Most recent internal Snowflake process error (if applicable). Used primarily by Snowflake for debugging purposes.