Skip to main content
Skip table of contents

Additional Notes

Adding Custom SQL

In your organisation there maybe scenario where you want to customise the extracts generated by the collector. For example you would like to filter out a specific database from the metadata extract.

KADA provides basic methods to view and edit extraction SQL, which can be called prior to calling the run method.

You can declare the extractor and list the available SQL that is used by the extractor like so

PY
from kada_collectors.extractors.snowflake import Extractor

ext = Extractor()
print(ext.list_sql())

PY
def list_sql(self: Self@BaseExtractor) -> list

To see the statement being used you can call

PY
from kada_collectors.extractors.snowflake import Extractor

ext = Extractor()
print(ext.get_sql('DATABASE_LOG_SQL'))

PY
def get_sql(self: Self@BaseExtractor, sql_name: str) -> Any

sql_name: from get_sql


If you have custom logic you wish to add to the SQL, you may overwrite the object’s sql like so

PY
from kada_collectors.extractors.snowflake import Extractor

my_new_statement = "Some SQL"

ext = Extractor()
ext.overwrite_sql('DATABASE_LOG_SQL', my_new_statement)
print(ext.get_sql('DATABASE_LOG_SQL')) # See the updated SQL

PY
def overwrite_sql(self: Self@BaseExtractor, sql_name: str, statement: str) -> None

sql_name: from get_sql
statement: new SQL statement to replace the existing statement

Note that the SQL does use formatting, so please refrain from overwrite any formatting place holder e.g. {0} {1} etc. Also do not change these numbers.

You should only add filters and adjust SELECT clauses where required.


Storing High Water Marks (HWM)

By default KADA provides hwm methods to retrieve and store _hwm.txt files a local directory, You may choose a custom location for this by specifying a file_path.

PY
from kada_collectors.extractors.utils import get_hwm

start_hwm, end_hwm = get_hwm('mysource', file_path='/tmp/somedir/hwm')
PY
def get_hwm: (prefix: Any, file_path: str = None) -> tuple[str, str]

prefix: The prefix for the _hwm.txt file
file_path: Optional if the hwm if located else where, otherwise default is current execution directory

Likewise the same can be done for publish

PY
from kada_collectors.extractors.utils import publish_hwm

start_hwm, end_hwm = publish_hwm('mysource', '2022-03-29 00:00:00', \
    file_path='/tmp/somedir/hwm')
PY
def publish_hwm: (prefix: Any, date: Any, file_path: str = None) -> None

prefix: The prefix for the _hwm.txt file
date: is in str format %Y-%m-%d %H:%M:%S
file_path: Optional if the hwm if located else where, otherwise default is current execution directory


Storing the HWM using the K Landing Area

You may choose to persist the hwm files by downloading and uploading to the K landing area for that particular source.

You may have been asked to upload the extract files to a path such as

CODE
lz/landing_directory_name/landing

You may use the following folder to persist hwm files

CODE
lz/landing_directory_name/hwm
e.g. lz/landing_directory_name/hwm/snowflake_hwm.txt

Alternatively you can choose to store the HWM values elsewhere such as a database by writing your own methods to retrieve and publish the HWM values.


Extractor Class

Each source extractor is initialised as a class instance

The configuration for the extractor inputted to the class args. See each collector page for the parameters to initialised the class.


Extractor run method

The Extractor class implements a run method which takes hwm inputs.

The method returns a dict of files produced and record count of each file.

PY
from kada_collectors.extractors.snowflake import Extractor

kwargs = {my args} # However you choose to construct your args
hwm_kwrgs = {"start_hwm": "end_hwm": } # The hwm values

ext = Extractor(**kwargs)
ext.run(**hwm_kwrgs)

PY
def run: (self: Self@Extractor, start_hwm: str = None, end_hwm: str = None) -> dict[str, int]

start_hwm: Start range for event filtering, timestamp in format YYYY-MM-DD HH:MM:SS
end_hwm: End range for event filtering, timestamp in format YYYY-MM-DD HH:MM:SS

Prior to calling the run you may wish to sanity check and make sure the extractor can in fact connect to the source. There is a method called test_connection to allow this

PY
def test_connection: (self: Self@Extractor) -> None

For testing access to actual databases/tables/api routes there is an upcoming method which is yet to be implemented which you may wish to extend upon yourself, the method is available but will throw a NotImplimentedError

PY
def test_access: (self: Self@Extractor) -> None


Referencing the KADA package whl in setup.py

The KADA whl packages can be added to your python setup.py using the format '<module> @ url'

Example

CODE
setup(
    name='<package>',
...
    install_requires=[
        '<your_normal_dependency>',
         # KADA wheel
        'kada_collectors_extractors_snowflake @ <Blob SAS URL>',
    ],
)

Reference: https://stackoverflow.com/a/53706140

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.