K Knowledge Base
Breadcrumbs

Collector Integration General Notes

Adding Custom SQL

In your organisation there may be scenarios where you want to customise the extracts generated by the collector. For example, you would like to filter out a specific database from the metadata extract.

KADA provides basic methods to view and edit extraction SQL, which can be called prior to calling the run method.

You can declare the extractor and list the available SQL that is used by the extractor like so:

Python
from kada_collectors.extractors.snowflake import Extractor

ext = Extractor()
print(ext.list_sql())

Python
def list_sql(self: Self@BaseExtractor) -> list

To see the statement being used you can call:

Python
from kada_collectors.extractors.snowflake import Extractor

ext = Extractor()
print(ext.get_sql('DATABASE_LOGS_SQL'))

Python
def get_sql(self: Self@BaseExtractor, sql_name: str) -> Any

sql_name: from get_sql


If you have custom logic you wish to add to the SQL, you may overwrite the object's sql like so:

Python
from kada_collectors.extractors.snowflake import Extractor

my_new_statement = "Some SQL"

ext = Extractor()
ext.overwrite_sql('DATABASE_LOGS_SQL', my_new_statement)
print(ext.get_sql('DATABASE_LOGS_SQL')) # See the updated SQL

Python
def overwrite_sql(self: Self@BaseExtractor, sql_name: str, statement: str) -> None

sql_name: from get_sql
statement: new SQL statement to replace the existing statement

Note: The SQL does use formatting, so please refrain from overwriting any formatting placeholders e.g. {0} {1} etc. Also do not change these numbers. You should only add filters and adjust SELECT clauses where required.


Storing High Water Marks (HWM)

By default KADA provides hwm methods to retrieve and store _hwm.txt files in a local directory. You may choose a custom location for this by specifying a file_path.

Python
from kada_collectors.extractors.utils import get_hwm

start_hwm, end_hwm = get_hwm('mysource', file_path='/tmp/somedir/hwm')
Python
def get_hwm: (prefix: Any, file_path: str = None) -> tuple[str, str]

prefix: The prefix for the _hwm.txt file
file_path: Optional if the hwm is located elsewhere, otherwise default is current execution directory

Likewise the same can be done for publish:

Python
from kada_collectors.extractors.utils import publish_hwm

start_hwm, end_hwm = publish_hwm('mysource', '2022-03-29 00:00:00', \
    file_path='/tmp/somedir/hwm')
Python
def publish_hwm: (prefix: Any, date: Any, file_path: str = None) -> None

prefix: The prefix for the _hwm.txt file
date: is in str format %Y-%m-%d %H:%M:%S
file_path: Optional if the hwm is located elsewhere, otherwise default is current execution directory


Storing the HWM using the K Landing Area

You may choose to persist the hwm files by downloading and uploading to the K landing area for that particular source.

You may have been asked to upload the extract files to a path such as:

lz/landing_directory_name/landing

You may use the following folder to persist hwm files:

lz/landing_directory_name/hwm
e.g. lz/landing_directory_name/hwm/snowflake_hwm.txt

Alternatively you can choose to store the HWM values elsewhere such as a database by writing your own methods to retrieve and publish the HWM values.


Extractor Class

Each source extractor is initialised as a class instance.

The configuration for the extractor is inputted to the class args. See each collector page for the parameters to initialise the class.


Extractor run method

The Extractor class implements a run method which takes hwm inputs.

The method returns a dict of files produced and record count of each file.

Python
from kada_collectors.extractors.snowflake import Extractor

kwargs = {my args} # However you choose to construct your args
hwm_kwrgs = {"start_hwm": "end_hwm": } # The hwm values

ext = Extractor(**kwargs)
ext.run(**hwm_kwrgs)

Python
def run: (self: Self@Extractor, start_hwm: str = None, end_hwm: str = None) -> dict[str, int]

start_hwm: Start range for event filtering, timestamp in format YYYY-MM-DD HH:MM:SS
end_hwm: End range for event filtering, timestamp in format YYYY-MM-DD HH:MM:SS

Prior to calling the run you may wish to sanity check and make sure the extractor can in fact connect to the source. There is a method called test_connection to allow this:

Python
def test_connection: (self: Self@Extractor) -> None

For testing access to actual databases/tables/api routes there is an upcoming method test_access (yet to be fully implemented):

Python
def test_access: (self: Self@Extractor) -> None

Referencing the KADA package whl in setup.py

The KADA whl packages can be added to your python setup.py using the format '<module> @ url'

Example:

Python
setup(
    name='<package>',
    install_requires=[
        '<your_normal_dependency>',
        # KADA wheel
        'kada_collectors_extractors_snowflake @ <Blob SAS URL>',
    ],
)