Categories:
Data Loading / Unloading DDL

CREATE PIPE

Creates a new pipe in the system for defining the COPY INTO <table> statement used by Snowpipe to load data from an ingestion queue into tables.

See also:
ALTER PIPE , DESCRIBE PIPE , DROP PIPE , SHOW PIPES

Syntax

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
  [ COMMENT = '<string_literal>' ]
  AS <copy_statement>

Required Parameters

name

Identifier for the pipe; must be unique for the schema in which the pipe is created.

The identifier must start with an alphabetic character and cannot contain spaces or special characters unless the entire identifier string is enclosed in double quotes (e.g. "My object"). Identifiers enclosed in double quotes are also case-sensitive.

For more details, see Identifier Syntax.

copy_statement
COPY INTO <table> statement used to load data from queued files into a Snowflake table. This statement serves as the text/definition for the pipe and is displayed in the SHOW PIPES output.

Optional Parameters

COMMENT = 'string_literal'

Specifies a comment for the pipe.

Default: No value

Usage Notes

  • A pipe supports any internal stage (i.e. Snowflake named stage or table stage, but not user stage) or external stage (i.e. AWS S3 or Microsoft Azure) when calling the public Snowpipe REST endpoints to load data.

  • All COPY options are supported except for the following:

    • FILES = ( 'file_name1' [ , 'file_name2', ... ] )

    • PATTERN = 'regex_pattern'

    • ON_ERROR = ABORT_STATEMENT

    • SIZE_LIMIT = num

    • PURGE = TRUE | FALSE (i.e. automatic purging while loading)

      Note that you can manually remove files from an internal (i.e. Snowflake) stage (after they’ve been loaded) using the REMOVE command.

    • RETURN_FAILED_ONLY = TRUE | FALSE

    • VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS

  • Using a query as the source for the COPY statement (i.e. transforming data during a load) is supported. For usage examples, see Transforming Data During a Load.

  • Cloning a database or schema clones all objects, including pipes, in the source database or schema. This results in the following behavior:

    • If a table is fully qualified in the COPY statement in the pipe definition (in the form of db_name.schema_name.table_name or schema_name.table_name), then calls to the insertFiles REST endpoint result in duplicate data getting loaded into the source table (i.e. the database.schema.table in the COPY statement) for each pipe.
    • If a table is not fully qualified in the pipe definition, then calls to the insertFiles REST endpoint result in the data getting loaded into the same table (e.g. mytable) in the source and cloned databases/schemas.
  • Pipe definitions are not dynamic (i.e. a pipe is not automatically updated if the underlying stage or table changes, such as renaming or dropping the stage/table). Instead, you must create a new pipe and submit this pipe name in future Snowpipe REST API calls.

Examples

Create a pipe in the current schema that loads all the data from files staged in the mystage stage into mytable:

create pipe if not exists mypipe as copy into mytable from @mystage;

SHOW PIPES;

+-------------------------------+---------+---------------+-------------+--------------------------------------------------------------+----------+---------+
| created_on                    | name    | database_name | schema_name | definition                                                   | owner    | comment |
|-------------------------------+---------+---------------+-------------+--------------------------------------------------------------+----------+---------|
| 2017-08-24 06:36:24.339 -0700 | MYPIPE  | MYDATABASE    | PUBLIC      | copy into mytable from @mystage                              | SYSADMIN |         |
+-------------------------------+---------+---------------+-------------+--------------------------------------------------------------+----------+---------+

Same as the previous example, but with a data transformation. Only load data from the 4th and 5th columns in the staged files, in reverse order:

create pipe if not exists mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);

SHOW PIPES;

+-------------------------------+---------+---------------+-------------+--------------------------------------------------------------+----------+---------+
| created_on                    | name    | database_name | schema_name | definition                                                   | owner    | comment |
|-------------------------------+---------+---------------+-------------+--------------------------------------------------------------+----------+---------|
| 2017-08-24 06:36:24.339 -0700 | MYPIPE  | MYDATABASE    | PUBLIC      | copy into mytable from @mystage                              | SYSADMIN |         |
| 2017-08-24 06:42:40.550 -0700 | MYPIPE2 | MYDATABASE    | PUBLIC      | copy into mytable(C1, C2) from (select $5, $4 from @mystage) | SYSADMIN |         |
+-------------------------------+---------+---------------+-------------+--------------------------------------------------------------+----------+---------+