K Knowledge Base
Breadcrumbs

DBT Cloud (via Collector method) - v3.0.0

About Collectors


Pre-requisites

  • Python 3.6 - 3.10

  • Access to K landing directory

  • Access to DBT Cloud

Unlike the other collectors, the DBT extractor produces manifest, catalog and run_result json files instead of csv files. Do not be alarmed if you see these.

This only works for DBT Cloud not DBT Core.


Step 1: Create the Source in K

Create a DBT Cloud source in K

  • Go to Settings, Select Sources and click Add Source

  • Select "Load from File system" option

  • Give the source a Name - e.g. DBT Cloud Production

  • Add the Host name for the DBT Cloud Server

  • Click Finish Setup


Step 2: Getting Access to the Source Landing Directory


Step 3: Install the Collector

You can download the Latest Core Library and whl via Platform Settings → SourcesDownload Collectors

Run the following command to install the collector

pip install kada_collectors_extractors_<version>-none-any.whl

You will also need to install the common library kada_collectors_lib for this collector to function properly.

pip install kada_collectors_lib-<version>-none-any.whl

Step 4: Configure the Collector

FIELD

FIELD TYPE

DESCRIPTION

EXAMPLE

account_id

string

DBT cloud account Id

"xxxxx.australia-east.azure"

token

string

Generated token from the DBT console


output_path

string

Absolute path to the output location

"/tmp/output"

timeout

integer

By default we allow 20 seconds for the API to respond

20

mapping

JSON

Mapping between DBT project ids and their corresponding database host value in K

{"60125": "af33141.australia-east.azure"}

dry_run

boolean

If you enable dry run, the extractor will simply produce the mapping.json file only

false

compress

boolean

To gzip the output or not

true

kada_dbt_extractor_config.json

{
    "account_id": "",
    "token": "",
    "output_path": "/tmp/output",
    "timeout": 20,
    "mapping": {},
    "dry_run": false,
    "compress": true
}

Step 5: Run the Collector

This is the wrapper script: kada_dbt_extractor.py

import os
import argparse
from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.dbt import Extractor

get_generic_logger('root')

_type = 'dbt'
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, 'kada_{}_extractor_config.json'.format(_type))

parser = argparse.ArgumentParser(description='KADA DBT Extractor.')
parser.add_argument('--config', '-c', dest='config', default=filename)
args = parser.parse_args()

start_hwm, end_hwm = get_hwm(_type)

ext = Extractor(**load_config(args.config))
ext.test_connection()
ext.run(**{"start_hwm": start_hwm, "end_hwm": end_hwm})

publish_hwm(args.name, end_hwm)

Step 6: Check the Collector Outputs

K Extracts

A set of files (eg metadata, databaselog, linkages, events etc) will be generated in the output_path directory.

High Water Mark File

A high water mark file is created called dbt_hwm.txt.


Step 7: Push the Extracts to K

Once the files have been validated, you can push the files to the K landing directory.


Example: Using Airflow to orchestrate the Extract and Push to K