Let's say you want to instantiate your own Azure Data Factory pipelines and activities. We can create our own entities with the built-in types! We just need to follow the qualified name pattern and relationship attributes.
First, let's understand the built-in types and their relationship attributes.
azure_data_factory
represents a Data Factory that you've scanned.pipelines
relationship attribute is an array of adf_pipeline
.adf_pipeline
represents an Azure Data Factory pipeline inside an Azure Data Factory.dataFactory
relationship attribute points to one azure_data_factory
.subProcesses
relationship attribute is an array of adf_copy_activity
(or other ADF supported types).adf_copy_activity
represents a Copy Activity within a pipeline.parent
relationship attribute points to one adf_pipeline
.subProcesses
relationship attribute is an array of adf_copy_operation
(or other ADF supported types).runInstances
relationship attribute is an array of adf_copy_activity_run
(or other ADF supported types).adf_copy_operation
represents the actual process that will contain inputs and outputsparent
relationship attribute points to one adf_copy_activity
.Another important consideration is the qualified name pattern at each of these levels.
azure_data_factory
pattern: /subscriptions/{SUBSCRIPTION_ID}/resourceGroups/{RG_NAME}/providers/Microsoft.DataFactory/factories/{DATA_FACTORY_NAME}
adf_pipeline
pattern: {azure_data_factory_pattern}/{PIPELINE_NAME}
adf_copy_activity
pattern: {azure_data_factory_pattern}/{PIPELINE_NAME}/activities/{ACTIVITY_NAME}
adf_copy_operation
pattern: {adf_copy_activity_pattern}#{output_qualified_name}#{output_type_name}
In the scenario below, we will create a pipeline, copy activity, and a copy operation for an existing data factory entity.
First, there's some set up. We need to fill out the existing data factory's guid, the resource path (for the qualified name patterns) should be updated, as well as the pipeline, activity, copy operation name, and the output's qualified name and type.
DATA_FACTORY_GUID = "xxxx-yyy-zzz-123-456"
RESOURCE_PATH = ("/subscriptions/XXX-YYY-ZZZ/" +
"resourceGroups/RG_NAME/" +
"providers/Microsoft.DataFactory/" +
"factories/ADF_NAME/pipelines"
)
PIPELINE_NAME = "MY_CUSTOM_PIPELINE"
ACTIVITY_NAME = "MY_CUSTOM_ACTIVITY"
COPY_ACTIVITY_NAME = "COPY_OP"
OP_SINK_QUALIFIED_NAME = "somedatasource"
OP_OUTPUT_TYPE = "some_type"
gt = GuidTracker()
# Create the Pipeline
adf_pipeline = AtlasEntity(
PIPELINE_NAME,
"adf_pipeline",
f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}",
str(gt.get_guid())
)
# Need to associate the pipeline with the existing data factory
adf_pipeline.addRelationship(dataFactory = {"guid":DATA_FACTORY_GUID})
# Create the Activity with a relationship to the pipeline
adf_activity = AtlasProcess(
ACTIVITY_NAME,
"adf_copy_activity",
f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}/activities/{ACTIVITY_NAME.lower()}",
guid = str(gt.get_guid()),
inputs=[],
outputs=[]
)
adf_activity.addRelationship(parent=adf_pipeline)
# Create the copy operation with a relationship to the activity
adf_copy_op = AtlasProcess(
COPY_ACTIVITY_NAME,
"adf_copy_operation",
(
f"{RESOURCE_PATH}/{PIPELINE_NAME.lower()}/" + f"activities/{ACTIVITY_NAME.lower()}" +
f"#{OP_SINK_QUALIFIED_NAME}#{OP_OUTPUT_TYPE}"
),
guid = str(gt.get_guid()),
# I'm hard coding in this case but you could do other references to entities you're creating
# or using the qualified name and type
inputs=[{"guid":"123-abc-def"}],
outputs=[{"guid":"456-ghi-jkl"}]
)
adf_copy_op.addRelationship(parent=adf_activity)
# Perform the upload
results = client.upload_entities([adf_pipeline, adf_activity, adf_copy_op])
print(json.dumps(results, indent=2))
Azure Data Factory lineage in Purview is subject to change and improvement over time.
Be sure to test your implementation in an environment where you have permission to delete entities and type definitions in case things go awry.