From 26bbe02e4408c0727d7b6c89bdc2cac6af5f39f2 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Wed, 9 Oct 2024 21:16:58 +0530 Subject: [PATCH] feat(ingest/stateful): omit irrelevant urns for deletion (#11558) --- .../state/stale_entity_removal_handler.py | 6 ++- .../state/golden_test_checkpoint_state.json | 2 +- ...n_test_checkpoint_state_after_deleted.json | 6 +-- .../state/golden_test_stateful_ingestion.json | 34 ++++++++++++++ ...test_stateful_ingestion_after_deleted.json | 46 +++++++++++++++++-- .../state/test_stateful_ingestion.py | 28 ++++++++++- 6 files changed, 110 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py index c73472f1b8041..9d77e13a0f3c2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py @@ -29,6 +29,7 @@ STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = { "dataProcessInstance", + "query", } @@ -75,7 +76,10 @@ def auto_stale_entity_removal( if wu.is_primary_source: entity_type = guess_entity_type(urn) - if entity_type is not None: + if ( + entity_type is not None + and entity_type not in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES + ): stale_entity_removal_handler.add_entity_to_state(entity_type, urn) else: stale_entity_removal_handler.add_urn_to_skip(urn) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json index ce03804279097..22f63da8ecb95 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state.json @@ -17,7 +17,7 @@ "state": { "formatVersion": "1.0", "serde": "utf-8", - "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\"]}" + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\", \"urn:li:query:query1\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json index 6a00e67a2ca21..a155c4cf1dbbb 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_checkpoint_state_after_deleted.json @@ -8,8 +8,8 @@ "json": { "timestampMillis": 1586847600000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "pipelineName": "dummy_stateful", "platformInstanceId": "", @@ -17,7 +17,7 @@ "state": { "formatVersion": "1.0", "serde": "utf-8", - "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26\"]}" + "payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}" }, "runId": "dummy-test-stateful-ingestion" } diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json index adf11a2833914..5cb8576594db3 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion.json @@ -69,6 +69,23 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:query1", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", @@ -84,5 +101,22 @@ "runId": "dummy-test-stateful-ingestion", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:query1", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json index e4893642d61ae..5300743f23ca8 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/golden_test_stateful_ingestion_after_deleted.json @@ -47,12 +47,46 @@ } } }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": true + } + }, "systemMetadata": { "lastObserved": 1586847600000, "runId": "dummy-test-stateful-ingestion", "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "query", + "entityUrn": "urn:li:query:query2", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:bigquery" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "dummy-test-stateful-ingestion", + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26", @@ -66,23 +100,25 @@ "systemMetadata": { "lastObserved": 1586847600000, "runId": "dummy-test-stateful-ingestion", - "lastRunId": "no-run-id-provided" + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)", + "entityType": "query", + "entityUrn": "urn:li:query:query2", "changeType": "UPSERT", "aspectName": "status", "aspect": { "json": { - "removed": true + "removed": false } }, "systemMetadata": { "lastObserved": 1586847600000, "runId": "dummy-test-stateful-ingestion", - "lastRunId": "no-run-id-provided" + "lastRunId": "no-run-id-provided", + "pipelineName": "dummy_stateful" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py index e3a2a6cccea79..66564dc856aba 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/state/test_stateful_ingestion.py @@ -29,7 +29,12 @@ from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import ( DataProcessInstanceProperties, ) -from datahub.metadata.schema_classes import AuditStampClass, StatusClass +from datahub.metadata.schema_classes import ( + AuditStampClass, + DataPlatformInstanceClass, + StatusClass, +) +from datahub.metadata.urns import DataPlatformUrn, QueryUrn from datahub.utilities.urns.dataset_urn import DatasetUrn from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import ( @@ -71,6 +76,9 @@ class DummySourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): default=None, description="Data process instance id to ingest.", ) + query_id_to_ingest: Optional[str] = Field( + default=None, description="Query id to ingest" + ) class DummySource(StatefulIngestionSourceBase): @@ -136,6 +144,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ), ).as_workunit() + if self.source_config.query_id_to_ingest: + yield MetadataChangeProposalWrapper( + entityUrn=QueryUrn(self.source_config.query_id_to_ingest).urn(), + aspect=DataPlatformInstanceClass( + platform=DataPlatformUrn("bigquery").urn() + ), + ).as_workunit() + if self.source_config.report_failure: self.reporter.report_failure("Dummy error", "Error") @@ -188,6 +204,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): }, }, "dpi_id_to_ingest": "job1", + "query_id_to_ingest": "query1", }, }, "sink": { @@ -198,7 +215,11 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): with mock.patch( "datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj" - ) as mock_state: + ) as mock_state, mock.patch( + "datahub.ingestion.source.state.stale_entity_removal_handler.STATEFUL_INGESTION_IGNORED_ENTITY_TYPES", + {}, + # Second mock is to imitate earlier behavior where entity type check was not present when adding entity to state + ): mock_state.return_value = GenericCheckpointState(serde="utf-8") pipeline_run1 = None pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore @@ -237,6 +258,8 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): "allow": ["dummy_dataset1", "dummy_dataset2"], } pipeline_run2_config["source"]["config"]["dpi_id_to_ingest"] = "job2" + pipeline_run2_config["source"]["config"]["query_id_to_ingest"] = "query2" + pipeline_run2_config["sink"]["config"][ "filename" ] = f"{tmp_path}/{output_file_name_after_deleted}" @@ -288,6 +311,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time): # assert report last ingestion state non_deletable entity urns non_deletable_urns: List[str] = [ "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619", + "urn:li:query:query1", ] assert sorted(non_deletable_urns) == sorted( report.last_state_non_deletable_entities