diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 982ad0fe19..78332a1edb 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -203,6 +203,28 @@ public final class Constants { public static String[] PROCESS_EDGE_LABELS = {PROCESS_OUTPUTS, PROCESS_INPUTS}; + /*** + * DataModel + */ + + public static final String ATLAS_DM_ENTITY_TYPE = "DMEntity"; + + public static final String ATLAS_DM_ATTRIBUTE_TYPE = "DMAttribute"; + public static final String ATLAS_DM_DATA_MODEL = "DMDataModel"; + + public static final String ATLAS_DM_VERSION_TYPE = "DMVersion"; + + public static final String ATLAS_DM_ENTITY_ASSOCIATION_TYPE= "DMEntityAssociation"; + public static final String ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE= "DMAttributeAssociation"; + + public static final String ATLAS_DM_QUALIFIED_NAME_PREFIX = "dMQualifiedNamePrefix"; + public static final String ATLAS_DM_NAMESPACE = "dMDataModelNamespace"; + public static final String ATLAS_DM_EXPIRED_AT_SYSTEM_DATE = "dMDataModelExpiredAtSystemDate"; + public static final String ATLAS_DM_EXPIRED_AT_BUSINESS_DATE = "dMDataModelExpiredAtBusinessDate"; + public static final String ATLAS_DM_SYSTEM_DATE = "dMDataModelSystemDate"; + public static final String ATLAS_DM_BUSINESS_DATE = "dMDataModelBusinessDate"; + + /** * The homeId field is used when saving into Atlas a copy of an object that is being imported from another * repository. The homeId will be set to a String that identifies the other repository. The specific format @@ -374,6 +396,8 @@ public final class Constants { public static final String ASSET_POLICY_GUIDS = "assetPolicyGUIDs"; public static final String ASSET_POLICIES_COUNT = "assetPoliciesCount"; + + /* * All supported file-format extensions for Bulk Imports through file upload */ diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 1ad37b2564..8bee2815be 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -291,8 +291,15 @@ public enum AtlasErrorCode { TASK_TYPE_NOT_SUPPORTED(400, "ATLAS-400-00-112", "Task type {0} is not supported"), PERSONA_POLICY_ASSETS_LIMIT_EXCEEDED(400, "ATLAS-400-00-113", "Exceeded limit of maximum allowed assets across policies for a Persona: Limit: {0}, assets: {1}"), - ADMIN_LIST_SHOULD_NOT_BE_EMPTY(400, "ATLAS-400-00-114", "Admin list should not be empty for type {0}"); - + ADMIN_LIST_SHOULD_NOT_BE_EMPTY(400, "ATLAS-400-00-114", "Admin list should not be empty for type {0}"), + DATA_MODEL_VERSION_NOT_EXIST(400, "ATLAS-400-00-115", "Model version {0} does not exist"), + DATA_ENTITY_NOT_EXIST(400, "ATLAS-400-00-116", "DataEntity {0} does not exist"), + DATA_MODEL_NOT_EXIST(400, "ATLAS-400-00-117", "DataModel {0} does not exist"), + QUALIFIED_NAME_PREFIX_NOT_EXIST(400, "ATLAS-400-00-118", "dMQualifiedNamePrefix is mandatory for DMEntity/DMAttribute"), + NO_TYPE_EXISTS_FOR_QUALIFIED_NAME_PREFIX (400,"ATLAS-400-00-119", "No DMEntity/DMAttribute exists for dMQualifiedNamePrefix : {0}"), + NAME_NAMESPACE_NOT_EXIST (400, "ATLAS-400-00-120", "name/namespace are mandatory for DMEntity/DMAttribute"), + QUALIFIED_NAME_PREFIX_OR_TYPE_NOT_FOUND(400, "ATLAS-400-00-121", "qualifiedName/entityType are mandatory"), + INVALID_ENTITY_TYPE(400, "ATLAS-400-00-122", "Invalid entity type"); private String errorCode; private String errorMessage; private Response.Status httpCode; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java index 040822dccc..669d7819ed 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java @@ -24,7 +24,9 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; import org.apache.atlas.repository.store.graph.EntityResolver; @@ -40,9 +42,11 @@ import org.apache.atlas.type.TemplateToken; import org.apache.atlas.utils.AtlasEntityUtil; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -103,8 +107,36 @@ public void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException { validateLabels(entity.getLabels()); + type.validateValue(entity, entity.getTypeName(), messages); + + + // DMEntity and DMAttributeType are requested for update + // by dMQualifiedNamePrefix which is not a unique attribute + // This can return multiple entity/attribute that match this prefix vale + // we have to return latest entity/attribute + if (entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE) || + entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)){ + + String qualifiedNamePrefix = (String) entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX); + if (qualifiedNamePrefix.isEmpty()){ + throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST); + } + // AtlasVertex vertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(entity.getTypeName(), qualifiedNamePrefix); + AtlasVertex vertex= discoveryContext.getResolvedEntityVertex(entity.getGuid()); + if (vertex == null) { + // no entity exists with this qualifiedName, set qualifiedName and let entity be created + entity.setAttribute(Constants.QUALIFIED_NAME, qualifiedNamePrefix + "_" + RequestContext.get().getRequestTime()); + return; + } + + // if guidFromVertex is found let entity be updated + entity.setGuid(AtlasGraphUtilsV2.getIdFromVertex(vertex)); + type.getNormalizedValue(entity); + return; + } + if (!messages.isEmpty()) { throw new AtlasBaseException(AtlasErrorCode.INSTANCE_CRUD_INVALID_PARAMS, messages); } @@ -161,6 +193,8 @@ protected void discover() throws AtlasBaseException { throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "found null entity"); } + validateAttributesForDataModel(entity); + processDynamicAttributes(entity); walkEntityGraph(entity); @@ -485,4 +519,15 @@ private void processDynamicAttributes(AtlasEntity entity) throws AtlasBaseExcept } } } + + private void validateAttributesForDataModel(AtlasEntity entity) throws AtlasBaseException { + if (entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE) || + entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)) { + if (entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX) == null || + entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX) == "") { + throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST); + } + } + + } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 224f04970b..a7b85635d3 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -67,6 +67,9 @@ import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMAttributePreprocessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityAssociationPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.ReadmePreProcessor; import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryCollectionPreProcessor; @@ -110,7 +113,6 @@ import static org.apache.atlas.type.Constants.*; - @Component public class AtlasEntityStoreV2 implements AtlasEntityStore { private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class); @@ -120,16 +122,16 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { private static final String ATTR_MEANINGS = "meanings"; - private final AtlasGraph graph; - private final DeleteHandlerDelegate deleteDelegate; - private final RestoreHandlerV1 restoreHandlerV1; - private final AtlasTypeRegistry typeRegistry; + private final AtlasGraph graph; + private final DeleteHandlerDelegate deleteDelegate; + private final RestoreHandlerV1 restoreHandlerV1; + private final AtlasTypeRegistry typeRegistry; private final IAtlasEntityChangeNotifier entityChangeNotifier; - private final EntityGraphMapper entityGraphMapper; - private final EntityGraphRetriever entityRetriever; - private boolean storeDifferentialAudits; - private final GraphHelper graphHelper; - private final TaskManagement taskManagement; + private final EntityGraphMapper entityGraphMapper; + private final EntityGraphRetriever entityRetriever; + private boolean storeDifferentialAudits; + private final GraphHelper graphHelper; + private final TaskManagement taskManagement; private EntityDiscoveryService discovery; private final AtlasRelationshipStore atlasRelationshipStore; private final FeatureFlagStore featureFlagStore; @@ -143,15 +145,15 @@ public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore, IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier) { - this.graph = graph; - this.deleteDelegate = deleteDelegate; - this.restoreHandlerV1 = restoreHandlerV1; - this.typeRegistry = typeRegistry; + this.graph = graph; + this.deleteDelegate = deleteDelegate; + this.restoreHandlerV1 = restoreHandlerV1; + this.typeRegistry = typeRegistry; this.entityChangeNotifier = entityChangeNotifier; - this.entityGraphMapper = entityGraphMapper; - this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry); + this.entityGraphMapper = entityGraphMapper; + this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry); this.storeDifferentialAudits = STORE_DIFFERENTIAL_AUDITS.getBoolean(); - this.graphHelper = new GraphHelper(graph); + this.graphHelper = new GraphHelper(graph); this.taskManagement = taskManagement; this.atlasRelationshipStore = atlasRelationshipStore; this.featureFlagStore = featureFlagStore; @@ -283,8 +285,8 @@ public AtlasEntitiesWithExtInfo getByIds(List guids, boolean isMinExtInf AtlasEntitiesWithExtInfo ret = entityRetriever.toAtlasEntitiesWithExtInfo(guids, isMinExtInfo); - if(ret != null){ - for(String guid : guids) { + if (ret != null) { + for (String guid : guids) { AtlasEntity entity = ret.getEntity(guid); try { AtlasAuthorizationUtils.verifyAccess(new AtlasEntityAccessRequest(typeRegistry, AtlasPrivilege.ENTITY_READ, new AtlasEntityHeader(entity)), "read entity: guid=", guid); @@ -316,7 +318,7 @@ public AtlasEntitiesWithExtInfo getByIds(List guids, boolean isMinExtInf @Override @GraphTransaction - public AtlasEntitiesWithExtInfo getEntitiesByUniqueAttributes(AtlasEntityType entityType, List> uniqueAttributes , boolean isMinExtInfo, boolean ignoreRelationships) throws AtlasBaseException { + public AtlasEntitiesWithExtInfo getEntitiesByUniqueAttributes(AtlasEntityType entityType, List> uniqueAttributes, boolean isMinExtInfo, boolean ignoreRelationships) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> getEntitiesByUniqueAttributes({}, {})", entityType.getTypeName(), uniqueAttributes); } @@ -375,7 +377,7 @@ public AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType entityType, @Override @GraphTransaction public AtlasEntityHeader getAtlasEntityHeaderWithoutAuthorization(String guid, String qualifiedName, String typeName) throws AtlasBaseException { - return extractEntityHeader( guid, qualifiedName, typeName); + return extractEntityHeader(guid, qualifiedName, typeName); } @Override @@ -407,6 +409,7 @@ public AtlasEntityHeader getEntityHeaderByUniqueAttributes(AtlasEntityType entit /** * Check state of entities in the store + * * @param request AtlasCheckStateRequest * @return AtlasCheckStateResult * @throws AtlasBaseException @@ -437,7 +440,7 @@ public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean @Override @GraphTransaction - public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean replaceClassifications, + public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean replaceClassifications, boolean replaceBusinessAttributes, boolean isOverwriteBusinessAttributes) throws AtlasBaseException { return createOrUpdate(entityStream, false, replaceClassifications, replaceBusinessAttributes, isOverwriteBusinessAttributes); } @@ -503,7 +506,7 @@ public EntityMutationResponse updateByUniqueAttributes(AtlasEntityType entityTyp throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entity to update."); } - String guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(graph, entityType, uniqAttributes); + String guid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(graph, entityType, uniqAttributes); AtlasEntity entity = updatedEntityInfo.getEntity(); entity.setGuid(guid); @@ -521,9 +524,9 @@ public EntityMutationResponse updateEntityAttributeByGuid(String guid, String at LOG.debug("==> updateEntityAttributeByGuid({}, {}, {})", guid, attrName, attrValue); } - AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); - AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); - AtlasAttribute attr = entityType.getAttribute(attrName); + AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + AtlasEntityType entityType = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); + AtlasAttribute attr = entityType.getAttribute(attrName); AtlasAuthorizationUtils.verifyUpdateEntityAccess(typeRegistry, entity, "update entity ByUniqueAttributes : guid=" + guid); @@ -535,7 +538,7 @@ public EntityMutationResponse updateEntityAttributeByGuid(String guid, String at } } - AtlasType attrType = attr.getAttributeType(); + AtlasType attrType = attr.getAttributeType(); AtlasEntity updateEntity = new AtlasEntity(); updateEntity.setGuid(guid); @@ -575,7 +578,7 @@ public EntityMutationResponse deleteById(final String guid) throws AtlasBaseExce } Collection deletionCandidates = new ArrayList<>(); - AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(graph, guid); + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(graph, guid); if (vertex != null) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); @@ -593,7 +596,7 @@ public EntityMutationResponse deleteById(final String guid) throws AtlasBaseExce EntityMutationResponse ret = deleteVertices(deletionCandidates); - if(ret.getDeletedEntities()!=null) + if (ret.getDeletedEntities() != null) processTermEntityDeletion(ret.getDeletedEntities()); // Notify the change listeners @@ -637,7 +640,7 @@ public EntityMutationResponse deleteByIds(final List guids) throws Atlas EntityMutationResponse ret = deleteVertices(deletionCandidates); - if(ret.getDeletedEntities() != null) + if (ret.getDeletedEntities() != null) processTermEntityDeletion(ret.getDeletedEntities()); // Notify the change listeners @@ -730,7 +733,7 @@ public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityTyp } Collection deletionCandidates = new ArrayList<>(); - AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(graph, entityType, uniqAttributes); + AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(graph, entityType, uniqAttributes); if (vertex != null) { AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); @@ -749,7 +752,7 @@ public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityTyp EntityMutationResponse ret = deleteVertices(deletionCandidates); - if(ret.getDeletedEntities()!=null) + if (ret.getDeletedEntities() != null) processTermEntityDeletion(ret.getDeletedEntities()); // Notify the change listeners @@ -816,19 +819,19 @@ public EntityMutationResponse deleteByUniqueAttributes(List objec return ret; } - private void processTermEntityDeletion(List deletedEntities) throws AtlasBaseException{ - for(AtlasEntityHeader entity:deletedEntities){ - if(ATLAS_GLOSSARY_TERM_ENTITY_TYPE.equals(entity.getTypeName())){ + private void processTermEntityDeletion(List deletedEntities) throws AtlasBaseException { + for (AtlasEntityHeader entity : deletedEntities) { + if (ATLAS_GLOSSARY_TERM_ENTITY_TYPE.equals(entity.getTypeName())) { - String termQualifiedName = entity.getAttribute(QUALIFIED_NAME).toString(); - String termName = entity.getAttribute(NAME).toString(); - String guid = entity.getGuid(); - Boolean isHardDelete = DeleteType.HARD.name().equals(entity.getDeleteHandler()); + String termQualifiedName = entity.getAttribute(QUALIFIED_NAME).toString(); + String termName = entity.getAttribute(NAME).toString(); + String guid = entity.getGuid(); + Boolean isHardDelete = DeleteType.HARD.name().equals(entity.getDeleteHandler()); - if(checkEntityTermAssociation(termQualifiedName)){ - if(DEFERRED_ACTION_ENABLED && taskManagement!=null){ + if (checkEntityTermAssociation(termQualifiedName)) { + if (DEFERRED_ACTION_ENABLED && taskManagement != null) { createAndQueueTask(termName, termQualifiedName, guid, isHardDelete); - }else{ + } else { updateMeaningsNamesInEntitiesOnTermDelete(termName, termQualifiedName, guid); } } @@ -836,11 +839,11 @@ private void processTermEntityDeletion(List deletedEntities) } } - private boolean checkEntityTermAssociation(String termQName) throws AtlasBaseException{ + private boolean checkEntityTermAssociation(String termQName) throws AtlasBaseException { List entityHeader; try { - entityHeader = discovery.searchUsingTermQualifiedName(0, 1, termQName,null, null); + entityHeader = discovery.searchUsingTermQualifiedName(0, 1, termQName, null, null); } catch (AtlasBaseException e) { throw e; } @@ -852,10 +855,10 @@ private boolean checkEntityTermAssociation(String termQName) throws AtlasBaseExc public void updateMeaningsNamesInEntitiesOnTermDelete(String termName, String termQName, String termGuid) throws AtlasBaseException { int from = 0; - Set attributes = new HashSet(){{ + Set attributes = new HashSet() {{ add(ATTR_MEANINGS); }}; - Set relationAttributes = new HashSet(){{ + Set relationAttributes = new HashSet() {{ add(STATE_PROPERTY_KEY); add(NAME); }}; @@ -890,13 +893,13 @@ public void updateMeaningsNamesInEntitiesOnTermDelete(String termName, String te } - public void createAndQueueTask(String termName, String termQName, String termGuid, Boolean isHardDelete){ + public void createAndQueueTask(String termName, String termQName, String termGuid, Boolean isHardDelete) { String taskType = isHardDelete ? UPDATE_ENTITY_MEANINGS_ON_TERM_HARD_DELETE : UPDATE_ENTITY_MEANINGS_ON_TERM_SOFT_DELETE; String currentUser = RequestContext.getCurrentUser(); Map taskParams = MeaningsTask.toParameters(termName, termQName, termGuid); AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams); - if(!isHardDelete){ + if (!isHardDelete) { AtlasVertex termVertex = AtlasGraphUtilsV2.findByGuid(termGuid); AtlasGraphUtilsV2.addEncodedProperty(termVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid()); } @@ -907,7 +910,7 @@ public void createAndQueueTask(String termName, String termQName, String termGui @Override @GraphTransaction - public String getGuidByUniqueAttributes(AtlasEntityType entityType, Map uniqAttributes) throws AtlasBaseException{ + public String getGuidByUniqueAttributes(AtlasEntityType entityType, Map uniqAttributes) throws AtlasBaseException { return AtlasGraphUtilsV2.getGuidByUniqueAttributes(graph, entityType, uniqAttributes); } @@ -962,7 +965,7 @@ public void addClassifications(final String guid, final List> bm : businessAttrbutes.entrySet()) { @@ -1226,7 +1229,7 @@ public void addOrUpdateBusinessAttributesByDisplayName(String guid, Map attributes = new HashMap<>(); + Map attributes = new HashMap<>(); for (Map.Entry incomingAttrs : bm.getValue().entrySet()) { AtlasAttribute atlasAttribute = bmType.getAllAttributes().get(incomingAttrs.getKey()); @@ -1284,8 +1287,8 @@ public void removeBusinessAttributes(String guid, Map labels) throws AtlasBaseException validateLabels(labels); - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); - Set addedLabels = Collections.emptySet(); - Set removedLabels = Collections.emptySet(); + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); + Set addedLabels = Collections.emptySet(); + Set removedLabels = Collections.emptySet(); if (CollectionUtils.isEmpty(entityHeader.getLabels())) { addedLabels = labels; } else if (CollectionUtils.isEmpty(labels)) { removedLabels = entityHeader.getLabels(); } else { - addedLabels = new HashSet(CollectionUtils.subtract(labels, entityHeader.getLabels())); + addedLabels = new HashSet(CollectionUtils.subtract(labels, entityHeader.getLabels())); removedLabels = new HashSet(CollectionUtils.subtract(entityHeader.getLabels(), labels)); } @@ -1374,7 +1377,7 @@ public void removeLabels(String guid, Set labels) throws AtlasBaseExcept throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); } - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); AtlasEntityAccessRequestBuilder requestBuilder = new AtlasEntityAccessRequestBuilder(typeRegistry, AtlasPrivilege.ENTITY_REMOVE_LABEL, entityHeader); for (String label : labels) { @@ -1413,7 +1416,7 @@ public void addLabels(String guid, Set labels) throws AtlasBaseException throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); } - AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(entityVertex); AtlasEntityAccessRequestBuilder requestBuilder = new AtlasEntityAccessRequestBuilder(typeRegistry, AtlasPrivilege.ENTITY_ADD_LABEL, entityHeader); for (String label : labels) { @@ -1464,17 +1467,17 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) { MetricRecorder checkForUnchangedEntities = RequestContext.get().startMetricRecord("checkForUnchangedEntities"); - List entitiesToSkipUpdate = new ArrayList<>(); - AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), !replaceClassifications, !replaceBusinessAttributes); - RequestContext reqContext = RequestContext.get(); + List entitiesToSkipUpdate = new ArrayList<>(); + AtlasEntityComparator entityComparator = new AtlasEntityComparator(typeRegistry, entityRetriever, context.getGuidAssignments(), !replaceClassifications, !replaceBusinessAttributes); + RequestContext reqContext = RequestContext.get(); for (AtlasEntity entity : context.getUpdatedEntities()) { if (entity.getStatus() == AtlasEntity.Status.DELETED) {// entity status could be updated during import continue; } - AtlasVertex storedVertex = context.getVertex(entity.getGuid()); - AtlasEntityDiffResult diffResult = entityComparator.getDiffResult(entity, storedVertex, !storeDifferentialAudits); + AtlasVertex storedVertex = context.getVertex(entity.getGuid()); + AtlasEntityDiffResult diffResult = entityComparator.getDiffResult(entity, storedVertex, !storeDifferentialAudits); if (diffResult.hasDifference()) { if (storeDifferentialAudits) { @@ -1510,7 +1513,7 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean AtlasEntityHeader entityHeaderWithClassifications = entityRetriever.toAtlasEntityHeaderWithClassifications(entity.getGuid()); AtlasEntityHeader entityHeader = new AtlasEntityHeader(entity); - if(CollectionUtils.isNotEmpty(entityHeaderWithClassifications.getClassifications())) { + if (CollectionUtils.isNotEmpty(entityHeaderWithClassifications.getClassifications())) { entityHeader.setClassifications(entityHeaderWithClassifications.getClassifications()); } @@ -1521,7 +1524,7 @@ private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean if (skipAuthBaseConditions && (skipAuthMeaningsUpdate || skipAuthStarredDetailsUpdate)) { //do nothing, only diff is relationshipAttributes.meanings or starred, allow update } else { - AtlasAuthorizationUtils.verifyUpdateEntityAccess(typeRegistry, entityHeader,"update entity: type=" + entity.getTypeName()); + AtlasAuthorizationUtils.verifyUpdateEntityAccess(typeRegistry, entityHeader, "update entity: type=" + entity.getTypeName()); } } } @@ -1572,37 +1575,72 @@ private void executePreProcessor(EntityMutationContext context) throws AtlasBase for (AtlasEntity entity : copyOfCreated) { entityType = context.getType(entity.getGuid()); preProcessors = getPreProcessor(entityType.getTypeName()); - for(PreProcessor processor : preProcessors){ + for (PreProcessor processor : preProcessors) { processor.processAttributes(entity, context, CREATE); } } List copyOfUpdated = new ArrayList<>(context.getUpdatedEntities()); - for (AtlasEntity entity: copyOfUpdated) { + for (AtlasEntity entity : copyOfUpdated) { entityType = context.getType(entity.getGuid()); preProcessors = getPreProcessor(entityType.getTypeName()); - for(PreProcessor processor : preProcessors){ + for (PreProcessor processor : preProcessors) { processor.processAttributes(entity, context, UPDATE); } } + + + List copyOfAppendRelationshipAttributes = new ArrayList<>(context.getUpdatedEntitiesForAppendRelationshipAttribute()); + for (AtlasEntity entity : copyOfAppendRelationshipAttributes) { + entityType = context.getType(entity.getGuid()); + if( entityType.getTypeName().equals(ATLAS_DM_ENTITY_TYPE) || + entityType.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE) || + entityType.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE) || + entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE) + ){ + preProcessors = getPreProcessor(entityType.getTypeName()); + for (PreProcessor processor : preProcessors) { + processor.processAttributes(entity, context, UPDATE); + } + } + } + + List copyOfRemoveRelationshipAttributes = new ArrayList<>(context.getEntitiesUpdatedWithRemoveRelationshipAttribute()); + for (AtlasEntity entity : copyOfRemoveRelationshipAttributes) { + entityType = context.getType(entity.getGuid()); + if( entityType.getTypeName().equals(ATLAS_DM_ENTITY_TYPE) + || entityType.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE) || + entityType.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE) || + entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE) + ){ + preProcessors = getPreProcessor(entityType.getTypeName()); + for (PreProcessor processor : preProcessors) { + processor.processAttributes(entity, context, UPDATE); + } + } + } } private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException { MetricRecorder metric = RequestContext.get().startMetricRecord("preCreateOrUpdate"); this.graph.setEnableCache(RequestContext.get().isCacheEnabled()); - EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(graph, typeRegistry, entityStream, entityGraphMapper); + EntityGraphDiscovery graphDiscoverer = new AtlasEntityGraphDiscoveryV2(graph, typeRegistry, entityStream, entityGraphMapper); EntityGraphDiscoveryContext discoveryContext = graphDiscoverer.discoverEntities(); - EntityMutationContext context = new EntityMutationContext(discoveryContext); - RequestContext requestContext = RequestContext.get(); + EntityMutationContext context = new EntityMutationContext(discoveryContext); + RequestContext requestContext = RequestContext.get(); Map referencedGuids = discoveryContext.getReferencedGuids(); for (Map.Entry element : referencedGuids.entrySet()) { String guid = element.getKey(); + + // guid negative + // entity == userInput AtlasEntity entity = entityStream.getByGuid(guid); if (entity != null) { // entity would be null if guid is not in the stream but referenced by an entity in the stream AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + // entity pass in user request with attributes if (entityType == null) { throw new AtlasBaseException(element.getValue(), AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); } @@ -1640,8 +1678,11 @@ private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, Entit context.addUpdated(guid, entity, entityType, vertex); } else { + graphDiscoverer.validateAndNormalize(entity); + + // Handle create flow here //Create vertices which do not exist in the repository if (RequestContext.get().isImportInProgress() && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) { vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid()); @@ -1771,7 +1812,7 @@ private void autoUpdateStarredDetailsAttributes(AtlasEntity entity, AtlasVertex private void addUserToStarredAttributes(String requestUser, long requestTime, Set starredBy, Set starredDetailsList) { //Check and update starredBy Attribute - if (!starredBy.contains(requestUser)){ + if (!starredBy.contains(requestUser)) { starredBy.add(requestUser); } @@ -1793,7 +1834,7 @@ private void addUserToStarredAttributes(String requestUser, long requestTime, Se private void removeUserFromStarredAttributes(String requestUser, Set starredBy, Set starredDetailsList) { //Check and update starredBy Attribute - if (starredBy.contains(requestUser)){ + if (starredBy.contains(requestUser)) { starredBy.remove(requestUser); } @@ -1925,6 +1966,17 @@ public List getPreProcessor(String typeName) { case STAKEHOLDER_TITLE_ENTITY_TYPE: preProcessors.add(new StakeholderTitlePreProcessor(graph, typeRegistry, entityRetriever)); break; + case ATLAS_DM_ENTITY_TYPE: + preProcessors.add(new DMEntityPreProcessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore)); + break; + case ATLAS_DM_ATTRIBUTE_TYPE: + preProcessors.add(new DMAttributePreprocessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore)); + break; + case ATLAS_DM_ENTITY_ASSOCIATION_TYPE: + preProcessors.add(new DMEntityAssociationPreProcessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore)); + break; + case ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE: + preProcessors.add(new DMAttributePreprocessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore)); } // The default global pre-processor for all AssetTypes @@ -1935,7 +1987,8 @@ public List getPreProcessor(String typeName) { private AtlasVertex getResolvedEntityVertex(EntityGraphDiscoveryContext context, AtlasEntity entity) throws AtlasBaseException { AtlasObjectId objectId = getAtlasObjectId(entity); - AtlasVertex ret = context.getResolvedEntityVertex(entity.getGuid()); + AtlasVertex ret = context.getResolvedEntityVertex(entity.getGuid()); + if (ret != null) { context.addResolvedIdByUniqAttribs(objectId, ret); @@ -1977,7 +2030,7 @@ private EntityMutationResponse deleteVertices(Collection deletionCa String typeName = getTypeName(vertex); List preProcessors = getPreProcessor(typeName); - for(PreProcessor processor : preProcessors){ + for (PreProcessor processor : preProcessors) { processor.processDelete(vertex); } @@ -2027,7 +2080,7 @@ private EntityMutationResponse deleteVertices(Collection deletionCa private EntityMutationResponse restoreVertices(Collection restoreCandidates) throws AtlasBaseException { EntityMutationResponse response = new EntityMutationResponse(); - RequestContext req = RequestContext.get(); + RequestContext req = RequestContext.get(); restoreHandlerV1.restoreEntities(restoreCandidates); @@ -2040,7 +2093,7 @@ private EntityMutationResponse restoreVertices(Collection restoreCa private EntityMutationResponse purgeVertices(Collection purgeCandidates) throws AtlasBaseException { EntityMutationResponse response = new EntityMutationResponse(); - RequestContext req = RequestContext.get(); + RequestContext req = RequestContext.get(); req.setDeleteType(DeleteType.HARD); req.setPurgeRequested(true); @@ -2078,15 +2131,15 @@ private void validateAndNormalize(AtlasClassification classification) throws Atl * @param classifications list of classifications to be associated */ private void validateEntityAssociations(String guid, List classifications) throws AtlasBaseException { - List entityClassifications = getClassificationNames(guid); - String entityTypeName = AtlasGraphUtilsV2.getTypeNameFromGuid(graph, guid); - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + List entityClassifications = getClassificationNames(guid); + String entityTypeName = AtlasGraphUtilsV2.getTypeNameFromGuid(graph, guid); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); Set processedTagTypeNames = new HashSet<>(); - List copyList = new ArrayList<>(classifications); + List copyList = new ArrayList<>(classifications); for (AtlasClassification classification : copyList) { - if (processedTagTypeNames.contains(classification.getTypeName())){ + if (processedTagTypeNames.contains(classification.getTypeName())) { classifications.remove(classification); } else { String newClassification = classification.getTypeName(); @@ -2108,7 +2161,7 @@ private void validateEntityAssociations(String guid, List c } private List getClassificationNames(String guid) throws AtlasBaseException { - List ret = null; + List ret = null; List classifications = retrieveClassifications(guid); if (CollectionUtils.isNotEmpty(classifications)) { @@ -2188,12 +2241,12 @@ public BulkImportResponse bulkCreateOrUpdateBusinessAttributes(InputStream input throw new AtlasBaseException(AtlasErrorCode.FILE_NAME_NOT_FOUND, fileName); } - List fileData = FileUtils.readFileData(fileName, inputStream); + List fileData = FileUtils.readFileData(fileName, inputStream); Map attributesToAssociate = getBusinessMetadataDefList(fileData, ret); for (AtlasEntity entity : attributesToAssociate.values()) { Map> businessAttributes = entity.getBusinessAttributes(); - String guid = entity.getGuid(); + String guid = entity.getGuid(); try { addOrUpdateBusinessAttributes(guid, businessAttributes, true); @@ -2216,7 +2269,8 @@ public List getAccessors(List atlas for (AtlasAccessorRequest accessorRequest : atlasAccessorRequestList) { try { AtlasAccessorResponse result = null; - AtlasPrivilege action = AtlasPrivilege.valueOf(accessorRequest.getAction());; + AtlasPrivilege action = AtlasPrivilege.valueOf(accessorRequest.getAction()); + ; switch (action) { case ENTITY_READ: @@ -2334,13 +2388,13 @@ private AtlasEntityHeader extractEntityHeader(String guid, String qualifiedName, } private Map getBusinessMetadataDefList(List fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException { - Map ret = new HashMap<>(); - Map vertexCache = new HashMap<>(); - List failedMsgList = new ArrayList<>(); + Map ret = new HashMap<>(); + Map vertexCache = new HashMap<>(); + List failedMsgList = new ArrayList<>(); for (int lineIndex = 0; lineIndex < fileData.size(); lineIndex++) { - String[] record = fileData.get(lineIndex); - int lineIndexToLog = lineIndex + 2; + String[] record = fileData.get(lineIndex); + int lineIndexToLog = lineIndex + 2; boolean missingFields = record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX || StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) || @@ -2348,13 +2402,13 @@ private Map getBusinessMetadataDefList(List fileD StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) || StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]); - if (missingFields){ + if (missingFields) { failedMsgList.add("Line #" + lineIndexToLog + ": missing fields. " + Arrays.toString(record)); continue; } - String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX]; + String typeName = record[FileUtils.TYPENAME_COLUMN_INDEX]; AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); if (entityType == null) { @@ -2363,10 +2417,10 @@ private Map getBusinessMetadataDefList(List fileD continue; } - String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]; - String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]; + String uniqueAttrValue = record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]; + String bmAttribute = record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]; String bmAttributeValue = record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]; - String uniqueAttrName = AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME; + String uniqueAttrName = AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME; if (record.length > FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX && StringUtils.isNotBlank(record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX])) { uniqueAttrName = record[FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX]; @@ -2386,8 +2440,8 @@ private Map getBusinessMetadataDefList(List fileD continue; } - String vertexKey = uniqueAttribute.getVertexPropertyName() + "_" + uniqueAttrValue; - AtlasVertex vertex = vertexCache.get(vertexKey); + String vertexKey = uniqueAttribute.getVertexPropertyName() + "_" + uniqueAttrValue; + AtlasVertex vertex = vertexCache.get(vertexKey); if (vertex == null) { vertex = AtlasGraphUtilsV2.findByTypeAndUniquePropertyName(graph, typeName, uniqueAttribute.getVertexUniquePropertyName(), uniqueAttrValue); @@ -2404,7 +2458,7 @@ private Map getBusinessMetadataDefList(List fileD AtlasBusinessAttribute businessAttribute = entityType.getBusinesAAttribute(bmAttribute); if (businessAttribute == null) { - failedMsgList.add("Line #" + lineIndexToLog + ": invalid business-metadata '"+ bmAttribute + "' for entity type '" + entityType.getTypeName() + "'"); + failedMsgList.add("Line #" + lineIndexToLog + ": invalid business-metadata '" + bmAttribute + "' for entity type '" + entityType.getTypeName() + "'"); continue; } @@ -2413,12 +2467,12 @@ private Map getBusinessMetadataDefList(List fileD if (businessAttribute.getAttributeType().getTypeCategory() == TypeCategory.ARRAY) { AtlasArrayType arrayType = (AtlasArrayType) businessAttribute.getAttributeType(); - List arrayValue; + List arrayValue; if (arrayType.getElementType() instanceof AtlasEnumType) { - arrayValue = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedMsgList, lineIndex+1); + arrayValue = AtlasGraphUtilsV2.assignEnumValues(bmAttributeValue, (AtlasEnumType) arrayType.getElementType(), failedMsgList, lineIndex + 1); } else { - arrayValue = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedMsgList, lineIndex+1); + arrayValue = assignMultipleValues(bmAttributeValue, arrayType.getElementTypeName(), failedMsgList, lineIndex + 1); } attrValue = arrayValue; @@ -2431,8 +2485,8 @@ private Map getBusinessMetadataDefList(List fileD entity.setBusinessAttribute(businessAttribute.getDefinedInType().getTypeName(), businessAttribute.getName(), attrValue); } else { - AtlasEntity entity = new AtlasEntity(); - String guid = GraphHelper.getGuid(vertex); + AtlasEntity entity = new AtlasEntity(); + String guid = GraphHelper.getGuid(vertex); Map> businessAttributes = entityRetriever.getBusinessMetadata(vertex); entity.setGuid(guid); @@ -2497,14 +2551,14 @@ private List assignMultipleValues(String bmAttributeValues, String elementTypeNa return null; } - private boolean missingFieldsCheck(String[] record, BulkImportResponse bulkImportResponse, int lineIndex){ + private boolean missingFieldsCheck(String[] record, BulkImportResponse bulkImportResponse, int lineIndex) { boolean missingFieldsCheck = (record.length < FileUtils.UNIQUE_ATTR_NAME_COLUMN_INDEX) || StringUtils.isBlank(record[FileUtils.TYPENAME_COLUMN_INDEX]) || StringUtils.isBlank(record[FileUtils.UNIQUE_ATTR_VALUE_COLUMN_INDEX]) || StringUtils.isBlank(record[FileUtils.BM_ATTR_NAME_COLUMN_INDEX]) || StringUtils.isBlank(record[FileUtils.BM_ATTR_VALUE_COLUMN_INDEX]); - if(missingFieldsCheck){ + if (missingFieldsCheck) { LOG.error("Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex); String failedTermMsgs = "Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex; @@ -2550,7 +2604,7 @@ public void repairHasLineage(AtlasHasLineageRequests requests) throws AtlasBaseE edge = graphHelper.getEdge(processVertex, assetVertex, request.getLabel()); } else { LOG.warn("Skipping since vertex is null for processGuid {} and asset Guid {}" - ,request.getProcessGuid(),request.getEndGuid() ); + , request.getProcessGuid(), request.getEndGuid()); } } catch (RepositoryException re) { throw new AtlasBaseException(AtlasErrorCode.HAS_LINEAGE_GET_EDGE_FAILED, re); @@ -2605,7 +2659,7 @@ public void repairHasLineageWithAtlasEdges(Set inputOutputEdges) { for (AtlasEdge atlasEdge : inputOutputEdges) { if (getStatus(atlasEdge) != ACTIVE) { - LOG.warn("Edge id {} is not Active, so skipping " , getRelationshipGuid(atlasEdge)); + LOG.warn("Edge id {} is not Active, so skipping ", getRelationshipGuid(atlasEdge)); continue; } @@ -2684,7 +2738,7 @@ public void repairMeaningAttributeForTerms(List termGuid) { AtlasVertex termVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); - if(termVertex!= null && ATLAS_GLOSSARY_TERM_ENTITY_TYPE.equals(getTypeName(termVertex)) && + if (termVertex != null && ATLAS_GLOSSARY_TERM_ENTITY_TYPE.equals(getTypeName(termVertex)) && GraphHelper.getStatus(termVertex) == AtlasEntity.Status.ACTIVE) { Iterable edges = termVertex.getEdges(AtlasEdgeDirection.OUT, Constants.TERM_ASSIGNMENT_LABEL); // Get entity to tagged with term. @@ -2694,7 +2748,7 @@ public void repairMeaningAttributeForTerms(List termGuid) { if (GraphHelper.getStatus(edge) == AtlasEntity.Status.ACTIVE) { AtlasVertex entityVertex = edge.getInVertex(); if (entityVertex != null & getStatus(entityVertex) == AtlasEntity.Status.ACTIVE) { - if(!RequestContext.get().getProcessGuidIds().contains(getGuid(entityVertex))) { + if (!RequestContext.get().getProcessGuidIds().contains(getGuid(entityVertex))) { repairMeanings(entityVertex); } } @@ -2745,10 +2799,11 @@ private void repairMeanings(AtlasVertex assetVertex) { RequestContext.get().addProcessGuidIds(getGuid(assetVertex)); - LOG.info("Updated asset {} with term {} ", getGuid(assetVertex) , StringUtils.join(termNameList, ",")); + LOG.info("Updated asset {} with term {} ", getGuid(assetVertex), StringUtils.join(termNameList, ",")); } } + @Override public void repairAccesscontrolAlias(String guid) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("repairAlias"); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java index 42d30d39ca..4dbe3629c7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java @@ -51,19 +51,7 @@ import java.text.SimpleDateFormat; import java.util.*; -import static org.apache.atlas.repository.Constants.ATLAS_GLOSSARY_ENTITY_TYPE; -import static org.apache.atlas.repository.Constants.CLASSIFICATION_NAMES_KEY; -import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.GLOSSARY_TERMS_EDGE_LABEL; -import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT; -import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY; -import static org.apache.atlas.repository.Constants.NAME; -import static org.apache.atlas.repository.Constants.PROPAGATED_CLASSIFICATION_NAMES_KEY; -import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; -import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY; -import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.*; import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance; import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.ASC; import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.DESC; @@ -708,6 +696,19 @@ public static Iterator findActiveEntityVerticesByType(AtlasGraph gr return query.vertices().iterator(); } + public static AtlasVertex findLatestEntityAttributeVerticesByType(String typename, String dMQualifiedNamePrefix) { + AtlasGraph graph= getGraphInstance(); + AtlasGraphQuery query = graph.query() + .has(ENTITY_TYPE_PROPERTY_KEY, typename) + .has(ATLAS_DM_QUALIFIED_NAME_PREFIX, dMQualifiedNamePrefix) + .has(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, 0) + .has(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, 0); + + Iterator results = query.vertices().iterator(); + AtlasVertex vertex = results.hasNext() ? results.next() : null; + return vertex; + } + public static boolean relationshipTypeHasInstanceEdges(String typeName) throws AtlasBaseException { return relationshipTypeHasInstanceEdges(getGraphInstance(), typeName); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java index 72c3200809..fe209140ff 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityMutationContext.java @@ -30,8 +30,8 @@ public class EntityMutationContext { private final EntityGraphDiscoveryContext context; private final List entitiesCreated = new ArrayList<>(); private final List entitiesUpdated = new ArrayList<>(); - private final List entitiesUpdatedWithAppendRelationshipAttribute = new ArrayList<>(); - private final List entitiesUpdatedWithRemoveRelationshipAttribute = new ArrayList<>(); + private List entitiesUpdatedWithAppendRelationshipAttribute = new ArrayList<>(); + private List entitiesUpdatedWithRemoveRelationshipAttribute = new ArrayList<>(); private final Map entityVsType = new HashMap<>(); private final Map entityVsVertex = new HashMap<>(); private final Map guidAssignments = new HashMap<>(); @@ -59,11 +59,11 @@ public void addCreated(String internalGuid, AtlasEntity entity, AtlasEntityType } } - public void setUpdatedWithRelationshipAttributes(AtlasEntity entity){ + public void setUpdatedWithRelationshipAttributes(AtlasEntity entity) { entitiesUpdatedWithAppendRelationshipAttribute.add(entity); } - public void setUpdatedWithRemoveRelationshipAttributes(AtlasEntity entity){ + public void setUpdatedWithRemoveRelationshipAttributes(AtlasEntity entity) { entitiesUpdatedWithRemoveRelationshipAttribute.add(entity); } @@ -84,6 +84,39 @@ public void addUpdated(String internalGuid, AtlasEntity entity, AtlasEntityType } } + public void removeUpdated(String internalGuid, AtlasEntity entity, AtlasEntityType type, AtlasVertex atlasVertex) { + if (entityVsVertex.containsKey(internalGuid)) { // if the entity was already created/updated + entitiesUpdated.remove(entity); + entityVsType.remove(entity.getGuid(), type); + entityVsVertex.remove(entity.getGuid(), atlasVertex); + +// if (!StringUtils.equals(internalGuid, entity.getGuid())) { +// guidAssignments.put(internalGuid, entity.getGuid()); +// entityVsVertex.put(internalGuid, atlasVertex); +// } + } + } + + public void removeUpdatedWithRelationshipAttributes(AtlasEntity entity) { + Iterator entities = entitiesUpdatedWithAppendRelationshipAttribute.iterator(); + while (entities.hasNext()) { + String guid = entities.next().getGuid(); + if (guid.equals(entity.getGuid())) { + entities.remove(); + } + } + } + + public void removeUpdatedWithDeleteRelationshipAttributes(AtlasEntity entity) { + Iterator entities = entitiesUpdatedWithRemoveRelationshipAttribute.iterator(); + while (entities.hasNext()) { + String guid = entities.next().getGuid(); + if (guid.equals(entity.getGuid())) { + entities.remove(); + } + } + } + public void addEntityToRestore(AtlasVertex vertex) { if (entitiesToRestore == null) { entitiesToRestore = new ArrayList<>(); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java index d6a2d717d9..788be55ce7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java @@ -22,6 +22,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; @@ -69,7 +70,33 @@ public EntityGraphDiscoveryContext resolveEntityReferences(EntityGraphDiscoveryC throw new AtlasBaseException(element.getValue(), AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName()); } - vertex = AtlasGraphUtilsV2.findByUniqueAttributes(this.graph, entityType, entity.getAttributes()); + // ------- + + if ( + ((entity.getAttributes().get(Constants.QUALIFIED_NAME) == null) && (entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX)!=null)) + && + ((entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE)) || (entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)))) { + + String qualifiedNamePrefix = (String) entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX); + if (qualifiedNamePrefix.isEmpty()){ + throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST); + } + vertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(entity.getTypeName(), qualifiedNamePrefix); + + if (vertex == null) { + // no entity exists with this qualifiedName, set qualifiedName and let entity be created + entity.setAttribute(Constants.QUALIFIED_NAME, qualifiedNamePrefix + "_" + RequestContext.get().getRequestTime()); + return context; + } + + // if guidFromVertex is found let entity be updated + // entity.setGuid(AtlasGraphUtilsV2.getIdFromVertex(vertex)); + // else find qualifiedName and set qualifiedName : as it is mandatory + context.addResolvedGuid(guid, vertex); + }else { + vertex = AtlasGraphUtilsV2.findByUniqueAttributes(this.graph, entityType, entity.getAttributes()); + } + } else if (!isAssignedGuid) { // for local-guids, entity must be in the stream throw new AtlasBaseException(element.getValue(), AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/AbstractModelPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/AbstractModelPreProcessor.java new file mode 100644 index 0000000000..2b39630150 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/AbstractModelPreProcessor.java @@ -0,0 +1,772 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.instance.AtlasRelationship; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.Constants.QUALIFIED_NAME; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +public abstract class AbstractModelPreProcessor implements PreProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractModelPreProcessor.class); + private static final String ATTRIBUTE_TYPE = "DMAttribute"; + + protected final AtlasTypeRegistry typeRegistry; + + protected final EntityGraphRetriever entityRetriever; + + protected EntityGraphMapper entityGraphMapper; + protected AtlasRelationshipStore atlasRelationshipStore; + + + public AbstractModelPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + this.typeRegistry = typeRegistry; + this.entityRetriever = entityRetriever; + this.entityGraphMapper = entityGraphMapper; + this.atlasRelationshipStore = atlasRelationshipStore; + } + + + protected void setModelDates(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(ATLAS_DM_SYSTEM_DATE, value); + newEntity.setAttribute(ATLAS_DM_BUSINESS_DATE, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, ATLAS_DM_SYSTEM_DATE, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, ATLAS_DM_BUSINESS_DATE, value); + } + + protected void setModelExpiredAtDates(AtlasEntity oldEntity, AtlasVertex oldVertex, Object value) { + oldEntity.setAttribute(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, value); + oldEntity.setAttribute(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, value); + AtlasGraphUtilsV2.setEncodedProperty(oldVertex, ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, value); + AtlasGraphUtilsV2.setEncodedProperty(oldVertex, ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, value); + } + + protected void setQualifiedName(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(QUALIFIED_NAME, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, QUALIFIED_NAME, value); + } + + protected void setName(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(NAME, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, NAME, value); + } + + protected void setQualifiedNamePrefix(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(ATLAS_DM_QUALIFIED_NAME_PREFIX, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, ATLAS_DM_QUALIFIED_NAME_PREFIX, value); + } + + protected void setNamespace(AtlasEntity newEntity, AtlasVertex newVertex, Object value) { + newEntity.setAttribute(ATLAS_DM_NAMESPACE, value); + AtlasGraphUtilsV2.setEncodedProperty(newVertex, ATLAS_DM_NAMESPACE, value); + } + + protected ModelResponse createEntity(String qualifiedName, String name, String entityType, String namespace, EntityMutationContext context) throws AtlasBaseException { + String guid = UUID.randomUUID().toString(); + AtlasEntity entity = new AtlasEntity(entityType); + entity.setAttribute(NAME, name); + entity.setAttribute(VERSION_PROPERTY_KEY, 0); + entity.setAttribute(QUALIFIED_NAME, qualifiedName); + entity.setAttribute(ATLAS_DM_NAMESPACE, namespace); + entity.setAttribute(ATLAS_DM_BUSINESS_DATE, RequestContext.get().getRequestTime()); + entity.setAttribute(ATLAS_DM_SYSTEM_DATE, RequestContext.get().getRequestTime()); + if (entityType.equals(ATLAS_DM_ENTITY_TYPE) || entityType.equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + String prefix = qualifiedName.substring(0, qualifiedName.indexOf("_")); + entity.setAttribute(ATLAS_DM_QUALIFIED_NAME_PREFIX, prefix); + } + AtlasVertex versionVertex = entityGraphMapper.createVertexWithGuid(entity, guid); + context.getDiscoveryContext().addResolvedGuid(guid, versionVertex); + entity.setGuid(guid); + return new ModelResponse(entity, versionVertex); + } + + public ModelResponse replicateModelVersion(String modelGuid, String modelQualifiedName, long now) throws AtlasBaseException { + AtlasEntity.AtlasEntityWithExtInfo dataModel = entityRetriever.toAtlasEntityWithExtInfo(modelGuid, false); + List existingModelVersions = (List) dataModel.getEntity().getRelationshipAttributes().get("dMVersions"); + + String modelVersion = "v1"; + AtlasRelatedObjectId existingModelVersionObj = null; + + if (CollectionUtils.isEmpty(existingModelVersions)) { + return new ModelResponse(null, null); + } + + int existingVersionNumber = existingModelVersions.size(); + modelVersion = "v" + (++existingVersionNumber); + + // get active model version + for (AtlasRelatedObjectId modelVersionObj : existingModelVersions) { + AtlasEntity modelVersionEntity = entityRetriever.toAtlasEntity(modelVersionObj.getGuid()); + Date expiredAtBusinessDate = (Date) modelVersionEntity.getAttributes().get(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE); + Date expiredAtSystemDate = (Date) modelVersionEntity.getAttributes().get(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE); + + if (expiredAtBusinessDate != null && expiredAtBusinessDate.getTime() > 0 || expiredAtSystemDate != null && expiredAtSystemDate.getTime() > 0) { + continue; + } + existingModelVersionObj = modelVersionObj; + } + + if (existingModelVersionObj == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_MODEL_VERSION_NOT_EXIST); + } + + AtlasEntity existingModelVersionEntity = entityRetriever.toAtlasEntityWithExtInfo(existingModelVersionObj.getGuid()).getEntity(); + AtlasVertex existingModelVersionVertex = entityRetriever.getEntityVertex(existingModelVersionObj.getGuid()); + + + AtlasVertex copyModelVertex = entityGraphMapper.createVertex(existingModelVersionEntity); + AtlasEntity copyModelVersion = entityRetriever.toAtlasEntity(copyModelVertex); + copyAllAttributes(existingModelVersionEntity, copyModelVersion, now); + setModelDates(copyModelVersion, copyModelVertex, now); + setQualifiedName(copyModelVersion, copyModelVertex, modelQualifiedName + "/" + modelVersion); + setName(copyModelVersion, copyModelVertex, modelVersion); + setNamespace(copyModelVersion, copyModelVertex, dataModel.getEntity().getAttribute(ATLAS_DM_NAMESPACE)); + setModelExpiredAtDates(existingModelVersionEntity, existingModelVersionVertex, now); + return new ModelResponse(existingModelVersionEntity, copyModelVersion, existingModelVersionVertex, copyModelVertex); + } + + public ModelResponse replicateModelEntity(AtlasEntity existingEntity, AtlasVertex existingEntityVertex, String entityQualifiedNamePrefix, long epoch) throws AtlasBaseException { + AtlasVertex copyEntityVertex = entityGraphMapper.createVertex(existingEntity); + AtlasEntity copyEntity = entityRetriever.toAtlasEntity(copyEntityVertex); + copyAllAttributes(existingEntity, copyEntity, epoch); + String entityQualifiedName = entityQualifiedNamePrefix + "_" + epoch; + setQualifiedName(copyEntity, copyEntityVertex, entityQualifiedName); + setModelDates(copyEntity, copyEntityVertex, epoch); + setName(copyEntity, copyEntityVertex, existingEntity.getAttribute(NAME)); + setNamespace(copyEntity, copyEntityVertex, existingEntity.getAttribute(ATLAS_DM_NAMESPACE)); + setQualifiedNamePrefix(copyEntity, copyEntityVertex, existingEntity.getAttribute(ATLAS_DM_QUALIFIED_NAME_PREFIX)); + setModelExpiredAtDates(existingEntity, existingEntityVertex, epoch); + return new ModelResponse(existingEntity, copyEntity, existingEntityVertex, copyEntityVertex); + } + + protected ModelResponse replicateModelAttribute(AtlasEntity existingAttribute, AtlasVertex existingAttributeVertex, String attributeQualifiedNamePrefix, long epoch) throws AtlasBaseException { + AtlasVertex copyAttributeVertex = entityGraphMapper.createVertex(existingAttribute); + AtlasEntity copyAttributeEntity = entityRetriever.toAtlasEntity(copyAttributeVertex); + copyAllAttributes(existingAttribute, copyAttributeEntity, epoch); + String attributeQualifiedName = attributeQualifiedNamePrefix + "_" + epoch; + setQualifiedName(copyAttributeEntity, copyAttributeVertex, attributeQualifiedName); + setModelDates(copyAttributeEntity, copyAttributeVertex, epoch); + setName(copyAttributeEntity, copyAttributeVertex, existingAttribute.getAttribute(NAME)); + setNamespace(copyAttributeEntity, copyAttributeVertex, existingAttribute.getAttribute(ATLAS_DM_NAMESPACE)); + setQualifiedNamePrefix(copyAttributeEntity, copyAttributeVertex, existingAttribute.getAttribute(ATLAS_DM_QUALIFIED_NAME_PREFIX)); + setModelExpiredAtDates(existingAttribute, existingAttributeVertex, epoch); + return new ModelResponse(existingAttribute, copyAttributeEntity, existingAttributeVertex, copyAttributeVertex); + } + + public void createModelVersionModelEntityRelationship(AtlasVertex modelVersionVertex, + AtlasVertex modelEntityVertex) throws AtlasBaseException { + AtlasRelationship modelVersionEntityRelation = new AtlasRelationship("d_m_version_d_m_entities"); + modelVersionEntityRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelVersionEntityRelation.setEnd1(new AtlasObjectId( + GraphHelper.getGuid(modelVersionVertex), + GraphHelper.getTypeName(modelVersionVertex))); + modelVersionEntityRelation.setEnd2(new AtlasObjectId( + GraphHelper.getGuid(modelEntityVertex), + GraphHelper.getTypeName(modelEntityVertex))); + atlasRelationshipStore.create(modelVersionEntityRelation); + } + + protected void createModelVersionModelEntityRelationship(AtlasVertex modelVersionVertex, + List existingEntities) throws AtlasBaseException { + if (CollectionUtils.isEmpty(existingEntities)) { + return; + } + AtlasRelationship modelVersionEntityRelation = new AtlasRelationship("d_m_version_d_m_entities"); + modelVersionEntityRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelVersionEntityRelation.setEnd1(new AtlasObjectId( + GraphHelper.getGuid(modelVersionVertex), + GraphHelper.getTypeName(modelVersionVertex))); + for (AtlasRelatedObjectId existingEntity : existingEntities) { + AtlasEntity entity = entityRetriever.toAtlasEntity(existingEntity.getGuid()); + Date expiredAtBusinessDate = (Date) entity.getAttributes().get(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE); + Date expiredAtSystemDate = (Date) entity.getAttributes().get(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE); + if (expiredAtBusinessDate != null && expiredAtBusinessDate.getTime() > 0 || expiredAtSystemDate != null && expiredAtSystemDate.getTime() > 0) { + continue; + } + modelVersionEntityRelation.setEnd2(new AtlasObjectId( + existingEntity.getGuid(), + existingEntity.getTypeName() + )); + atlasRelationshipStore.create(modelVersionEntityRelation); + } + } + + protected void createModelEntityModelAttributeRelation(AtlasVertex entity, List existingEntityAttributes) throws AtlasBaseException { + if (CollectionUtils.isEmpty(existingEntityAttributes)) { + return; + } + AtlasRelationship modelEntityAttributeRelation = new AtlasRelationship("d_m_entity_d_m_attributes"); + modelEntityAttributeRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelEntityAttributeRelation.setEnd1( + new AtlasObjectId( + GraphHelper.getGuid(entity), + GraphHelper.getTypeName(entity))); + for (AtlasRelatedObjectId existingEntityAttribute : existingEntityAttributes) { + AtlasEntity entityAttribute = entityRetriever.toAtlasEntity(existingEntityAttribute.getGuid()); + Date expiredAtBusinessDate = (Date) entityAttribute.getAttributes().get(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE); + Date expiredAtSystemDate = (Date) entityAttribute.getAttributes().get(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE); + if (expiredAtBusinessDate != null && expiredAtBusinessDate.getTime() > 0 || expiredAtSystemDate != null && expiredAtSystemDate.getTime() > 0) { + continue; + } + modelEntityAttributeRelation.setEnd2( + new AtlasObjectId( + existingEntityAttribute.getGuid(), + existingEntityAttribute.getTypeName())); + atlasRelationshipStore.create(modelEntityAttributeRelation); + } + } + + protected void createModelEntityModelAttributeRelation(AtlasVertex entity, AtlasVertex attribute) throws AtlasBaseException { + AtlasRelationship modelEntityAttributeRelation = new AtlasRelationship("d_m_entity_d_m_attributes"); + modelEntityAttributeRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelEntityAttributeRelation.setEnd1( + new AtlasObjectId( + GraphHelper.getGuid(entity), + GraphHelper.getTypeName(entity))); + modelEntityAttributeRelation.setEnd2( + new AtlasObjectId( + GraphHelper.getGuid(attribute), + GraphHelper.getTypeName(attribute))); + atlasRelationshipStore.create(modelEntityAttributeRelation); + } + + + public void createModelModelVersionRelation(String modelGuid, String latestModelVersionGuid) throws AtlasBaseException { + AtlasRelationship modelVersionModelRelation = new AtlasRelationship("d_m_data_model_d_m_versions"); + modelVersionModelRelation.setStatus(AtlasRelationship.Status.ACTIVE); + modelVersionModelRelation.setEnd1( + new AtlasObjectId( + modelGuid, ATLAS_DM_DATA_MODEL)); + modelVersionModelRelation.setEnd2( + new AtlasObjectId(latestModelVersionGuid, ATLAS_DM_VERSION_TYPE)); + atlasRelationshipStore.create(modelVersionModelRelation); + } + + protected void copyAllAttributes(AtlasEntity source, AtlasEntity destination, long epochNow) { + if (source == null || destination == null) { + throw new IllegalArgumentException("Source and destination entities must not be null."); + } + + if (source.getAttributes() != null) { + destination.setAttributes(new HashMap<>(source.getAttributes())); + } else { + destination.setAttributes(new HashMap<>()); + } + + + if (CollectionUtils.isNotEmpty(source.getMeanings())) { + destination.setMeanings(new ArrayList<>(source.getMeanings())); + } else { + destination.setMeanings(new ArrayList<>()); + } + + long requestTime = RequestContext.get().getRequestTime(); + destination.setCreateTime(new Date(requestTime)); + destination.setUpdateTime(new Date(requestTime)); + + + if (source.getCustomAttributes() != null) { + destination.setCustomAttributes(new HashMap<>(source.getCustomAttributes())); + } else { + destination.setCustomAttributes(new HashMap<>()); // Set empty map if source custom attributes are null + } + + if (CollectionUtils.isNotEmpty(source.getClassifications())) { + destination.setClassifications(new ArrayList<>(source.getClassifications())); + } else { + destination.setClassifications(new ArrayList<>()); // Set empty list if source classifications are null or empty + } + + String entityType = source.getTypeName(); + + if (MapUtils.isNotEmpty(source.getRelationshipAttributes())) { + Map relationAttributes = copyRelationshipAttributes(source.getRelationshipAttributes(), destination, entityType); + destination.setRelationshipAttributes(relationAttributes); + } + if (MapUtils.isNotEmpty(source.getAppendRelationshipAttributes())) { + Map relationAttributes = copyRelationshipAttributes(source.getAppendRelationshipAttributes(), destination, entityType); + destination.setAppendRelationshipAttributes(relationAttributes); + } + if (MapUtils.isNotEmpty(source.getRemoveRelationshipAttributes())) { + Map relationAttributes = copyRelationshipAttributes(source.getRemoveRelationshipAttributes(), destination, entityType); + destination.setRemoveRelationshipAttributes(relationAttributes); + } + + } + + private Map copyRelationshipAttributes(Map sourceAttributes, AtlasEntity destination, String entityType) { + Map destinationAttributes = new HashMap<>(); + + if (MapUtils.isEmpty(sourceAttributes)) { + return destinationAttributes; + } + + Set allowedRelations = allowedRelationshipsForEntityType(entityType); + + for (String attribute : sourceAttributes.keySet()) { + if (allowedRelations.contains(attribute)) { + destinationAttributes.put(attribute, sourceAttributes.get(attribute)); + } + } + + return destinationAttributes; + } + + public static void replaceAttributes(Map existingAttributes, Map diffAttributes) { + if (MapUtils.isEmpty(diffAttributes)) { + return; + } + // Temporary map to hold new key-value pairs during replacement + Map tempMap = new HashMap<>(); + + // Iterate over the original map + for (Map.Entry entry : existingAttributes.entrySet()) { + String originalKey = entry.getKey(); + Object value = entry.getValue(); + + // Check if the second map contains a key for replacement + if (diffAttributes.containsKey(originalKey)) { + Object newValue = diffAttributes.get(originalKey); // Get the new key from second map + tempMap.put(originalKey, newValue); // Put the new key in the temp map + } else { + tempMap.put(originalKey, value); // No replacement, keep the original key + } + } + + // Clear the original map and put all the updated entries + existingAttributes.clear(); + existingAttributes.putAll(tempMap); + } + + protected void applyDiffs(AtlasEntity sourceEntity, AtlasEntity destinationEntity, String typeName) { + RequestContext reqContext = RequestContext.get(); + AtlasEntity diffEntity = reqContext.getDifferentialEntity(sourceEntity.getGuid()); + if (diffEntity == null) { + return; + } + boolean diffExistsForSameType = diffEntity.getTypeName().equals(typeName); + if (!diffExistsForSameType) { + return; + } + replaceAttributes(destinationEntity.getAttributes(), diffEntity.getAttributes()); + } + + protected void unsetExpiredDates(AtlasEntity latestEntity, AtlasVertex latestVertex) { + latestEntity.setAttribute(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, 0); + latestEntity.setAttribute(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, 0); + AtlasGraphUtilsV2.setEncodedProperty(latestVertex, ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, 0); + AtlasGraphUtilsV2.setEncodedProperty(latestVertex, ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, 0); + } + + protected Set allowedRelationshipsForEntityType(String entityType) { + Set allowedRelationships = new HashSet<>(); + switch (entityType) { + case ATLAS_DM_ENTITY_TYPE: + allowedRelationships.add("dMMappedToEntities"); + allowedRelationships.add("dMMappedFromEntities"); + allowedRelationships.add("dMRelatedFromEntities"); + allowedRelationships.add("dMRelatedToEntities"); + break; + case ATLAS_DM_ATTRIBUTE_TYPE: + allowedRelationships.add("dMMappedFromAttributes"); + allowedRelationships.add("dMMappedToAttributes"); + allowedRelationships.add("dMRelatedFromAttributes"); + allowedRelationships.add("dMRelatedToAttributes"); + break; + case ATLAS_DM_ENTITY_ASSOCIATION_TYPE: + allowedRelationships.add("dMEntityTo"); + allowedRelationships.add("dMEntityFrom"); + break; + case ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE: + allowedRelationships.add("dMAttributeTo"); + allowedRelationships.add("dMAttributeFrom"); + break; + } + return allowedRelationships; + } + + protected ModelResponse updateDMEntity(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)) { + return new ModelResponse(entity, vertex); + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + + long now = RequestContext.get().getRequestTime(); + AtlasEntity.AtlasEntityWithExtInfo existingEntity = entityRetriever.toAtlasEntityWithExtInfo(vertex, false); + List existingEntityAttributes = (List) existingEntity.getEntity().getRelationshipAttributes().get("dMAttributes"); + + + // get model qualifiedName with qualifiedNamePrefix + String qualifiedNamePrefix = (String) entity.getAttributes().get(ATLAS_DM_QUALIFIED_NAME_PREFIX); + int lastIndex = qualifiedNamePrefix.lastIndexOf("/"); + String modelQualifiedName = qualifiedNamePrefix.substring(0, lastIndex); + String modelVersion = "v1"; + + + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + + AtlasVertex modelVertex = AtlasGraphUtilsV2.findByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), attrValues); + + if (modelVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_MODEL_NOT_EXIST); + } + + String modelGuid = AtlasGraphUtilsV2.getIdFromVertex(modelVertex); + ModelResponse modelVersionResponse = replicateModelVersion(modelGuid, modelQualifiedName, now); + + // model is not replicated successfully + if (modelVersionResponse.getReplicaEntity() == null) { + String namespace = (String) entity.getAttributes().get(ATLAS_DM_NAMESPACE); + modelVersionResponse = createEntity( + (modelQualifiedName + "/" + modelVersion), + modelVersion, + ATLAS_DM_VERSION_TYPE, + namespace, + context); + } + + AtlasEntity latestModelVersionEntity = modelVersionResponse.getReplicaEntity(); + AtlasVertex latestModelVersionVertex = modelVersionResponse.getReplicaVertex(); + AtlasEntity existingVersion = modelVersionResponse.getExistingEntity(); + + // create entity e1 ---> e1' + ModelResponse modelEntityResponse = replicateModelEntity(existingEntity.getEntity(), vertex, qualifiedNamePrefix, now); + AtlasVertex copyEntityVertex = modelEntityResponse.getReplicaVertex(); + AtlasEntity copyEntity = modelEntityResponse.getReplicaEntity(); + applyDiffs(entity, copyEntity, ATLAS_DM_ENTITY_TYPE); + unsetExpiredDates(copyEntity, copyEntityVertex); + + // create model-modelVersion relation + createModelModelVersionRelation(modelGuid, latestModelVersionEntity.getGuid()); + + // create modelVersion-modelEntity relationship with new entity + createModelVersionModelEntityRelationship(latestModelVersionVertex, copyEntityVertex); + + // create modelVersion-modelEntity relation with old entities which are not expired + if (existingVersion != null) { + List existingEntities = (List) existingVersion.getRelationshipAttributes().get("dMEntities"); + createModelVersionModelEntityRelationship(latestModelVersionVertex, existingEntities); + } + // create modelEntity-modelAttributeRelationship + createModelEntityModelAttributeRelation(copyEntityVertex, existingEntityAttributes); + + + /** + * update context + */ + // previousEntity and previousModelVersion + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + AtlasEntityType modelVersionType = typeRegistry.getEntityTypeByName(modelVersionResponse.getReplicaEntity().getTypeName()); + + context.addCreated(copyEntity.getGuid(), copyEntity, entityType, copyEntityVertex); + context.addCreated(latestModelVersionEntity.getGuid(), latestModelVersionEntity, modelVersionType, latestModelVersionVertex); + + //remove existing entity from context so it is not updated + context.removeUpdated(entity.getGuid(), entity, + entityType, vertex); + + // resolve references + context.getDiscoveryContext(). + addResolvedGuid( + modelGuid, + modelVertex); + + return new ModelResponse(copyEntity, copyEntityVertex); + } + + protected Map processRelationshipAttributesForEntity(AtlasEntity entity, Map relationshipAttributes, EntityMutationContext context) throws AtlasBaseException { + Map appendAttributesDestination = new HashMap<>(); + if (relationshipAttributes != null) { + Map appendAttributesSource = (Map) relationshipAttributes; + ; + ModelResponse modelResponseRelatedEntity = null; + String guid = ""; + Set allowedRelations = allowedRelationshipsForEntityType(entity.getTypeName()); + + for (String attribute : appendAttributesSource.keySet()) { + + if (appendAttributesSource.get(attribute) instanceof List) { + + if (!allowedRelations.contains(attribute)) { + continue; + } + List> destList = new ArrayList<>(); + Map destMap = null; + + List> attributeList = (List>) appendAttributesSource.get(attribute); + + for (Map relationAttribute : attributeList) { + guid = (String) relationAttribute.get("guid"); + + // update end2 + modelResponseRelatedEntity = updateDMEntity( + entityRetriever.toAtlasEntity(guid), + entityRetriever.getEntityVertex(guid), + context); + //relationAttribute.put("guid", modelResponseRelatedEntity.getCopyEntity()); + destMap = new HashMap<>(relationAttribute); + guid = modelResponseRelatedEntity.getReplicaEntity().getGuid(); + destMap.put("guid", guid); + //destMap.put(QUALIFIED_NAME, ) + context.getDiscoveryContext().addResolvedGuid(guid, modelResponseRelatedEntity.getReplicaVertex()); + destList.add(destMap); + } + appendAttributesDestination.put(attribute, destList); + } else { + if (appendAttributesSource.get(attribute) instanceof Map) { + LinkedHashMap attributeList = (LinkedHashMap) appendAttributesSource.get(attribute); + guid = (String) attributeList.get("guid"); + + // update end2 + modelResponseRelatedEntity = updateDMEntity( + entityRetriever.toAtlasEntity(guid), + entityRetriever.getEntityVertex(guid), + context); + + Map destMap = new HashMap<>(attributeList); + destMap.put("guid", guid); + guid = modelResponseRelatedEntity.getReplicaEntity().getGuid(); + context.getDiscoveryContext().addResolvedGuid(guid, modelResponseRelatedEntity.getReplicaVertex()); + appendAttributesDestination.put(attribute, destMap); + } + } + } + } + return appendAttributesDestination; + } + + protected ModelResponse replicateDMAssociation(AtlasEntity existingEntity, AtlasVertex existingEntityVertex, long epoch) throws AtlasBaseException { + AtlasVertex copyEntityVertex = entityGraphMapper.createVertex(existingEntity); + AtlasEntity copyEntity = entityRetriever.toAtlasEntity(copyEntityVertex); + copyAllAttributes(existingEntity, copyEntity, epoch); + setModelDates(copyEntity, copyEntityVertex, epoch); + setModelDates(copyEntity, copyEntityVertex, epoch); + setModelExpiredAtDates(existingEntity, existingEntityVertex, epoch); + return new ModelResponse(existingEntity, copyEntity, existingEntityVertex, copyEntityVertex); + } + + protected Map processRelationshipAttributesForAttribute(AtlasEntity entity, Map relationshipAttributes, EntityMutationContext context) throws AtlasBaseException { + Map appendAttributesDestination = new HashMap<>(); + if (relationshipAttributes != null) { + Map appendAttributesSource = (Map) relationshipAttributes; + ; + ModelResponse modelResponseRelatedEntity = null; + String guid = ""; + Set allowedRelations = allowedRelationshipsForEntityType(entity.getTypeName()); + + for (String attribute : appendAttributesSource.keySet()) { + + if (appendAttributesSource.get(attribute) instanceof List) { + + if (!allowedRelations.contains(attribute)) { + continue; + } + List> destList = new ArrayList<>(); + Map destMap = null; + + List> attributeList = (List>) appendAttributesSource.get(attribute); + + for (Map relationAttribute : attributeList) { + guid = (String) relationAttribute.get("guid"); + + // update end2 + modelResponseRelatedEntity = updateDMAttribute( + entityRetriever.toAtlasEntity(guid), + entityRetriever.getEntityVertex(guid), + context); + //relationAttribute.put("guid", modelResponseRelatedEntity.getCopyEntity()); + destMap = new HashMap<>(relationAttribute); + guid = modelResponseRelatedEntity.getReplicaEntity().getGuid(); + destMap.put("guid", guid); + //destMap.put(QUALIFIED_NAME, ) + context.getDiscoveryContext().addResolvedGuid(guid, modelResponseRelatedEntity.getReplicaVertex()); + destList.add(destMap); + } + appendAttributesDestination.put(attribute, destList); + } else { + if (appendAttributesSource.get(attribute) instanceof Map) { + LinkedHashMap attributeList = (LinkedHashMap) appendAttributesSource.get(attribute); + guid = (String) attributeList.get("guid"); + + // update end2 + modelResponseRelatedEntity = updateDMAttribute( + entityRetriever.toAtlasEntity(guid), + entityRetriever.getEntityVertex(guid), + context); + + Map destMap = new HashMap<>(attributeList); + destMap.put("guid", guid); + guid = modelResponseRelatedEntity.getReplicaEntity().getGuid(); + context.getDiscoveryContext().addResolvedGuid(guid, modelResponseRelatedEntity.getReplicaVertex()); + appendAttributesDestination.put(attribute, destMap); + } + } + } + } + return appendAttributesDestination; + } + + protected ModelResponse updateDMAttribute(AtlasEntity entityAttribute, AtlasVertex vertexAttribute, EntityMutationContext context) throws AtlasBaseException { + if (!entityAttribute.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + return new ModelResponse(entityAttribute, vertexAttribute); + } + + String attributeName = (String) entityAttribute.getAttribute(NAME); + + if (StringUtils.isEmpty(attributeName) || isNameInvalid(attributeName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME, attributeName); + } + + long now = RequestContext.get().getRequestTime(); + + + // get entity qualifiedName with qualifiedNamePrefix + String attributeQualifiedNamePrefix = (String) entityAttribute.getAttributes().get(ATLAS_DM_QUALIFIED_NAME_PREFIX); + int lastIndex = attributeQualifiedNamePrefix.lastIndexOf("/"); + String entityQualifiedNamePrefix = attributeQualifiedNamePrefix.substring(0, lastIndex); + String namespace = (String) entityAttribute.getAttributes().get(ATLAS_DM_NAMESPACE); + String modelVersion = "v1"; + + ModelResponse modelENtityResponse = null; + AtlasVertex latestEntityVertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(ATLAS_DM_ENTITY_TYPE, entityQualifiedNamePrefix); + + // get model qualifiedName with qualifiedNamePrefix + lastIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + String modelQualifiedName = entityQualifiedNamePrefix.substring(0, lastIndex); + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + + String modelGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), + attrValues); + + List existingAttributes = null; + + if (latestEntityVertex != null) { + modelENtityResponse = replicateModelEntity( + entityRetriever.toAtlasEntity(latestEntityVertex), + latestEntityVertex, + entityQualifiedNamePrefix, + now + ); + modelVersion = "v2"; + if (modelENtityResponse.getExistingEntity() != null && modelENtityResponse.getExistingEntity().getRelationshipAttributes() != null) { + existingAttributes = (List) modelENtityResponse.getExistingEntity().getAttributes().get("dMAttributes"); + } + } else { + int lastSlashIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + String entityName = entityQualifiedNamePrefix.substring(lastSlashIndex + 1); + modelENtityResponse = createEntity( + entityQualifiedNamePrefix + "_" + now, + entityName, + ATLAS_DM_ENTITY_TYPE, + namespace, + context + ); + } + + ModelResponse modelVersionResponse = replicateModelVersion(modelGuid, modelQualifiedName, now); + + if (modelVersionResponse.getReplicaEntity() == null) { + modelVersionResponse = createEntity( + (modelQualifiedName + "/" + modelVersion), + modelVersion, + ATLAS_DM_VERSION_TYPE, + namespace, + context); + } + AtlasEntity latestModelVersionEntity = modelVersionResponse.getReplicaEntity(); + AtlasVertex latestModelVersionVertex = modelVersionResponse.getReplicaVertex(); + + List existingEntities = null; + + if (modelVersionResponse.getExistingEntity() != null && modelVersionResponse.getExistingEntity().getRelationshipAttributes() != null) { + existingEntities = (List) modelVersionResponse.getExistingEntity().getRelationshipAttributes().get("dMEntities"); + } + + AtlasEntity existingEntityAttributeWithExtInfo = entityRetriever.toAtlasEntityWithExtInfo(entityAttribute.getGuid(), false).getEntity(); + + // create attribute a1 ---> a1' + ModelResponse modelAttributeResponse = replicateModelAttribute( + existingEntityAttributeWithExtInfo, + entityRetriever.getEntityVertex(entityAttribute.getGuid()), + attributeQualifiedNamePrefix, + now); + + AtlasVertex copyAttributeVertex = modelAttributeResponse.getReplicaVertex(); + AtlasEntity copyAttribute = modelAttributeResponse.getReplicaEntity(); + applyDiffs(entityAttribute, copyAttribute, ATLAS_DM_ATTRIBUTE_TYPE); + unsetExpiredDates(copyAttribute, copyAttributeVertex); + + // create model-modelVersion relationship + createModelModelVersionRelation(modelGuid, latestModelVersionEntity.getGuid()); + + // create modelVersion-entity relationship [with new entity] + createModelVersionModelEntityRelationship(latestModelVersionVertex, modelENtityResponse.getReplicaVertex()); + + // create modelVersion-entity relationship [with existing entities] + createModelVersionModelEntityRelationship(latestModelVersionVertex, existingEntities); + + + // create entity - attribute relation [with new attribute] + createModelEntityModelAttributeRelation(modelENtityResponse.getReplicaVertex(), copyAttributeVertex); + + // create entity - attribute relation [with existing attributes] + createModelEntityModelAttributeRelation(modelENtityResponse.getReplicaVertex(), existingAttributes); + + AtlasEntityType attributeType = typeRegistry.getEntityTypeByName(entityAttribute.getTypeName()); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(modelENtityResponse.getReplicaEntity().getTypeName()); + AtlasEntityType modelVersionType = typeRegistry.getEntityTypeByName(latestModelVersionEntity.getTypeName()); + + context.addCreated(copyAttribute.getGuid(), copyAttribute, attributeType, copyAttributeVertex); + context.addCreated(modelENtityResponse.getReplicaEntity().getGuid(), modelENtityResponse.getReplicaEntity(), + entityType, modelENtityResponse.getReplicaVertex()); + context.addCreated(latestModelVersionEntity.getGuid(), + latestModelVersionEntity, modelVersionType, latestModelVersionVertex); + + context.removeUpdated(entityAttribute.getGuid(), entityAttribute, + entityType, vertexAttribute); + + // resolve references + context.getDiscoveryContext(). + addResolvedGuid( + modelGuid, + entityRetriever.getEntityVertex(modelGuid)); + + return new ModelResponse(copyAttribute, copyAttributeVertex); + } + +} + + diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributeAssociationPreprocessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributeAssociationPreprocessor.java new file mode 100644 index 0000000000..5430d98451 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributeAssociationPreprocessor.java @@ -0,0 +1,102 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +public class DMAttributeAssociationPreprocessor extends AbstractModelPreProcessor{ + private static final Logger LOG = LoggerFactory.getLogger(DMAttributePreprocessor.class); + + public DMAttributeAssociationPreprocessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + super(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore); + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + + if (LOG.isDebugEnabled()) { + LOG.debug("ModelPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + switch (operation) { + case CREATE: + createDMAttributeAssociation(entity, vertex, context); + break; + case UPDATE: + updateDMAttributeAssociation(entity, vertex, context); + } + } + + private void createDMAttributeAssociation(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + entity.setRelationshipAttributes( + processRelationshipAttributesForAttribute(entity, entity.getRelationshipAttributes(), context)); + } + private void updateDMAttributeAssociation(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + + long now = RequestContext.get().getRequestTime(); + + ModelResponse modelResponse = replicateDMAssociation(entity, vertex, now); + AtlasEntity copyEntity = modelResponse.getReplicaEntity(); + AtlasVertex copyVertex = modelResponse.getReplicaVertex(); + applyDiffs(entity, copyEntity, ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE); + unsetExpiredDates(copyEntity, copyVertex); + + + // case when a mapping is added + if (entity.getAppendRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForAttribute(entity, entity.getAppendRelationshipAttributes(), context); + modelResponse.getReplicaEntity().setAppendRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithRelationshipAttributes(entity); + context.setUpdatedWithRelationshipAttributes(modelResponse.getReplicaEntity()); + } + + if (entity.getRemoveRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForAttribute(entity, entity.getRemoveRelationshipAttributes(), context); + modelResponse.getReplicaEntity().setRemoveRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithDeleteRelationshipAttributes(entity); + context.setUpdatedWithRemoveRelationshipAttributes(modelResponse.getReplicaEntity()); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributePreprocessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributePreprocessor.java new file mode 100644 index 0000000000..c933f02519 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMAttributePreprocessor.java @@ -0,0 +1,182 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +public class DMAttributePreprocessor extends AbstractModelPreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DMAttributePreprocessor.class); + + public DMAttributePreprocessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + super(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore); + } + + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (LOG.isDebugEnabled()) { + LOG.debug("ModelPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + switch (operation) { + case CREATE: + createDMAttribute(entity, vertex, context); + break; + case UPDATE: + updateDMAttributes(entity, vertex, context); + } + } + + private void createDMAttribute(AtlasEntity entityAttribute, AtlasVertex vertexAttribute, EntityMutationContext context) throws AtlasBaseException { + if (!entityAttribute.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + return; + } + if (CollectionUtils.isEmpty(context.getCreatedEntities())) { + return; + } + + String entityName = (String) entityAttribute.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + long now = RequestContext.get().getRequestTime(); + + + // get entity qualifiedName with qualifiedNamePrefix + String attributeQualifiedNamePrefix = (String) entityAttribute.getAttributes().get(ATLAS_DM_QUALIFIED_NAME_PREFIX); + int lastIndex = attributeQualifiedNamePrefix.lastIndexOf("/"); + String entityQualifiedNamePrefix = attributeQualifiedNamePrefix.substring(0, lastIndex); + String namespace = (String) entityAttribute.getAttributes().get(ATLAS_DM_NAMESPACE); + String modelVersion = "v1"; + + ModelResponse modelENtityResponse = null; + AtlasVertex latestEntityVertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(ATLAS_DM_ENTITY_TYPE, entityQualifiedNamePrefix); + lastIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + + String modelQualifiedName = entityQualifiedNamePrefix.substring(0, lastIndex); + + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + String modelGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), + attrValues); + + List existingAttributes = null; + + // + if (latestEntityVertex != null) { + modelENtityResponse = replicateModelEntity( + entityRetriever.toAtlasEntity(latestEntityVertex), + latestEntityVertex, + entityQualifiedNamePrefix, + now + ); + modelVersion = "v2"; + if (modelENtityResponse.getExistingEntity() != null && modelENtityResponse.getExistingEntity().getRelationshipAttributes() != null) { + existingAttributes = (List) modelENtityResponse.getExistingEntity().getRelationshipAttributes().get("dMAttributes"); + } + } else { + int lastSlashIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + + // Extract the substring after the last "/" + String name = entityQualifiedNamePrefix.substring(lastSlashIndex + 1); + modelENtityResponse = createEntity( + entityQualifiedNamePrefix + "_" + now, + name, + ATLAS_DM_ENTITY_TYPE, + namespace, + context + ); + } + + List existingEntities = null; + ModelResponse modelVersionResponse = replicateModelVersion(modelGuid, modelQualifiedName, now); + if (modelVersionResponse.getReplicaEntity() == null) { + modelVersionResponse = createEntity( + (modelQualifiedName + "/" + modelVersion), + modelVersion, + ATLAS_DM_VERSION_TYPE, + namespace, + context); + } + AtlasEntity latestModelVersionEntity = modelVersionResponse.getReplicaEntity(); + AtlasVertex latestModelVersionVertex = modelVersionResponse.getReplicaVertex(); + + + // model --- modelVersion relation + createModelModelVersionRelation(modelGuid, latestModelVersionEntity.getGuid()); + + // modelVersion --- entity relation + createModelVersionModelEntityRelationship(latestModelVersionVertex, modelENtityResponse.getReplicaVertex()); + + // modelVersion --- entitiesOfExistingModelVersion + if (modelVersionResponse.getExistingEntity() != null && modelVersionResponse.getExistingEntity().getRelationshipAttributes() != null) { + existingEntities = (List) modelVersionResponse.getExistingEntity().getRelationshipAttributes().get("dMEntities"); + createModelVersionModelEntityRelationship(latestModelVersionVertex, existingEntities); + } + + // entity --- attributes of existingEntity relation + createModelEntityModelAttributeRelation(modelENtityResponse.getReplicaVertex(), existingAttributes); + + // latest entity ---- new attribute relation + createModelEntityModelAttributeRelation(modelENtityResponse.getReplicaVertex(), vertexAttribute); + + context.addCreated(latestModelVersionEntity.getGuid(), latestModelVersionEntity, + typeRegistry.getEntityTypeByName(ATLAS_DM_VERSION_TYPE), latestModelVersionVertex); + + context.addCreated(modelENtityResponse.getReplicaEntity().getGuid(), modelENtityResponse.getReplicaEntity(), + typeRegistry.getEntityTypeByName(ATLAS_DM_ENTITY_TYPE), modelENtityResponse.getReplicaVertex()); + + // resolve references + context.getDiscoveryContext(). + addResolvedGuid( + modelGuid, + entityRetriever.getEntityVertex(modelGuid)); + + entityAttribute.setRelationshipAttributes(processRelationshipAttributesForAttribute(entityAttribute, entityAttribute.getRelationshipAttributes(), context)); + } + + private void updateDMAttributes(AtlasEntity entityAttribute, AtlasVertex vertexAttribute, EntityMutationContext context) throws AtlasBaseException { + ModelResponse modelResponseParentEntity = updateDMAttribute(entityAttribute, vertexAttribute, context); + // case when a mapping is added + if (entityAttribute.getAppendRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForAttribute(entityAttribute, entityAttribute.getAppendRelationshipAttributes(), context); + modelResponseParentEntity.getReplicaEntity().setAppendRelationshipAttributes(new HashMap<>(appendRelationshipAttributes)); + context.removeUpdatedWithRelationshipAttributes(entityAttribute); + } + + if (entityAttribute.getRemoveRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForAttribute(entityAttribute, entityAttribute.getRemoveRelationshipAttributes(), context); + modelResponseParentEntity.getReplicaEntity().setRemoveRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithDeleteRelationshipAttributes(entityAttribute); + } + } + +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityAssociationPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityAssociationPreProcessor.java new file mode 100644 index 0000000000..1e549b4090 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityAssociationPreProcessor.java @@ -0,0 +1,103 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +public class DMEntityAssociationPreProcessor extends AbstractModelPreProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(DMAttributePreprocessor.class); + + public DMEntityAssociationPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + super(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore); + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + + if (LOG.isDebugEnabled()) { + LOG.debug("ModelPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + switch (operation) { + case CREATE: + createDMEntityAssociation(entity, vertex, context); + break; + case UPDATE: + updateDMEntityAssociation(entity, vertex, context); + } + } + + private void createDMEntityAssociation(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE)) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + entity.setRelationshipAttributes( + processRelationshipAttributesForEntity(entity, entity.getRelationshipAttributes(), context)); + } + private void updateDMEntityAssociation(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + + + long now = RequestContext.get().getRequestTime(); + + ModelResponse modelResponse = replicateDMAssociation(entity, vertex, now); + AtlasEntity copyEntity = modelResponse.getReplicaEntity(); + AtlasVertex copyVertex = modelResponse.getReplicaVertex(); + applyDiffs(entity, copyEntity, ATLAS_DM_ENTITY_ASSOCIATION_TYPE); + unsetExpiredDates(copyEntity, copyVertex); + + + // case when a mapping is added + if (entity.getAppendRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForEntity(entity, entity.getAppendRelationshipAttributes(), context); + modelResponse.getReplicaEntity().setAppendRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithRelationshipAttributes(entity); + context.setUpdatedWithRelationshipAttributes(modelResponse.getReplicaEntity()); + } + + if (entity.getRemoveRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForEntity(entity, entity.getRemoveRelationshipAttributes(), context); + modelResponse.getReplicaEntity().setRemoveRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithDeleteRelationshipAttributes(entity); + context.setUpdatedWithRemoveRelationshipAttributes(modelResponse.getReplicaEntity()); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityPreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityPreProcessor.java new file mode 100644 index 0000000000..3c7149dfbd --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/DMEntityPreProcessor.java @@ -0,0 +1,157 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasRelatedObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasRelationshipStore; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.isNameInvalid; + +@Component +public class DMEntityPreProcessor extends AbstractModelPreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DMEntityPreProcessor.class); + + + public DMEntityPreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, EntityGraphMapper entityGraphMapper, AtlasRelationshipStore atlasRelationshipStore) { + super(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore); + } + + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + //Handle name & qualifiedName + if (LOG.isDebugEnabled()) { + LOG.debug("ModelPreProcessor.processAttributes: pre processing {}, {}", + entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + + + switch (operation) { + case CREATE: + createDMEntity(entity, vertex, context); + break; + case UPDATE: + updateDMEntities(entity, vertex, context); + break; + } + } + + private void createDMEntity(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + if (!entity.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)) { + return; + } + if (CollectionUtils.isEmpty(context.getCreatedEntities())) { + return; + } + + String entityName = (String) entity.getAttribute(NAME); + + if (StringUtils.isEmpty(entityName) || isNameInvalid(entityName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_DISPLAY_NAME); + } + long now = RequestContext.get().getRequestTime(); + + // get model qualifiedName with qualifiedNamePrefix + String qualifiedNamePrefix = (String) entity.getAttributes().get(ATLAS_DM_QUALIFIED_NAME_PREFIX); + int lastIndex = qualifiedNamePrefix.lastIndexOf("/"); + String modelQualifiedName = qualifiedNamePrefix.substring(0, lastIndex); + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + + AtlasVertex modelVertex = AtlasGraphUtilsV2.findByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), attrValues); + + if (modelVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_MODEL_NOT_EXIST); + } + + String modelGuid = AtlasGraphUtilsV2.getIdFromVertex(modelVertex); + + ModelResponse modelVersionResponse = replicateModelVersion(modelGuid, modelQualifiedName, now); + + // create modelVersion + if (modelVersionResponse.getReplicaEntity() == null) { + String namespace = (String) entity.getAttributes().get(ATLAS_DM_NAMESPACE); + modelVersionResponse = createEntity( + (modelQualifiedName + "/" + "v1"), + "v1", + ATLAS_DM_VERSION_TYPE, + namespace, + context); + } + + AtlasEntity latestModelVersionEntity = modelVersionResponse.getReplicaEntity(); + AtlasVertex latestModelVersionVertex = modelVersionResponse.getReplicaVertex(); + + + // model --- modelVersion relation + createModelModelVersionRelation(modelGuid, latestModelVersionEntity.getGuid()); + + // modelVersion --- entity relation + createModelVersionModelEntityRelationship(latestModelVersionVertex, vertex); + + if (modelVersionResponse.getExistingEntity() != null) { + List existingEntities = (List) modelVersionResponse.getExistingEntity() + .getRelationshipAttributes() + .get("dMEntities"); + // modelVersion --- entitiesOfExistingModelVersion + createModelVersionModelEntityRelationship(latestModelVersionVertex, existingEntities); + + } + context.addCreated(latestModelVersionEntity.getGuid(), latestModelVersionEntity, + typeRegistry.getEntityTypeByName(ATLAS_DM_VERSION_TYPE), latestModelVersionVertex); + // resolve references + context.getDiscoveryContext(). + addResolvedGuid( + modelGuid, + entityRetriever.getEntityVertex(modelGuid)); + + entity.setRelationshipAttributes( + processRelationshipAttributesForEntity(entity, entity.getRelationshipAttributes(), context)); + } + + private void updateDMEntities(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context) throws AtlasBaseException { + ModelResponse modelResponseParentEntity = updateDMEntity(entity, vertex, context); + + // case when a mapping is added + if (entity.getAppendRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForEntity(entity, entity.getAppendRelationshipAttributes(), context); + modelResponseParentEntity.getReplicaEntity().setAppendRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithRelationshipAttributes(entity); + context.setUpdatedWithRelationshipAttributes(modelResponseParentEntity.getReplicaEntity()); + } + + if (entity.getRemoveRelationshipAttributes() != null) { + Map appendRelationshipAttributes = processRelationshipAttributesForEntity(entity, entity.getRemoveRelationshipAttributes(), context); + modelResponseParentEntity.getReplicaEntity().setRemoveRelationshipAttributes(appendRelationshipAttributes); + context.removeUpdatedWithDeleteRelationshipAttributes(entity); + context.setUpdatedWithRemoveRelationshipAttributes(modelResponseParentEntity.getReplicaEntity()); + } + } +} + + + diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/ModelResponse.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/ModelResponse.java new file mode 100644 index 0000000000..d6fd46afc3 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/model/ModelResponse.java @@ -0,0 +1,41 @@ +package org.apache.atlas.repository.store.graph.v2.preprocessor.model; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.graphdb.AtlasVertex; + +public class ModelResponse { + + private AtlasEntity existingEntity; + private AtlasEntity replicaEntity; + private AtlasVertex existingVertex; + private AtlasVertex replicaVertex; + + public ModelResponse(AtlasEntity existingEntity, AtlasEntity replicaEntity, + AtlasVertex existingVertex, AtlasVertex replicaVertex) { + this.existingEntity = existingEntity; + this.replicaEntity = replicaEntity; + this.existingVertex = existingVertex; + this.replicaVertex = replicaVertex; + } + + public ModelResponse(AtlasEntity replicaEntity, AtlasVertex replicaVertex) { + this.replicaEntity = replicaEntity; + this.replicaVertex = replicaVertex; + } + + public AtlasEntity getExistingEntity() { + return existingEntity; + } + + public AtlasEntity getReplicaEntity() { + return replicaEntity; + } + + public AtlasVertex getExistingVertex() { + return existingVertex; + } + + public AtlasVertex getReplicaVertex() { + return replicaVertex; + } +} diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/ModelREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/ModelREST.java index dd83cfc77f..206b552ec6 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/ModelREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/ModelREST.java @@ -1,9 +1,5 @@ package org.apache.atlas.web.rest; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.RequestContext; @@ -12,10 +8,23 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.IndexSearchParams; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.AbstractModelPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityPreProcessor; +import org.apache.atlas.repository.store.graph.v2.preprocessor.model.ModelResponse; import org.apache.atlas.searchlog.SearchLoggingManagement; +import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.web.util.ModelUtil; import org.apache.atlas.web.util.Servlets; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -29,10 +38,9 @@ import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import java.util.Arrays; -import java.util.Base64; -import java.util.HashSet; -import java.util.Set; +import java.util.*; + +import static org.apache.atlas.repository.Constants.*; @Path("model") @Singleton @@ -41,43 +49,41 @@ @Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON}) public class ModelREST { - private static final String BUSINESS_DATE = "dMDataModelBusinessDate"; - private static final String EXPIRED_BUSINESS_DATE = "dMDataModelExpiredAtBusinessDate"; - private static final String LESSER_THAN_EQUAL_TO = "lte"; - private static final String SYSTEM_DATE = "dMDataModelSystemDate"; - private static final String EXPIRED_SYSTEM_DATE = "dMDataModelExpiredAtSystemDate"; - private static final String NAMESPACE = "dMDataModelNamespace"; - private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.DiscoveryREST"); + + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.ModelREST"); private static final Logger LOG = LoggerFactory.getLogger(DiscoveryREST.class); @Context private HttpServletRequest httpServletRequest; private final boolean enableSearchLogging; - private final AtlasTypeRegistry typeRegistry; + private final AbstractModelPreProcessor entityPreProcessor; private final AtlasDiscoveryService discoveryService; private final SearchLoggingManagement loggerManagement; + private final EntityGraphMapper entityGraphMapper; + private final EntityGraphRetriever graphRetriever; + + //private final Entity private static final String INDEXSEARCH_TAG_NAME = "indexsearch"; private static final Set TRACKING_UTM_TAGS = new HashSet<>(Arrays.asList("ui_main_list", "ui_popup_searchbar")); private static final String UTM_TAG_FROM_PRODUCT = "project_webapp"; @Inject - public ModelREST(AtlasTypeRegistry typeRegistry, AtlasDiscoveryService discoveryService, - SearchLoggingManagement loggerManagement) { + public ModelREST(AtlasTypeRegistry typeRegistry, AtlasDiscoveryService discoveryService, SearchLoggingManagement loggerManagement, EntityGraphMapper entityGraphMapper, EntityGraphRetriever graphRetriever, DMEntityPreProcessor entityPreProcessor) { this.typeRegistry = typeRegistry; this.discoveryService = discoveryService; this.loggerManagement = loggerManagement; + this.entityGraphMapper = entityGraphMapper; + this.graphRetriever = graphRetriever; + this.entityPreProcessor = entityPreProcessor; this.enableSearchLogging = AtlasConfiguration.ENABLE_SEARCH_LOGGER.getBoolean(); } @Path("/search") @POST @Timed - public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, - @QueryParam("businessDate") String businessDate, - @QueryParam("systemDate") String systemDate, - @Context HttpServletRequest servletRequest, IndexSearchParams parameters) throws AtlasBaseException { + public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, @QueryParam("businessDate") String businessDate, @QueryParam("systemDate") String systemDate, @Context HttpServletRequest servletRequest, IndexSearchParams parameters) throws AtlasBaseException { Servlets.validateQueryParamLength("namespace", namespace); Servlets.validateQueryParamLength("businessDate", businessDate); @@ -91,10 +97,7 @@ public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, parameters = parameters == null ? new IndexSearchParams() : parameters; - String queryStringUsingFiltersAndUserDSL = createQueryStringUsingFiltersAndUserDSL(namespace, - businessDate, - systemDate, - parameters.getQuery()); + String queryStringUsingFiltersAndUserDSL = ModelUtil.createQueryStringUsingFiltersAndUserDSL(namespace, businessDate, systemDate, parameters.getQuery()); if (StringUtils.isEmpty(queryStringUsingFiltersAndUserDSL)) { AtlasBaseException abe = new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid model search query"); @@ -111,19 +114,15 @@ public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, if (result == null) { return null; } - - return result; } catch (AtlasBaseException abe) { - if (enableSearchLogging && parameters.isSaveSearchLog() - ) { + if (enableSearchLogging && parameters.isSaveSearchLog()) { // logSearchLog(parameters, servletRequest, abe, System.currentTimeMillis() - startTime); } throw abe; } catch (Exception e) { AtlasBaseException abe = new AtlasBaseException(e.getMessage(), e.getCause()); - if (enableSearchLogging && parameters.isSaveSearchLog() - ) { + if (enableSearchLogging && parameters.isSaveSearchLog()) { //logSearchLog(parameters, servletRequest, abe, System.currentTimeMillis() - startTime); } throw abe; @@ -149,148 +148,184 @@ public AtlasSearchResult dataSearch(@QueryParam("namespace") String namespace, } } - /*** - * combines user query/dsl along with business parameters - * - * creates query as following : - * {"query":{"bool":{"must":[{"bool":{"filter":[{"match":{"namespace":"{namespace}"}},{"bool":{"must":[{"range":{"businessDate":{"lte":"businessDate"}}},{"bool":{"should":[{"range":{"expiredAtBusinessDate":{"gt":"{businessDate}"}}},{"bool":{"must_not":[{"exists":{"field":"expiredAtBusiness"}}]}}],"minimum_should_match":1}}]}}]}},{"wrapper":{"query":"user query"}}]}}} - * @param namespace - * @param businessDate - * @param dslString - * @return - */ - private String createQueryStringUsingFiltersAndUserDSL(final String namespace, - final String businessDate, - final String systemDate, - final String dslString) { + @DELETE + @Path("/entity") + @Timed + public EntityMutationResponse deleteByQualifiedNamePrefix(@QueryParam("qualifiedNamePrefix") String qualifiedNamePrefix, + @QueryParam("entityType") String entityType, + @Context HttpServletRequest servletRequest) throws AtlasBaseException { + + Servlets.validateQueryParamLength("qualifiedNamePrefix", qualifiedNamePrefix); + Servlets.validateQueryParamLength("entityType", entityType); + AtlasPerfTracer perf = null; + + EntityGraphDiscoveryContext graphDiscoveryContext = new EntityGraphDiscoveryContext(typeRegistry, null); + EntityMutationContext entityMutationContext = new EntityMutationContext(graphDiscoveryContext); + try { - AtlasPerfMetrics.MetricRecorder addBusinessFiltersToSearchQueryMetric = RequestContext.get().startMetricRecord("createQueryStringUsingFiltersAndUserDSL"); - // Create an ObjectMapper instance - ObjectMapper objectMapper = new ObjectMapper(); - - // Create the root 'query' node - ObjectNode rootNode = objectMapper.createObjectNode(); - ObjectNode queryNode = objectMapper.createObjectNode(); - ObjectNode boolNode = objectMapper.createObjectNode(); - ArrayNode mustArray = objectMapper.createArrayNode(); - - // Create the first 'bool' object inside 'must' - ObjectNode firstBoolNode = objectMapper.createObjectNode(); - ObjectNode filterBoolNode = objectMapper.createObjectNode(); - ArrayNode filterArray = objectMapper.createArrayNode(); - - // Create 'match' object - ObjectNode matchNode = objectMapper.createObjectNode(); - matchNode.put(NAMESPACE.concat(".keyword"), namespace); - - // Add 'match' object to filter - ObjectNode matchWrapper = objectMapper.createObjectNode(); - matchWrapper.set("term", matchNode); - filterArray.add(matchWrapper); - - // add 'businessDateValidation' - ObjectNode businessDateWrapper = dateValidation(businessDate, true, objectMapper); - filterArray.add(businessDateWrapper); - - // add 'systemDateValidation' - if (!StringUtils.isEmpty(systemDate)) { - ObjectNode systemDateWrapper = dateValidation(systemDate, false, objectMapper); - filterArray.add(systemDateWrapper); + if (StringUtils.isEmpty(qualifiedNamePrefix) || StringUtils.isEmpty(entityType)) { + throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST); } - // Add filter to firstBool - filterBoolNode.set("filter", filterArray); - firstBoolNode.set("bool", filterBoolNode); - - // Add firstBool to must array - mustArray.add(firstBoolNode); - - // process user query - if (!StringUtils.isEmpty(dslString)) { - JsonNode node = new ObjectMapper().readTree(dslString); - JsonNode userQueryNode = node.get("query"); - ObjectNode wrapperNode = objectMapper.createObjectNode(); - String userQueryString = userQueryNode.toString(); - String userQueryBase64 = Base64.getEncoder().encodeToString(userQueryString.getBytes()); - wrapperNode.put("query", userQueryBase64); - // Add wrapper to must array - ObjectNode wrapperWrapper = objectMapper.createObjectNode(); - wrapperWrapper.set("wrapper", wrapperNode); - mustArray.add(wrapperWrapper); + // check with chris how the label look like + // accordingly capitalize letters + AtlasEntityType atlasEntityType = typeRegistry.getEntityTypeByName(entityType); + + if (atlasEntityType == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_TYPE); } + AtlasVertex latestEntityVertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(entityType, qualifiedNamePrefix); + + if (latestEntityVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.NO_TYPE_EXISTS_FOR_QUALIFIED_NAME_PREFIX, qualifiedNamePrefix); + } - // Add must array to bool node - boolNode.set("must", mustArray); + String modelQualifiedName; + + if (entityType.equals(ATLAS_DM_ENTITY_TYPE)) { + int lastIndex = qualifiedNamePrefix.lastIndexOf("/"); + modelQualifiedName = qualifiedNamePrefix.substring(0, lastIndex); + String entityGuid = AtlasGraphUtilsV2.getIdFromVertex(latestEntityVertex); + replicateModelVersionAndExcludeEntity(modelQualifiedName, entityGuid, entityMutationContext); + + } else if (entityType.equals(ATLAS_DM_ATTRIBUTE_TYPE)) { + int lastIndex = qualifiedNamePrefix.lastIndexOf("/"); + String entityQualifiedNamePrefix = qualifiedNamePrefix.substring(0, lastIndex); + String attributeGuid = AtlasGraphUtilsV2.getIdFromVertex(latestEntityVertex); + replicateModelVersionAndEntityAndExcludeAttribute(entityQualifiedNamePrefix, + attributeGuid, "", entityMutationContext); + } else { + throw new AtlasBaseException(AtlasErrorCode.INVALID_ENTITY_TYPE); + } + return entityGraphMapper.mapAttributesAndClassifications(entityMutationContext, + false, false, false, false); + } finally { + AtlasPerfTracer.log(perf); + } + } - // Add bool to query - queryNode.set("bool", boolNode); + private void replicateModelVersionAndEntityAndExcludeAttribute(final String entityQualifiedNamePrefix, String deleteAttributeGuid, String deleteEntityGuid, EntityMutationContext entityMutationContext) throws AtlasBaseException { + int lastIndex = entityQualifiedNamePrefix.lastIndexOf("/"); + String modelQualifiedName = entityQualifiedNamePrefix.substring(0, lastIndex); - rootNode.set("query", queryNode); + // get entity + // replicate entity + AtlasVertex latestEntityVertex = AtlasGraphUtilsV2.findLatestEntityAttributeVerticesByType(ATLAS_DM_ENTITY_TYPE, entityQualifiedNamePrefix); + AtlasEntity latestEntity = graphRetriever.toAtlasEntity(latestEntityVertex); - // Print the JSON representation of the query - return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode); - } catch (Exception e) { - LOG.error("Error -> createQueryStringUsingFiltersAndUserDSL!", e); + if (latestEntityVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_ENTITY_NOT_EXIST); } - return ""; + + long now = RequestContext.get().getRequestTime(); + + ModelResponse modelResponse = entityPreProcessor.replicateModelEntity(latestEntity, + latestEntityVertex, entityQualifiedNamePrefix, now); + + AtlasEntity replicaEntity = modelResponse.getReplicaEntity(); + AtlasVertex replicaVertex = modelResponse.getReplicaVertex(); + + // exclude attribute from entity + Map relationshipAttributes = excludeEntityFromRelationshipAttribute(deleteAttributeGuid, + replicaEntity.getRelationshipAttributes()); + replicaEntity.setRelationshipAttributes(relationshipAttributes); + + + //replicate modelVersion + ModelResponse modelVersionResponse = replicateModelVersionAndExcludeEntity( + modelQualifiedName, "", entityMutationContext); + + // create entity-modelVersion relationship + entityPreProcessor.createModelVersionModelEntityRelationship( + modelVersionResponse.getReplicaVertex(), + replicaVertex); + + entityMutationContext.addCreated(replicaEntity.getGuid(), + replicaEntity, + typeRegistry.getEntityTypeByName(ATLAS_DM_ENTITY_TYPE), + replicaVertex); + } - private ObjectNode dateValidation(final String date, final boolean isBusinessDate, ObjectMapper objectMapper) { + private ModelResponse replicateModelVersionAndExcludeEntity(final String modelQualifiedName, String deleteEntityGuid, EntityMutationContext entityMutationContext) throws AtlasBaseException { + Map attrValues = new HashMap<>(); + attrValues.put(QUALIFIED_NAME, modelQualifiedName); + + AtlasVertex modelVertex = AtlasGraphUtilsV2.findByUniqueAttributes( + typeRegistry.getEntityTypeByName(ATLAS_DM_DATA_MODEL), attrValues); + + if (modelVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.DATA_MODEL_NOT_EXIST); + } + + String modelGuid = AtlasGraphUtilsV2.getIdFromVertex(modelVertex); - String condition = LESSER_THAN_EQUAL_TO, dateType = BUSINESS_DATE, expiredDateType = EXPIRED_BUSINESS_DATE; + long now = RequestContext.get().getRequestTime(); - if (!isBusinessDate) { - dateType = SYSTEM_DATE; - expiredDateType = EXPIRED_SYSTEM_DATE; + ModelResponse modelResponse = entityPreProcessor.replicateModelVersion(modelGuid, modelQualifiedName, now); + AtlasEntity replicaModelVersionEntity = modelResponse.getReplicaEntity(); + AtlasVertex replicaModelVersionVertex = modelResponse.getReplicaVertex(); + String modelVersionGuid = replicaModelVersionEntity.getGuid(); + + Map relationshipAttributes = excludeEntityFromRelationshipAttribute(deleteEntityGuid, + replicaModelVersionEntity.getRelationshipAttributes()); + replicaModelVersionEntity.setRelationshipAttributes(relationshipAttributes); + entityPreProcessor.createModelModelVersionRelation(modelGuid, modelVersionGuid); + + entityMutationContext.addCreated(modelVersionGuid, replicaModelVersionEntity, + typeRegistry.getEntityTypeByName(ATLAS_DM_VERSION_TYPE), replicaModelVersionVertex); + + entityMutationContext.getDiscoveryContext().addResolvedGuid(modelGuid, modelVertex); + return modelResponse; + } + + private Map excludeEntityFromRelationshipAttribute(String entityGuid, Map relationshipAttributes) throws AtlasBaseException { + if (StringUtils.isEmpty(entityGuid)) { + return relationshipAttributes; + } + Map appendAttributesDestination = new HashMap<>(); + if (relationshipAttributes != null) { + Map appendAttributesSource = (Map) relationshipAttributes; + + String guid = ""; + + for (String attribute : appendAttributesSource.keySet()) { + + if (appendAttributesSource.get(attribute) instanceof List) { + + List> destList = new ArrayList<>(); + Map destMap = null; + + List> attributeList = (List>) appendAttributesSource.get(attribute); + + for (Map relationAttribute : attributeList) { + guid = (String) relationAttribute.get("guid"); + + if (guid.equals(entityGuid)) { + continue; + } + + destMap = new HashMap<>(relationAttribute); + destList.add(destMap); + } + appendAttributesDestination.put(attribute, destList); + } else { + if (appendAttributesSource.get(attribute) instanceof Map) { + LinkedHashMap attributeList = (LinkedHashMap) appendAttributesSource.get(attribute); + guid = (String) attributeList.get("guid"); + + // update end2 + if (guid.equals(entityGuid)) { + continue; + } + + Map destMap = new HashMap<>(attributeList); + appendAttributesDestination.put(attribute, destMap); + } + } + } } - // Create the nested 'bool' object inside filter - ObjectNode nestedBoolNode = objectMapper.createObjectNode(); - ArrayNode nestedMustArray = objectMapper.createArrayNode(); - ObjectNode rangeBusinessDateNode = objectMapper.createObjectNode(); - rangeBusinessDateNode.put(condition, date); - - // Add 'range' object to nestedMust - ObjectNode rangeBusinessDateWrapper = objectMapper.createObjectNode(); - rangeBusinessDateWrapper.set("range", objectMapper.createObjectNode().set(dateType, rangeBusinessDateNode)); - nestedMustArray.add(rangeBusinessDateWrapper); - - - // Create 'bool' object for 'should' - ObjectNode shouldBoolNodeWrapper = objectMapper.createObjectNode(); - ObjectNode shouldBoolNode = objectMapper.createObjectNode(); - ArrayNode shouldArray = objectMapper.createArrayNode(); - - // Create 'range' object for 'expiredAtBusinessDate' - ObjectNode rangeExpiredAtNode = objectMapper.createObjectNode(); - rangeExpiredAtNode.put("gt", date); - - // Add 'range' object to should array - ObjectNode rangeExpiredAtWrapper = objectMapper.createObjectNode(); - rangeExpiredAtWrapper.set("range", objectMapper.createObjectNode().set(expiredDateType, rangeExpiredAtNode)); - shouldArray.add(rangeExpiredAtWrapper); - - // add 'term' object to should array - ObjectNode termNode = objectMapper.createObjectNode(); - termNode.put(expiredDateType, 0); - ObjectNode termNodeWrapper = objectMapper.createObjectNode(); - termNodeWrapper.set("term", termNode); - shouldArray.add(termNodeWrapper); - - // Add 'should' to should array - shouldBoolNode.set("should", shouldArray); - shouldBoolNode.put("minimum_should_match", 1); - shouldBoolNodeWrapper.set("bool", shouldBoolNode); - - // Add shouldBoolNodeWrapper to nestedMust - nestedMustArray.add(shouldBoolNodeWrapper); - - // Add nestedMust to nestedBool - nestedBoolNode.set("must", nestedMustArray); - - // Add nestedBool to filter - ObjectNode nestedBoolWrapper = objectMapper.createObjectNode(); - nestedBoolWrapper.set("bool", nestedBoolNode); - return nestedBoolWrapper; + return appendAttributesDestination; } } \ No newline at end of file diff --git a/webapp/src/main/java/org/apache/atlas/web/util/ModelUtil.java b/webapp/src/main/java/org/apache/atlas/web/util/ModelUtil.java new file mode 100644 index 0000000000..d8e95d1b40 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/util/ModelUtil.java @@ -0,0 +1,173 @@ +package org.apache.atlas.web.util; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.atlas.RequestContext; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.web.rest.DiscoveryREST; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Base64; + +public class ModelUtil { + + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.ModelUtil"); + private static final Logger LOG = LoggerFactory.getLogger(DiscoveryREST.class); + + private static final String BUSINESS_DATE = "dMDataModelBusinessDate"; + private static final String EXPIRED_BUSINESS_DATE = "dMDataModelExpiredAtBusinessDate"; + private static final String LESSER_THAN_EQUAL_TO = "lte"; + private static final String SYSTEM_DATE = "dMDataModelSystemDate"; + private static final String EXPIRED_SYSTEM_DATE = "dMDataModelExpiredAtSystemDate"; + private static final String NAMESPACE = "dMDataModelNamespace"; + + /*** + * combines user query/dsl along with business parameters + * + * creates query as following : + * {"query":{"bool":{"must":[{"bool":{"filter":[{"match":{"namespace":"{namespace}"}},{"bool":{"must":[{"range":{"businessDate":{"lte":"businessDate"}}},{"bool":{"should":[{"range":{"expiredAtBusinessDate":{"gt":"{businessDate}"}}},{"bool":{"must_not":[{"exists":{"field":"expiredAtBusiness"}}]}}],"minimum_should_match":1}}]}}]}},{"wrapper":{"query":"user query"}}]}}} + * @param namespace + * @param businessDate + * @param dslString + * @return + */ + public static String createQueryStringUsingFiltersAndUserDSL(final String namespace, + final String businessDate, + final String systemDate, + final String dslString) { + try { + AtlasPerfMetrics.MetricRecorder addBusinessFiltersToSearchQueryMetric = RequestContext.get().startMetricRecord("createQueryStringUsingFiltersAndUserDSL"); + // Create an ObjectMapper instance + ObjectMapper objectMapper = new ObjectMapper(); + + // Create the root 'query' node + ObjectNode rootNode = objectMapper.createObjectNode(); + ObjectNode queryNode = objectMapper.createObjectNode(); + ObjectNode boolNode = objectMapper.createObjectNode(); + ArrayNode mustArray = objectMapper.createArrayNode(); + + // Create the first 'bool' object inside 'must' + ObjectNode firstBoolNode = objectMapper.createObjectNode(); + ObjectNode filterBoolNode = objectMapper.createObjectNode(); + ArrayNode filterArray = objectMapper.createArrayNode(); + + // Create 'match' object + ObjectNode matchNode = objectMapper.createObjectNode(); + matchNode.put(NAMESPACE.concat(".keyword"), namespace); + + // Add 'match' object to filter + ObjectNode matchWrapper = objectMapper.createObjectNode(); + matchWrapper.set("term", matchNode); + filterArray.add(matchWrapper); + + // add 'businessDateValidation' + ObjectNode businessDateWrapper = dateValidation(businessDate, true, objectMapper); + filterArray.add(businessDateWrapper); + + // add 'systemDateValidation' + if (!StringUtils.isEmpty(systemDate)) { + ObjectNode systemDateWrapper = dateValidation(systemDate, false, objectMapper); + filterArray.add(systemDateWrapper); + } + + // Add filter to firstBool + filterBoolNode.set("filter", filterArray); + firstBoolNode.set("bool", filterBoolNode); + + // Add firstBool to must array + mustArray.add(firstBoolNode); + + // process user query + if (!StringUtils.isEmpty(dslString)) { + JsonNode node = new ObjectMapper().readTree(dslString); + JsonNode userQueryNode = node.get("query"); + ObjectNode wrapperNode = objectMapper.createObjectNode(); + String userQueryString = userQueryNode.toString(); + String userQueryBase64 = Base64.getEncoder().encodeToString(userQueryString.getBytes()); + wrapperNode.put("query", userQueryBase64); + // Add wrapper to must array + ObjectNode wrapperWrapper = objectMapper.createObjectNode(); + wrapperWrapper.set("wrapper", wrapperNode); + mustArray.add(wrapperWrapper); + } + + + // Add must array to bool node + boolNode.set("must", mustArray); + + // Add bool to query + queryNode.set("bool", boolNode); + + rootNode.set("query", queryNode); + + // Print the JSON representation of the query + return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode); + } catch (Exception e) { + LOG.error("Error -> createQueryStringUsingFiltersAndUserDSL!", e); + } + return ""; + } + + private static ObjectNode dateValidation(final String date, final boolean isBusinessDate, ObjectMapper objectMapper) { + + String condition = LESSER_THAN_EQUAL_TO, dateType = BUSINESS_DATE, expiredDateType = EXPIRED_BUSINESS_DATE; + + if (!isBusinessDate) { + dateType = SYSTEM_DATE; + expiredDateType = EXPIRED_SYSTEM_DATE; + } + // Create the nested 'bool' object inside filter + ObjectNode nestedBoolNode = objectMapper.createObjectNode(); + ArrayNode nestedMustArray = objectMapper.createArrayNode(); + ObjectNode rangeBusinessDateNode = objectMapper.createObjectNode(); + rangeBusinessDateNode.put(condition, date); + + // Add 'range' object to nestedMust + ObjectNode rangeBusinessDateWrapper = objectMapper.createObjectNode(); + rangeBusinessDateWrapper.set("range", objectMapper.createObjectNode().set(dateType, rangeBusinessDateNode)); + nestedMustArray.add(rangeBusinessDateWrapper); + + + // Create 'bool' object for 'should' + ObjectNode shouldBoolNodeWrapper = objectMapper.createObjectNode(); + ObjectNode shouldBoolNode = objectMapper.createObjectNode(); + ArrayNode shouldArray = objectMapper.createArrayNode(); + + // Create 'range' object for 'expiredAtBusinessDate' + ObjectNode rangeExpiredAtNode = objectMapper.createObjectNode(); + rangeExpiredAtNode.put("gt", date); + + // Add 'range' object to should array + ObjectNode rangeExpiredAtWrapper = objectMapper.createObjectNode(); + rangeExpiredAtWrapper.set("range", objectMapper.createObjectNode().set(expiredDateType, rangeExpiredAtNode)); + shouldArray.add(rangeExpiredAtWrapper); + + // add 'term' object to should array + ObjectNode termNode = objectMapper.createObjectNode(); + termNode.put(expiredDateType, 0); + ObjectNode termNodeWrapper = objectMapper.createObjectNode(); + termNodeWrapper.set("term", termNode); + shouldArray.add(termNodeWrapper); + + // Add 'should' to should array + shouldBoolNode.set("should", shouldArray); + shouldBoolNode.put("minimum_should_match", 1); + shouldBoolNodeWrapper.set("bool", shouldBoolNode); + + // Add shouldBoolNodeWrapper to nestedMust + nestedMustArray.add(shouldBoolNodeWrapper); + + // Add nestedMust to nestedBool + nestedBoolNode.set("must", nestedMustArray); + + // Add nestedBool to filter + ObjectNode nestedBoolWrapper = objectMapper.createObjectNode(); + nestedBoolWrapper.set("bool", nestedBoolNode); + return nestedBoolWrapper; + } +}