CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name> [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = <string> ] [ INTEGRATION = '<string>' ] [ COMMENT = '<string_literal>' ] AS <copy_statement>
Identifier for the pipe; must be unique for the schema in which the pipe is created.
The identifier must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g.
"My object"). Identifiers enclosed in double quotes are also case-sensitive.
For more details, see Identifier Requirements.
AUTO_INGEST = TRUE | FALSE
Specifies whether to automatically load data files from the specified external stage and optional path when event notifications are received from a configured message service.
TRUEenables automatic data loading.
FALSEdisables automatic data loading. You must make calls to the Snowpipe REST API endpoints to load data files.
AWS_SNS_TOPIC = string
Required only when configuring AUTO_INGEST for Amazon S3 stages using Amazon Simple Notification Service (SNS). Specifies the Amazon Resource Name (ARN) for the SNS topic for your S3 bucket. The CREATE PIPE statement subscribes the Amazon Simple Queue Service (SQS) queue to the specified SNS topic. The pipe copies files to the ingest queue triggered by event notifications via the SNS topic. For more information, see Automating Snowpipe for Amazon S3.
INTEGRATION = 'string'
Required only when configuring AUTO_INGEST for Microsoft Azure stages. Specifies the existing notification integration used to access an Azure storage queue. For more information, see Automating Snowpipe for Azure Blob Storage.
COMMENT = 'string_literal'
Specifies a comment for the pipe.
Default: No value
Pipes support any internal stage (i.e. Snowflake named stage or table stage, but not user stage) or external stage (Amazon S3, Google Cloud Storage, or Microsoft Azure) when calling the public Snowpipe REST endpoints to load data.
All COPY options are supported except for the following:
FILES = ( 'file_name1' [ , 'file_name2', ... ] )
PATTERN = 'regex_pattern'
ON_ERROR = ABORT_STATEMENT
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE | CASE_INSENSITIVE | NONE
SIZE_LIMIT = num
PURGE = TRUE | FALSE(i.e. automatic purging while loading)
FORCE = TRUE | FALSE
Note that you can manually remove files from an internal (i.e. Snowflake) stage (after they’ve been loaded) using the REMOVE command.
RETURN_FAILED_ONLY = TRUE | FALSE
VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
Using a query as the source for the COPY statement for column reordering, column omission, and casts (i.e. transforming data during a load) is supported. For usage examples, see Transforming Data During a Load. Note that only simple SELECT statements are supported. Filtering using a WHERE clause is not supported.
Cloning a database or schema clones all contained pipes that reference external (Amazon S3, Google Cloud Storage, or Microsoft Azure) stages. This results in the following behavior:
If a table is fully qualified in the COPY statement in the pipe definition (in the form of
schema_name.table_name), then calls to the
insertFilesREST endpoint result in duplicate data getting loaded into the source table (i.e. the
database.schema.tablein the COPY statement) for each pipe.
If a table is not fully qualified in the pipe definition, then calls to the
insertFilesREST endpoint result in the data getting loaded into the same table (e.g.
mytable) in the source and cloned databases/schemas.
Pipe definitions are not dynamic (i.e. a pipe is not automatically updated if the underlying stage or table changes, such as renaming or dropping the stage/table). Instead, you must create a new pipe and submit this pipe name in future Snowpipe REST API calls.
Create a pipe in the current schema that loads all the data from files staged in the
mystage stage into
create pipe mypipe as copy into mytable from @mystage;
Same as the previous example, but with a data transformation. Only load data from the 4th and 5th columns in the staged files, in reverse order:
create pipe mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);
Create a pipe in the current schema for automatic loading of data using event notifications received from a messaging service:
Amazon S3create or replace pipe mypipe_s3 auto_ingest = true aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');
Microsoft Azurecreate or replace pipe mypipe_azure auto_ingest = true integration = 'myint' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');