Skip to content

Commit

Permalink
Merge pull request #3576 from atlanhq/dev/task/CJ-165
Browse files Browse the repository at this point in the history
CJ-165 | Add support for bulk in data models
  • Loading branch information
aarshi0301 authored Oct 4, 2024
2 parents a8375c5 + 50c2669 commit f1630f2
Show file tree
Hide file tree
Showing 13 changed files with 526 additions and 296 deletions.
45 changes: 33 additions & 12 deletions common/src/main/java/org/apache/atlas/repository/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,44 @@ public final class Constants {
/***
* DataModel
*/
public static final String MODEL_ENTITY = "ModelEntity";
public static final String MODEL_ATTRIBUTE = "ModelAttribute";
public static final String MODEL_DATA_MODEL = "ModelDataModel";
public static final String MODEL_VERSION = "ModelVersion";
public static final String MODEL_ENTITY_ASSOCIATION = "ModelEntityAssociation";
public static final String MODEL_ATTRIBUTE_ASSOCIATION = "ModelAttributeAssociation";
public static final String MODEL_QUALIFIED_NAME_PATTERN = "modelVersionAgnosticQualifiedName";
public static final String MODEL_NAMESPACE = "modelNamespace";
public static final String MODEL_EXPIRED_AT_SYSTEM_DATE = "modelExpiredAtSystemDate";
public static final String MODEL_EXPIRED_AT_BUSINESS_DATE = "modelExpiredAtBusinessDate";
public static final String MODEL_SYSTEM_DATE = "modelSystemDate";
public static final String MODEL_BUSINESS_DATE = "modelBusinessDate";
public static final String MODEL_MODEL_VERSION_RELATION = "model_data_model_model_versions";
public static final String RELATED_DATA_MODEL = "modelDataModel";
public static final String RELATED_MODEL_VERSIONS = "modelVersions";
public static final String MODEL_VERSIONS_MODEL_VERSION_ENTITY_RELATION = "model_versions_model_version_entities";
public static final String MODEL_VERSION_ENTITIES = "modelVersionEntities";
public static final String MODEL_ENTITY_MAPPED_FROM_ENTITIES = "modelEntityMappedFromEntities";
public static final String MODEL_ENTITY_MAPPED_TO_ENTITIES = "modelEntityMappedToEntities";
public static final String MODEL_ENTITY_MODEL_ATTRIBUTES_RELATION = "model_attribute_entities_model_entity_attributes";
public static final String MODEL_ENTITY_ATTRIBUTES = "modelEntityAttributes";
public static final String MODEL_ATTRIBUTE_ENTITIES = "modelAttributeEntities";
public static final String MODEL_ATTRIBUTE_MAPPED_TO_ATTRIBUTES = "modelAttributeMappedToAttributes";
public static final String MODEL_ATTRIBUTE_MAPPED_FROM_ATTRIBUTES = "modelAttributeMappedFromAttributes";
public static final String MODEL_ENTITY_RELATED_FROM_ENTITIES = "modelEntityRelatedFromEntities";
public static final String MODEL_ENTITY_RELATED_TO_ENTITIES = "modelEntityRelatedToEntities";
public static final String MODEL_ATTRIBUTE_RELATED_FROM_ATTRIBUTES = "modelAttributeRelatedFromAttributes";
public static final String MODEL_ATTRIBUTE_RELATED_TO_ATTRIBUTES = "modelAttributeRelatedToAttributes";
public static String MODEL_ENTITY_ASSOCIATION_TO = "modelEntityAssociationTo";
public static String MODEL_ENTITY_ASSOCIATION_FROM = "modelEntityAssociationFrom";
public static String MODEL_ATTRIBUTE_ASSOCIATION_FROM = "modelAttributeAssociationFrom";
public static String MODEL_ATTRIBUTE_ASSOCIATION_TO = "modelAttributeAssociationTo";


public static final String ATLAS_DM_ENTITY_TYPE = "DMEntity";

public static final String ATLAS_DM_ATTRIBUTE_TYPE = "DMAttribute";
public static final String ATLAS_DM_DATA_MODEL = "DMDataModel";

public static final String ATLAS_DM_VERSION_TYPE = "DMVersion";

public static final String ATLAS_DM_ENTITY_ASSOCIATION_TYPE= "DMEntityAssociation";
public static final String ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE= "DMAttributeAssociation";

public static final String ATLAS_DM_QUALIFIED_NAME_PREFIX = "dMQualifiedNamePrefix";
public static final String ATLAS_DM_NAMESPACE = "dMDataModelNamespace";
public static final String ATLAS_DM_EXPIRED_AT_SYSTEM_DATE = "dMDataModelExpiredAtSystemDate";
public static final String ATLAS_DM_EXPIRED_AT_BUSINESS_DATE = "dMDataModelExpiredAtBusinessDate";
public static final String ATLAS_DM_SYSTEM_DATE = "dMDataModelSystemDate";
public static final String ATLAS_DM_BUSINESS_DATE = "dMDataModelBusinessDate";


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@
import org.apache.atlas.type.TemplateToken;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
Expand Down Expand Up @@ -116,10 +114,10 @@ public void validateAndNormalize(AtlasEntity entity) throws AtlasBaseException {
// by dMQualifiedNamePrefix which is not a unique attribute
// This can return multiple entity/attribute that match this prefix vale
// we have to return latest entity/attribute
if (entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE) ||
entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)){
if (entity.getTypeName().equals(Constants.MODEL_ENTITY) ||
entity.getTypeName().equals(Constants.MODEL_ATTRIBUTE)){

String qualifiedNamePrefix = (String) entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX);
String qualifiedNamePrefix = (String) entity.getAttributes().get(Constants.MODEL_QUALIFIED_NAME_PATTERN);
if (qualifiedNamePrefix.isEmpty()){
throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST);
}
Expand Down Expand Up @@ -521,10 +519,10 @@ private void processDynamicAttributes(AtlasEntity entity) throws AtlasBaseExcept
}

private void validateAttributesForDataModel(AtlasEntity entity) throws AtlasBaseException {
if (entity.getTypeName().equals(Constants.ATLAS_DM_ENTITY_TYPE) ||
entity.getTypeName().equals(Constants.ATLAS_DM_ATTRIBUTE_TYPE)) {
if (entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX) == null ||
entity.getAttributes().get(Constants.ATLAS_DM_QUALIFIED_NAME_PREFIX) == "") {
if (entity.getTypeName().equals(Constants.MODEL_ENTITY) ||
entity.getTypeName().equals(Constants.MODEL_ATTRIBUTE)) {
if (entity.getAttributes().get(Constants.MODEL_QUALIFIED_NAME_PATTERN) == null ||
entity.getAttributes().get(Constants.MODEL_QUALIFIED_NAME_PATTERN) == "") {
throw new AtlasBaseException(AtlasErrorCode.QUALIFIED_NAME_PREFIX_NOT_EXIST);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
Expand Down Expand Up @@ -72,9 +73,7 @@
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.CategoryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.GlossaryPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.glossary.TermPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMAttributePreprocessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityAssociationPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.model.DMEntityPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.model.*;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.LinkPreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.resource.ReadmePreProcessor;
import org.apache.atlas.repository.store.graph.v2.preprocessor.sql.QueryCollectionPreProcessor;
Expand Down Expand Up @@ -1645,6 +1644,11 @@ private void executePreProcessor(EntityMutationContext context) throws AtlasBase
AtlasEntityType entityType;
List<PreProcessor> preProcessors;

List<AtlasEntity> mergedChanges = new ArrayList<>();
mergedChanges.addAll(new ArrayList<>(context.getUpdatedEntities()));
mergedChanges.addAll(new ArrayList<>(context.getCreatedEntities()));
createExclusionSet(mergedChanges, context);

List<AtlasEntity> copyOfCreated = new ArrayList<>(context.getCreatedEntities());
for (AtlasEntity entity : copyOfCreated) {
entityType = context.getType(entity.getGuid());
Expand All @@ -1669,11 +1673,7 @@ private void executePreProcessor(EntityMutationContext context) throws AtlasBase
List<AtlasEntity> copyOfAppendRelationshipAttributes = new ArrayList<>(context.getUpdatedEntitiesForAppendRelationshipAttribute());
for (AtlasEntity entity : copyOfAppendRelationshipAttributes) {
entityType = context.getType(entity.getGuid());
if( entityType.getTypeName().equals(ATLAS_DM_ENTITY_TYPE) ||
entityType.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE) ||
entityType.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE) ||
entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE)
){
if (dataModelEntityTypes().contains(entityType)) {
preProcessors = getPreProcessor(entityType.getTypeName());
for (PreProcessor processor : preProcessors) {
processor.processAttributes(entity, context, UPDATE);
Expand All @@ -1684,11 +1684,7 @@ private void executePreProcessor(EntityMutationContext context) throws AtlasBase
List<AtlasEntity> copyOfRemoveRelationshipAttributes = new ArrayList<>(context.getEntitiesUpdatedWithRemoveRelationshipAttribute());
for (AtlasEntity entity : copyOfRemoveRelationshipAttributes) {
entityType = context.getType(entity.getGuid());
if( entityType.getTypeName().equals(ATLAS_DM_ENTITY_TYPE)
|| entityType.getTypeName().equals(ATLAS_DM_ATTRIBUTE_TYPE) ||
entityType.getTypeName().equals(ATLAS_DM_ENTITY_ASSOCIATION_TYPE) ||
entity.getTypeName().equals(ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE)
){
if( dataModelEntityTypes().contains(entityType)){
preProcessors = getPreProcessor(entityType.getTypeName());
for (PreProcessor processor : preProcessors) {
processor.processAttributes(entity, context, UPDATE);
Expand All @@ -1697,6 +1693,28 @@ private void executePreProcessor(EntityMutationContext context) throws AtlasBase
}
}

private void createExclusionSet(Collection<AtlasEntity> updatedEntities, EntityMutationContext entityMutationContext) {
for (AtlasEntity entity : updatedEntities) {
String qualifiedNamePrefix = (String) entity.getAttribute(MODEL_QUALIFIED_NAME_PATTERN);
if (entity.getTypeName().equals(MODEL_ENTITY)) {
entityMutationContext.updateModelEntitiesSet(qualifiedNamePrefix);
} else if (entity.getTypeName().equals(MODEL_ATTRIBUTE)) {
entityMutationContext.updateModelAttributesSet(qualifiedNamePrefix);
int lastIndex = qualifiedNamePrefix.lastIndexOf("/");
String entityQualifiedNamePrefix = qualifiedNamePrefix.substring(0, lastIndex);
entityMutationContext.updateModelEntitiesSet(entityQualifiedNamePrefix);
}
}
}

private Set dataModelEntityTypes() {
Set<String> dataModelEntityTypes = new HashSet<>();
dataModelEntityTypes.add(MODEL_ENTITY);
dataModelEntityTypes.add(MODEL_ATTRIBUTE);
dataModelEntityTypes.add(MODEL_ENTITY_ASSOCIATION);
dataModelEntityTypes.add(MODEL_ATTRIBUTE_ASSOCIATION);
return dataModelEntityTypes;
}
private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws AtlasBaseException {
MetricRecorder metric = RequestContext.get().startMetricRecord("preCreateOrUpdate");

Expand Down Expand Up @@ -1757,7 +1775,7 @@ private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, Entit
if (RequestContext.get().isImportInProgress() && AtlasTypeUtil.isAssignedGuid(entity.getGuid())) {
vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid());
} else {
vertex = entityGraphMapper.createVertex(entity);
vertex = entityGraphMapper.createVertex(entity);
}

discoveryContext.addResolvedGuid(guid, vertex);
Expand All @@ -1768,6 +1786,13 @@ private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, Entit

entity.setGuid(generatedGuid);

if (entity.getTypeName().equals(MODEL_DATA_MODEL)){
AtlasGraphUtilsV2.setProperty(vertex, QUALIFIED_NAME, entity.getAttribute(QUALIFIED_NAME));
context.cacheModel((String) entity.getAttribute(QUALIFIED_NAME), generatedGuid);
}else if (entity.getTypeName().equals(MODEL_ENTITY)){
context.cacheModelEntity((String) entity.getAttribute(MODEL_QUALIFIED_NAME_PATTERN), new ModelResponse(entity, vertex));
}

requestContext.recordEntityGuidUpdate(entity, guid);

context.addCreated(guid, entity, entityType, vertex);
Expand Down Expand Up @@ -2039,17 +2064,17 @@ public List<PreProcessor> getPreProcessor(String typeName) {

case PROCESS_ENTITY_TYPE:
preProcessors.add(new LineagePreProcessor(typeRegistry, entityRetriever, graph, this));
case ATLAS_DM_ENTITY_TYPE:
case MODEL_ENTITY:
preProcessors.add(new DMEntityPreProcessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore));
break;
case ATLAS_DM_ATTRIBUTE_TYPE:
case MODEL_ATTRIBUTE:
preProcessors.add(new DMAttributePreprocessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore));
break;
case ATLAS_DM_ENTITY_ASSOCIATION_TYPE:
case MODEL_ENTITY_ASSOCIATION:
preProcessors.add(new DMEntityAssociationPreProcessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore));
break;
case ATLAS_DM_ATTRIBUTE_ASSOCIATION_TYPE:
preProcessors.add(new DMAttributePreprocessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore));
case MODEL_ATTRIBUTE_ASSOCIATION:
preProcessors.add(new DMAttributeAssociationPreprocessor(typeRegistry, entityRetriever, entityGraphMapper, atlasRelationshipStore));
}

// The default global pre-processor for all AssetTypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,9 @@ public static AtlasVertex findLatestEntityAttributeVerticesByType(String typenam
AtlasGraph graph= getGraphInstance();
AtlasGraphQuery query = graph.query()
.has(ENTITY_TYPE_PROPERTY_KEY, typename)
.has(ATLAS_DM_QUALIFIED_NAME_PREFIX, dMQualifiedNamePrefix)
.has(ATLAS_DM_EXPIRED_AT_SYSTEM_DATE, 0)
.has(ATLAS_DM_EXPIRED_AT_BUSINESS_DATE, 0);
.has(MODEL_QUALIFIED_NAME_PATTERN, dMQualifiedNamePrefix)
.has(MODEL_EXPIRED_AT_SYSTEM_DATE, 0)
.has(MODEL_EXPIRED_AT_BUSINESS_DATE, 0);

Iterator<AtlasVertex> results = query.vertices().iterator();
AtlasVertex vertex = results.hasNext() ? results.next() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,18 @@ private AtlasEdge mapObjectIdValueUsingRelationship(AttributeMutationContext ctx
return null;
}

throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, (ctx.getValue() == null ? null : ctx.getValue().toString()));
AtlasEntity dataModelEntity = entityRetriever.toAtlasEntity(guid);
String entityType = dataModelEntity.getTypeName();
if (entityType.equals(MODEL_ENTITY) || entityType.equals(MODEL_ATTRIBUTE)) {
attributeVertex = entityRetriever.getEntityVertex(guid);

if (attributeVertex == null) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, (ctx.getValue() == null ? null : ctx.getValue().toString()));
}

} else {
throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, (ctx.getValue() == null ? null : ctx.getValue().toString()));
}
}

AtlasType type = typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex));
Expand Down
Loading

0 comments on commit f1630f2

Please sign in to comment.