Option 2: Automating Snowpipe with AWS Lambda

AWS Lambda is a compute service that runs when triggered by an event and executes code that has been loaded into the system. You can adapt the sample Python code provided in this topic and create a Lambda function that calls the Snowpipe REST API to load data from your external stage (i.e. S3 bucket; Azure containers are not supported). The function is deployed to your AWS account, where it is hosted. Events you define in Lambda (e.g. when files in your S3 bucket are updated) invoke the Lambda function and run the Python code.

This topic describes the steps necessary to configure a Lambda function to automatically load data in micro-batches continuously using Snowpipe.

Note

This topic assumes you have configured Snowpipe using the instructions in Preparing to Load Data Using the Snowpipe REST API.

Attention

Automated Snowpipe relies on Amazon S3 event notifications to determine when new files arrive in monitored S3 buckets and are ready to load. Currently, AWS cannot guarantee reliable delivery of event notifications to consumers such as a Lambda function. Therefore, until AWS can provide reliability guarantees for S3 event notifications, Snowflake cannot guarantee loading of all files staged to monitored S3 buckets.

As a workaround, we suggest following our best practices for staging your data files and intermittently executing an ALTER PIPE … REFRESH statement to load any missed files. For satisfactory performance, we also recommend using a selective path prefix with ALTER PIPE to reduce the number of files that need to be listed and checked if they have been loaded already (e.g. bucket_name/YYYY/MM/DD/ or even bucket_name/YYYY/MM/DD/HH/ depending on your volume).

Note that calling Snowpipe REST endpoints to load data is not subject to this limitation.

In this Topic:

Step 1: Write Python Code Invoking the Snowpipe REST API

Sample Python code

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile

import os
import sys
import uuid

# Assume the public key has been registered in Snowflake
# private key in PEM format
private_key="""
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----
"""

# Proxy object that abstracts the Snowpipe REST API
ingest_manager = SimpleIngestManager(account='<account_name>',
                   host='<account_name>.<region_id>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)

Note

The sample code does not account for error handling. For example, it does not retry failed ingest_manager calls.

Before using the sample code, make the following changes:

  1. Update the security parameter:

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    Specifies the content of the private key file you created in Using Key Pair Authentication (in Preparing to Load Data Using the Snowpipe REST API).

  2. Update the session parameters:

    account='<account_name>'

    Specifies the name of your account (provided by Snowflake).

    host='<account_name>.<region_id>.snowflakecomputing.com'

    Specifies your host information in the form of a URL. Note that the format of the URL is different depending on the Snowflake Region where your account is located:

    US West

    <account_name>.snowflakecomputing.com

    Other regions

    <account_name>.<region_id>.snowflakecomputing.com

    Where <account_name> is the name of your account (provided by Snowflake) and <region_id> is:

    Region

    Region ID

    Notes

    Amazon Web Services (AWS)

    US West

    us-west-2

    Required only when configuring AWS PrivateLink for accounts in US West.

    US East

    us-east-1

    Canada

    ca-central-1

    EU (Dublin)

    eu-west-1

    EU (Frankfurt)

    eu-central-1

    Asia Pacific (Singapore)

    ap-southeast-1

    Asia Pacific (Sydney)

    ap-southeast-2

    Microsoft Azure

    East US 2

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Available only for accounts on Snowflake ESD (or higher).

    West Europe

    west-europe.azure

    Australia East

    australia-east.azure

    user='<user_login_name>'

    Specifies the login name of the Snowflake user that will run the Snowpipe code.

    pipe='<db_name>.<schema_name>.<pipe_name>'

    Specifies the fully-qualified name of the pipe to use to load the data, in the form of <db_name>.<schema_name>.<pipe_name>.

  3. Specify the path to your files to import in the file objects list:

    staged_file_list = []

    The path you specify must be relative to the stage where the files are located. Include the complete name for each file, including the file extension. For example, a CSV file that is gzip-compressed might have the extension .csv.gz.

  4. Save the file in a convenient location.

The remaining instructions in this topic assume the file name to be SnowpipeLamdbaCode.py.

Step 2: Create a Lambda Function Deployment Package

Complete the following instructions to build a Python runtime environment for Lambda and add the Snowpipe code you adapted in Step 1: Write Python Code Invoking the Snowpipe REST API (in this topic). For more information about these steps, see the AWS Lambda deployment package documentation (see the instructions for Python):

  1. Create an AWS EC2 Linux instance by completing the AWS EC2 instructions. This instance will provide the compute resources to run the Snowpipe code.

  2. Copy the Snowpipe code file to your new AWS EC2 instance using SCP (Secure Copy):

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    

    Where:

    • <path> is the path to your local SnowpipeLambdaCode.py file.

    • <machine>.<region_id> is the DNS name of the EC2 instance (e.g. ec2-54-244-54-199.us-west-2.compute.amazonaws.com).

      The DNS name is displayed on the Instances screen in the Amazon EC2 console.

  3. Connect to the EC2 instance using SSH (Secure SHell):

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
  4. Install Python and related libraries on the EC2 instance:

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
  5. Create the .zip deployment package (Snowpipe.zip):

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    

Step 3: Create an AWS IAM Role for Lambda

Follow the AWS Lambda documentation to create an IAM role to execute the Lambda function.

Record the IAM Amazon Resource Name (ARN) for the role. You will use it in the next step.

Step 4: Create the Lambda Function

Create the Lambda function by uploading the .zip deployment package you created in Step 2: Create a Lambda Function Deployment Package (in this topic):

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024

For --role, specify the role ARN you recorded in Step 3: Create an AWS IAM Role for Lambda (in this topic).

Record the ARN for the new function from the output. You will use it in the next step.

Step 5: Allow Calls to the Lambda Function

Grant S3 the permissions required to invoke your function.

For --source-arn, specify the function ARN you recorded in Step 4: Create the Lambda Function (in this topic).

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser

Step 6: Register the Lambda Notification Event

Register a Lambda notification event by completing the AWS S3 Event Notifications instructions. In the input field, specify the function ARN you recorded in Step 4: Create the Lambda Function (in this topic).