About Collectors
Pre-requisites
-
Python 3.8 - 3.11
-
Access to K landing directory
-
Access to DBT Cloud
Unlike the other collectors, the DBT extractor produces manifest, catalog and run_result json files instead of csv files. Do not be alarmed if you see these.
This only works for DBT Cloud not DBT Core.
Step 1: Create the Source in K
Create a DBT Cloud source in K
-
Go to Settings, Select Sources and click Add Source
-
Select "Load from File system" option
-
Give the source a Name - e.g. DBT Cloud Production
-
Add the Host name for the DBT Cloud Server
-
Click Finish Setup
Step 2: Getting Access to the Source Landing Directory
Step 3: Install the Collector
You can download the Latest Core Library and whl via Platform Settings → Sources → Download Collectors
Run the following command to install the collector
pip install kada_collectors_extractors_<version>-none-any.whl
You will also need to install the common library kada_collectors_lib for this collector to function properly.
pip install kada_collectors_lib-<version>-none-any.whl
Step 4: Configure the Collector
|
FIELD |
FIELD TYPE |
DESCRIPTION |
EXAMPLE |
|---|---|---|---|
|
account_id |
string |
DBT cloud account Id |
"xxxxx.australia-east.azure" |
|
environment_ids |
list<string> |
List of environment Ids to extract |
12345,234234 |
|
token |
string |
Generated token from the DBT console |
|
|
output_path |
string |
Absolute path to the output location |
"/tmp/output" |
|
timeout |
integer |
By default we allow 20 seconds for the API to respond |
20 |
|
mapping |
JSON |
Mapping between DBT project ids and their corresponding database host value in K |
{"60125": "af33141.australia-east.azure"} |
|
dry_run |
boolean |
If you enable dry run, the extractor will simply produce the mapping.json file only |
false |
|
compress |
boolean |
To gzip the output or not |
true |
kada_dbt_extractor_config.json
{
"account_id": "",
"token": "",
"output_path": "/tmp/output",
"timeout": 20,
"mapping": {},
"dry_run": false,
"compress": true,
"environment_ids": [123,64]
}
Step 5: Run the Collector
This is the wrapper script: kada_dbt_extractor.py
import os
import argparse
from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.dbt import Extractor
get_generic_logger('root')
_type = 'dbt'
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, 'kada_{}_extractor_config.json'.format(_type))
parser = argparse.ArgumentParser(description='KADA DBT Extractor.')
parser.add_argument('--config', '-c', dest='config', default=filename)
parser.add_argument('--name', '-n', dest='name', default=_type)
args = parser.parse_args()
start_hwm, end_hwm = get_hwm(args.name)
ext = Extractor(**load_config(args.config))
ext.test_connection()
ext.run(**{"start_hwm": start_hwm, "end_hwm": end_hwm})
publish_hwm(args.name, end_hwm)
Step 6: Check the Collector Outputs
K Extracts
A set of files (eg metadata, databaselog, linkages, events etc) will be generated in the output_path directory.
High Water Mark File
A high water mark file is created called dbt_hwm.txt.
Step 7: Push the Extracts to K
Once the files have been validated, you can push the files to the K landing directory.