Skip to main content

S3 Data Lake

Module s3

Incubating

Important Capabilities

CapabilityStatusNotes
Data ProfilingOptionally enabled via configuration
Extract TagsCan extract S3 object/bucket tags if enabled

This plugin extracts:

  • Row and column counts for each table
  • For each column, if profiling is enabled:
    • null counts and proportions
    • distinct counts and proportions
    • minimum, maximum, mean, median, standard deviation, some quantile values
    • histograms or frequencies of unique values

This connector supports both local files as well as those stored on AWS S3 (which must be identified using the prefix s3://). Supported file types are as follows:

  • CSV
  • TSV
  • JSON
  • Parquet
  • Apache Avro

Schemas for Parquet and Avro files are extracted as provided.

Schemas for schemaless formats (CSV, TSV, JSON) are inferred. For CSV and TSV files, we consider the first 100 rows by default, which can be controlled via the max_rows recipe parameter (see below) JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few objects of the file), which may impact performance. We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object.

Note that because the profiling is run with PySpark, we require Spark 3.0.3 with Hadoop 3.2 to be installed (see compatibility for more details). If profiling, make sure that permissions for s3a:// access are set because Spark and Hadoop use the s3a:// protocol to interface with AWS (schema inference outside of profiling requires s3:// access). Enabling profiling will slow down ingestion runs.

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[s3]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: s3
config:
path_specs:
-
include: "s3://covid19-lake/covid_knowledge_graph/csv/nodes/*.*"

aws_config:
aws_access_key_id: *****
aws_secret_access_key: *****
aws_region: us-east-2
env: "PROD"
profiling:
enabled: false

# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

View All Configuration Options
Field [Required]TypeDescriptionDefaultNotes
max_rows [✅]integerMaximum number of rows to use when inferring schemas for TSV and CSV files.100
platform [✅]stringThe platform that this source connects to (either 's3' or 'file'). If not specified, the platform will be inferred from the path_specs.None
platform_instance [✅]stringThe instance of the platform that all assets produced by this recipe belong toNone
spark_driver_memory [✅]stringMax amount of memory to grant Spark.4g
update_schema_on_partition_file_updates [✅]booleanWhether to update the table schema when schema in files within the partitions are updated.None
use_s3_bucket_tags [✅]booleanWhether or not to create tags in datahub from the s3 bucketNone
use_s3_object_tags [✅]boolean# Whether or not to create tags in datahub from the s3 objectNone
verify_ssl [✅]UnionType (See notes for variants)Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.TrueOne of boolean,string
env [✅]stringThe environment that all assets produced by this connector belong toPROD
aws_config [✅]AwsConnectionConfigAWS configurationNone
aws_config.aws_access_key_id [❓ (required if aws_config is set)]stringAWS access key ID. Can be auto-detected, see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for details.None
aws_config.aws_endpoint_url [❓ (required if aws_config is set)]stringAutodetected. See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.htmlNone
aws_config.aws_profile [❓ (required if aws_config is set)]stringNamed AWS profile to use. Only used if access key / secret are unset. If not set the default will be usedNone
aws_config.aws_proxy [❓ (required if aws_config is set)]map(str,string)None
aws_config.aws_region [❓ (required if aws_config is set)]stringAWS region code.None
aws_config.aws_secret_access_key [❓ (required if aws_config is set)]stringAWS secret access key. Can be auto-detected, see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for details.None
aws_config.aws_session_token [❓ (required if aws_config is set)]stringAWS session token. Can be auto-detected, see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html for details.None
aws_config.aws_role [❓ (required if aws_config is set)]UnionType (See notes for variants)AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are documented at https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_roleNoneOne of string,union(anyOf),string,AwsAssumeRoleConfig
aws_config.aws_role.ExternalId [❓ (required if aws_role is set)]stringExternal ID to use when assuming the role.None
aws_config.aws_role.RoleArn [❓ (required if aws_role is set)]stringARN of the role to assume.None
path_specs [✅]array(object)None
path_specs.default_extension [❓ (required if path_specs is set)]stringFor files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped.None
path_specs.enable_compression [❓ (required if path_specs is set)]booleanEnable or disable processing compressed files. Currently .gz and .bz files are supported.True
path_specs.exclude [❓ (required if path_specs is set)]array(string)None
path_specs.file_types [❓ (required if path_specs is set)]array(string)None
path_specs.include [❓ (required if path_specs is set)]stringPath to table (s3 or local file system). Name variable {table} is used to mark the folder with dataset. In absence of {table}, file level dataset will be created. Check below examples for more details.None
path_specs.sample_files [❓ (required if path_specs is set)]booleanNot listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabledTrue
path_specs.table_name [❓ (required if path_specs is set)]stringDisplay name of the dataset.Combination of named variables from include path and stringsNone
profile_patterns [✅]AllowDenyPatternregex patterns for tables to profile{'allow': ['.*'], 'deny': [], 'ignoreCase': True}
profile_patterns.allow [❓ (required if profile_patterns is set)]array(string)None
profile_patterns.deny [❓ (required if profile_patterns is set)]array(string)None
profile_patterns.ignoreCase [❓ (required if profile_patterns is set)]booleanWhether to ignore case sensitivity during pattern matching.True
profiling [✅]DataLakeProfilerConfigData profiling configuration{'enabled': False, 'profile_table_level_only': False, 'max_number_of_fields_to_profile': None, 'include_field_null_count': True, 'include_field_min_value': True, 'include_field_max_value': True, 'include_field_mean_value': True, 'include_field_median_value': True, 'include_field_stddev_value': True, 'include_field_quantiles': True, 'include_field_distinct_value_frequencies': True, 'include_field_histogram': True, 'include_field_sample_values': True}
profiling.enabled [❓ (required if profiling is set)]booleanWhether profiling should be done.None
profiling.include_field_distinct_value_frequencies [❓ (required if profiling is set)]booleanWhether to profile for distinct value frequencies.True
profiling.include_field_histogram [❓ (required if profiling is set)]booleanWhether to profile for the histogram for numeric fields.True
profiling.include_field_max_value [❓ (required if profiling is set)]booleanWhether to profile for the max value of numeric columns.True
profiling.include_field_mean_value [❓ (required if profiling is set)]booleanWhether to profile for the mean value of numeric columns.True
profiling.include_field_median_value [❓ (required if profiling is set)]booleanWhether to profile for the median value of numeric columns.True
profiling.include_field_min_value [❓ (required if profiling is set)]booleanWhether to profile for the min value of numeric columns.True
profiling.include_field_null_count [❓ (required if profiling is set)]booleanWhether to profile for the number of nulls for each column.True
profiling.include_field_quantiles [❓ (required if profiling is set)]booleanWhether to profile for the quantiles of numeric columns.True
profiling.include_field_sample_values [❓ (required if profiling is set)]booleanWhether to profile for the sample values for all columns.True
profiling.include_field_stddev_value [❓ (required if profiling is set)]booleanWhether to profile for the standard deviation of numeric columns.True
profiling.max_number_of_fields_to_profile [❓ (required if profiling is set)]integerA positive integer that specifies the maximum number of columns to profile for any table. None implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.None
profiling.profile_table_level_only [❓ (required if profiling is set)]booleanWhether to perform profiling at table-level only or include column-level profiling as well.None

Path Specs

Example - Dataset per file

Bucket structure:

test-s3-bucket
├── employees.csv
└── food_items.csv

Path specs config

path_specs:
- include: s3://test-s3-bucket/*.csv

Example - Datasets with partitions

Bucket structure:

test-s3-bucket
├── orders
│   └── year=2022
│   └── month=2
│   ├── 1.parquet
│   └── 2.parquet
└── returns
└── year=2021
└── month=2
└── 1.parquet

Path specs config:

path_specs:
- include: s3://test-s3-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet

Example - Datasets with partition and exclude

Bucket structure:

test-s3-bucket
├── orders
│   └── year=2022
│   └── month=2
│   ├── 1.parquet
│   └── 2.parquet
└── tmp_orders
└── year=2021
└── month=2
└── 1.parquet


Path specs config:

path_specs:
- include: s3://test-s3-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
exclude:
- **/tmp_orders/**

Example - Datasets of mixed nature

Bucket structure:

test-s3-bucket
├── customers
│   ├── part1.json
│   ├── part2.json
│   ├── part3.json
│   └── part4.json
├── employees.csv
├── food_items.csv
├── tmp_10101000.csv
└── orders
   └── year=2022
    └── month=2
   ├── 1.parquet
   ├── 2.parquet
   └── 3.parquet

Path specs config:

path_specs:
- include: s3://test-s3-bucket/*.csv
exclude:
- **/tmp_10101000.csv
- include: s3://test-s3-bucket/{table}/*.json
- include: s3://test-s3-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet

Valid path_specs.include

s3://my-bucket/foo/tests/bar.avro # single file table   
s3://my-bucket/foo/tests/*.* # mulitple file level tables
s3://my-bucket/foo/tests/{table}/*.avro #table without partition
s3://my-bucket/foo/tests/{table}/*/*.avro #table where partitions are not specified
s3://my-bucket/foo/tests/{table}/*.* # table where no partitions as well as data type specified
s3://my-bucket/{dept}/tests/{table}/*.avro # specifying keywords to be used in display name
s3://my-bucket/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro # specify partition key and value format
s3://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro # specify partition value only format
s3://my-bucket/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # for all extensions
s3://my-bucket/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 2 levels down in bucket
s3://my-bucket/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 3 levels down in bucket

Valid path_specs.exclude

  • **/tests/**
  • s3://my-bucket/hr/**
  • */tests/.csv
  • s3://my-bucket/foo/*/my_table/**

Notes

  • {table} represents folder for which dataset will be created.
  • include path must end with (. or *.[ext]) to represent leaf level.
  • if *.[ext] is provided then only files with specified type will be scanned.
  • /*/ represents single folder.
  • {partition[i]} represents value of partition.
  • {partition_key[i]} represents name of the partition.
  • While extracting, “i” will be used to match partition_key to partition.
  • all folder levels need to be specified in include. Only exclude path can have ** like matching.
  • exclude path cannot have named variables ( {} ).
  • Folder names should not contain {, }, *, / in their names.
  • {folder} is reserved for internal working. please do not use in named variables.

If you would like to write a more complicated function for resolving file names, then a {transformer} would be a good fit.

caution

Specify as long fixed prefix ( with out /*/ ) as possible in path_specs.include. This will reduce the scanning time and cost, specifically on AWS S3

caution

Running profiling against many tables or over many rows can run up significant costs. While we've done our best to limit the expensiveness of the queries the profiler runs, you should be prudent about the set of tables profiling is enabled on or the frequency of the profiling runs.

caution

If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.

Compatibility

Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the SPARK_HOME and SPARK_VERSION environment variables to be set. The Spark+Hadoop binary can be downloaded here.

For an example guide on setting up PyDeequ on AWS, see this guide.

Code Coordinates

  • Class Name: datahub.ingestion.source.s3.source.S3Source
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for S3 Data Lake, feel free to ping us on our Slack