Skip to main content

Delta Lake

Module delta-lake

Incubating

Important Capabilities

CapabilityStatusNotes
Extract TagsCan extract S3 object/bucket tags if enabled

This plugin extracts:

  • Column types and schema associated with each delta table
  • Custom properties: number_of_files, partition_columns, table_creation_time, location, version etc.
caution

If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[delta-lake]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: delta-lake
config:
env: "PROD"
platform_instance: "my-delta-lake"
base_path: "/path/to/data/folder"

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

View All Configuration Options
Field [Required]TypeDescriptionDefaultNotes
base_path [✅]stringPath to table (s3 or local file system). If path is not a delta table path then all subfolders will be scanned to detect and ingest delta tables.None
platform [✅]stringThe platform that this source connects todelta-lake
platform_instance [✅]stringThe instance of the platform that all assets produced by this recipe belong toNone
relative_path [✅]stringIf set, delta-tables will be searched at location '<base_path>/<relative_path>' and URNs will be created using relative_path only.None
require_files [✅]booleanWhether DeltaTable should track files. Consider setting this to False for large delta tables, resulting in significant memory reduction for ingestion process.When set to False, number_of_files in delta table can not be reported.True
version_history_lookback [✅]integerNumber of previous version histories to be ingested. Defaults to 1. If set to -1 all version history will be ingested.1
env [✅]stringThe environment that all assets produced by this connector belong toPROD
s3 [✅]S3None
s3.use_s3_bucket_tags [❓ (required if s3 is set)]booleanWhether or not to create tags in datahub from the s3 bucketNone
s3.use_s3_object_tags [❓ (required if s3 is set)]boolean# Whether or not to create tags in datahub from the s3 objectNone
s3.aws_config [❓ (required if s3 is set)]AwsConnectionConfigAWS configurationNone
s3.aws_config.aws_access_key_id [❓ (required if aws_config is set)]stringAWS access key ID. Can be auto-detected, see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for details.None
s3.aws_config.aws_endpoint_url [❓ (required if aws_config is set)]stringAutodetected. See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.htmlNone
s3.aws_config.aws_profile [❓ (required if aws_config is set)]stringNamed AWS profile to use. Only used if access key / secret are unset. If not set the default will be usedNone
s3.aws_config.aws_proxy [❓ (required if aws_config is set)]map(str,string)None
s3.aws_config.aws_region [❓ (required if aws_config is set)]stringAWS region code.None
s3.aws_config.aws_secret_access_key [❓ (required if aws_config is set)]stringAWS secret access key. Can be auto-detected, see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for details.None
s3.aws_config.aws_session_token [❓ (required if aws_config is set)]stringAWS session token. Can be auto-detected, see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for details.None
s3.aws_config.aws_role [❓ (required if aws_config is set)]UnionType (See notes for variants)AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are documented at https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_roleNoneOne of string,union(anyOf),string,AwsAssumeRoleConfig
s3.aws_config.aws_role.ExternalId [❓ (required if aws_role is set)]stringExternal ID to use when assuming the role.None
s3.aws_config.aws_role.RoleArn [❓ (required if aws_role is set)]stringARN of the role to assume.None
table_pattern [✅]AllowDenyPatternregex patterns for tables to filter in ingestion.{'allow': ['.*'], 'deny': [], 'ignoreCase': True}
table_pattern.allow [❓ (required if table_pattern is set)]array(string)None
table_pattern.deny [❓ (required if table_pattern is set)]array(string)None
table_pattern.ignoreCase [❓ (required if table_pattern is set)]booleanWhether to ignore case sensitivity during pattern matching.True

Usage Guide

If you are new to Delta Lake and want to test out a simple integration with Delta Lake and DataHub, you can follow this guide.

Delta Table on Local File System

Step 1

Create a delta table using the sample PySpark code below if you don't have a delta table you can point to.

import uuid
import random
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

def generate_data():
return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
for d in range(1, 29)
for m in range(1, 13)
for y in range(2000, 2021)]

jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
.appName("quickstart") \
.master("local[*]") \
.config("spark.jars.packages", ",".join(jar_packages)) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

table_path = "quickstart/my-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)

df = spark.read.format("delta").load(table_path)
df.show()

Step 2

Create a datahub ingestion yaml file (delta.dhub.yaml) to ingest metadata from the delta table you just created.

source:
type: "delta-lake"
config:
base_path: "quickstart/my-table"

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"

Note: Make sure you run the Spark code as well as recipe from same folder otherwise use absolute paths.

Step 3

Execute the ingestion recipe:

datahub ingest -c delta.dhub.yaml

Delta Table on S3

Step 1

Set up your AWS credentials by creating an AWS credentials config file; typically in '$HOME/.aws/credentials'.

[my-creds]
aws_access_key_id: ######
aws_secret_access_key: ######

Step 2: Create a Delta Table using the PySpark sample code below unless you already have Delta Tables on your S3.

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from configparser import ConfigParser
import uuid
import random
def generate_data():
return [(y, m, d, str(uuid.uuid4()), str(random.randrange(10000) % 26 + 65) * 3, random.random()*10000)
for d in range(1, 29)
for m in range(1, 13)
for y in range(2000, 2021)]

jar_packages = ["org.apache.hadoop:hadoop-aws:3.2.3", "io.delta:delta-core_2.12:1.2.1"]
spark = SparkSession.builder \
.appName("quickstart") \
.master("local[*]") \
.config("spark.jars.packages", ",".join(jar_packages)) \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()


config_object = ConfigParser()
config_object.read("$HOME/.aws/credentials")
profile_info = config_object["my-creds"]
access_id = profile_info["aws_access_key_id"]
access_key = profile_info["aws_secret_access_key"]

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", access_id)
hadoop_conf.set("fs.s3a.secret.key", access_key)

table_path = "s3a://my-bucket/my-folder/sales-table"
columns = ["year", "month", "day", "sale_id", "customer", "total_cost"]
spark.sparkContext.parallelize(generate_data()).toDF(columns).repartition(1).write.format("delta").save(table_path)
df = spark.read.format("delta").load(table_path)
df.show()

Step 3

Create a datahub ingestion yaml file (delta.s3.dhub.yaml) to ingest metadata from the delta table you just created.

source:
type: "delta-lake"
config:
base_path: "s3://my-bucket/my-folder/sales-table"
s3:
aws_config:
aws_access_key_id: <<Access key>>
aws_secret_access_key: <<secret key>>

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"

Step 4

Execute the ingestion recipe:

datahub ingest -c delta.s3.dhub.yaml

Note

The above recipes are minimal recipes. Please refer to Config Details section for the full configuration.

Code Coordinates

  • Class Name: datahub.ingestion.source.delta_lake.source.DeltaLakeSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Delta Lake, feel free to ping us on our Slack