Skip to content

Commit

Permalink
feat(ingest/stateful): omit irrelevant urns for deletion (#11558)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Oct 9, 2024
1 parent 576ae8a commit 26bbe02
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
"query",
}


Expand Down Expand Up @@ -75,7 +76,10 @@ def auto_stale_entity_removal(

if wu.is_primary_source:
entity_type = guess_entity_type(urn)
if entity_type is not None:
if (
entity_type is not None
and entity_type not in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES
):
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)
else:
stale_entity_removal_handler.add_urn_to_skip(urn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\"]}"
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\", \"urn:li:query:query1\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"pipelineName": "dummy_stateful",
"platformInstanceId": "",
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26\"]}"
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:query1",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
Expand All @@ -84,5 +101,22 @@
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:query1",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,46 @@
}
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": true
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:query2",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26",
Expand All @@ -66,23 +100,25 @@
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
"entityType": "query",
"entityUrn": "urn:li:query:query2",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": true
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
DataProcessInstanceProperties,
)
from datahub.metadata.schema_classes import AuditStampClass, StatusClass
from datahub.metadata.schema_classes import (
AuditStampClass,
DataPlatformInstanceClass,
StatusClass,
)
from datahub.metadata.urns import DataPlatformUrn, QueryUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
Expand Down Expand Up @@ -71,6 +76,9 @@ class DummySourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
default=None,
description="Data process instance id to ingest.",
)
query_id_to_ingest: Optional[str] = Field(
default=None, description="Query id to ingest"
)


class DummySource(StatefulIngestionSourceBase):
Expand Down Expand Up @@ -136,6 +144,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
),
).as_workunit()

if self.source_config.query_id_to_ingest:
yield MetadataChangeProposalWrapper(
entityUrn=QueryUrn(self.source_config.query_id_to_ingest).urn(),
aspect=DataPlatformInstanceClass(
platform=DataPlatformUrn("bigquery").urn()
),
).as_workunit()

if self.source_config.report_failure:
self.reporter.report_failure("Dummy error", "Error")

Expand Down Expand Up @@ -188,6 +204,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
},
},
"dpi_id_to_ingest": "job1",
"query_id_to_ingest": "query1",
},
},
"sink": {
Expand All @@ -198,7 +215,11 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):

with mock.patch(
"datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj"
) as mock_state:
) as mock_state, mock.patch(
"datahub.ingestion.source.state.stale_entity_removal_handler.STATEFUL_INGESTION_IGNORED_ENTITY_TYPES",
{},
# Second mock is to imitate earlier behavior where entity type check was not present when adding entity to state
):
mock_state.return_value = GenericCheckpointState(serde="utf-8")
pipeline_run1 = None
pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore
Expand Down Expand Up @@ -237,6 +258,8 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
"allow": ["dummy_dataset1", "dummy_dataset2"],
}
pipeline_run2_config["source"]["config"]["dpi_id_to_ingest"] = "job2"
pipeline_run2_config["source"]["config"]["query_id_to_ingest"] = "query2"

pipeline_run2_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_name_after_deleted}"
Expand Down Expand Up @@ -288,6 +311,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
# assert report last ingestion state non_deletable entity urns
non_deletable_urns: List[str] = [
"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
"urn:li:query:query1",
]
assert sorted(non_deletable_urns) == sorted(
report.last_state_non_deletable_entities
Expand Down

0 comments on commit 26bbe02

Please sign in to comment.