Iceberg
Module iceberg
Important Capabilities
Capability | Status | Notes |
---|---|---|
Data Profiling | ✅ | Optionally enabled via configuration. |
Descriptions | ✅ | Enabled by default. |
Detect Deleted Entities | ✅ | Enabled via stateful ingestion |
Domains | ❌ | Currently not supported. |
Extract Ownership | ✅ | Optionally enabled via configuration by specifying which Iceberg table property holds user or group ownership. |
Partition Support | ❌ | Currently not supported. |
Platform Instance | ✅ | Optionally enabled via configuration, an Iceberg instance represents the datalake name where the table is stored. |
Integration Details
The DataHub Iceberg source plugin extracts metadata from Iceberg tables stored in a distributed or local file system.
Typically, Iceberg tables are stored in a distributed file system like S3 or Azure Data Lake Storage (ADLS) and registered in a catalog. There are various catalog
implementations like Filesystem-based, RDBMS-based or even REST-based catalogs. This Iceberg source plugin relies on the
Iceberg python_legacy library and its support for catalogs is limited at the moment.
A new version of the Iceberg Python library is currently in development and should fix this.
Because of this limitation, this source plugin will only ingest HadoopCatalog-based tables that have a version-hint.text
metadata file.
Ingestion of tables happens in 2 steps:
- Discover Iceberg tables stored in file system.
- Load discovered tables using Iceberg python_legacy library
The current implementation of the Iceberg source plugin will only discover tables stored in a local file system or in ADLS. Support for S3 could be added fairly easily.
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[iceberg]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: "iceberg"
config:
env: PROD
adls:
# Will be translated to https://{account_name}.dfs.core.windows.net
account_name: my_adls_account
# Can use sas_token or account_key
sas_token: "${SAS_TOKEN}"
# account_key: "${ACCOUNT_KEY}"
container_name: warehouse
base_path: iceberg
platform_instance: my_iceberg_catalog
table_pattern:
allow:
- marketing.*
profiling:
enabled: true
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
View All Configuration Options
Field [Required] | Type | Description | Default | Notes |
---|---|---|---|---|
group_ownership_property [✅] | string | Iceberg table property to look for a CorpGroup owner. Can only hold a single group value. If property has no value, no owner information will be emitted. | None | |
localfs [✅] | string | Local path to crawl for Iceberg tables. This is one filesystem type supported by this source and only one can be configured. | None | |
max_path_depth [✅] | integer | Maximum folder depth to crawl for Iceberg tables. Folders deeper than this value will be silently ignored. | 2 | |
platform_instance [✅] | string | The instance of the platform that all assets produced by this recipe belong to | None | |
user_ownership_property [✅] | string | Iceberg table property to look for a CorpUser owner. Can only hold a single user value. If property has no value, no owner information will be emitted. | owner | |
env [✅] | string | The environment that all assets produced by this connector belong to | PROD | |
adls [✅] | AdlsSourceConfig | Azure Data Lake Storage to crawl for Iceberg tables. This is one filesystem type supported by this source and only one can be configured. | None | |
adls.account_key [❓ (required if adls is set)] | string | Azure storage account access key that can be used as a credential. An account key, a SAS token or a client secret is required for authentication. | None | |
adls.account_name [❓ (required if adls is set)] | string | Name of the Azure storage account. See Microsoft official documentation on how to create a storage account. | None | |
adls.base_path [❓ (required if adls is set)] | string | Base folder in hierarchical namespaces to start from. | / | |
adls.client_id [❓ (required if adls is set)] | string | Azure client (Application) ID required when a client_secret is used as a credential. | None | |
adls.client_secret [❓ (required if adls is set)] | string | Azure client secret that can be used as a credential. An account key, a SAS token or a client secret is required for authentication. | None | |
adls.container_name [❓ (required if adls is set)] | string | Azure storage account container name. | None | |
adls.sas_token [❓ (required if adls is set)] | string | Azure storage account Shared Access Signature (SAS) token that can be used as a credential. An account key, a SAS token or a client secret is required for authentication. | None | |
adls.tenant_id [❓ (required if adls is set)] | string | Azure tenant (Directory) ID required when a client_secret is used as a credential. | None | |
table_pattern [✅] | AllowDenyPattern | Regex patterns for tables to filter in ingestion. | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | |
table_pattern.allow [❓ (required if table_pattern is set)] | array(string) | None | ||
table_pattern.deny [❓ (required if table_pattern is set)] | array(string) | None | ||
table_pattern.ignoreCase [❓ (required if table_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
profiling [✅] | IcebergProfilingConfig | {'enabled': False, 'include_field_null_count': True, 'include_field_min_value': True, 'include_field_max_value': True} | ||
profiling.enabled [❓ (required if profiling is set)] | boolean | Whether profiling should be done. | None | |
profiling.include_field_max_value [❓ (required if profiling is set)] | boolean | Whether to profile for the max value of numeric columns. | True | |
profiling.include_field_min_value [❓ (required if profiling is set)] | boolean | Whether to profile for the min value of numeric columns. | True | |
profiling.include_field_null_count [❓ (required if profiling is set)] | boolean | Whether to profile for the number of nulls for each column. | True | |
stateful_ingestion [✅] | StatefulStaleMetadataRemovalConfig | Iceberg Stateful Ingestion Config. | None | |
stateful_ingestion.enabled [❓ (required if stateful_ingestion is set)] | boolean | The type of the ingestion state provider registered with datahub. | None | |
stateful_ingestion.ignore_new_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the current checkpoint state. | None | |
stateful_ingestion.ignore_old_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the previous checkpoint state. | None | |
stateful_ingestion.remove_stale_metadata [❓ (required if stateful_ingestion is set)] | boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. | True |
The JSONSchema for this configuration is inlined below.
{
"title": "IcebergSourceConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "Iceberg Stateful Ingestion Config.",
"allOf": [
{
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
}
]
},
"adls": {
"title": "Adls",
"description": "[Azure Data Lake Storage](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction) to crawl for Iceberg tables. This is one filesystem type supported by this source and **only one can be configured**.",
"allOf": [
{
"$ref": "#/definitions/AdlsSourceConfig"
}
]
},
"localfs": {
"title": "Localfs",
"description": "Local path to crawl for Iceberg tables. This is one filesystem type supported by this source and **only one can be configured**.",
"type": "string"
},
"max_path_depth": {
"title": "Max Path Depth",
"description": "Maximum folder depth to crawl for Iceberg tables. Folders deeper than this value will be silently ignored.",
"default": 2,
"type": "integer"
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns for tables to filter in ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"user_ownership_property": {
"title": "User Ownership Property",
"description": "Iceberg table property to look for a `CorpUser` owner. Can only hold a single user value. If property has no value, no owner information will be emitted.",
"default": "owner",
"type": "string"
},
"group_ownership_property": {
"title": "Group Ownership Property",
"description": "Iceberg table property to look for a `CorpGroup` owner. Can only hold a single group value. If property has no value, no owner information will be emitted.",
"type": "string"
},
"profiling": {
"title": "Profiling",
"default": {
"enabled": false,
"include_field_null_count": true,
"include_field_min_value": true,
"include_field_max_value": true
},
"allOf": [
{
"$ref": "#/definitions/IcebergProfilingConfig"
}
]
}
},
"additionalProperties": false,
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "The type of the ingestion state provider registered with datahub.",
"default": false,
"type": "boolean"
},
"ignore_old_state": {
"title": "Ignore Old State",
"description": "If set to True, ignores the previous checkpoint state.",
"default": false,
"type": "boolean"
},
"ignore_new_state": {
"title": "Ignore New State",
"description": "If set to True, ignores the current checkpoint state.",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"AdlsSourceConfig": {
"title": "AdlsSourceConfig",
"description": "Common Azure credentials config.\n\nhttps://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python",
"type": "object",
"properties": {
"base_path": {
"title": "Base Path",
"description": "Base folder in hierarchical namespaces to start from.",
"default": "/",
"type": "string"
},
"container_name": {
"title": "Container Name",
"description": "Azure storage account container name.",
"type": "string"
},
"account_name": {
"title": "Account Name",
"description": "Name of the Azure storage account. See [Microsoft official documentation on how to create a storage account.](https://docs.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account)",
"type": "string"
},
"account_key": {
"title": "Account Key",
"description": "Azure storage account access key that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
"type": "string"
},
"sas_token": {
"title": "Sas Token",
"description": "Azure storage account Shared Access Signature (SAS) token that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
"type": "string"
},
"client_secret": {
"title": "Client Secret",
"description": "Azure client secret that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
"type": "string"
},
"client_id": {
"title": "Client Id",
"description": "Azure client (Application) ID required when a `client_secret` is used as a credential.",
"type": "string"
},
"tenant_id": {
"title": "Tenant Id",
"description": "Azure tenant (Directory) ID required when a `client_secret` is used as a credential.",
"type": "string"
}
},
"required": [
"container_name",
"account_name"
],
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"IcebergProfilingConfig": {
"title": "IcebergProfilingConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"include_field_null_count": {
"title": "Include Field Null Count",
"description": "Whether to profile for the number of nulls for each column.",
"default": true,
"type": "boolean"
},
"include_field_min_value": {
"title": "Include Field Min Value",
"description": "Whether to profile for the min value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_max_value": {
"title": "Include Field Max Value",
"description": "Whether to profile for the max value of numeric columns.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
}
}
}
Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
Source Concept | DataHub Concept | Notes |
---|---|---|
iceberg | Data Platform | |
Table | Dataset | Each Iceberg table maps to a Dataset named using the parent folders. If a table is stored under my/namespace/table , the dataset name will be my.namespace.table . If a Platform Instance is configured, it will be used as a prefix: <platform_instance>.my.namespace.table . |
Table property | User (a.k.a CorpUser) | The value of a table property can be used as the name of a CorpUser owner. This table property name can be configured with the source option user_ownership_property . |
Table property | CorpGroup | The value of a table property can be used as the name of a CorpGroup owner. This table property name can be configured with the source option group_ownership_property . |
Table parent folders (excluding warehouse catalog location) | Container | Available in a future release |
Table schema | SchemaField | Maps to the fields defined within the Iceberg table schema definition. |
Troubleshooting
[Common Issue]
[Provide description of common issues with this integration and steps to resolve]
Code Coordinates
- Class Name:
datahub.ingestion.source.iceberg.iceberg.IcebergSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Iceberg, feel free to ping us on our Slack