Skip to content

Commit

Permalink
Add way to reference existing DataHub Tag from a bigquery label
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Oct 7, 2024
1 parent 0187fc6 commit 6baabc6
Showing 1 changed file with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import re
from base64 import b32decode
from collections import defaultdict
from typing import Dict, Iterable, List, Optional, Set, Type, Union, cast

Expand Down Expand Up @@ -89,12 +90,13 @@
HiveColumnToAvroConverter,
get_schema_fields_for_hive_column,
)
from datahub.utilities.mapping import Constants
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.ratelimiter import RateLimiter
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor

ENCODED_TAG_PREFIX = "urn_li_encoded_tag_"

logger: logging.Logger = logging.getLogger(__name__)
# Handle table snapshots
# See https://cloud.google.com/bigquery/docs/table-snapshots-intro.
Expand Down Expand Up @@ -708,6 +710,8 @@ def gen_table_dataset_workunits(
tags_to_add.extend(
[
make_tag_urn(f"""{k}:{v}""")
if not v.startswith("urn_li_encoded_tag_")
else self.modified_base32decode(v)
for k, v in table.labels.items()
if is_tag_allowed(self.config.capture_table_label_as_tag, k)
]
Expand All @@ -733,7 +737,9 @@ def gen_view_dataset_workunits(
tags_to_add = None
if table.labels and self.config.capture_view_label_as_tag:
tags_to_add = [
make_tag_urn(f"{k}:{v}")
make_tag_urn(f"""{k}:{v}""")
if not v.startswith("urn_li_encoded_tag_")
else self.modified_base32decode(v)
for k, v in table.labels.items()
if is_tag_allowed(self.config.capture_view_label_as_tag, k)
]
Expand Down Expand Up @@ -785,6 +791,18 @@ def gen_snapshot_dataset_workunits(
custom_properties=custom_properties,
)

def modified_base32decode(self, text_to_decode: str) -> str:
# When we sync from DataHub to BigQuery, we encode the tags as modified base32 strings.
# BiqQuery labels only support lowercase letters, international characters, numbers, or underscores.
# So we need to modify the base32 encoding to replace the padding character `=` with `_` and convert to lowercase.
if not text_to_decode.startswith("%s" % ENCODED_TAG_PREFIX):
return text_to_decode
text_to_decode = (
text_to_decode.replace(ENCODED_TAG_PREFIX, "").upper().replace("_", "=")
)
text = b32decode(text_to_decode.encode("utf-8")).decode("utf-8")
return text

def gen_dataset_workunits(
self,
table: Union[BigqueryTable, BigqueryView, BigqueryTableSnapshot],
Expand Down Expand Up @@ -922,11 +940,6 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
break
else:
tags = []
if col.is_partition_column:
tags.append(
TagAssociationClass(make_tag_urn(Constants.TAG_PARTITION_KEY))
)

if col.cluster_column_position is not None:
tags.append(
TagAssociationClass(
Expand All @@ -944,6 +957,7 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
type=SchemaFieldDataType(
self.BIGQUERY_FIELD_TYPE_MAPPINGS.get(col.data_type, NullType)()
),
isPartitioningKey=col.is_partition_column,
nativeDataType=col.data_type,
description=col.comment,
nullable=col.is_nullable,
Expand Down

0 comments on commit 6baabc6

Please sign in to comment.