Collector Integration General Notes
Adding Custom SQL
In your organisation there maybe scenario where you want to customise the extracts generated by the collector. For example you would like to filter out a specific database from the metadata extract.
KADA provides basic methods to view and edit extraction SQL, which can be called prior to calling the run method.
You can declare the extractor and list the available SQL that is used by the extractor like so
from kada_collectors.extractors.snowflake import Extractor
ext = Extractor()
print(ext.list_sql())
def list_sql(self: Self@BaseExtractor) -> list
To see the statement being used you can call
from kada_collectors.extractors.snowflake import Extractor
ext = Extractor()
print(ext.get_sql('DATABASE_LOGS_SQL'))
def get_sql(self: Self@BaseExtractor, sql_name: str) -> Any
sql_name: from get_sql
If you have custom logic you wish to add to the SQL, you may overwrite the object’s sql like so
from kada_collectors.extractors.snowflake import Extractor
my_new_statement = "Some SQL"
ext = Extractor()
ext.overwrite_sql('DATABASE_LOGS_SQL', my_new_statement)
print(ext.get_sql('DATABASE_LOGS_SQL')) # See the updated SQL
def overwrite_sql(self: Self@BaseExtractor, sql_name: str, statement: str) -> None
sql_name: from get_sql
statement: new SQL statement to replace the existing statement
Note that the SQL does use formatting, so please refrain from overwrite any formatting place holder e.g. {0} {1} etc. Also do not change these numbers.
You should only add filters and adjust SELECT clauses where required.
Storing High Water Marks (HWM)
By default KADA provides hwm methods to retrieve and store _hwm.txt
files a local directory, You may choose a custom location for this by specifying a file_path
.
from kada_collectors.extractors.utils import get_hwm
start_hwm, end_hwm = get_hwm('mysource', file_path='/tmp/somedir/hwm')
def get_hwm: (prefix: Any, file_path: str = None) -> tuple[str, str]
prefix: The prefix for the _hwm.txt file
file_path: Optional if the hwm if located else where, otherwise default is current execution directory
Likewise the same can be done for publish
from kada_collectors.extractors.utils import publish_hwm
start_hwm, end_hwm = publish_hwm('mysource', '2022-03-29 00:00:00', \
file_path='/tmp/somedir/hwm')
def publish_hwm: (prefix: Any, date: Any, file_path: str = None) -> None
prefix: The prefix for the _hwm.txt file
date: is in str format %Y-%m-%d %H:%M:%S
file_path: Optional if the hwm if located else where, otherwise default is current execution directory
Storing the HWM using the K Landing Area
You may choose to persist the hwm files by downloading and uploading to the K landing area for that particular source.
You may have been asked to upload the extract files to a path such as
lz/landing_directory_name/landing
You may use the following folder to persist hwm files
lz/landing_directory_name/hwm
e.g. lz/landing_directory_name/hwm/snowflake_hwm.txt
Alternatively you can choose to store the HWM values elsewhere such as a database by writing your own methods to retrieve and publish the HWM values.
Extractor Class
Each source extractor is initialised as a class instance
The configuration for the extractor inputted to the class args. See each collector page for the parameters to initialised the class.
Extractor run method
The Extractor class implements a run method which takes hwm inputs.
The method returns a dict of files produced and record count of each file.
from kada_collectors.extractors.snowflake import Extractor
kwargs = {my args} # However you choose to construct your args
hwm_kwrgs = {"start_hwm": "end_hwm": } # The hwm values
ext = Extractor(**kwargs)
ext.run(**hwm_kwrgs)
def run: (self: Self@Extractor, start_hwm: str = None, end_hwm: str = None) -> dict[str, int]
start_hwm: Start range for event filtering, timestamp in format YYYY-MM-DD HH:MM:SS
end_hwm: End range for event filtering, timestamp in format YYYY-MM-DD HH:MM:SS
Prior to calling the run you may wish to sanity check and make sure the extractor can in fact connect to the source. There is a method called test_connection to allow this
def test_connection: (self: Self@Extractor) -> None
For testing access to actual databases/tables/api routes there is an upcoming method which is yet to be implemented which you may wish to extend upon yourself, the method is available but will throw a NotImplimentedError
def test_access: (self: Self@Extractor) -> None
Referencing the KADA package whl in setup.py
The KADA whl packages can be added to your python setup.py using the format '<module> @ url'
Example
setup(
name='<package>',
...
install_requires=[
'<your_normal_dependency>',
# KADA wheel
'kada_collectors_extractors_snowflake @ <Blob SAS URL>',
],
)
Reference: https://stackoverflow.com/a/53706140