Skip to main content
Version: Next

Dagster Integration

This connector supports extracting:

  • Dagster Pipeline and Task Metadata
  • Pipeline Run Status
  • Table Lineage

from Dagster.

Using DataHub's Dagster Sensor

Dagster Sensors allow us to perform actions when important events occur in Dagster. DataHub's Dagster Sensor allows you to emit metadata after every Dagster pipeline run. This sensor can emit Pipelines, Tasks, and run results. For more details about Dagster Sensors, please refer to the documentation.

Prerequisites

  1. Create a Dagster project. See Create New Project.
  2. Create a Definitions class or Repositories.
  3. The creation of a new Dagster project via the UI uses the Definition class to define Dagster pipelines.

Setup

  1. Install the DataHub Dagster plugin package:

    pip install acryl_datahub_dagster_plugin
  2. Import the DataHub Dagster Sensor, which is provided in the plugin package, to your Dagster Definition or Repository before starting the Dagster UI:

    Example Definitions Class:

    {{ inline /metadata-ingestion-modules/dagster-plugin/examples/basic_setup.py }}
  3. The DataHub Dagster plugin-provided sensor internally uses the following configurations. You can override the default config values using environment variables.

    Configuration options:

    Configuration OptionDefault ValueDescription
    datahub_client_configThe DataHub client config
    dagster_urlThe URL to your Dagster Webserver.
    capture_asset_materializationTrueWhether to capture asset keys as DataHub Datasets on AssetMaterialization event
    capture_input_outputTrueWhether to capture and try to parse input and output from HANDLED_OUTPUT, LOADED_INPUT events. (currently only PathMetadataValue metadata supported (EXPERIMENTAL)
    platform_instanceThe instance of the platform that all assets produced by this recipe belong to. It is optional
    asset_lineage_extractorYou can implement your own logic to capture asset lineage information. See example for details[]
    enable_asset_query_metadata_parsingWhether to enable parsing query from asset metadata. See below for details[]
  4. Once the Dagster UI is running, turn on the provided Sensor execution. To turn on the Sensor, click on the Overview tab and then on the Sensors tab. Simply toggle the DataHub sensor on.

Woohoo! Now, the DataHub Sensor is ready to emit metadata after every pipeline run.

How to Validate Installation

  1. Navigate to the Dagster UI.

  2. Go to Overview > Sensors and look for datahub_sensor.

  3. Start a Dagster Job. In the daemon logs, you should see DataHub-related log messages:

    datahub_sensor - Emitting metadata...

    This means that DataHub's sensor is correctly configured to emit metadata to DataHub.

Capturing Table Lineage

There are a few ways to extract lineage, or relationships between tables, from Dagster. We recommend one or more of the following approaches to extract lineage automatically.

But First: Extracting Asset Identifiers

When naming Dagster Assets, we recommend the following structure:

key_prefix=["env", "platform", "db_name", "schema_name"]

This ensures that we correctly resolve the Asset name to a Dataset URN in DataHub.

For example:

@asset(
key_prefix=["prod", "snowflake", "db_name", "schema_name"], # the fqdn asset name to be able to identify platform and make sure the asset is unique
deps=[iris_dataset],
)

If you properly name your Dagster Asset, you can establish a connection between the Asset and the dataset it is referring to, which is likely already stored in DataHub. This allows for accurate tracking and lineage information in the next steps.

If you follow a different naming convention, you can create your own asset_keys_to_dataset_urn_converter logic and set a custom callback function. This can be used to generate a DataHub Dataset URN in any way you please, from metadata or otherwise.

Here is an example that can create a DataHub URN from the Asset key naming convention specified above:

def asset_keys_to_dataset_urn_converter(
self, asset_key: Sequence[str]
) -> Optional[DatasetUrn]:
"""
Convert asset key to dataset urn

By default, we assume the following asset key structure:
key_prefix=["prod", "snowflake", "db_name", "schema_name"]
"""
if len(asset_key) >= 3:
return DatasetUrn(
platform=asset_key[1],
env=asset_key[0],
name=".".join(asset_key[2:]),
)
else:
return None

Using SQL Query Parsing to Extract Lineage

DataHub's Dagster integration can automatically capture dataset inputs and outputs for Software Defined assets from the SQL queries it runs by parsing the SQL. Simply add the executed query to the Asset Metadata with the Query tag.

Here is an example of a Software Defined Asset annotated with the Query:

@asset(key_prefix=["prod", "snowflake", "db_name", "schema_name"])
def my_asset_table_a(snowflake: SnowflakeResource) -> MaterializeResult:
query = """
create or replace table db_name.schema_name.my_asset_table_a as (
SELECT *
FROM db_name.schema_name.my_asset_table_b
);
"""
with snowflake.get_connection() as connection:
with connection.cursor() as cursor:
cursor.execute(query)
return MaterializeResult( # Adding query to metadata to use it getting lineage from it with SQL parser
metadata={
"Query": MetadataValue.text(query),
}
)

For the above example, the plugin will automatically extract and set the upstream lineage as db_name.schema_name.my_asset_table_b.

Please note that it is important to name the asset properly as query parser tries to get the query language from the asset generated urn. In the above example it will be `snowflake`

[See a full example job here](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/dagster-plugin/examples/iris.py).
  1. SnowflakePandasIOManager: The plugin can automatically capture Snowflake assets created by the SnowflakePandasIOManager and it also adds DataHub urn and DataHub link to the assets in Dagster.

    It can easily set up by using DataHubSnowflakePandasIOManager instead of SnowflakePandasIOManager. `

    DataHubSnowflakePandasIOManager the following additional parameters: datahub_base_url: base url to the datahub UI whic is used to generate direct url to the Snowflake Dataset in DataHub. If this is not set it won't generate url. datahub_env: the datahub_env to use when generating DataHub urns. It defaults to PROD if not set.

from datahub_dagster_plugin.modules.snowflake_pandas.datahub_snowflake_pandas_io_manager import (
DataHubSnowflakePandasIOManager,
)

...

resources={
"snowflake_io_manager": DataHubSnowflakePandasIOManager(
database="MY_DB",
account="my_snowflake_account,
warehouse="MY_WAREHOUSE",
user="my_user",
password="my_password",
role="my_rolw",
datahub_base_url="http://localhost:9002",
),
}

Using Dagster Ins and Out

We can provide inputs and outputs to both Assets and Ops explicitly using a dictionary of Ins and Out corresponding to the decorated function arguments. While providing inputs and outputs explicitly, we can provide metadata as well.

To create dataset upstream and downstream dependency for the Assets and Ops, you can use an ins and out dictionary with metadata provided. For reference, look at the sample jobs created using assets assets_job.py, or ops ops_job.py.

Using Custom Logic for Extracting Lineage

You can define your own logic to capture asset lineage information.

The output Tuple contains two dictionaries, one for input assets and the other for output assets. The key of the dictionary is the op key and the value is the set of asset URNs that are upstream or downstream of the op.

from datahub_dagster_plugin.client.dagster_generator import DagsterGenerator, DatasetLineage

def asset_lineage_extractor(
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
graph: DataHubGraph,
) -> Dict[str, DatasetLineage]:
dataset_lineage: Dict[str, DatasetLineage] = {}

# Extracting input and output assets from the context
return dataset_lineage

See an example job here.

Debugging

Connection Error for DataHub Rest URL

If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your DataHub GMS service is not up.