Microsoft Dynamics (via Collector method) - v3.0.0

About Collectors

Collectors are extractors that are developed and managed by you (a customer of K).

KADA provides python libraries that customers can use to quickly deploy a Collector.

Why you should use a Collector

There are several reasons why you may use a collector vs the direct connect extractor:

  1. You are using the KADA SaaS offering and it cannot connect to your sources due to firewall restrictions

  2. You want to push metadata to KADA rather than allow it to pull data for security reasons

  3. You want to inspect the metadata before pushing it to K

Using a collector requires you to manage:

  1. Deploying and orchestrating the extract code

  2. Managing a high water mark so the extract only pulls the latest metadata

  3. Storing and pushing the extracts to your K instance


This collector has been tested with Microsoft Dataverse Web API v9.2

https://learn.microsoft.com/en-us/power-apps/developer/data-platform/webapi/web-api-versions


Step 1: Registering A Kada App in Azure AD

This step is performed by the Azure AD Admin.

Skip this step if you have already configured a Kada app in Azure AD for Entra SSO or Power BI integration

  • Log in to your company's Azure Portal and open the Azure Active Directory page

  • Select App Registration in the side panel and click New registration

  • Complete the registration form

    • Name: Enter a name for the integration e.g. KADA Dynamics API Integration

    • Supported account types: Select Accounts in this organisation directory only

    • Redirect URL: Add Web / https://www.kada.ai

  • Click Register to complete the registration

  • Click on the newly created KADA Dynamics API Integration App

  • Save the Application (client) ID and Directory (tenant) ID for use in a later step

  • Click on Endpoints and save the URL for OpenID Connect metadata document for use in a later step

  • Select Certificates & secrets in the side panel and click New client secret

  • Complete the new secret form and save the Secret Value for use in a later step

Make sure you send the following information to the K Admin so that they can complete the next process.

  • Application (client) ID

  • Directory (tenant) ID

  • Secret Value


Step 2: Kada App access to Dynamics API

This step is performed by a Dynamics admin

  • Log in to your company's Dynamics / Power Apps

  • Select Environments and click on the Environment that you want to load into K.

  • Click on Settings

  • Select on Users + permissions and click on Application Users

  • Click New App User. In the slide out:

    • Click Add an App. Search for the App that was created in Step 1 and add it

    • Select an appropriate org

    • Add the Service Reader role

  • Click Create


Step 3: Obtain the Web Resource API

This step is performed by a Dynamics admin

  • Log in to your company's Power App Maker

  • Select Settings. Click on Developer resources

  • Save the Web API endpoint for use in a later step


Step 4: Review the list of entities to be loaded into K (Optional)

This step is performed by a Dynamics admin

The Dynamics integration will extract all entities (and fields). This may include some entities that you do not want to load into K. K supports extracting a list of entities (and associated fields) as well.

To review the list of entities you want to extract:

  • Log in to your company's Power App Maker - https://make.powerapps.com/

  • Select Tables

  • Take down the list of table names that you want to include in the extract


Step 5: Create the Source in K

This and the following step is performed by a K admin

Create a source in K

  • Go to Settings, Select Sources and click Add Source

  • Select a Microsoft Dynamics as the Source type

  • Select "Load from File" option

  • Give the source a Name - e.g. Microsoft Dynamics Production

  • Add the Host name for the Microsoft Dynamics Instance - e.g. https://<org-id>.api.crm6.dynamics.com

  • Click Next & Finish Setup


Step 6: Getting Access to the Source Landing Directory

When using a Collector you will push metadata to a K landing directory.

To find your landing directory you will need to:

  1. Go to Platform Settings - Settings. Note down the value of this setting:

    • If using Azure: storage_azure_storage_account

    • If using AWS:

      • storage_root_folder - the AWS s3 bucket

      • storage_aws_region - the region where the AWS s3 bucket is hosted

  2. Go to Sources - Edit the Source you have configured. Note down the landing directory in the About this Source section.

To connect to the landing directory you will need:

  • If using Azure: a SAS token to push data to the landing directory. Request this from KADA Support (support@kada.ai)

  • If using AWS:

    • An Access key and Secret. Request this from KADA Support (support@kada.ai)

    • OR provide your IAM role to KADA Support to provision access.


Step 7: Install the Collector

You can download the Latest Core Library and Microsoft Dynamics whl via Platform Settings → SourcesDownload Collectors

Run the following command to install the collector

pip install kada_collectors_extractors_microsoft_dynamics-3.0.0-py3-none-any.whl

You will also need to install the corresponding common library kada_collectors_lib-x.x.x for this collector to function properly.

pip install kada_collectors_lib-x.x.x-py3-none-any.whl

Step 8: Configure the Collector

FIELD

FIELD TYPE

DESCRIPTION

EXAMPLE

client

string

Onboarded client in Azure to access Microsoft Dynamics


secret

string

Onboarded client secret in Azure to access Microsoft Dynamics


tenant

string

Tenant ID of where Microsoft Dynamics exists


resource

string

Microsoft Dynamics resource url

https://<org-id>.api.crm6.dynamics.com/api/data/v9.2

entity_filter

list<string>

A list of logical entity names to extract

["appointment", "account", "aaduser"]

output_path

string

Absolute path to the output location

"/tmp/output"

mask

boolean

To enable masking or not

true

compress

boolean

To gzip the output or not

true

timeout

integer

Timeout for the API call

10

meta_only

boolean

Extract metadata only

false

kada_microsoft_dynamics_extractor_config.json

JSON
{
    "client": "",
    "secret": "",
    "tenant": "",
    "resource": "",
    "entity_filter": [],
    "output_path": "",
    "mask": false,
    "compress": false,
    "timeout": 30,
    "meta_only": true
}

Step 9: Run the Collector

This code sample uses the kada_microsoft_dynamics_extractor.py for handling the configuration details

Python
import os
import argparse
from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.microsoft_dynamics import Extractor

get_generic_logger('root')

_type = 'microsoft_dynamics'
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, 'kada_{}_extractor_config.json'.format(_type))

parser = argparse.ArgumentParser(description='KADA Dynamics API Extractor.')
parser.add_argument('--config', '-c', dest='config', default=filename)
parser.add_argument('--name', '-n', dest='name', default=_type)
args = parser.parse_args()

start_hwm, end_hwm = get_hwm(args.name)

ext = Extractor(**load_config(args.config))
ext.test_connection()
ext.run(**{"start_hwm": start_hwm, "end_hwm": end_hwm})

publish_hwm(args.name, end_hwm)

Step 10: Check the Collector Outputs

K Extracts

A set of files (eg metadata, databaselog, linkages, events etc) will be generated in the output_path directory.

High Water Mark File

A high water mark file is created called microsoft_dynamics_hwm.txt.

Refer to Collector Integration General Notes for more information.


Step 11: Push the Extracts to K

Once the files have been validated, you can push the files to the K landing directory.


Example: Using Airflow to orchestrate the Extract and Push to K

The following example is how you can orchestrate the Tableau collector using Airflow and push the files to K hosted on Azure. The code is not expected to be used as-is but as a template for your own DAG.

Python
# built-in
import os

# Installed
from airflow.operators.python_operator import PythonOperator
from airflow.models.dag import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

from plugins.utils.azure_blob_storage import AzureBlobStorage

from kada_collectors.extractors.utils import load_config, get_hwm, publish_hwm, get_generic_logger
from kada_collectors.extractors.tableau import Extractor

# To be configured by the customer.
# Note variables may change if using a different object store.
KADA_SAS_TOKEN = os.getenv("KADA_SAS_TOKEN")
KADA_CONTAINER = ""
KADA_STORAGE_ACCOUNT = ""
KADA_LANDING_PATH = "lz/tableau/landing"
KADA_EXTRACTOR_CONFIG = {
    "server_address": "http://tabserver",
    "username": "user",
    "password": "password",
    "sites": [],
    "db_host": "tabserver",
    "db_username": "repo_user",
    "db_password": "repo_password",
    "db_port": 8060,
    "db_name": "workgroup",
    "meta_only": False,
    "retries": 5,
    "dry_run": False,
    "output_path": "/set/to/output/path",
    "mask": True,
    "mapping": {}
}

# To be implemented by the customer.
# Upload to your landing zone storage.
# Change '.csv' to '.csv.gz' if you set compress = true in the config
def upload():
  output = KADA_EXTRACTOR_CONFIG['output_path']
  for filename in os.listdir(output):
      if filename.endswith('.csv'):
        file_to_upload_path = os.path.join(output, filename)

        AzureBlobStorage.upload_file_sas_token(
            client=KADA_SAS_TOKEN,
            storage_account=KADA_STORAGE_ACCOUNT,
            container=KADA_CONTAINER,
            blob=f'{KADA_LANDING_PATH}/{filename}',
            local_path=file_to_upload_path
        )

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:

    # To be implemented by the customer.
    # Retrieve the timestamp from the prior run
    start_hwm = 'YYYY-MM-DD HH:mm:SS'
    end_hwm = 'YYYY-MM-DD HH:mm:SS' # timestamp now

    ext = Extractor(**KADA_EXTRACTOR_CONFIG)

    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="extract tableau and upload") as extract_upload:
        task_1 = PythonOperator(
            task_id="extract_tableau",
            python_callable=ext.run,
            op_kwargs={"start_hwm": start_hwm, "end_hwm": end_hwm},
            provide_context=True,
        )

        task_2 = PythonOperator(
            task_id="upload_extracts",
            python_callable=upload,
            op_kwargs={},
            provide_context=True,
        )

        # To be implemented by the customer.
        # Timestamp needs to be saved for next run
        task_3 = DummyOperator(task_id='save_hwm')

    end = DummyOperator(task_id='end')

    start >> extract_upload >> end