Pulsar
Integration Details
The Datahub Pulsar source plugin extracts topic
and schema
metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the Pulsar admin Rest API interface to interact with the Pulsar instance. The following APIs are used in order to:
- Get the list of existing tenants
- Get the list of namespaces associated with each tenant
- Get the list of topics associated with each namespace
- persistent topics
- persistent partitioned topics
- non-persistent topics
- non-persistent partitioned topics
- Get the latest schema associated with each topic
The data is extracted on tenant
and namespace
basis, topics with corresponding schema (if available) are ingested as Dataset into Datahub. Some additional values like schema description
, schema_version
, schema_type
and partitioned
are included as DatasetProperties
.
Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
Source Concept | DataHub Concept | Notes |
---|---|---|
pulsar | Data Platform | |
Pulsar Topic | Dataset | subType: topic |
Pulsar Schema | SchemaField | Maps to the fields defined within the Avro or JSON schema definition. |
Metadata Ingestion Quickstart
For context on getting started with ingestion, check out our metadata ingestion guide.
Module pulsar
Important Capabilities
Capability | Status | Notes |
---|---|---|
Domains | ✅ | Supported via the domain config field |
Platform Instance | ✅ | Enabled by default |
PulsarSource(config: datahub.ingestion.source_config.pulsar.PulsarSourceConfig, ctx: datahub.ingestion.api.common.PipelineContext)
NOTE: Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}).
Prerequisites
In order to ingest metadata from Apache Pulsar, you will need:
- Access to a Pulsar Instance, if authentication is enabled a valid access token.
- Pulsar version >= 2.7.0
NOTE: A superUser role is required for listing all existing tenants within a Pulsar instance.
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[pulsar]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: "pulsar"
config:
env: "TEST"
platform_instance: "local"
## Pulsar client connection config ##
web_service_url: "https://localhost:8443"
verify_ssl: "/opt/certs/ca.cert.pem"
# Issuer url for auth document, for example "http://localhost:8083/realms/pulsar"
issuer_url: <issuer_url>
client_id: ${CLIENT_ID}
client_secret: ${CLIENT_SECRET}
# Tenant list to scrape
tenants:
- tenant_1
- tenant_2
# Topic filter pattern
topic_patterns:
allow:
- ".*sales.*"
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
View All Configuration Options
Field [Required] | Type | Description | Default | Notes |
---|---|---|---|---|
client_id [✅] | string | The application's client ID | None | |
client_secret [✅] | string | The application's client secret | None | |
exclude_individual_partitions [✅] | boolean | Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets. | True | |
issuer_url [✅] | string | The complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication. | None | |
oid_config [✅] | object | Placeholder for OpenId discovery document | None | |
platform_instance [✅] | string | The instance of the platform that all assets produced by this recipe belong to | None | |
tenants [✅] | array(string) | None | ||
timeout [✅] | integer | Timout setting, how long to wait for the Pulsar rest api to send data before giving up | 5 | |
token [✅] | string | The access token for the application. Mandatory for token based authentication. | None | |
verify_ssl [✅] | UnionType (See notes for variants) | Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. | True | One of boolean,string |
web_service_url [✅] | string | The web URL for the cluster. | http://localhost:8080 | |
env [✅] | string | The environment that all assets produced by this connector belong to | PROD | |
domain [✅] | map(str,AllowDenyPattern) | A class to store allow deny regexes | None | |
domain.key .allow [❓ (required if domain is set)] | array(string) | None | ||
domain.key .deny [❓ (required if domain is set)] | array(string) | None | ||
domain.key .ignoreCase [❓ (required if domain is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
namespace_patterns [✅] | AllowDenyPattern | List of regex patterns for namespaces to include/exclude from ingestion. By default the functions namespace is denied. | {'allow': ['.*'], 'deny': ['public/functions'], 'ignoreCase': True} | |
namespace_patterns.allow [❓ (required if namespace_patterns is set)] | array(string) | None | ||
namespace_patterns.deny [❓ (required if namespace_patterns is set)] | array(string) | None | ||
namespace_patterns.ignoreCase [❓ (required if namespace_patterns is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
tenant_patterns [✅] | AllowDenyPattern | List of regex patterns for tenants to include/exclude from ingestion. By default all tenants are allowed. | {'allow': ['.*'], 'deny': ['pulsar'], 'ignoreCase': True} | |
tenant_patterns.allow [❓ (required if tenant_patterns is set)] | array(string) | None | ||
tenant_patterns.deny [❓ (required if tenant_patterns is set)] | array(string) | None | ||
tenant_patterns.ignoreCase [❓ (required if tenant_patterns is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
topic_patterns [✅] | AllowDenyPattern | List of regex patterns for topics to include/exclude from ingestion. By default the Pulsar system topics are denied. | {'allow': ['.*'], 'deny': ['/__.*$'], 'ignoreCase': True} | |
topic_patterns.allow [❓ (required if topic_patterns is set)] | array(string) | None | ||
topic_patterns.deny [❓ (required if topic_patterns is set)] | array(string) | None | ||
topic_patterns.ignoreCase [❓ (required if topic_patterns is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
stateful_ingestion [✅] | StatefulStaleMetadataRemovalConfig | see Stateful Ingestion | None | |
stateful_ingestion.enabled [❓ (required if stateful_ingestion is set)] | boolean | The type of the ingestion state provider registered with datahub. | None | |
stateful_ingestion.ignore_new_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the current checkpoint state. | None | |
stateful_ingestion.ignore_old_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the previous checkpoint state. | None | |
stateful_ingestion.remove_stale_metadata [❓ (required if stateful_ingestion is set)] | boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. | True |
The JSONSchema for this configuration is inlined below.
{
"title": "PulsarSourceConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "see Stateful Ingestion",
"allOf": [
{
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
}
]
},
"web_service_url": {
"title": "Web Service Url",
"description": "The web URL for the cluster.",
"default": "http://localhost:8080",
"type": "string"
},
"timeout": {
"title": "Timeout",
"description": "Timout setting, how long to wait for the Pulsar rest api to send data before giving up",
"default": 5,
"type": "integer"
},
"issuer_url": {
"title": "Issuer Url",
"description": "The complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication.",
"type": "string"
},
"client_id": {
"title": "Client Id",
"description": "The application's client ID",
"type": "string"
},
"client_secret": {
"title": "Client Secret",
"description": "The application's client secret",
"type": "string"
},
"token": {
"title": "Token",
"description": "The access token for the application. Mandatory for token based authentication.",
"type": "string"
},
"verify_ssl": {
"title": "Verify Ssl",
"description": "Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.",
"default": true,
"anyOf": [
{
"type": "boolean"
},
{
"type": "string"
}
]
},
"tenant_patterns": {
"title": "Tenant Patterns",
"description": "List of regex patterns for tenants to include/exclude from ingestion. By default all tenants are allowed.",
"default": {
"allow": [
".*"
],
"deny": [
"pulsar"
],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"namespace_patterns": {
"title": "Namespace Patterns",
"description": "List of regex patterns for namespaces to include/exclude from ingestion. By default the functions namespace is denied.",
"default": {
"allow": [
".*"
],
"deny": [
"public/functions"
],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"topic_patterns": {
"title": "Topic Patterns",
"description": "List of regex patterns for topics to include/exclude from ingestion. By default the Pulsar system topics are denied.",
"default": {
"allow": [
".*"
],
"deny": [
"/__.*$"
],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"exclude_individual_partitions": {
"title": "Exclude Individual Partitions",
"description": "Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.",
"default": true,
"type": "boolean"
},
"tenants": {
"title": "Tenants",
"description": "Listing all tenants requires superUser role, alternative you can set a list of tenants you want to scrape using the tenant admin role",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"domain": {
"title": "Domain",
"description": "Domain patterns",
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/AllowDenyPattern"
}
},
"oid_config": {
"title": "Oid Config",
"description": "Placeholder for OpenId discovery document",
"type": "object"
}
},
"additionalProperties": false,
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "The type of the ingestion state provider registered with datahub.",
"default": false,
"type": "boolean"
},
"ignore_old_state": {
"title": "Ignore Old State",
"description": "If set to True, ignores the previous checkpoint state.",
"default": false,
"type": "boolean"
},
"ignore_new_state": {
"title": "Ignore New State",
"description": "If set to True, ignores the current checkpoint state.",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.pulsar.PulsarSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Pulsar, feel free to ping us on our Slack