Home OSS About Privacy

Working with Azure Data Factory Types and Data Factory Lineage

Let's say you want to instantiate your own Azure Data Factory pipelines and activities. We can create our own entities with the built-in types! We just need to follow the qualified name pattern and relationship attributes.

ADF Types and Relationships

First, let's understand the built-in types and their relationship attributes.

ADF Qualified Names

Another important consideration is the qualified name pattern at each of these levels.

Creating Entities with ADF Types

In the scenario below, we will create a pipeline, copy activity, and a copy operation for an existing data factory entity.

First, there's some set up. We need to fill out the existing data factory's guid, the resource path (for the qualified name patterns) should be updated, as well as the pipeline, activity, copy operation name, and the output's qualified name and type.

DATA_FACTORY_GUID = "xxxx-yyy-zzz-123-456"
RESOURCE_PATH = ("/subscriptions/XXX-YYY-ZZZ/" +
 "resourceGroups/RG_NAME/" + 
 "providers/Microsoft.DataFactory/" + 
 "factories/ADF_NAME/pipelines"
)
PIPELINE_NAME = "MY_CUSTOM_PIPELINE"
ACTIVITY_NAME = "MY_CUSTOM_ACTIVITY"
COPY_ACTIVITY_NAME = "COPY_OP"
OP_SINK_QUALIFIED_NAME = "somedatasource"
OP_OUTPUT_TYPE = "some_type"

gt = GuidTracker()

# Create the Pipeline
adf_pipeline = AtlasEntity(
    PIPELINE_NAME,
    "adf_pipeline",
    f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}",
    str(gt.get_guid())
)
# Need to associate the pipeline with the existing data factory
adf_pipeline.addRelationship(dataFactory = {"guid":DATA_FACTORY_GUID})

# Create the Activity with a relationship to the pipeline
adf_activity = AtlasProcess(
    ACTIVITY_NAME,
    "adf_copy_activity",
    f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}/activities/{ACTIVITY_NAME.lower()}",
    guid = str(gt.get_guid()),
    inputs=[],
    outputs=[]
)
adf_activity.addRelationship(parent=adf_pipeline)

# Create the copy operation with a relationship to the activity
adf_copy_op = AtlasProcess(
    COPY_ACTIVITY_NAME,
    "adf_copy_operation",
    (
    f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}/" + f"activities/{ACTIVITY_NAME.lower()}" + 
    f"#{OP_SINK_QUALIFIED_NAME}#{OP_OUTPUT_TYPE}"
    ),
    guid = str(gt.get_guid()),
    # I'm hard coding in this case but you could do other references to entities you're creating
    # or using the qualified name and type
    inputs=[{"guid":"123-abc-def"}],
    outputs=[{"guid":"456-ghi-jkl"}]
)
adf_copy_op.addRelationship(parent=adf_activity)

# Perform the upload
results = client.upload_entities([adf_pipeline, adf_activity, adf_copy_op])
print(json.dumps(results, indent=2))

Recap

Azure Data Factory lineage in Purview is subject to change and improvement over time.

Be sure to test your implementation in an environment where you have permission to delete entities and type definitions in case things go awry.