Automating Snowpipe for Azure Blob Storage

This topic provides instructions for triggering Snowpipe data loads automatically using Microsoft Azure Event Grid messages for Blob storage events. The instructions explain how to create an event message for the target path in Blob storage where your data files are stored.

Note

  • This feature is limited to Snowflake accounts hosted on Microsoft Azure.

  • This feature is limited to Azure Blob storage accounts and General Purpose v2 (GPv2) storage accounts.

In this Topic:

Limitations of Automated Snowpipe Using Azure Event Grid

  • Snowpipe with auto-ingest enabled uses Event Grid messages to determine when new files arrive in monitored Azure containers and are ready to load. Event Grid messages contain the event and a list of the file names. They do not include the actual data in the files.

Configuring Automated Snowpipe Using Azure Event Grid

Note

The instructions in this topic assume a target table already exists in the Snowflake database where your data will be loaded.

Step 1: Configure the Event Grid Subscription

This section describes how to set up an Event Grid subscription for Azure Storage events using the Azure CLI. For more information about the steps described in this section, see the following articles in the Azure documentation:

Creating a Resource Group

An Event Grid topic provides an endpoint where the source (i.e. Azure Storage) sends events. A topic is used for a collection of related events. Event Grid topics are Azure resources, and must be placed in an Azure resource group.

Execute the following command to create a resource group:

az group create --name <resource_group_name> --location <location>

Where:

  • resource_group_name is the name of the new resource group.

  • location is the location, or region in Snowflake terminology, of your Azure Storage account.

Enabling the Event Grid Resource Provider

Execute the following command to register the Event Grid resource provider. Note that this step is only required if you have not previously used Event Grid with your Azure account:

az provider register --namespace Microsoft.EventGrid
az provider show --namespace Microsoft.EventGrid --query "registrationState"

Creating a Storage Account for Data Files

Execute the following command to create a storage account to store your data files. This account must be either a Blob storage (i.e. BlobStorage kind) or GPv2 (i.e. StorageV2 kind) account, because only these two account types support event messages.

Note

If you already have a Blob storage or GPv2 account, you can use that account instead.

For example, create a Blob storage account:

az storage account create --resource-group <resource_group_name> --name <storage_account_name> --sku Standard_LRS --location <location> --kind BlobStorage --access-tier Hot

Where:

  • resource_group_name is the name of the resource group you created in Creating a Resource Group.

  • storage_account_name is the name of the new storage account.

  • location is the location of your Azure Storage account.

Creating a Storage Account for the Storage Queue

Execute the following command to create a storage account to host your storage queue. This account must be a GPv2 account, because only this kind of account supports event messages to a storage queue.

Note

If you already have a GPv2 account, you can use that account to host both your data files and your storage queue.

For example, create a GPv2 account:

az storage account create --resource-group <resource_group_name> --name <storage_account_name> --sku Standard_LRS --location <location> --kind StorageV2

Where:

  • resource_group_name is the name of the resource group you created in Creating a Resource Group.

  • storage_account_name is the name of the new storage account.

  • location is the location of your Azure Storage account.

Creating a Storage Queue

A single storage queue can collect the event messages for many Event Grid subscriptions. For best performance, Snowflake recommends creating a single storage queue to accommodate all of your subscriptions related to Snowflake. Snowpipe routes messages to the correct pipe even when the messages are relayed through the same storage queue.

Execute the following command to create a storage queue. A storage queue stores a set of messages, in this case event messages from Event Grid:

az storage queue create --name <storage_queue_name> --account-name <storage_account_name>

Where:

Exporting the Storage Account and Queue IDs for Reference

Execute the following commands to set environment variables for the storage account and queue IDs that will be requested later in these instructions:

  • Linux or macOS:

    export storageid=$(az storage account show --name <data_storage_account_name> --resource-group <resource_group_name> --query id --output tsv)
    export queuestorageid=$(az storage account show --name <queue_storage_account_name> --resource-group <resource_group_name> --query id --output tsv)
    export queueid="$queuestorageid/queueservices/default/queues/<storage_queue_name>"
    
  • Windows:

    set storageid=$(az storage account show --name <data_storage_account_name> --resource-group <resource_group_name> --query id --output tsv)
    set queuestorageid=$(az storage account show --name <queue_storage_account_name> --resource-group <resource_group_name> --query id --output tsv)
    set queueid="%queuestorageid%/queueservices/default/queues/<storage_queue_name>"
    

Where:

Installing the Event Grid Extension

Execute the following command to install the Event Grid extension for Azure CLI:

az extension add --name eventgrid

Creating the Event Grid Subscription

Execute the following command to create the Event Grid subscription. Subscribing to a topic informs Event Grid which events to track:

  • Linux or macOS:

    az eventgrid event-subscription create \
    --source-resource-id $storageid \
    --name <subscription_name> --endpoint-type storagequeue \
    --endpoint $queueid
    
  • Windows:

    az eventgrid event-subscription create \
    --source-resource-id %storageid% \
    --name <subscription_name> --endpoint-type storagequeue \
    --endpoint %queueid%
    

Where:

Step 2: Create an Integration in Snowflake

Retrieving the Storage Queue URL and Tenant ID

  1. Log into the Microsoft Azure portal.

  2. Navigate to Storage account » Queue service » Queues. Record the URL for the queue you created in Creating a Storage Queue for reference later. The URL has the following format:

    https://<storage_account_name>.queue.core.windows.net/<storage_queue_name>
    
  3. Navigate to Azure Active Directory » Properties. Record the Directory ID value for reference later. The directory ID, or tenant ID, is needed to generate the consent URL that grants Snowflake access to the Event Grid subscription.

Creating the Integration

Create an integration using the CREATE NOTIFICATION INTEGRATION command. An integration is a Snowflake object that references the Azure storage queue you created.

Note

Only account administrators (users with the ACCOUNTADMIN role) can execute this SQL command.

CREATE NOTIFICATION INTEGRATION <integration_name>
  ENABLED = true
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
  AZURE_STORAGE_QUEUE_PRIMARY_URI = '<queue_URL>'
  AZURE_TENANT_ID = '<directory_ID>';

Where:

Granting Snowflake Access to the Storage Queue

  1. Execute the DESCRIBE INTEGRATION command to retrieve the consent URL:

    DESC NOTIFICATION INTEGRATION <integration_name>;
    

    Where:

  2. Note the URL in the AZURE_CONSENT_URL column, which has the following format:

    https://login.microsoftonline.com/<tenant_id>/oauth2/authorize?client_id=<snowflake_application_id>
    
  3. Navigate to the URL in a web browser. The page displays a Microsoft permissions request page.

  4. Click the Accept button to register Snowflake in Active Directory.

  5. Log into the Microsoft Azure portal.

  6. Navigate to Azure Active Directory » App registrations. Verify the Snowflake application is listed.

  7. Navigate to Queues » storage_queue_name, where storage_queue_name is the name of the storage queue you created in Creating a Storage Queue.

  8. Click Access Control (IAM) » Add role assignment.

  9. Search for the Snowflake application.

  10. Grant the Snowflake app the following permissions:

    • Role: Storage Queue Data Contributor (Preview)

    • Assign access to: Azure AD user, group, or service principle

    • Select: snowflake_application_id

    The Snowflake application should now be listed under Storage Queue Data Contributor (on the same dialog).

Step 3: Create a Stage (If Needed)

Create an external (i.e. Azure) stage using the CREATE STAGE command, or you can choose to use an existing Azure stage. Snowpipe fetches your data files from the stage and temporarily queues them before loading them into your target table.

Note

The URL in the stage definition must point to the Azure Blob storage account that you created in Creating a Storage Account for Data Files.

For example, set snowpipe_db.public as the current schema for the current user session, and then create a stage named mystage:

USE SCHEMA snowpipe_db.public;

CREATE OR REPLACE STAGE mystage
  URL = 'azure://myaccount.blob.core.windows.net/mycontainer/files/'
  CREDENTIALS = (AZURE_SAS_TOKEN='**************');

Step 4: Create a Pipe with Auto-Ingest Enabled

Create a pipe using the CREATE PIPE command. The pipe defines the COPY INTO <table> statement used by Snowpipe to load data from the ingestion queue into the target table.

For example, create a pipe in the snowpipe_db.public schema that loads the data from files staged in the mystage stage into the mytable table:

create or replace pipe snowpipe_db.public.mypipe
  auto_ingest = true
  integration = '<integration_name>'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');

Where:

Important

  • The integration name must be typed in all uppercase.

  • Compare the stage reference in the pipe definition with existing pipes. Verify that the directory paths for the same Azure container do not overlap; otherwise, multiple pipes could load the same set of data files multiple times, into one or more target tables. This can happen, for example, when multiple stages reference the same Azure container with different levels of granularity, such as azure://myaccount.blob.core.windows.net/mycontainer/path1 and azure://myaccount.blob.core.windows.net/mycontainer/path1/path2. In this use case, if files are staged in azure://myaccount.blob.core.windows.net/mycontainer/path1/path2, the pipes for both stages would load a copy of the files.

    This is different from the manual Snowpipe setup (with auto-ingest disabled), which requires users to submit a named set of files to a REST API to queue the files for loading. With auto-ingest enabled, each pipe receives a generated file list from the Event Grid messages. Additional care is required to avoid data duplication.

Note

Cloning a database or schema clones all objects, including pipes, in the source database or schema. When a data file is created in a stage location (e.g. Blob storage container), a copy of the notification is sent to to every pipe that matches the stage location. This results in the following behavior:

  • If a table is fully qualified in the COPY statement in the pipe definition (in the form of db_name.schema_name.table_name or schema_name.table_name), then Snowpipe loads duplicate data into the source table (i.e. the database.schema.table in the COPY statement) for each pipe.

  • If a table is not fully qualified in the pipe definition, then Snowpipe loads the data into the same table (e.g. mytable) in the source and cloned databases/schemas.

Snowpipe with auto-ingest is now configured!

When new data files are added to the Azure container, the event message informs Snowpipe to load them into the target table defined in the pipe.

Step 5: Load Historical Files

To load any backlog of data files that existed in the external stage before Event Grid messages were configured, execute an ALTER PIPE … REFRESH statement.

SYSTEM$PIPE_STATUS Output

The SYSTEM$PIPE_STATUS function retrieves a JSON representation of the current status of a pipe.

For pipes with AUTO_INGEST set to TRUE, the function returns a JSON object containing the following name/value pairs (if applicable to the current pipe status):

{“executionState”:”<value>”,”oldestFileTimestamp”:<value>,”pendingFileCount”:<value>,”notificationChannelName”:”<value>”,”numOutstandingMessagesOnChannel”:<value>,”lastReceivedMessageTimestamp”:”<value>”,”lastForwardedMessageTimestamp”:”<value>”,”error”:<value>,”fault”:<value>}

Where:

executionState

Current execution state of the pipe; could be any one of the following:

  • RUNNING (i.e. everything is normal; Snowflake may or may not be actively processing files for this pipe)

  • STOPPED_FEATURE_DISABLED

  • STOPPED_STAGE_DROPPED

  • STOPPED_FILE_FORMAT_DROPPED

  • STOPPED_MISSING_PIPE

  • STOPPED_MISSING_TABLE

  • STALLED_COMPILATION_ERROR

  • STALLED_INITIALIZATION_ERROR

  • STALLED_EXECUTION_ERROR

  • STALLED_INTERNAL_ERROR

  • PAUSED

  • PAUSED_BY_SNOWFLAKE_ADMIN

  • PAUSED_BY_ACCOUNT_ADMIN

oldestFileTimestamp

Earliest timestamp among data files currently queued (if applicable), where the timestamp is set when the file is added to the queue.

pendingFileCount

Number of files currently being processed by the pipe. If the pipe is paused, this value will decrease as any files queued before the pipe was paused are processed. When this value is 0, either there are no files queued for this pipe or the pipe is effectively paused.

notificationChannelName

Azure storage queue associated with the pipe.

numOutstandingMessagesOnChannel

Number of messages in the storage queue that have been queued but not received yet.

lastReceivedMessageTimestamp

Timestamp of the last message received from the storage queue. Note that this message might not apply to the specific pipe, e.g., if the path/prefix associated with the message does not match the path/prefix in the pipe definition. In addition, only messages triggered by created data objects are consumed by auto-ingest pipes.

lastForwardedMessageTimestamp

Timestamp of the last “create object” event message with a matching path/prefix that was forwarded to the pipe.

error

Error message produced when the pipe was last compiled for execution (if applicable); often caused by problems accessing the necessary objects (i.e. table, stage, file format) due to permission problems or dropped objects.

fault

Most recent internal Snowflake process error (if applicable). Used primarily by Snowflake for debugging purposes.

Integration DDL

To support creating and managing integrations, Snowflake provides the following set of special DDL commands: