About Collectors
Collectors are extractors that are developed and managed by you (a customer of K).
KADA provides python libraries that customers can use to quickly deploy a Collector.
Why you should use a Collector
There are several reasons why you may use a collector vs the direct connect extractor:
-
You are using the KADA SaaS offering and it cannot connect to your sources due to firewall restrictions
-
You want to push metadata to KADA rather than allow it to pull data for security reasons
-
You want to inspect the metadata before pushing it to K
Using a collector requires you to manage:
-
Deploying and orchestrating the extract code
-
Managing a high water mark so the extract only pulls the latest metadata
-
Storing and pushing the extracts to your K instance
Pre-requisites
Collector Server Minimum Requirements
For the collector to operate effectively, it will need to be deployed on a server with the below minimum specifications:
-
CPU: 2 vCPU
-
Memory: 8GB
-
Storage: 30GB (depends on historical data extracted)
-
OS: unix distro e.g. RHEL preferred but can also work with Windows Server
-
Python 3.10.x or later
-
Access to K landing directory
Glue Requirements
-
Access to Glue
Step 1: Establish Glue Access
It is advised you create a new Role for the service user provided to KADA and have a policy that allows the below, see Identity and access management in Glue - Amazon Glue
The service user/account/role will require permissions to the following
-
Ability to GET and LIST s3 resources that you want the user to have access to if required.
-
Call the following Glue APIs
-
get_tables
-
get_databases
-
Example Role Policy to allow Glue Access with least privileges for actions. Note the YOUR-REGION and AWS-ACCOUNT-ID.
Note this is a Cloudformation Template and is a YAML not JSON file
AWSTemplateFormatVersion: "2010-09-09"
Description: 'AWS IAM Role - Glue Access to KADA'
Resources:
KadaGlueRole:
Type: "AWS::IAM::Role"
Properties:
RoleName: "KadaGlueRole"
MaxSessionDuration: 43200
Path: "/"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
AWS: "[ACCOUNT ARN]"
Action: "sts:AssumeRole"
KadaGluePolicy:
Type: 'AWS::IAM::Policy'
Properties:
PolicyName: root
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- glue:GetTables
- glue:GetDatabases
Resource:
- 'arn:aws:glue:YOUR-REGION:AWS-ACCOUNT-ID:catalog'
- 'arn:aws:glue:your-region:your-account-id:database/*'
- 'arn:aws:glue:YOUR-REGION:AWS-ACCOUNT-ID:table/*/*'
Roles:
- !Ref KadaGlueRole
Alternatively you may wish to just create the Policy using this example JSON
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "KadaGluePolicy",
"Effect": "Allow",
"Action": [
"glue:GetDatabases",
"glue:GetTables"
],
"Resource": [
"arn:aws:glue:YOUR-REGION:AWS-ACCOUNT-ID:catalog",
"arn:aws:glue:YOUR-REGION:AWS-ACCOUNT-ID:database/*",
"arn:aws:glue:YOUR-REGION:AWS-ACCOUNT-ID:table/*/*"
]
}
]
}
Step 1 Optional: Creating Glue Crawlers over S3
See https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html for more details.
You may also wish to set up a crawler over your s3 data to ingest into Glue.
-
The crawler will need an IAM role with the direct policies attached
-
AWSGlueServiceRole
-
AWSS3FullAccessRole
-
-
Create a Database in the Glue Console
-
In the left navigation pane, choose "Databases"
-
Click "Add database" and provide a name for the database
-
-
Create a s3 Crawler
-
In the left navigation pane, choose "Crawlers"
-
Click "Add crawler" and provide a name for the crawler
-
Choose "Data stores" and select "S3" as the data store type
-
Specify the S3 path to the bucket you want to crawl
-
Choose "Next" and select the IAM role you created earlier
-
Choose "Next" and select the Database created in Step 2.
-
Configure other settings like frequency etc.
-
Choose "Next" to review settings then "Finish"
-
-
Run the Crawler
-
Select the Crawler you created and click "Run Crawler" and wait for completion
-
Step 2: Create the Source in K
Create an Athena source in K
-
Go to Settings, Select Sources and click Add Source
-
Select "Load from File system" option
-
Give the source a Name - e.g. Glue Production
-
Add the Host name for the Athena Server, recommended to use the convention [AWS ACCOUNT ID]_glue e.g. 3255667_glue
-
Click Finish Setup
Step 3: Getting Access to the Source Landing Directory
[
When using a Collector you will push metadata to a K landing directory.
To find your landing directory you will need to:
-
Go to Platform Settings - Settings. Note down the value of this setting:
-
If using Azure: storage_azure_storage_account
-
If using AWS:
-
storage_root_folder - the AWS s3 bucket
-
storage_aws_region - the region where the AWS s3 bucket is hosted
-
-
-
Go to Sources - Edit the Source you have configured. Note down the landing directory in the About this Source section.
To connect to the landing directory you will need:
-
If using Azure: a SAS token to push data to the landing directory. Request this from KADA Support (support@kada.ai)
-
If using AWS:
-
An Access key and Secret. Request this from KADA Support (support@kada.ai)
-
OR provide your IAM role to KADA Support to provision access.
-
Step 4: Install the Collector
You can download the Latest Core Library and Athena whl via Platform Settings → Sources → Download Collectors
Run the following command to install the collector
pip install kada_collectors_extractors_<version>-none-any.whl
You will also need to install the common library kada_collectors_lib for this collector to function properly.
pip install kada_collectors_lib-<version>-none-any.whl
Under the covers this uses boto3 and may have OS dependencies see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html
Step 5: Configure the Collector
|
FIELD |
FIELD TYPE |
DESCRIPTION |
EXAMPLE |
|---|---|---|---|
|
key |
string |
Key for the AWS user |
"xcvsdsdfsdf" |
|
secret |
string |
Secret for the AWS user |
"sgsdfdsfg" |
|
server |
string |
This is the host that was onboarded in K for Glue |
"43234234_glue" |
|
regions |
string |
A list of regions in which you have Glue set up |
["ap-southeast-2"] |
|
catalogId |
string |
This is generally your AWS Account Id |
"43234234" |
|
role |
string |
If your access requires role assumption, place the full arn value here |
"" |
|
output_path |
string |
Absolute path to the output location |
"/tmp/output" |
|
mask |
boolean |
To enable masking or not |
true |
|
compress |
boolean |
To gzip the output or not |
true |
kada_glue_extractor_config.json
{
"key": "",
"secret": "",
"server": "43234234_glue",
"regions": ["ap-southeast-2"],
"catalogId": "43234234",
"role": "",
"output_path": "/tmp/output",
"mask": true,
"compress": true,
"meta_only": true
}
Step 6: Run the Collector
This is the wrapper script: kada_glue_extractor.py
import os
import argparse
from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.glue import Extractor
get_generic_logger('root')
_type = 'glue'
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, 'kada_{}_extractor_config.json'.format(_type))
parser = argparse.ArgumentParser(description='KADA Glue Extractor.')
parser.add_argument('--config', '-c', dest='config', default=filename)
parser.add_argument('--name', '-n', dest='name', default=_type)
args = parser.parse_args()
start_hwm, end_hwm = get_hwm(args.name)
ext = Extractor(**load_config(args.config))
ext.test_connection()
ext.run(**{"start_hwm": start_hwm, "end_hwm": end_hwm})
publish_hwm(args.name, end_hwm)
Step 7: Check the Collector Outputs
K Extracts
A set of files (eg metadata, databaselog, linkages, events etc) will be generated in the output_path directory.
High Water Mark File
A high water mark file is created called glue_hwm.txt.
Refer to Collector Integration General Notes for more information.
Step 8: Push the Extracts to K
Once the files have been validated, you can push the files to the K landing directory.
Example: Using Airflow to orchestrate the Extract and Push to K
The following example is how you can orchestrate the Tableau collector using Airflow and push the files to K hosted on Azure. The code is not expected to be used as-is but as a template for your own DAG.
# built-in
import os
# Installed
from airflow.operators.python_operator import PythonOperator
from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from plugins.utils.azure_blob_storage import AzureBlobStorage
from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.tableau import Extractor
# To be configured by the customer.
# Note variables may change if using a different object store.
KADA_SAS_TOKEN = os.getenv("KADA_SAS_TOKEN")
KADA_CONTAINER = ""
KADA_STORAGE_ACCOUNT = ""
KADA_LANDING_PATH = "lz/tableau/landing"
KADA_EXTRACTOR_CONFIG = {
"server_address": "http://tabserver",
"username": "user",
"password": "password",
"sites": [],
"db_host": "tabserver",
"db_username": "repo_user",
"db_password": "repo_password",
"db_port": 8060,
"db_name": "workgroup",
"meta_only": False,
"retries": 5,
"dry_run": False,
"output_path": "/set/to/output/path",
"mask": True,
"mapping": {}
}
# To be implemented by the customer.
# Upload to your landing zone storage.
# Change '.csv' to '.csv.gz' if you set compress = true in the config
def upload():
output = KADA_EXTRACTOR_CONFIG['output_path']
for filename in os.listdir(output):
if filename.endswith('.csv'):
file_to_upload_path = os.path.join(output, filename)
AzureBlobStorage.upload_file_sas_token(
client=KADA_SAS_TOKEN,
storage_account=KADA_STORAGE_ACCOUNT,
container=KADA_CONTAINER,
blob=f'{KADA_LANDING_PATH}/{filename}',
local_path=file_to_upload_path
)
with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
# To be implemented by the customer.
# Retrieve the timestamp from the prior run
start_hwm = 'YYYY-MM-DD HH:mm:SS'
end_hwm = 'YYYY-MM-DD HH:mm:SS' # timestamp now
ext = Extractor(**KADA_EXTRACTOR_CONFIG)
start = DummyOperator(task_id="start")
with TaskGroup("taskgroup_1", tooltip="extract tableau and upload") as extract_upload:
task_1 = PythonOperator(
task_id="extract_tableau",
python_callable=ext.run,
op_kwargs={"start_hwm": start_hwm, "end_hwm": end_hwm},
provide_context=True,
)
task_2 = PythonOperator(
task_id="upload_extracts",
python_callable=upload,
op_kwargs={},
provide_context=True,
)
# To be implemented by the customer.
# Timestamp needs to be saved for next run
task_3 = DummyOperator(task_id='save_hwm')
end = DummyOperator(task_id='end')
start >> extract_upload >> end