Redshift
There are 2 sources that provide integration with Redshift
Source Module | Documentation |
| This plugin extracts the following:
tip You can also get fine-grained usage statistics for Redshift using the PrerequisitesThis source needs to access system tables that require extra permissions. To grant these permissions, please alter your datahub Redshift user the following way:
note Giving a user unrestricted access to system tables gives the user visibility to data generated by other users. For example, STL_QUERY and STL_QUERYTEXT contain the full text of INSERT, UPDATE, and DELETE statements. LineageThere are multiple lineage collector implementations as Redshift does not support table lineage out of the box. stl_scan_basedThe stl_scan based collector uses Redshift's stl_insert and stl_scan system tables to discover lineage between tables. Pros:
Cons:
sql_basedThe sql_based based collector uses Redshift's stl_insert to discover all the insert queries and uses sql parsing to discover the dependecies. Pros:
Cons:
mixedUsing both collector above and first applying the sql based and then the stl_scan based one. Pros:
Cons:
note The redshift stl redshift tables which are used for getting data lineage only retain approximately two to five days of log history. This means you cannot extract lineage from queries issued outside that window. |
| This plugin extracts usage statistics for datasets in Amazon Redshift. Note: Usage information is computed by querying the following system tables -
To grant access this plugin for all system tables, please alter your datahub Redshift user the following way:
This plugin has the below functionalities -
note This source only does usage statistics. To get the tables, views, and schemas in your Redshift warehouse, ingest using the note Redshift system tables have some latency in getting data from queries. In addition, these tables only maintain logs for 2-5 days. You can find more information from the official documentation here. |
To get all metadata from Redshift you need to use two plugins redshift
and redshift-usage
. Both of them are described in this page. These will require 2 separate recipes. We understand this is not ideal and we plan to make this easier in the future.
Module redshift
Important Capabilities
Capability | Status | Notes |
---|---|---|
Data Profiling | ✅ | Optionally enabled via configuration |
Dataset Usage | ❌ | Not provided by this module, use redshift-usage for that. |
Descriptions | ✅ | Enabled by default |
Detect Deleted Entities | ✅ | Enabled via stateful ingestion |
Domains | ✅ | Supported via the domain config field |
Platform Instance | ✅ | Enabled by default |
Table-Level Lineage | ✅ | Optionally enabled via configuration |
This plugin extracts the following:
- Metadata for databases, schemas, views and tables
- Column types associated with each table
- Also supports PostGIS extensions
- Table, row, and column statistics via optional SQL profiling
- Table lineage
You can also get fine-grained usage statistics for Redshift using the redshift-usage
source described below.
Prerequisites
This source needs to access system tables that require extra permissions. To grant these permissions, please alter your datahub Redshift user the following way:
ALTER USER datahub_user WITH SYSLOG ACCESS UNRESTRICTED;
GRANT SELECT ON pg_catalog.svv_table_info to datahub_user;
GRANT SELECT ON pg_catalog.svl_user_info to datahub_user;
Giving a user unrestricted access to system tables gives the user visibility to data generated by other users. For example, STL_QUERY and STL_QUERYTEXT contain the full text of INSERT, UPDATE, and DELETE statements.
Lineage
There are multiple lineage collector implementations as Redshift does not support table lineage out of the box.
stl_scan_based
The stl_scan based collector uses Redshift's stl_insert and stl_scan system tables to discover lineage between tables. Pros:
- Fast
- Reliable
Cons:
- Does not work with Spectrum/external tables because those scans do not show up in stl_scan table.
- If a table is depending on a view then the view won't be listed as dependency. Instead the table will be connected with the view's dependencies.
sql_based
The sql_based based collector uses Redshift's stl_insert to discover all the insert queries and uses sql parsing to discover the dependecies.
Pros:
- Works with Spectrum tables
- Views are connected properly if a table depends on it
Cons:
- Slow.
- Less reliable as the query parser can fail on certain queries
mixed
Using both collector above and first applying the sql based and then the stl_scan based one.
Pros:
- Works with Spectrum tables
- Views are connected properly if a table depends on it
- A bit more reliable than the sql_based one only
Cons:
- Slow
- May be incorrect at times as the query parser can fail on certain queries
The redshift stl redshift tables which are used for getting data lineage only retain approximately two to five days of log history. This means you cannot extract lineage from queries issued outside that window.
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[redshift]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: redshift
config:
# Coordinates
host_port: example.something.us-west-2.redshift.amazonaws.com:5439
database: DemoDatabase
# Credentials
username: user
password: pass
# Options
options:
# driver_option: some-option
include_views: True # whether to include views, defaults to True
include_tables: True # whether to include views, defaults to True
sink:
# sink configs
#------------------------------------------------------------------------------
# Extra options when running Redshift behind a proxy</summary>
# This requires you to have already installed the Microsoft ODBC Driver for SQL Server.
# See https://docs.microsoft.com/en-us/sql/connect/python/pyodbc/step-1-configure-development-environment-for-pyodbc-python-development?view=sql-server-ver15
#------------------------------------------------------------------------------
source:
type: redshift
config:
host_port: my-proxy-hostname:5439
options:
connect_args:
sslmode: "prefer" # or "require" or "verify-ca"
sslrootcert: ~ # needed to unpin the AWS Redshift certificate
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
View All Configuration Options
Field [Required] | Type | Description | Default | Notes |
---|---|---|---|---|
bucket_duration [✅] | Enum | Size of the time window to aggregate usage stats. | DAY | |
capture_lineage_query_parser_failures [✅] | boolean | Whether to capture lineage query parser errors with dataset properties for debuggings | None | |
database [✅] | string | database (catalog) | None | |
database_alias [✅] | string | Alias to apply to database when ingesting. | None | |
default_schema [✅] | string | The default schema to use if the sql parser fails to parse the schema with sql_based lineage collector | public | |
end_time [✅] | string(date-time) | Latest date of usage to consider. Default: Current time in UTC | None | |
host_port [✅] | string | host URL | None | |
include_copy_lineage [✅] | boolean | Whether lineage should be collected from copy commands | True | |
include_table_lineage [✅] | boolean | Whether table lineage should be ingested. | True | |
include_table_location_lineage [✅] | boolean | If the source supports it, include table lineage to the underlying storage location. | True | |
include_tables [✅] | boolean | Whether tables should be ingested. | True | |
include_unload_lineage [✅] | boolean | Whether lineage should be collected from unload commands | True | |
include_view_lineage [✅] | boolean | Include table lineage for views | None | |
include_views [✅] | boolean | Whether views should be ingested. | True | |
options [✅] | object | Any options specified here will be passed to SQLAlchemy's create_engine as kwargs. See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. | None | |
password [✅] | string(password) | password | None | |
platform_instance [✅] | string | The instance of the platform that all assets produced by this recipe belong to | None | |
platform_instance_map [✅] | map(str,string) | None | ||
sqlalchemy_uri [✅] | string | URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters. | None | |
start_time [✅] | string(date-time) | Earliest date of usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration ) | None | |
table_lineage_mode [✅] | Enum | Which table lineage collector mode to use. Available modes are: [stl_scan_based, sql_based, mixed] | stl_scan_based | |
username [✅] | string | username | None | |
env [✅] | string | The environment that all assets produced by this connector belong to | PROD | |
domain [✅] | map(str,AllowDenyPattern) | A class to store allow deny regexes | None | |
domain.key .allow [❓ (required if domain is set)] | array(string) | None | ||
domain.key .deny [❓ (required if domain is set)] | array(string) | None | ||
domain.key .ignoreCase [❓ (required if domain is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
profile_pattern [✅] | AllowDenyPattern | Regex patterns to filter tables (or specific columns) for profiling during ingestion. Note that only tables allowed by the table_pattern will be considered. | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | |
profile_pattern.allow [❓ (required if profile_pattern is set)] | array(string) | None | ||
profile_pattern.deny [❓ (required if profile_pattern is set)] | array(string) | None | ||
profile_pattern.ignoreCase [❓ (required if profile_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
s3_lineage_config [✅] | S3LineageProviderConfig | Common config for S3 lineage generation | None | |
s3_lineage_config.path_specs [❓ (required if s3_lineage_config is set)] | array(object) | None | ||
s3_lineage_config.path_specs.default_extension [❓ (required if path_specs is set)] | string | For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped. | None | |
s3_lineage_config.path_specs.enable_compression [❓ (required if path_specs is set)] | boolean | Enable or disable processing compressed files. Currently .gz and .bz files are supported. | True | |
s3_lineage_config.path_specs.exclude [❓ (required if path_specs is set)] | array(string) | None | ||
s3_lineage_config.path_specs.file_types [❓ (required if path_specs is set)] | array(string) | None | ||
s3_lineage_config.path_specs.include [❓ (required if path_specs is set)] | string | Path to table (s3 or local file system). Name variable {table} is used to mark the folder with dataset. In absence of {table}, file level dataset will be created. Check below examples for more details. | None | |
s3_lineage_config.path_specs.sample_files [❓ (required if path_specs is set)] | boolean | Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled | True | |
s3_lineage_config.path_specs.table_name [❓ (required if path_specs is set)] | string | Display name of the dataset.Combination of named variables from include path and strings | None | |
schema_pattern [✅] | AllowDenyPattern | {'allow': ['.*'], 'deny': ['information_schema'], 'ignoreCase': True} | ||
schema_pattern.allow [❓ (required if schema_pattern is set)] | array(string) | None | ||
schema_pattern.deny [❓ (required if schema_pattern is set)] | array(string) | None | ||
schema_pattern.ignoreCase [❓ (required if schema_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
table_pattern [✅] | AllowDenyPattern | Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*' | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | |
table_pattern.allow [❓ (required if table_pattern is set)] | array(string) | None | ||
table_pattern.deny [❓ (required if table_pattern is set)] | array(string) | None | ||
table_pattern.ignoreCase [❓ (required if table_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
view_pattern [✅] | AllowDenyPattern | Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*' | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | |
view_pattern.allow [❓ (required if view_pattern is set)] | array(string) | None | ||
view_pattern.deny [❓ (required if view_pattern is set)] | array(string) | None | ||
view_pattern.ignoreCase [❓ (required if view_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
profiling [✅] | GEProfilingConfig | {'enabled': False, 'limit': None, 'offset': None, 'report_dropped_profiles': False, 'turn_off_expensive_profiling_metrics': False, 'profile_table_level_only': False, 'include_field_null_count': True, 'include_field_distinct_count': True, 'include_field_min_value': True, 'include_field_max_value': True, 'include_field_mean_value': True, 'include_field_median_value': True, 'include_field_stddev_value': True, 'include_field_quantiles': False, 'include_field_distinct_value_frequencies': False, 'include_field_histogram': False, 'include_field_sample_values': True, 'field_sample_values_limit': 20, 'max_number_of_fields_to_profile': None, 'profile_if_updated_since_days': None, 'profile_table_size_limit': 5, 'profile_table_row_limit': 5000000, 'profile_table_row_count_estimate_only': False, 'max_workers': 20, 'query_combiner_enabled': True, 'catch_exceptions': True, 'partition_profiling_enabled': True, 'partition_datetime': None} | ||
profiling.catch_exceptions [❓ (required if profiling is set)] | boolean | True | ||
profiling.enabled [❓ (required if profiling is set)] | boolean | Whether profiling should be done. | None | |
profiling.field_sample_values_limit [❓ (required if profiling is set)] | integer | Upper limit for number of sample values to collect for all columns. | 20 | |
profiling.include_field_distinct_count [❓ (required if profiling is set)] | boolean | Whether to profile for the number of distinct values for each column. | True | |
profiling.include_field_distinct_value_frequencies [❓ (required if profiling is set)] | boolean | Whether to profile for distinct value frequencies. | None | |
profiling.include_field_histogram [❓ (required if profiling is set)] | boolean | Whether to profile for the histogram for numeric fields. | None | |
profiling.include_field_max_value [❓ (required if profiling is set)] | boolean | Whether to profile for the max value of numeric columns. | True | |
profiling.include_field_mean_value [❓ (required if profiling is set)] | boolean | Whether to profile for the mean value of numeric columns. | True | |
profiling.include_field_median_value [❓ (required if profiling is set)] | boolean | Whether to profile for the median value of numeric columns. | True | |
profiling.include_field_min_value [❓ (required if profiling is set)] | boolean | Whether to profile for the min value of numeric columns. | True | |
profiling.include_field_null_count [❓ (required if profiling is set)] | boolean | Whether to profile for the number of nulls for each column. | True | |
profiling.include_field_quantiles [❓ (required if profiling is set)] | boolean | Whether to profile for the quantiles of numeric columns. | None | |
profiling.include_field_sample_values [❓ (required if profiling is set)] | boolean | Whether to profile for the sample values for all columns. | True | |
profiling.include_field_stddev_value [❓ (required if profiling is set)] | boolean | Whether to profile for the standard deviation of numeric columns. | True | |
profiling.limit [❓ (required if profiling is set)] | integer | Max number of documents to profile. By default, profiles all documents. | None | |
profiling.max_number_of_fields_to_profile [❓ (required if profiling is set)] | integer | A positive integer that specifies the maximum number of columns to profile for any table. None implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up. | None | |
profiling.max_workers [❓ (required if profiling is set)] | integer | Number of worker threads to use for profiling. Set to 1 to disable. | 20 | |
profiling.offset [❓ (required if profiling is set)] | integer | Offset in documents to profile. By default, uses no offset. | None | |
profiling.partition_datetime [❓ (required if profiling is set)] | string(date-time) | For partitioned datasets profile only the partition which matches the datetime or profile the latest one if not set. Only Bigquery supports this. | None | |
profiling.partition_profiling_enabled [❓ (required if profiling is set)] | boolean | True | ||
profiling.profile_if_updated_since_days [❓ (required if profiling is set)] | number | Profile table only if it has been updated since these many number of days. If set to null , no constraint of last modified time for tables to profile. Supported only in snowflake and BigQuery . | None | |
profiling.profile_table_level_only [❓ (required if profiling is set)] | boolean | Whether to perform profiling at table-level only, or include column-level profiling as well. | None | |
profiling.profile_table_row_count_estimate_only [❓ (required if profiling is set)] | boolean | Use an approximate query for row count. This will be much faster but slightly less accurate. Only supported for Postgres. | None | |
profiling.profile_table_row_limit [❓ (required if profiling is set)] | integer | Profile tables only if their row count is less then specified count. If set to null , no limit on the row count of tables to profile. Supported only in snowflake and BigQuery | 5000000 | |
profiling.profile_table_size_limit [❓ (required if profiling is set)] | integer | Profile tables only if their size is less then specified GBs. If set to null , no limit on the size of tables to profile. Supported only in snowflake and BigQuery | 5 | |
profiling.query_combiner_enabled [❓ (required if profiling is set)] | boolean | This feature is still experimental and can be disabled if it causes issues. Reduces the total number of queries issued and speeds up profiling by dynamically combining SQL queries where possible. | True | |
profiling.report_dropped_profiles [❓ (required if profiling is set)] | boolean | Whether to report datasets or dataset columns which were not profiled. Set to True for debugging purposes. | None | |
profiling.turn_off_expensive_profiling_metrics [❓ (required if profiling is set)] | boolean | Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10. | None | |
stateful_ingestion [✅] | StatefulStaleMetadataRemovalConfig | Base specialized config for Stateful Ingestion with stale metadata removal capability. | None | |
stateful_ingestion.enabled [❓ (required if stateful_ingestion is set)] | boolean | The type of the ingestion state provider registered with datahub. | None | |
stateful_ingestion.ignore_new_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the current checkpoint state. | None | |
stateful_ingestion.ignore_old_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the previous checkpoint state. | None | |
stateful_ingestion.remove_stale_metadata [❓ (required if stateful_ingestion is set)] | boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. | True |
The JSONSchema for this configuration is inlined below.
{
"title": "RedshiftConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"s3_lineage_config": {
"title": "S3 Lineage Config",
"description": "Common config for S3 lineage generation",
"allOf": [
{
"$ref": "#/definitions/S3LineageProviderConfig"
}
]
},
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance_map": {
"title": "Platform Instance Map",
"description": "A holder for platform -> platform_instance mappings to generate correct dataset urns",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"bucket_duration": {
"description": "Size of the time window to aggregate usage stats.",
"default": "DAY",
"allOf": [
{
"$ref": "#/definitions/BucketDuration"
}
]
},
"end_time": {
"title": "End Time",
"description": "Latest date of usage to consider. Default: Current time in UTC",
"type": "string",
"format": "date-time"
},
"start_time": {
"title": "Start Time",
"description": "Earliest date of usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`)",
"type": "string",
"format": "date-time"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"stateful_ingestion": {
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
},
"options": {
"title": "Options",
"description": "Any options specified here will be passed to SQLAlchemy's create_engine as kwargs. See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details.",
"type": "object"
},
"schema_pattern": {
"title": "Schema Pattern",
"default": {
"allow": [
".*"
],
"deny": [
"information_schema"
],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"view_pattern": {
"title": "View Pattern",
"description": "Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"profile_pattern": {
"title": "Profile Pattern",
"description": "Regex patterns to filter tables (or specific columns) for profiling during ingestion. Note that only tables allowed by the `table_pattern` will be considered.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"domain": {
"title": "Domain",
"description": "Attach domains to databases, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like \"Marketing\".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/AllowDenyPattern"
}
},
"include_views": {
"title": "Include Views",
"description": "Whether views should be ingested.",
"default": true,
"type": "boolean"
},
"include_tables": {
"title": "Include Tables",
"description": "Whether tables should be ingested.",
"default": true,
"type": "boolean"
},
"include_table_location_lineage": {
"title": "Include Table Location Lineage",
"description": "If the source supports it, include table lineage to the underlying storage location.",
"default": true,
"type": "boolean"
},
"profiling": {
"title": "Profiling",
"default": {
"enabled": false,
"limit": null,
"offset": null,
"report_dropped_profiles": false,
"turn_off_expensive_profiling_metrics": false,
"profile_table_level_only": false,
"include_field_null_count": true,
"include_field_distinct_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"include_field_mean_value": true,
"include_field_median_value": true,
"include_field_stddev_value": true,
"include_field_quantiles": false,
"include_field_distinct_value_frequencies": false,
"include_field_histogram": false,
"include_field_sample_values": true,
"field_sample_values_limit": 20,
"max_number_of_fields_to_profile": null,
"profile_if_updated_since_days": null,
"profile_table_size_limit": 5,
"profile_table_row_limit": 5000000,
"profile_table_row_count_estimate_only": false,
"max_workers": 20,
"query_combiner_enabled": true,
"catch_exceptions": true,
"partition_profiling_enabled": true,
"partition_datetime": null
},
"allOf": [
{
"$ref": "#/definitions/GEProfilingConfig"
}
]
},
"username": {
"title": "Username",
"description": "username",
"type": "string"
},
"password": {
"title": "Password",
"description": "password",
"type": "string",
"writeOnly": true,
"format": "password"
},
"host_port": {
"title": "Host Port",
"description": "host URL",
"type": "string"
},
"database": {
"title": "Database",
"description": "database (catalog)",
"type": "string"
},
"database_alias": {
"title": "Database Alias",
"description": "Alias to apply to database when ingesting.",
"type": "string"
},
"sqlalchemy_uri": {
"title": "Sqlalchemy Uri",
"description": "URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters.",
"type": "string"
},
"include_view_lineage": {
"title": "Include View Lineage",
"description": "Include table lineage for views",
"default": false,
"type": "boolean"
},
"default_schema": {
"title": "Default Schema",
"description": "The default schema to use if the sql parser fails to parse the schema with `sql_based` lineage collector",
"default": "public",
"type": "string"
},
"include_table_lineage": {
"title": "Include Table Lineage",
"description": "Whether table lineage should be ingested.",
"default": true,
"type": "boolean"
},
"include_copy_lineage": {
"title": "Include Copy Lineage",
"description": "Whether lineage should be collected from copy commands",
"default": true,
"type": "boolean"
},
"include_unload_lineage": {
"title": "Include Unload Lineage",
"description": "Whether lineage should be collected from unload commands",
"default": true,
"type": "boolean"
},
"capture_lineage_query_parser_failures": {
"title": "Capture Lineage Query Parser Failures",
"description": "Whether to capture lineage query parser errors with dataset properties for debuggings",
"default": false,
"type": "boolean"
},
"table_lineage_mode": {
"description": "Which table lineage collector mode to use. Available modes are: [stl_scan_based, sql_based, mixed]",
"default": "stl_scan_based",
"allOf": [
{
"$ref": "#/definitions/LineageMode"
}
]
}
},
"required": [
"host_port"
],
"additionalProperties": false,
"definitions": {
"PathSpec": {
"title": "PathSpec",
"type": "object",
"properties": {
"include": {
"title": "Include",
"description": "Path to table (s3 or local file system). Name variable {table} is used to mark the folder with dataset. In absence of {table}, file level dataset will be created. Check below examples for more details.",
"type": "string"
},
"exclude": {
"title": "Exclude",
"description": "list of paths in glob pattern which will be excluded while scanning for the datasets",
"type": "array",
"items": {
"type": "string"
}
},
"file_types": {
"title": "File Types",
"description": "Files with extenstions specified here (subset of default value) only will be scanned to create dataset. Other files will be omitted.",
"default": [
"csv",
"tsv",
"json",
"parquet",
"avro"
],
"type": "array",
"items": {
"type": "string"
}
},
"default_extension": {
"title": "Default Extension",
"description": "For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped.",
"type": "string"
},
"table_name": {
"title": "Table Name",
"description": "Display name of the dataset.Combination of named variables from include path and strings",
"type": "string"
},
"enable_compression": {
"title": "Enable Compression",
"description": "Enable or disable processing compressed files. Currently .gz and .bz files are supported.",
"default": true,
"type": "boolean"
},
"sample_files": {
"title": "Sample Files",
"description": "Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled",
"default": true,
"type": "boolean"
}
},
"required": [
"include"
],
"additionalProperties": false
},
"S3LineageProviderConfig": {
"title": "S3LineageProviderConfig",
"description": "Any source that produces s3 lineage from/to Datasets should inherit this class.",
"type": "object",
"properties": {
"path_specs": {
"title": "Path Specs",
"description": "List of PathSpec. See below the details about PathSpec",
"type": "array",
"items": {
"$ref": "#/definitions/PathSpec"
}
}
},
"required": [
"path_specs"
],
"additionalProperties": false
},
"BucketDuration": {
"title": "BucketDuration",
"description": "An enumeration.",
"enum": [
"DAY",
"HOUR"
],
"type": "string"
},
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "The type of the ingestion state provider registered with datahub.",
"default": false,
"type": "boolean"
},
"ignore_old_state": {
"title": "Ignore Old State",
"description": "If set to True, ignores the previous checkpoint state.",
"default": false,
"type": "boolean"
},
"ignore_new_state": {
"title": "Ignore New State",
"description": "If set to True, ignores the current checkpoint state.",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"GEProfilingConfig": {
"title": "GEProfilingConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"limit": {
"title": "Limit",
"description": "Max number of documents to profile. By default, profiles all documents.",
"type": "integer"
},
"offset": {
"title": "Offset",
"description": "Offset in documents to profile. By default, uses no offset.",
"type": "integer"
},
"report_dropped_profiles": {
"title": "Report Dropped Profiles",
"description": "Whether to report datasets or dataset columns which were not profiled. Set to `True` for debugging purposes.",
"default": false,
"type": "boolean"
},
"turn_off_expensive_profiling_metrics": {
"title": "Turn Off Expensive Profiling Metrics",
"description": "Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.",
"default": false,
"type": "boolean"
},
"profile_table_level_only": {
"title": "Profile Table Level Only",
"description": "Whether to perform profiling at table-level only, or include column-level profiling as well.",
"default": false,
"type": "boolean"
},
"include_field_null_count": {
"title": "Include Field Null Count",
"description": "Whether to profile for the number of nulls for each column.",
"default": true,
"type": "boolean"
},
"include_field_distinct_count": {
"title": "Include Field Distinct Count",
"description": "Whether to profile for the number of distinct values for each column.",
"default": true,
"type": "boolean"
},
"include_field_min_value": {
"title": "Include Field Min Value",
"description": "Whether to profile for the min value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_max_value": {
"title": "Include Field Max Value",
"description": "Whether to profile for the max value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_mean_value": {
"title": "Include Field Mean Value",
"description": "Whether to profile for the mean value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_median_value": {
"title": "Include Field Median Value",
"description": "Whether to profile for the median value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_stddev_value": {
"title": "Include Field Stddev Value",
"description": "Whether to profile for the standard deviation of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_quantiles": {
"title": "Include Field Quantiles",
"description": "Whether to profile for the quantiles of numeric columns.",
"default": false,
"type": "boolean"
},
"include_field_distinct_value_frequencies": {
"title": "Include Field Distinct Value Frequencies",
"description": "Whether to profile for distinct value frequencies.",
"default": false,
"type": "boolean"
},
"include_field_histogram": {
"title": "Include Field Histogram",
"description": "Whether to profile for the histogram for numeric fields.",
"default": false,
"type": "boolean"
},
"include_field_sample_values": {
"title": "Include Field Sample Values",
"description": "Whether to profile for the sample values for all columns.",
"default": true,
"type": "boolean"
},
"field_sample_values_limit": {
"title": "Field Sample Values Limit",
"description": "Upper limit for number of sample values to collect for all columns.",
"default": 20,
"type": "integer"
},
"max_number_of_fields_to_profile": {
"title": "Max Number Of Fields To Profile",
"description": "A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
"exclusiveMinimum": 0,
"type": "integer"
},
"profile_if_updated_since_days": {
"title": "Profile If Updated Since Days",
"description": "Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `snowflake` and `BigQuery`.",
"exclusiveMinimum": 0,
"type": "number"
},
"profile_table_size_limit": {
"title": "Profile Table Size Limit",
"description": "Profile tables only if their size is less then specified GBs. If set to `null`, no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`",
"default": 5,
"type": "integer"
},
"profile_table_row_limit": {
"title": "Profile Table Row Limit",
"description": "Profile tables only if their row count is less then specified count. If set to `null`, no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`",
"default": 5000000,
"type": "integer"
},
"profile_table_row_count_estimate_only": {
"title": "Profile Table Row Count Estimate Only",
"description": "Use an approximate query for row count. This will be much faster but slightly less accurate. Only supported for Postgres. ",
"default": false,
"type": "boolean"
},
"max_workers": {
"title": "Max Workers",
"description": "Number of worker threads to use for profiling. Set to 1 to disable.",
"default": 20,
"type": "integer"
},
"query_combiner_enabled": {
"title": "Query Combiner Enabled",
"description": "*This feature is still experimental and can be disabled if it causes issues.* Reduces the total number of queries issued and speeds up profiling by dynamically combining SQL queries where possible.",
"default": true,
"type": "boolean"
},
"catch_exceptions": {
"title": "Catch Exceptions",
"default": true,
"type": "boolean"
},
"partition_profiling_enabled": {
"title": "Partition Profiling Enabled",
"default": true,
"type": "boolean"
},
"partition_datetime": {
"title": "Partition Datetime",
"description": "For partitioned datasets profile only the partition which matches the datetime or profile the latest one if not set. Only Bigquery supports this.",
"type": "string",
"format": "date-time"
}
},
"additionalProperties": false
},
"LineageMode": {
"title": "LineageMode",
"description": "An enumeration.",
"enum": [
"sql_based",
"stl_scan_based",
"mixed"
]
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.sql.redshift.RedshiftSource
- Browse on GitHub
Module redshift-usage
Important Capabilities
Capability | Status | Notes |
---|---|---|
Platform Instance | ✅ | Enabled by default |
This plugin extracts usage statistics for datasets in Amazon Redshift.
Note: Usage information is computed by querying the following system tables -
- stl_scan
- svv_table_info
- stl_query
- svl_user_info
To grant access this plugin for all system tables, please alter your datahub Redshift user the following way:
ALTER USER datahub_user WITH SYSLOG ACCESS UNRESTRICTED;
This plugin has the below functionalities -
- For a specific dataset this plugin ingests the following statistics -
- top n queries.
- top users.
- Aggregation of these statistics into buckets, by day or hour granularity.
This source only does usage statistics. To get the tables, views, and schemas in your Redshift warehouse, ingest using the redshift
source described above.
Redshift system tables have some latency in getting data from queries. In addition, these tables only maintain logs for 2-5 days. You can find more information from the official documentation here.
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[redshift-usage]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: redshift-usage
config:
# Coordinates
host_port: db_host:port
database: dev
email_domain: acryl.io
# Credentials
username: username
password: "password"
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
View All Configuration Options
Field [Required] | Type | Description | Default | Notes |
---|---|---|---|---|
bucket_duration [✅] | Enum | Size of the time window to aggregate usage stats. | DAY | |
capture_lineage_query_parser_failures [✅] | boolean | Whether to capture lineage query parser errors with dataset properties for debuggings | None | |
database [✅] | string | database (catalog) | None | |
database_alias [✅] | string | Alias to apply to database when ingesting. | None | |
default_schema [✅] | string | The default schema to use if the sql parser fails to parse the schema with sql_based lineage collector | public | |
email_domain [✅] | string | Email domain of your organisation so users can be displayed on UI appropriately. | None | |
end_time [✅] | string(date-time) | Latest date of usage to consider. Default: Current time in UTC | None | |
format_sql_queries [✅] | boolean | Whether to format sql queries | None | |
host_port [✅] | string | host URL | None | |
include_copy_lineage [✅] | boolean | Whether lineage should be collected from copy commands | True | |
include_operational_stats [✅] | boolean | Whether to display operational stats. | True | |
include_read_operational_stats [✅] | boolean | Whether to report read operational stats. Experimental. | None | |
include_table_lineage [✅] | boolean | Whether table lineage should be ingested. | True | |
include_table_location_lineage [✅] | boolean | If the source supports it, include table lineage to the underlying storage location. | True | |
include_tables [✅] | boolean | Whether tables should be ingested. | True | |
include_top_n_queries [✅] | boolean | Whether to ingest the top_n_queries. | True | |
include_unload_lineage [✅] | boolean | Whether lineage should be collected from unload commands | True | |
include_view_lineage [✅] | boolean | Include table lineage for views | None | |
include_views [✅] | boolean | Whether views should be ingested. | True | |
options [✅] | object | Any options specified here will be passed to SQLAlchemy's create_engine as kwargs.See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. | None | |
password [✅] | string(password) | password | None | |
platform_instance [✅] | string | The instance of the platform that all assets produced by this recipe belong to | None | |
platform_instance_map [✅] | map(str,string) | None | ||
sqlalchemy_uri [✅] | string | URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters. | None | |
start_time [✅] | string(date-time) | Earliest date of usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration ) | None | |
table_lineage_mode [✅] | Enum | Which table lineage collector mode to use. Available modes are: [stl_scan_based, sql_based, mixed] | stl_scan_based | |
top_n_queries [✅] | integer | Number of top queries to save to each table. | 10 | |
username [✅] | string | username | None | |
env [✅] | string | The environment that all assets produced by this connector belong to | PROD | |
domain [✅] | map(str,AllowDenyPattern) | A class to store allow deny regexes | None | |
domain.key .allow [❓ (required if domain is set)] | array(string) | None | ||
domain.key .deny [❓ (required if domain is set)] | array(string) | None | ||
domain.key .ignoreCase [❓ (required if domain is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
profile_pattern [✅] | AllowDenyPattern | Regex patterns to filter tables (or specific columns) for profiling during ingestion. Note that only tables allowed by the table_pattern will be considered. | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | |
profile_pattern.allow [❓ (required if profile_pattern is set)] | array(string) | None | ||
profile_pattern.deny [❓ (required if profile_pattern is set)] | array(string) | None | ||
profile_pattern.ignoreCase [❓ (required if profile_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
s3_lineage_config [✅] | S3LineageProviderConfig | Common config for S3 lineage generation | None | |
s3_lineage_config.path_specs [❓ (required if s3_lineage_config is set)] | array(object) | None | ||
s3_lineage_config.path_specs.default_extension [❓ (required if path_specs is set)] | string | For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped. | None | |
s3_lineage_config.path_specs.enable_compression [❓ (required if path_specs is set)] | boolean | Enable or disable processing compressed files. Currently .gz and .bz files are supported. | True | |
s3_lineage_config.path_specs.exclude [❓ (required if path_specs is set)] | array(string) | None | ||
s3_lineage_config.path_specs.file_types [❓ (required if path_specs is set)] | array(string) | None | ||
s3_lineage_config.path_specs.include [❓ (required if path_specs is set)] | string | Path to table (s3 or local file system). Name variable {table} is used to mark the folder with dataset. In absence of {table}, file level dataset will be created. Check below examples for more details. | None | |
s3_lineage_config.path_specs.sample_files [❓ (required if path_specs is set)] | boolean | Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled | True | |
s3_lineage_config.path_specs.table_name [❓ (required if path_specs is set)] | string | Display name of the dataset.Combination of named variables from include path and strings | None | |
schema_pattern [✅] | AllowDenyPattern | {'allow': ['.*'], 'deny': ['information_schema'], 'ignoreCase': True} | ||
schema_pattern.allow [❓ (required if schema_pattern is set)] | array(string) | None | ||
schema_pattern.deny [❓ (required if schema_pattern is set)] | array(string) | None | ||
schema_pattern.ignoreCase [❓ (required if schema_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
table_pattern [✅] | AllowDenyPattern | Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*' | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | |
table_pattern.allow [❓ (required if table_pattern is set)] | array(string) | None | ||
table_pattern.deny [❓ (required if table_pattern is set)] | array(string) | None | ||
table_pattern.ignoreCase [❓ (required if table_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
user_email_pattern [✅] | AllowDenyPattern | regex patterns for user emails to filter in usage. | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | |
user_email_pattern.allow [❓ (required if user_email_pattern is set)] | array(string) | None | ||
user_email_pattern.deny [❓ (required if user_email_pattern is set)] | array(string) | None | ||
user_email_pattern.ignoreCase [❓ (required if user_email_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
view_pattern [✅] | AllowDenyPattern | Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*' | {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | |
view_pattern.allow [❓ (required if view_pattern is set)] | array(string) | None | ||
view_pattern.deny [❓ (required if view_pattern is set)] | array(string) | None | ||
view_pattern.ignoreCase [❓ (required if view_pattern is set)] | boolean | Whether to ignore case sensitivity during pattern matching. | True | |
profiling [✅] | GEProfilingConfig | {'enabled': False, 'limit': None, 'offset': None, 'report_dropped_profiles': False, 'turn_off_expensive_profiling_metrics': False, 'profile_table_level_only': False, 'include_field_null_count': True, 'include_field_distinct_count': True, 'include_field_min_value': True, 'include_field_max_value': True, 'include_field_mean_value': True, 'include_field_median_value': True, 'include_field_stddev_value': True, 'include_field_quantiles': False, 'include_field_distinct_value_frequencies': False, 'include_field_histogram': False, 'include_field_sample_values': True, 'field_sample_values_limit': 20, 'max_number_of_fields_to_profile': None, 'profile_if_updated_since_days': None, 'profile_table_size_limit': 5, 'profile_table_row_limit': 5000000, 'profile_table_row_count_estimate_only': False, 'max_workers': 20, 'query_combiner_enabled': True, 'catch_exceptions': True, 'partition_profiling_enabled': True, 'partition_datetime': None} | ||
profiling.catch_exceptions [❓ (required if profiling is set)] | boolean | True | ||
profiling.enabled [❓ (required if profiling is set)] | boolean | Whether profiling should be done. | None | |
profiling.field_sample_values_limit [❓ (required if profiling is set)] | integer | Upper limit for number of sample values to collect for all columns. | 20 | |
profiling.include_field_distinct_count [❓ (required if profiling is set)] | boolean | Whether to profile for the number of distinct values for each column. | True | |
profiling.include_field_distinct_value_frequencies [❓ (required if profiling is set)] | boolean | Whether to profile for distinct value frequencies. | None | |
profiling.include_field_histogram [❓ (required if profiling is set)] | boolean | Whether to profile for the histogram for numeric fields. | None | |
profiling.include_field_max_value [❓ (required if profiling is set)] | boolean | Whether to profile for the max value of numeric columns. | True | |
profiling.include_field_mean_value [❓ (required if profiling is set)] | boolean | Whether to profile for the mean value of numeric columns. | True | |
profiling.include_field_median_value [❓ (required if profiling is set)] | boolean | Whether to profile for the median value of numeric columns. | True | |
profiling.include_field_min_value [❓ (required if profiling is set)] | boolean | Whether to profile for the min value of numeric columns. | True | |
profiling.include_field_null_count [❓ (required if profiling is set)] | boolean | Whether to profile for the number of nulls for each column. | True | |
profiling.include_field_quantiles [❓ (required if profiling is set)] | boolean | Whether to profile for the quantiles of numeric columns. | None | |
profiling.include_field_sample_values [❓ (required if profiling is set)] | boolean | Whether to profile for the sample values for all columns. | True | |
profiling.include_field_stddev_value [❓ (required if profiling is set)] | boolean | Whether to profile for the standard deviation of numeric columns. | True | |
profiling.limit [❓ (required if profiling is set)] | integer | Max number of documents to profile. By default, profiles all documents. | None | |
profiling.max_number_of_fields_to_profile [❓ (required if profiling is set)] | integer | A positive integer that specifies the maximum number of columns to profile for any table. None implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up. | None | |
profiling.max_workers [❓ (required if profiling is set)] | integer | Number of worker threads to use for profiling. Set to 1 to disable. | 20 | |
profiling.offset [❓ (required if profiling is set)] | integer | Offset in documents to profile. By default, uses no offset. | None | |
profiling.partition_datetime [❓ (required if profiling is set)] | string(date-time) | For partitioned datasets profile only the partition which matches the datetime or profile the latest one if not set. Only Bigquery supports this. | None | |
profiling.partition_profiling_enabled [❓ (required if profiling is set)] | boolean | True | ||
profiling.profile_if_updated_since_days [❓ (required if profiling is set)] | number | Profile table only if it has been updated since these many number of days. If set to null , no constraint of last modified time for tables to profile. Supported only in snowflake and BigQuery . | None | |
profiling.profile_table_level_only [❓ (required if profiling is set)] | boolean | Whether to perform profiling at table-level only, or include column-level profiling as well. | None | |
profiling.profile_table_row_count_estimate_only [❓ (required if profiling is set)] | boolean | Use an approximate query for row count. This will be much faster but slightly less accurate. Only supported for Postgres. | None | |
profiling.profile_table_row_limit [❓ (required if profiling is set)] | integer | Profile tables only if their row count is less then specified count. If set to null , no limit on the row count of tables to profile. Supported only in snowflake and BigQuery | 5000000 | |
profiling.profile_table_size_limit [❓ (required if profiling is set)] | integer | Profile tables only if their size is less then specified GBs. If set to null , no limit on the size of tables to profile. Supported only in snowflake and BigQuery | 5 | |
profiling.query_combiner_enabled [❓ (required if profiling is set)] | boolean | This feature is still experimental and can be disabled if it causes issues. Reduces the total number of queries issued and speeds up profiling by dynamically combining SQL queries where possible. | True | |
profiling.report_dropped_profiles [❓ (required if profiling is set)] | boolean | Whether to report datasets or dataset columns which were not profiled. Set to True for debugging purposes. | None | |
profiling.turn_off_expensive_profiling_metrics [❓ (required if profiling is set)] | boolean | Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10. | None | |
stateful_ingestion [✅] | StatefulStaleMetadataRemovalConfig | Base specialized config for Stateful Ingestion with stale metadata removal capability. | None | |
stateful_ingestion.enabled [❓ (required if stateful_ingestion is set)] | boolean | The type of the ingestion state provider registered with datahub. | None | |
stateful_ingestion.ignore_new_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the current checkpoint state. | None | |
stateful_ingestion.ignore_old_state [❓ (required if stateful_ingestion is set)] | boolean | If set to True, ignores the previous checkpoint state. | None | |
stateful_ingestion.remove_stale_metadata [❓ (required if stateful_ingestion is set)] | boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. | True |
The JSONSchema for this configuration is inlined below.
{
"title": "RedshiftUsageConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"bucket_duration": {
"description": "Size of the time window to aggregate usage stats.",
"default": "DAY",
"allOf": [
{
"$ref": "#/definitions/BucketDuration"
}
]
},
"end_time": {
"title": "End Time",
"description": "Latest date of usage to consider. Default: Current time in UTC",
"type": "string",
"format": "date-time"
},
"start_time": {
"title": "Start Time",
"description": "Earliest date of usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`)",
"type": "string",
"format": "date-time"
},
"top_n_queries": {
"title": "Top N Queries",
"description": "Number of top queries to save to each table.",
"default": 10,
"exclusiveMinimum": 0,
"type": "integer"
},
"user_email_pattern": {
"title": "User Email Pattern",
"description": "regex patterns for user emails to filter in usage.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"include_operational_stats": {
"title": "Include Operational Stats",
"description": "Whether to display operational stats.",
"default": true,
"type": "boolean"
},
"include_read_operational_stats": {
"title": "Include Read Operational Stats",
"description": "Whether to report read operational stats. Experimental.",
"default": false,
"type": "boolean"
},
"format_sql_queries": {
"title": "Format Sql Queries",
"description": "Whether to format sql queries",
"default": false,
"type": "boolean"
},
"include_top_n_queries": {
"title": "Include Top N Queries",
"description": "Whether to ingest the top_n_queries.",
"default": true,
"type": "boolean"
},
"s3_lineage_config": {
"title": "S3 Lineage Config",
"description": "Common config for S3 lineage generation",
"allOf": [
{
"$ref": "#/definitions/S3LineageProviderConfig"
}
]
},
"platform_instance_map": {
"title": "Platform Instance Map",
"description": "A holder for platform -> platform_instance mappings to generate correct dataset urns",
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"stateful_ingestion": {
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
},
"options": {
"title": "Options",
"description": "Any options specified here will be passed to SQLAlchemy's create_engine as kwargs.See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details.",
"default": {},
"type": "object"
},
"schema_pattern": {
"title": "Schema Pattern",
"default": {
"allow": [
".*"
],
"deny": [
"information_schema"
],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"view_pattern": {
"title": "View Pattern",
"description": "Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"profile_pattern": {
"title": "Profile Pattern",
"description": "Regex patterns to filter tables (or specific columns) for profiling during ingestion. Note that only tables allowed by the `table_pattern` will be considered.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"domain": {
"title": "Domain",
"description": "Attach domains to databases, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like \"Marketing\".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/AllowDenyPattern"
}
},
"include_views": {
"title": "Include Views",
"description": "Whether views should be ingested.",
"default": true,
"type": "boolean"
},
"include_tables": {
"title": "Include Tables",
"description": "Whether tables should be ingested.",
"default": true,
"type": "boolean"
},
"include_table_location_lineage": {
"title": "Include Table Location Lineage",
"description": "If the source supports it, include table lineage to the underlying storage location.",
"default": true,
"type": "boolean"
},
"profiling": {
"title": "Profiling",
"default": {
"enabled": false,
"limit": null,
"offset": null,
"report_dropped_profiles": false,
"turn_off_expensive_profiling_metrics": false,
"profile_table_level_only": false,
"include_field_null_count": true,
"include_field_distinct_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"include_field_mean_value": true,
"include_field_median_value": true,
"include_field_stddev_value": true,
"include_field_quantiles": false,
"include_field_distinct_value_frequencies": false,
"include_field_histogram": false,
"include_field_sample_values": true,
"field_sample_values_limit": 20,
"max_number_of_fields_to_profile": null,
"profile_if_updated_since_days": null,
"profile_table_size_limit": 5,
"profile_table_row_limit": 5000000,
"profile_table_row_count_estimate_only": false,
"max_workers": 20,
"query_combiner_enabled": true,
"catch_exceptions": true,
"partition_profiling_enabled": true,
"partition_datetime": null
},
"allOf": [
{
"$ref": "#/definitions/GEProfilingConfig"
}
]
},
"username": {
"title": "Username",
"description": "username",
"type": "string"
},
"password": {
"title": "Password",
"description": "password",
"type": "string",
"writeOnly": true,
"format": "password"
},
"host_port": {
"title": "Host Port",
"description": "host URL",
"type": "string"
},
"database": {
"title": "Database",
"description": "database (catalog)",
"type": "string"
},
"database_alias": {
"title": "Database Alias",
"description": "Alias to apply to database when ingesting.",
"type": "string"
},
"sqlalchemy_uri": {
"title": "Sqlalchemy Uri",
"description": "URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters.",
"type": "string"
},
"include_view_lineage": {
"title": "Include View Lineage",
"description": "Include table lineage for views",
"default": false,
"type": "boolean"
},
"default_schema": {
"title": "Default Schema",
"description": "The default schema to use if the sql parser fails to parse the schema with `sql_based` lineage collector",
"default": "public",
"type": "string"
},
"include_table_lineage": {
"title": "Include Table Lineage",
"description": "Whether table lineage should be ingested.",
"default": true,
"type": "boolean"
},
"include_copy_lineage": {
"title": "Include Copy Lineage",
"description": "Whether lineage should be collected from copy commands",
"default": true,
"type": "boolean"
},
"include_unload_lineage": {
"title": "Include Unload Lineage",
"description": "Whether lineage should be collected from unload commands",
"default": true,
"type": "boolean"
},
"capture_lineage_query_parser_failures": {
"title": "Capture Lineage Query Parser Failures",
"description": "Whether to capture lineage query parser errors with dataset properties for debuggings",
"default": false,
"type": "boolean"
},
"table_lineage_mode": {
"description": "Which table lineage collector mode to use. Available modes are: [stl_scan_based, sql_based, mixed]",
"default": "stl_scan_based",
"allOf": [
{
"$ref": "#/definitions/LineageMode"
}
]
},
"email_domain": {
"title": "Email Domain",
"description": "Email domain of your organisation so users can be displayed on UI appropriately.",
"type": "string"
}
},
"required": [
"host_port",
"email_domain"
],
"additionalProperties": false,
"definitions": {
"BucketDuration": {
"title": "BucketDuration",
"description": "An enumeration.",
"enum": [
"DAY",
"HOUR"
],
"type": "string"
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"PathSpec": {
"title": "PathSpec",
"type": "object",
"properties": {
"include": {
"title": "Include",
"description": "Path to table (s3 or local file system). Name variable {table} is used to mark the folder with dataset. In absence of {table}, file level dataset will be created. Check below examples for more details.",
"type": "string"
},
"exclude": {
"title": "Exclude",
"description": "list of paths in glob pattern which will be excluded while scanning for the datasets",
"type": "array",
"items": {
"type": "string"
}
},
"file_types": {
"title": "File Types",
"description": "Files with extenstions specified here (subset of default value) only will be scanned to create dataset. Other files will be omitted.",
"default": [
"csv",
"tsv",
"json",
"parquet",
"avro"
],
"type": "array",
"items": {
"type": "string"
}
},
"default_extension": {
"title": "Default Extension",
"description": "For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped.",
"type": "string"
},
"table_name": {
"title": "Table Name",
"description": "Display name of the dataset.Combination of named variables from include path and strings",
"type": "string"
},
"enable_compression": {
"title": "Enable Compression",
"description": "Enable or disable processing compressed files. Currently .gz and .bz files are supported.",
"default": true,
"type": "boolean"
},
"sample_files": {
"title": "Sample Files",
"description": "Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled",
"default": true,
"type": "boolean"
}
},
"required": [
"include"
],
"additionalProperties": false
},
"S3LineageProviderConfig": {
"title": "S3LineageProviderConfig",
"description": "Any source that produces s3 lineage from/to Datasets should inherit this class.",
"type": "object",
"properties": {
"path_specs": {
"title": "Path Specs",
"description": "List of PathSpec. See below the details about PathSpec",
"type": "array",
"items": {
"$ref": "#/definitions/PathSpec"
}
}
},
"required": [
"path_specs"
],
"additionalProperties": false
},
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "The type of the ingestion state provider registered with datahub.",
"default": false,
"type": "boolean"
},
"ignore_old_state": {
"title": "Ignore Old State",
"description": "If set to True, ignores the previous checkpoint state.",
"default": false,
"type": "boolean"
},
"ignore_new_state": {
"title": "Ignore New State",
"description": "If set to True, ignores the current checkpoint state.",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"GEProfilingConfig": {
"title": "GEProfilingConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"limit": {
"title": "Limit",
"description": "Max number of documents to profile. By default, profiles all documents.",
"type": "integer"
},
"offset": {
"title": "Offset",
"description": "Offset in documents to profile. By default, uses no offset.",
"type": "integer"
},
"report_dropped_profiles": {
"title": "Report Dropped Profiles",
"description": "Whether to report datasets or dataset columns which were not profiled. Set to `True` for debugging purposes.",
"default": false,
"type": "boolean"
},
"turn_off_expensive_profiling_metrics": {
"title": "Turn Off Expensive Profiling Metrics",
"description": "Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.",
"default": false,
"type": "boolean"
},
"profile_table_level_only": {
"title": "Profile Table Level Only",
"description": "Whether to perform profiling at table-level only, or include column-level profiling as well.",
"default": false,
"type": "boolean"
},
"include_field_null_count": {
"title": "Include Field Null Count",
"description": "Whether to profile for the number of nulls for each column.",
"default": true,
"type": "boolean"
},
"include_field_distinct_count": {
"title": "Include Field Distinct Count",
"description": "Whether to profile for the number of distinct values for each column.",
"default": true,
"type": "boolean"
},
"include_field_min_value": {
"title": "Include Field Min Value",
"description": "Whether to profile for the min value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_max_value": {
"title": "Include Field Max Value",
"description": "Whether to profile for the max value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_mean_value": {
"title": "Include Field Mean Value",
"description": "Whether to profile for the mean value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_median_value": {
"title": "Include Field Median Value",
"description": "Whether to profile for the median value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_stddev_value": {
"title": "Include Field Stddev Value",
"description": "Whether to profile for the standard deviation of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_quantiles": {
"title": "Include Field Quantiles",
"description": "Whether to profile for the quantiles of numeric columns.",
"default": false,
"type": "boolean"
},
"include_field_distinct_value_frequencies": {
"title": "Include Field Distinct Value Frequencies",
"description": "Whether to profile for distinct value frequencies.",
"default": false,
"type": "boolean"
},
"include_field_histogram": {
"title": "Include Field Histogram",
"description": "Whether to profile for the histogram for numeric fields.",
"default": false,
"type": "boolean"
},
"include_field_sample_values": {
"title": "Include Field Sample Values",
"description": "Whether to profile for the sample values for all columns.",
"default": true,
"type": "boolean"
},
"field_sample_values_limit": {
"title": "Field Sample Values Limit",
"description": "Upper limit for number of sample values to collect for all columns.",
"default": 20,
"type": "integer"
},
"max_number_of_fields_to_profile": {
"title": "Max Number Of Fields To Profile",
"description": "A positive integer that specifies the maximum number of columns to profile for any table. `None` implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.",
"exclusiveMinimum": 0,
"type": "integer"
},
"profile_if_updated_since_days": {
"title": "Profile If Updated Since Days",
"description": "Profile table only if it has been updated since these many number of days. If set to `null`, no constraint of last modified time for tables to profile. Supported only in `snowflake` and `BigQuery`.",
"exclusiveMinimum": 0,
"type": "number"
},
"profile_table_size_limit": {
"title": "Profile Table Size Limit",
"description": "Profile tables only if their size is less then specified GBs. If set to `null`, no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`",
"default": 5,
"type": "integer"
},
"profile_table_row_limit": {
"title": "Profile Table Row Limit",
"description": "Profile tables only if their row count is less then specified count. If set to `null`, no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`",
"default": 5000000,
"type": "integer"
},
"profile_table_row_count_estimate_only": {
"title": "Profile Table Row Count Estimate Only",
"description": "Use an approximate query for row count. This will be much faster but slightly less accurate. Only supported for Postgres. ",
"default": false,
"type": "boolean"
},
"max_workers": {
"title": "Max Workers",
"description": "Number of worker threads to use for profiling. Set to 1 to disable.",
"default": 20,
"type": "integer"
},
"query_combiner_enabled": {
"title": "Query Combiner Enabled",
"description": "*This feature is still experimental and can be disabled if it causes issues.* Reduces the total number of queries issued and speeds up profiling by dynamically combining SQL queries where possible.",
"default": true,
"type": "boolean"
},
"catch_exceptions": {
"title": "Catch Exceptions",
"default": true,
"type": "boolean"
},
"partition_profiling_enabled": {
"title": "Partition Profiling Enabled",
"default": true,
"type": "boolean"
},
"partition_datetime": {
"title": "Partition Datetime",
"description": "For partitioned datasets profile only the partition which matches the datetime or profile the latest one if not set. Only Bigquery supports this.",
"type": "string",
"format": "date-time"
}
},
"additionalProperties": false
},
"LineageMode": {
"title": "LineageMode",
"description": "An enumeration.",
"enum": [
"sql_based",
"stl_scan_based",
"mixed"
]
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.usage.redshift_usage.RedshiftUsageSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Redshift, feel free to ping us on our Slack