K Knowledge Base
Breadcrumbs

Snowflake (via Collector method) - v3.4.0

About Collectors


Pre-requisites

Collector Server Minimum Requirements

Snowflake Requirements

  • Access to Snowflake (see section below)

Snowflake Access

Create a Snowflake user with read access to following views in the Snowflake database.

  • account_usage.history

  • account_usage.views

  • account_usage.tables

  • account_usage.columns

  • account_usage.copy_history

  • account_usage.grants_to_roles

  • account_usage.grants_to_users

  • account_usage.schemata

  • account_usage.databases

  • account_usage.policy_references

  • account_usage.access_history (If you have Enterprise Edition)

Ability to run

  • SHOW STREAMS IN ACCOUNT

  • SHOW PRIMARY KEYS IN ACCOUNT

To create a user with general access to metadata available in Snowflake Account Usage schema

--Log in with a user that has the permissions to create a role/user

--Create a new role for the Catalog user
Create role CATALOG_READ_ONLY;

--Grant the role access to the Account usage schema
grant imported privileges on database Snowflake to CATALOG_READ_ONLY;
grant select on all tables in schema SNOWFLAKE.ACCOUNT_USAGE to CATALOG_READ_ONLY;
grant monitor on account to role CATALOG_READ_ONLY;

--Create a new user for K and grant it the role (remove the [])
create user [kada_user] password=['abc123!@#'] default_role = CATALOG_READ_ONLY default_warehouse = [warehouse];

To create a user with specific access to metadata in Snowflake Account Usage, you will need to create a new Snowflake database with views that select from the Snowflake database. This is a known Snowflake limitation.

-- create a new database
create database CATALOG_METADATA;

-- create a new schema
create schema CATALOG_METADATA.ACCOUNT_USAGE;

-- account_usage.access_history
create view CATALOG_METADATA.ACCOUNT_USAGE.ACCESS_HISTORY
    as select * from SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY;

-- account_usage.views
create view CATALOG_METADATA.ACCOUNT_USAGE.VIEWS
    as select * from SNOWFLAKE.ACCOUNT_USAGE.VIEWS;

-- account_usage.tables
create view CATALOG_METADATA.ACCOUNT_USAGE.TABLES
    as select * from SNOWFLAKE.ACCOUNT_USAGE.TABLES;

-- account_usage.columns
create view CATALOG_METADATA.ACCOUNT_USAGE.COLUMNS
    as select * from SNOWFLAKE.ACCOUNT_USAGE.COLUMNS;

-- account_usage.copy_history
create view CATALOG_METADATA.ACCOUNT_USAGE.COPY_HISTORY
    as select * from SNOWFLAKE.ACCOUNT_USAGE.COPY_HISTORY;

-- account_usage.grant_to_roles
create view CATALOG_METADATA.ACCOUNT_USAGE.GRANTS_TO_ROLES
    as select * from SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_ROLES;

-- account_usage.grant_to_users
create view CATALOG_METADATA.ACCOUNT_USAGE.GRANTS_TO_USERS
    as select * from SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_USERS;

-- account_usage.schemata
create view CATALOG_METADATA.ACCOUNT_USAGE.SCHEMATA
    as select * from SNOWFLAKE.ACCOUNT_USAGE.SCHEMATA;

-- account_usage.databases
create view CATALOG_METADATA.ACCOUNT_USAGE.DATABASES
    as select * from SNOWFLAKE.ACCOUNT_USAGE.DATABASES;

-- account_usage.policy_references
create view CATALOG_METADATA.ACCOUNT_USAGE.POLICY_REFERENCES
    as select * from SNOWFLAKE.ACCOUNT_USAGE.POLICY_REFERENCES;

-- create a new role
create role CATALOG_READ_ONLY;

-- grant access
grant usage on warehouse [MY_WAREHOUSE] to role CATALOG_READ_ONLY;
grant usage, monitor on database CATALOG_METADATA to role CATALOG_READ_ONLY;
grant usage, monitor on schema CATALOG_METADATA.ACCOUNT_USAGE to role CATALOG_READ_ONLY;
grant select on all views in schema CATALOG_METADATA.ACCOUNT_USAGE to CATALOG_READ_ONLY;
grant select on future views in schema CATALOG_METADATA.ACCOUNT_USAGE to CATALOG_READ_ONLY;

-- create a new Kada user
create user [kada_user] password=['<add password>'] default_role = CATALOG_READ_ONLY default_warehouse = [warehouse];

From the above record down the following to be used for the setup

  1. User name / Password

  2. Role

  3. Warehouse

  4. (If creating a new database for metadata) Database name

  5. Snowflake account (found in the URL of your Snowflake instance - between https:// and .snowflakecomputing.com/…)


Step 1: Create the Source in K

Create a Snowflake source in K

  • Go to Settings, Select Sources and click Add Source

  • Select "Load from File" option

  • Give the source a Name - e.g. Snowflake Production

  • Add the Host name for the Snowflake Server

  • Click Finish Setup


Step 2: Getting Access to the Source Landing Directory


Step 3: Install the Collector

You can download the latest Core Library and Snowflake whl via Platform Settings → SourcesDownload Collectors

pip install kada_collectors_extractors_<version>-none-any.whl
pip install kada_collectors_lib-<version>-none-any.whl

OS

Packages

CentOS

libffi-devel openssl-devel

Ubuntu

libssl-dev libffi-dev


Step 4: Configure the Collector

FIELD

FIELD TYPE

DESCRIPTION

EXAMPLE

account

string

Snowflake account

"abc123.australia-east.azure"

username

string

Username to log into the snowflake account


password

string

Password to log into the snowflake account


information_database

string

Database where all the required tables are located

"snowflake"

role

string

The role to access the required account_usage tables

"accountadmin"

warehouse

string

The warehouse to execute the queries against

"xs_analytics"

databases

list<string>

A list of databases to extract from Snowflake

["dwh", "adw"]

login_timeout

integer

The max amount of time in seconds for connection

5

output_path

string

Absolute path to the output location

"/tmp/output"

mask

boolean

To enable masking or not

true

compress

boolean

To gzip the output or not

true

use_private_key

boolean

To use private key or not

false

private_key

string

The private key value as text

-----BEGIN ENCRYPTED PRIVATE KEY-----\

blah
-----END ENCRYPTED PRIVATE KEY----- |
| host | string | The host value for snowflake that was onboarded in K | "abc123.australia-east.azure.snowflakecomputing.com" |
| enterprise | boolean | Do you have snowflake Enterprise Edition? | false |

kada_snowflake_extractor_config.json

JSON
{
    "account": "",
    "username": "",
    "password": "",
    "information_database": "",
    "role": "",
    "warehouse": "",
    "databases": [],
    "login_timeout": 5,
    "output_path": "/tmp/output",
    "mask": true,
    "compress": true,
    "use_private_key": false,
    "private_key": "",
    "host": "",
    "enterprise": false
}

Step 5: Run the Collector

This is the wrapper script: kada_snowflake_extractor.py

Python
import os
import argparse
from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.snowflake import Extractor

get_generic_logger('root')

_type = 'snowflake'
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, 'kada_{}_extractor_config.json'.format(_type))

parser = argparse.ArgumentParser(description='KADA Snowflake Extractor.')
parser.add_argument('--config', '-c', dest='config', default=filename)
parser.add_argument('--name', '-n', dest='name', default=_type)
args = parser.parse_args()

start_hwm, end_hwm = get_hwm(args.name)

ext = Extractor(**load_config(args.config))
ext.test_connection()
ext.run(**{"start_hwm": start_hwm, "end_hwm": end_hwm})

publish_hwm(args.name, end_hwm)

class Extractor(account: str = None,
    username: str = None,
    password: str = None,
    databases: list = [],
    information_database: str = 'snowflake',
    role: str = 'accountadmin',
    output_path: str = './output',
    warehouse: str = None,
    login_timeout: int = 5,
    mask: bool = False,
    compress: bool = False,
    host: str = None,
    use_private_key: bool = False,
    private_key: str = None,
    enterprise: bool = False,
    ) -> None)

enterprise: Enterprise edition of snowflake


Step 6: Check the Collector Outputs

K Extracts

A set of files (eg metadata, databaselog, linkages, events etc) will be generated in the output_path directory.

High Water Mark File

A high water mark file is created called snowflake_hwm.txt.


Step 7: Push the Extracts to K

Once the files have been validated, you can push the files to the K landing directory.


Example: Using Airflow to orchestrate the Extract and Push to K