Skip to main content

Kafka Connect

Integration Details

This plugin extracts the following:

  • Source and Sink Connectors in Kafka Connect as Data Pipelines
  • For Source connectors - Data Jobs to represent lineage information between source dataset to Kafka topic per {connector_name}:{source_dataset} combination
  • For Sink connectors - Data Jobs to represent lineage information between Kafka topic to destination dataset per {connector_name}:{topic} combination

Concept Mapping

This ingestion source maps the following Source System Concepts to DataHub Concepts:

Source ConceptDataHub ConceptNotes
"kafka-connect"Data Platform
ConnectorDataFlow
Kafka TopicDataset

Current limitations

Works only for

  • Source connectors: JDBC, Debezium, Mongo and Generic connectors with user-defined lineage graph
  • Sink connectors: BigQuery

Module kafka-connect

Certified

Important Capabilities

CapabilityStatusNotes
Platform InstanceEnabled by default

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[kafka-connect]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "kafka-connect"
config:
# Coordinates
connect_uri: "http://localhost:8083"

# Credentials
username: admin
password: password

# Optional
platform_instance_map:
bigquery: bigquery_platform_instance_id

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

View All Configuration Options
Field [Required]TypeDescriptionDefaultNotes
cluster_name [✅]stringCluster to ingest from.connect-cluster
connect_to_platform_map [✅]objectPlatform instance mapping when multiple instances for a platform is available. Entry for a platform should be in either platform_instance_map or connect_to_platform_map. e.g.connect_to_platform_map: { "postgres-connector-finance-db": "postgres": "core_finance_instance" }None
connect_uri [✅]stringURI to connect to.http://localhost:8083/
convert_lineage_urns_to_lowercase [✅]booleanWhether to convert the urns of ingested lineage dataset to lowercaseNone
password [✅]stringKafka Connect password.None
platform_instance [✅]stringThe instance of the platform that all assets produced by this recipe belong toNone
platform_instance_map [✅]map(str,string)None
username [✅]stringKafka Connect username.None
env [✅]stringThe environment that all assets produced by this connector belong toPROD
connector_patterns [✅]AllowDenyPatternregex patterns for connectors to filter for ingestion.{'allow': ['.*'], 'deny': [], 'ignoreCase': True}
connector_patterns.allow [❓ (required if connector_patterns is set)]array(string)None
connector_patterns.deny [❓ (required if connector_patterns is set)]array(string)None
connector_patterns.ignoreCase [❓ (required if connector_patterns is set)]booleanWhether to ignore case sensitivity during pattern matching.True
generic_connectors [✅]array(object)None
generic_connectors.connector_name [❓ (required if generic_connectors is set)]stringNone
generic_connectors.source_dataset [❓ (required if generic_connectors is set)]stringNone
generic_connectors.source_platform [❓ (required if generic_connectors is set)]stringNone
provided_configs [✅]array(object)None
provided_configs.path_key [❓ (required if provided_configs is set)]stringNone
provided_configs.provider [❓ (required if provided_configs is set)]stringNone
provided_configs.value [❓ (required if provided_configs is set)]stringNone
stateful_ingestion [✅]StatefulStaleMetadataRemovalConfigBase specialized config for Stateful Ingestion with stale metadata removal capability.None
stateful_ingestion.enabled [❓ (required if stateful_ingestion is set)]booleanThe type of the ingestion state provider registered with datahub.None
stateful_ingestion.ignore_new_state [❓ (required if stateful_ingestion is set)]booleanIf set to True, ignores the current checkpoint state.None
stateful_ingestion.ignore_old_state [❓ (required if stateful_ingestion is set)]booleanIf set to True, ignores the previous checkpoint state.None
stateful_ingestion.remove_stale_metadata [❓ (required if stateful_ingestion is set)]booleanSoft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.True

Advanced Configurations

Kafka Connect supports pluggable configuration providers which can load configuration data from external sources at runtime. These values are not available to DataHub ingestion source through Kafka Connect APIs. If you are using such provided configurations to specify connection url (database, etc) in Kafka Connect connector configuration then you will need also add these in provided_configs section in recipe for DataHub to generate correct lineage.

    # Optional mapping of provider configurations if using
provided_configs:
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb

Code Coordinates

  • Class Name: datahub.ingestion.source.kafka_connect.KafkaConnectSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Kafka Connect, feel free to ping us on our Slack