
DataHub Actions: Announcing Support for DataHub Cloud
We’re excited to announce a major update to the DataHub Actions Framework available in v1.9: support for connecting to DataHub Cloud instances!
This makes it easier than ever to integrate DataHub with your broader event-driven architectures and create customized workflows based on real-time metadata changes.
In this post, we’ll:
- Provide an overview of the Actions Framework and its capabilities.
- Walk you through how to define and run a custom Action using DataHub Cloud.
- Highlight resources for exploring advanced use cases and building custom actions.
Let’s dive in!
What is DataHub Actions?
The DataHub Actions Framework is a powerful tool for integrating DataHub into event-driven architectures. It allows you to:
- Respond to real-time metadata changes in your DataHub Metadata Graph.
- Trigger workflows to enhance metadata quality, governance, and collaboration.
- Seamlessly integrate with external systems, like Slack or custom-built applications.
Actions make it easier to respond dynamically to metadata events, replacing legacy integrations (e.g., the AWS EventBridge integration) with a more robust, flexible, and scalable framework.
For a deeper dive, check out the original announcement or get started with the Actions Concepts Guide.
Introducing DataHub Cloud Support
Previously, the Actions Framework required a self-managed DataHub instance. With this release, you can now connect DataHub Actions directly to DataHub Cloud, empowering you to build deeply integrated workflows that connect DataHub to the rest of your data ecosystem seamlessly.
Let’s look at an example to see how easy it is to get building custom Actions with DataHub Cloud.
Quickstart: Building a Custom Action
Scenario
Suppose you want to monitor application of a particular Glossary Term to a dataset column — for example, one representing PII data — so you can automatically apply a masking policy to the column in Snowflake when the term is added. Good news — We can use DataHub Actions to achieve this!
Here’s how you can set things up:
Step 1: Install the Required Packages
First, ensure you have the necessary CLI tools installed in your virtual environment:
python3 -m pip install --upgrade pip wheel setuptools
python3 -m pip install --upgrade acryl-datahub acryl-datahub-actions
Verify the installation:
datahub --version
datahub actions version
Step 2: Implement Your Action
To create an action, you simply need to implement the “Action” base class provided by the acryl-datahub-actions package.
# pii_column_masking_action.py
import re
from datahub_actions.action.action import Action
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.pipeline.pipeline_context import PipelineContext
class PiiColumnMaskingAction(Action):
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
# Initialize the action with provided configurations
print("Configuration loaded:", config_dict)
return cls(ctx)
def __init__(self, ctx: PipelineContext):
self.ctx = ctx
def act(self, event: EventEnvelope) -> None:
# Check if the event is of type EntityChangeEvent_v1
if event.event_type == "EntityChangeEvent_v1":
# Extract relevant fields directly from the deserialized event
entity_type = event.event.entityType
entity_urn = event.event.entityUrn
category = event.event.category
operation = event.event.operation
modifier = event.event.modifier
# Check for PII glossary term additions
if category == "GLOSSARY_TERM" and operation == "ADD" and "pii" in modifier:
print(f"Detected PII Term Addition: {modifier} to schema field (column) {entity_urn}")
# Parse the table name and column name from the schemaField URN
table_name, column_name = self._parse_schema_field_urn(entity_urn)
print(f"Table: {table_name}, Column: {column_name}")
# Mask the column
self._mask_snowflake_column(table_name, column_name)
def _parse_schema_field_urn(self, urn: str) -> (str, str):
# Regex to extract table and column from schemaField URN
pattern = r"urn:li:schemaField:\(urn:li:dataset:\(urn:li:dataPlatform:[^,]+,([^,]+),[^\)]+\),([^\)]+)\)"
match = re.match(pattern, urn)
if match:
table_name = match.group(1)
column_name = match.group(2)
return table_name, column_name
else:
raise ValueError(f"Unable to parse schemaField URN: {urn}")
def _mask_snowflake_column(self, table: str, column: str) -> None:
# Here, you'd connect to your Snowflake and apply a column masking policy.
# Skipping for brevity.
pass
def close(self) -> None:
# Clean up resources if necessary
print("Closing PiiColumnMaskingAction")
Where our code is designed to process incoming Entity Change Events with the following structure:
{
"event_type": "EntityChangeEvent_v1",
"event": {
"entityType": "schemaField",
"entityUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,datahub_community.datahub_slack.message_file,PROD),id)",
"category": "GLOSSARY_TERM",
"operation": "ADD",
"modifier": "urn:li:glossaryTerm:pii",
"auditStamp": {
"time": 1737593949107,
"actor": "urn:li:corpuser:john.joyce@acryl.io"
},
"parameters": {
"fieldPath": "id",
"termUrn": "urn:li:glossaryTerm:pii",
"parentUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,datahub_community.datahub_slack.message_file,PROD),id)"
}
}
}
Step 3: Define Your Action Configuration
Create a YAML configuration file (e.g., pii_monitor_action.yml
) to define your Action:
# pii_monitor_action.yaml
name: "pii_monitor_action"
datahub:
server: "https://<your-organization-name>.acryl.io"
token: "<your-datahub-access-token>"
source:
type: "datahub-cloud"
# Filter for Glossary Term Changes to Schema Fields
filter:
event_type: "EntityChangeEvent_v1"
event:
entityType: "schemaField"
category: "GLOSSARY_TERM"
operation: "ADD"
modifier: "urn:li:glossaryTerm:PII"
# Finally, invoke your custom code
action:
# Path to your custom action py file
type: "pii_column_masking_action:PiiColumnMaskingAction"
For information about all the available configuration options, check out the DataHub Cloud Event Source documentation.
Step 3: Run the Action
Run the Action configuration file using the following command:
datahub actions -c pii_monitor_action.yml
You should see output confirming that the pipeline is running:
Action Pipeline with name 'pii_monitor_action' is now running.
Whenever a PII Glossary Term is added, the action will log the event details and hopefully apply your masking policy! ⚡
Next Steps
With DataHub Actions and DataHub Cloud support, the possibilities for integrating metadata into your organization’s data management workflows are endless.
Here are a few next steps to consider:
- Learn more about the DataHub Actions Framework
- Check out all of the supported event types and their schemas.
- We’re always looking to improve! Share your experiences and feedback on the DataHub Slack.