Skip to content

Commit

Permalink
Added ‘assetsCountToPropagate’ and ‘assetsCountPropagated’ to the tas…
Browse files Browse the repository at this point in the history
…k vertex. The former will be updated with the total count of propagations to be done once the planning phase is complete and the task begins execution. The latter will be updated as the task progresses, reflecting the count of completed propagations at any given point.
  • Loading branch information
jnkrmg authored and abhijeet-atlan committed Jan 28, 2025
1 parent 400f814 commit 398085d
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 27 deletions.
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ on:
- master
- lineageondemand
- makerlogic
- tagpropv1master

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ public final class Constants {
public static final String TASK_CLASSIFICATION_ID = encodePropertyKey(TASK_PREFIX + "classificationId");
public static final String TASK_ENTITY_GUID = encodePropertyKey(TASK_PREFIX + "entityGuid");
public static final String TASK_CLASSIFICATION_TYPENAME = encodePropertyKey(TASK_PREFIX + "classificationTypeName");
public static final String TASK_ASSET_COUNT_TO_PROPAGATE = encodePropertyKey(TASK_PREFIX + "assetsCountToPropagate");
public static final String TASK_ASSET_COUNT_PROPAGATED = encodePropertyKey(TASK_PREFIX + "assetsCountPropagated");
public static final String ACTIVE_STATE_VALUE = "ACTIVE";
public static final String TASK_HEADER_ATLAN_AGENT = "x-atlan-agent";
public static final String TASK_HEADER_ATLAN_AGENT_ID = "x-atlan-agent-id";
Expand Down
22 changes: 20 additions & 2 deletions intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ public class AtlasPatch implements Serializable {
private long createdTime;
private long updatedTime;
private PatchStatus status;
private long assetsCountToPropagate;
private long assetsCountPropagated;

public enum PatchStatus { UNKNOWN, APPLIED, SKIPPED, FAILED }

public AtlasPatch() { }

public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status,
String updatedBy, String createdBy, long createdTime, long updatedTime) {
String updatedBy, String createdBy, long createdTime, long updatedTime, long assetsCountToPropagate, long assetsCountPropagated) {
this.id = id;
this.description = patchName;
this.type = type;
Expand All @@ -65,6 +67,8 @@ public AtlasPatch(String id, String patchName, String type, String action, Patch
this.createdBy = createdBy;
this.createdTime = createdTime;
this.updatedTime = updatedTime;
this.assetsCountToPropagate = assetsCountToPropagate;
this.assetsCountPropagated = assetsCountPropagated;
}

public String getId() {
Expand Down Expand Up @@ -139,6 +143,18 @@ public void setUpdatedTime(long updatedTime) {
this.updatedTime = updatedTime;
}

public void setAssetsCountToPropagate(Long assetsCount) {
this.assetsCountToPropagate = assetsCount;
}

public Long getAssetsCountToPropagate() {
return assetsCountToPropagate;
}

public Long getAssetsCountPropagated(){
return assetsCountPropagated;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -157,7 +173,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status);
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status, assetsCountToPropagate, assetsCountPropagated);
}

@Override
Expand All @@ -173,6 +189,8 @@ public String toString() {
sb.append(", createdTime=").append(createdTime);
sb.append(", updatedTime=").append(updatedTime);
sb.append(", status=").append(status);
sb.append(", assetsCountToPropagate=").append(assetsCountToPropagate);
sb.append(", assetsCountPropagated=").append(assetsCountPropagated);
sb.append('}');

return sb.toString();
Expand Down
22 changes: 22 additions & 0 deletions intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public static Status from(String s) {
private String classificationId;
private String entityGuid;
private String classificationTypeName;
private Long assetsCountToPropagate;
private Long assetsCountPropagated;

public AtlasTask() {
}
Expand All @@ -111,6 +113,8 @@ public AtlasTask(String type, String createdBy, Map<String, Object> parameters,
this.attemptCount = 0;
this.classificationId = classificationId;
this.entityGuid = entityGuid;
this.assetsCountToPropagate = 0L;
this.assetsCountPropagated = 0L;
}

public String getGuid() {
Expand Down Expand Up @@ -239,6 +243,22 @@ public String getEntityGuid() {
return entityGuid;
}

public void setAssetsCountToPropagate(Long assetsCount) {
this.assetsCountToPropagate = assetsCount;
}

public Long getAssetsCountToPropagate() {
return assetsCountToPropagate;
}

public void setAssetsCountPropagated(Long assetsCountPropagated) {
this.assetsCountPropagated = assetsCountPropagated;
}

public Long getAssetsCountPropagated(){
return assetsCountPropagated;
}

@JsonIgnore
public void start() {
this.setStatus(Status.IN_PROGRESS);
Expand Down Expand Up @@ -270,6 +290,8 @@ public String toString() {
", attemptCount=" + attemptCount +
", errorMessage='" + errorMessage + '\'' +
", status=" + status +
", assetsCountToPropagate=" + assetsCountToPropagate +
", assetsCountPropagated=" + assetsCountPropagated +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ private void initialize(AtlasGraph graph) throws RepositoryException, IndexExcep
createCommonVertexIndex(management, TASK_ENTITY_GUID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_ERROR_MESSAGE, UniqueKind.NONE, String.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ATTEMPT_COUNT, UniqueKind.NONE, Integer.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ASSET_COUNT_TO_PROPAGATE, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ASSET_COUNT_PROPAGATED, UniqueKind.NONE, Long.class, SINGLE, false, false);

createCommonVertexIndex(management, TASK_UPDATED_TIME, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_TIME_TAKEN_IN_SECONDS, UniqueKind.NONE, Long.class, SINGLE, false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ private static AtlasPatch toAtlasPatch(AtlasVertex vertex) {
ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class));
ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class));
ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class));
ret.setAssetsCountToPropagate(getEncodedProperty(vertex, TASK_ASSET_COUNT_TO_PROPAGATE, Long.class));
ret.setStatus(getPatchStatus(vertex));

return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1220,23 +1220,52 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship
}
}

// update the 'assetsCountToPropagate' on in memory java object.
AtlasTask currentTask = RequestContext.get().getCurrentTask();
currentTask.setAssetsCountToPropagate((long) addPropagationsMap.size() + removePropagationsMap.size());

//update the 'assetsCountToPropagate' in the current task vertex.
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate());
//commiting to graph
graph.commit();

//total propagated count
int propagatedCount = 0;
for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
List<AtlasVertex> entitiesToAddPropagation = addPropagationsMap.get(classificationVertex);

addTagPropagation(classificationVertex, entitiesToAddPropagation);
propagatedCount++;
if (propagatedCount == 100){
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount - 1);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
propagatedCount = 0;
}
}

for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
List<AtlasVertex> entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex);

removeTagPropagation(classificationVertex, entitiesToRemovePropagation);
propagatedCount++;
if (propagatedCount == 100){
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
propagatedCount = 0;
}
}
if (propagatedCount != 0){
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
}
} else {
// update blocked propagated classifications only if there is no change is tag propagation (don't update both)
handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
}
}


public void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedClassifications) throws AtlasBaseException {
if (blockedClassifications != null) {
List<AtlasVertex> propagatableClassifications = getPropagatableClassifications(edge);
Expand Down
Loading

0 comments on commit 398085d

Please sign in to comment.