Kafka Connect
Integration Details
This plugin extracts the following:
- Source and Sink Connectors in Kafka Connect as Data Pipelines
- For Source connectors - Data Jobs to represent lineage information between source dataset to Kafka topic per
{connector_name}:{source_dataset}
combination - For Sink connectors - Data Jobs to represent lineage information between Kafka topic to destination dataset per
{connector_name}:{topic}
combination
Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
Source Concept | DataHub Concept | Notes |
---|---|---|
"kafka-connect" | Data Platform | |
Connector | DataFlow | |
Kafka Topic | Dataset |
Current limitations
Works only for
- Source connectors: JDBC, Debezium, Mongo and Generic connectors with user-defined lineage graph
- Sink connectors: BigQuery
Module kafka-connect
Important Capabilities
Capability | Status | Notes |
---|---|---|
Platform Instance | ✅ | Enabled by default |
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[kafka-connect]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: "kafka-connect"
config:
# Coordinates
connect_uri: "http://localhost:8083"
# Credentials
username: admin
password: password
# Optional
platform_instance_map:
bigquery: bigquery_platform_instance_id
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
View All Configuration Options
Field [Required] | Type | Description | Default | Notes |
---|---|---|---|---|
cluster_name [✅] | string | Cluster to ingest from. | connect-cluster | |
connect_to_platform_map [✅] | object | Platform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either platform_instance_map or connect_to_platform_map . e.g.connect_to_platform_map: { "postgres-connector-finance-db": "postgres": "core_finance_instance" } | None | |
connect_uri [✅] | string | URI to connect to. | http://localhost:8083/ | |
convert_lineage_urns_to_lowercase [✅] | boolean | Whether to convert the urns of ingested lineage dataset to lowercase | None | |
password [✅] | string | Kafka Connect password. | None | |
platform_instance [✅] | string | The instance of the platform that all assets produced by this recipe belong to | None | |
platform_instance_map [✅] | map(str,string) | None | ||
username [✅] | string | Kafka Connect username. | None | |
env [✅] | string | The environment that all assets produced by this connector belong to | PROD | |
connector_patterns [✅] | AllowDenyPattern | regex patterns for connectors to filter for ingestion. | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | |
connector_patterns.allow [❓ (required if connector_patterns is set)] | array(string) | None | ||
connector_patterns.deny [❓ (required if connector_patterns is set)] | array(string) | None | ||
connector_patterns.ignoreCase [❓ (required if connector_patterns is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
generic_connectors [✅] | array(object) | None | ||
generic_connectors.connector_name [❓ (required if generic_connectors is set)] | string | None | ||
generic_connectors.source_dataset [❓ (required if generic_connectors is set)] | string | None | ||
generic_connectors.source_platform [❓ (required if generic_connectors is set)] | string | None | ||
provided_configs [✅] | array(object) | None | ||
provided_configs.path_key [❓ (required if provided_configs is set)] | string | None | ||
provided_configs.provider [❓ (required if provided_configs is set)] | string | None | ||
provided_configs.value [❓ (required if provided_configs is set)] | string | None | ||
stateful_ingestion [✅] | StatefulStaleMetadataRemovalConfig | Base specialized config for Stateful Ingestion with stale metadata removal capability. | None | |
stateful_ingestion.enabled [❓ (required if stateful_ingestion is set)] | boolean | The type of the ingestion state provider registered with datahub. | None | |
stateful_ingestion.ignore_new_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the current checkpoint state. | None | |
stateful_ingestion.ignore_old_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the previous checkpoint state. | None | |
stateful_ingestion.remove_stale_metadata [❓ (required if stateful_ingestion is set)] | boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. | True |
The JSONSchema for this configuration is inlined below.
{
"title": "KafkaConnectSourceConfig",
"description": "Any source that connects to a platform should inherit this class",
"type": "object",
"properties": {
"stateful_ingestion": {
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
},
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance_map": {
"title": "Platform Instance Map",
"description": "Platform instance mapping to use when constructing URNs. e.g.`platform_instance_map: { \"hive\": \"warehouse\" }`",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"connect_uri": {
"title": "Connect Uri",
"description": "URI to connect to.",
"default": "http://localhost:8083/",
"type": "string"
},
"username": {
"title": "Username",
"description": "Kafka Connect username.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Kafka Connect password.",
"type": "string"
},
"cluster_name": {
"title": "Cluster Name",
"description": "Cluster to ingest from.",
"default": "connect-cluster",
"type": "string"
},
"convert_lineage_urns_to_lowercase": {
"title": "Convert Lineage Urns To Lowercase",
"description": "Whether to convert the urns of ingested lineage dataset to lowercase",
"default": false,
"type": "boolean"
},
"connector_patterns": {
"title": "Connector Patterns",
"description": "regex patterns for connectors to filter for ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"provided_configs": {
"title": "Provided Configs",
"description": "Provided Configurations",
"type": "array",
"items": {
"$ref": "#/definitions/ProvidedConfig"
}
},
"connect_to_platform_map": {
"title": "Connect To Platform Map",
"description": "Platform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either `platform_instance_map` or `connect_to_platform_map`. e.g.`connect_to_platform_map: { \"postgres-connector-finance-db\": \"postgres\": \"core_finance_instance\" }`",
"type": "object"
},
"generic_connectors": {
"title": "Generic Connectors",
"description": "Provide lineage graph for sources connectors other than Confluent JDBC Source Connector, Debezium Source Connector, and Mongo Source Connector",
"default": [],
"type": "array",
"items": {
"$ref": "#/definitions/GenericConnectorConfig"
}
}
},
"additionalProperties": false,
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "The type of the ingestion state provider registered with datahub.",
"default": false,
"type": "boolean"
},
"ignore_old_state": {
"title": "Ignore Old State",
"description": "If set to True, ignores the previous checkpoint state.",
"default": false,
"type": "boolean"
},
"ignore_new_state": {
"title": "Ignore New State",
"description": "If set to True, ignores the current checkpoint state.",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"ProvidedConfig": {
"title": "ProvidedConfig",
"type": "object",
"properties": {
"provider": {
"title": "Provider",
"type": "string"
},
"path_key": {
"title": "Path Key",
"type": "string"
},
"value": {
"title": "Value",
"type": "string"
}
},
"required": [
"provider",
"path_key",
"value"
],
"additionalProperties": false
},
"GenericConnectorConfig": {
"title": "GenericConnectorConfig",
"type": "object",
"properties": {
"connector_name": {
"title": "Connector Name",
"type": "string"
},
"source_dataset": {
"title": "Source Dataset",
"type": "string"
},
"source_platform": {
"title": "Source Platform",
"type": "string"
}
},
"required": [
"connector_name",
"source_dataset",
"source_platform"
],
"additionalProperties": false
}
}
}
Advanced Configurations
Kafka Connect supports pluggable configuration providers which can load configuration data from external sources at runtime. These values are not available to DataHub ingestion source through Kafka Connect APIs. If you are using such provided configurations to specify connection url (database, etc) in Kafka Connect connector configuration then you will need also add these in provided_configs
section in recipe for DataHub to generate correct lineage.
# Optional mapping of provider configurations if using
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
Code Coordinates
- Class Name:
datahub.ingestion.source.kafka_connect.KafkaConnectSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Kafka Connect, feel free to ping us on our Slack