Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG1924: Param changes in task vertex for add tags propagation to increase observability. #3878

Open
wants to merge 7 commits into
base: beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ on:
- beta
- development
- master
- dg1908
- revert3773dg1924
- ns/fix/delta-refresh
- taskdg1924
hr2904 marked this conversation as resolved.
Show resolved Hide resolved
jnkrmg marked this conversation as resolved.
Show resolved Hide resolved

jobs:
build:
Expand Down Expand Up @@ -66,8 +64,9 @@ jobs:

- name: Build with Maven
run: |
branch_name=${{ env.BRANCH_NAME }}
if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]]

branch_name=${{ steps.get_branch.outputs.branch }}
if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'taskdg1924' ]]
then
echo "build without dashboard"
chmod +x ./build.sh && ./build.sh build_without_dashboard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ public final class Constants {
public static final String TASK_CLASSIFICATION_ID = encodePropertyKey(TASK_PREFIX + "classificationId");
public static final String TASK_ENTITY_GUID = encodePropertyKey(TASK_PREFIX + "entityGuid");
public static final String TASK_CLASSIFICATION_TYPENAME = encodePropertyKey(TASK_PREFIX + "classificationTypeName");
public static final String TASK_ASSET_COUNT_TO_PROPAGATE = encodePropertyKey(TASK_PREFIX + "assetsCountToPropagate");
public static final String TASK_ASSET_COUNT_PROPAGATED = encodePropertyKey(TASK_PREFIX + "assetsCountPropagated");
public static final String ACTIVE_STATE_VALUE = "ACTIVE";
public static final String TASK_HEADER_ATLAN_AGENT = "x-atlan-agent";
public static final String TASK_HEADER_ATLAN_AGENT_ID = "x-atlan-agent-id";
Expand Down
22 changes: 20 additions & 2 deletions intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ public class AtlasPatch implements Serializable {
private long createdTime;
private long updatedTime;
private PatchStatus status;
private long assetsCountToPropagate;
private long assetsCountPropagated;

public enum PatchStatus { UNKNOWN, APPLIED, SKIPPED, FAILED }

public AtlasPatch() { }

public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status,
String updatedBy, String createdBy, long createdTime, long updatedTime) {
String updatedBy, String createdBy, long createdTime, long updatedTime, long assetsCountToPropagate, long assetsCountPropagated) {
this.id = id;
this.description = patchName;
this.type = type;
Expand All @@ -65,6 +67,8 @@ public AtlasPatch(String id, String patchName, String type, String action, Patch
this.createdBy = createdBy;
this.createdTime = createdTime;
this.updatedTime = updatedTime;
this.assetsCountToPropagate = assetsCountToPropagate;
this.assetsCountPropagated = assetsCountPropagated;
}

public String getId() {
Expand Down Expand Up @@ -139,6 +143,18 @@ public void setUpdatedTime(long updatedTime) {
this.updatedTime = updatedTime;
}

public void setAssetsCountToPropagate(Long assetsCount) {
this.assetsCountToPropagate = assetsCount;
}

public Long getAssetsCountToPropagate() {
return assetsCountToPropagate;
}

public Long getAssetsCountPropagated(){
return assetsCountPropagated;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -157,7 +173,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status);
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status, assetsCountToPropagate, assetsCountPropagated);
}

@Override
Expand All @@ -173,6 +189,8 @@ public String toString() {
sb.append(", createdTime=").append(createdTime);
sb.append(", updatedTime=").append(updatedTime);
sb.append(", status=").append(status);
sb.append(", assetsCountToPropagate=").append(assetsCountToPropagate);
sb.append(", assetsCountPropagated=").append(assetsCountPropagated);
sb.append('}');

return sb.toString();
Expand Down
23 changes: 23 additions & 0 deletions intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public static Status from(String s) {
private String classificationId;
private String entityGuid;
private String classificationTypeName;
private Long assetsCountToPropagate;
private Long assetsCountPropagated;

public AtlasTask() {
}
Expand All @@ -111,8 +113,11 @@ public AtlasTask(String type, String createdBy, Map<String, Object> parameters,
this.attemptCount = 0;
this.classificationId = classificationId;
this.entityGuid = entityGuid;
this.assetsCountToPropagate = 0L;
this.assetsCountPropagated = 0L;
}


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space. Though minor but good to change

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

public String getGuid() {
return guid;
}
Expand Down Expand Up @@ -239,6 +244,22 @@ public String getEntityGuid() {
return entityGuid;
}

public void setAssetsCountToPropagate(Long assetsCount) {
this.assetsCountToPropagate = assetsCount;
}

public Long getAssetsCountToPropagate() {
return assetsCountToPropagate;
}

public void setAssetsCountPropagated(Long assetsCountPropagated) {
this.assetsCountPropagated = assetsCountPropagated;
}

public Long getAssetsCountPropagated(){
return assetsCountPropagated;
}

@JsonIgnore
public void start() {
this.setStatus(Status.IN_PROGRESS);
Expand Down Expand Up @@ -270,6 +291,8 @@ public String toString() {
", attemptCount=" + attemptCount +
", errorMessage='" + errorMessage + '\'' +
", status=" + status +
", assetsCountToPropagate=" + assetsCountToPropagate +
", assetsCountPropagated=" + assetsCountPropagated +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ private void initialize(AtlasGraph graph) throws RepositoryException, IndexExcep
createCommonVertexIndex(management, TASK_ENTITY_GUID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_ERROR_MESSAGE, UniqueKind.NONE, String.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ATTEMPT_COUNT, UniqueKind.NONE, Integer.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ASSET_COUNT_TO_PROPAGATE, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ASSET_COUNT_PROPAGATED, UniqueKind.NONE, Long.class, SINGLE, false, false);

createCommonVertexIndex(management, TASK_UPDATED_TIME, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_TIME_TAKEN_IN_SECONDS, UniqueKind.NONE, Long.class, SINGLE, false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ private static AtlasPatch toAtlasPatch(AtlasVertex vertex) {
ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class));
ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class));
ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class));
ret.setAssetsCountToPropagate(getEncodedProperty(vertex, TASK_ASSET_COUNT_TO_PROPAGATE, Long.class));
ret.setStatus(getPatchStatus(vertex));

return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@

import javax.inject.Inject;
import java.util.*;
import java.util.concurrent.TimeUnit;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove this if not being used in the code. found at 2 places, being imported but not used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -95,6 +96,7 @@
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.tasks.AtlasTask.Status.IN_PROGRESS;
import static org.apache.atlas.model.tasks.AtlasTask.Status.PENDING;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
Expand Down Expand Up @@ -3417,6 +3419,7 @@ public void addClassifications(final EntityMutationContext context, String guid,

public List<String> propagateClassification(String entityGuid, String classificationVertexId, String relationshipGuid, Boolean previousRestrictPropagationThroughLineage,Boolean previousRestrictPropagationThroughHierarchy) throws AtlasBaseException {
try {

if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
LOG.error("propagateClassification(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);

Expand Down Expand Up @@ -3459,7 +3462,7 @@ public List<String> propagateClassification(String entityGuid, String classifica
List<String> edgeLabelsToCheck = CLASSIFICATION_PROPAGATION_MODE_LABELS_MAP.get(propagationMode);
Boolean toExclude = propagationMode == CLASSIFICATION_PROPAGATION_MODE_RESTRICT_LINEAGE ? true:false;
List<AtlasVertex> impactedVertices = entityRetriever.getIncludedImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId, edgeLabelsToCheck,toExclude);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix the indents please

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (CollectionUtils.isEmpty(impactedVertices)) {
LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId);

Expand All @@ -3475,6 +3478,16 @@ public List<String> propagateClassification(String entityGuid, String classifica
}

public List<String> processClassificationPropagationAddition(List<AtlasVertex> verticesToPropagate, AtlasVertex classificationVertex) throws AtlasBaseException{

hr2904 marked this conversation as resolved.
Show resolved Hide resolved
// update the 'assetsCountToPropagate' on in memory java object.
AtlasTask currentTask = RequestContext.get().getCurrentTask();
currentTask.setAssetsCountToPropagate((long) verticesToPropagate.size());

//update the 'assetsCountToPropagate' in the current task vertex.
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a method instead of using this 1 liner please

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for all the comments asking to make the graph call into a method instead of a one liner, me and janaki has discussed this and I will be doing that in version 2. Thats part of the code cleanup.

currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate());
graph.commit();

AtlasPerfMetrics.MetricRecorder classificationPropagationMetricRecorder = RequestContext.get().startMetricRecord("processClassificationPropagationAddition");
List<String> propagatedEntitiesGuids = new ArrayList<>();
int impactedVerticesSize = verticesToPropagate.size();
Expand Down Expand Up @@ -3505,9 +3518,12 @@ public List<String> processClassificationPropagationAddition(List<AtlasVertex> v

propagatedEntitiesGuids.addAll(chunkedPropagatedEntitiesGuids);

offset += CHUNK_SIZE;

transactionInterceptHelper.intercept();
int finishedTaskCount = toIndex - offset;

offset += CHUNK_SIZE;
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());

} while (offset < impactedVerticesSize);
} catch (AtlasBaseException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
import java.util.HashMap;
import java.util.Map;

import static org.apache.atlas.model.tasks.AtlasTask.Status.*;
import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
import static org.apache.atlas.model.tasks.AtlasTask.Status.IN_PROGRESS;

import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;

public abstract class ClassificationTask extends AbstractTask {
Expand Down Expand Up @@ -100,9 +103,7 @@ public AtlasTask.Status perform() throws AtlasBaseException {

try {
setStatus(IN_PROGRESS);

run(params);

setStatus(COMPLETE);
} catch (AtlasBaseException e) {
LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,12 @@ protected AtlasTask getTaskDef() {
return this.task;
}

protected void setAssetsCountToPropagate(Long assetsCount) {
task.setAssetsCountToPropagate(assetsCount);;
}

public Long getAssetsCountToPropagate() {
return this.task.getAssetsCountToPropagate();
}
public abstract Status perform() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ public List<AtlasTask> createAtlasTasks(List<AtlasTask> tasks) throws AtlasBaseE
task.setCreatedTime(new Date());
task.setStatusPending();
task.setAttemptCount(0);
task.setAssetsCountToPropagate(0L);
task.setAssetsCountPropagated(0L);
task.setGuid(UUID.randomUUID().toString());
task.setCreatedBy(RequestContext.getCurrentUser());

Expand Down Expand Up @@ -268,6 +270,8 @@ public AtlasVertex createTaskVertex(AtlasTask task) {

setEncodedProperty(ret, Constants.TASK_PARAMETERS, AtlasJson.toJson(task.getParameters()));
setEncodedProperty(ret, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
setEncodedProperty(ret, Constants.TASK_ASSET_COUNT_TO_PROPAGATE, task.getAssetsCountToPropagate());
setEncodedProperty(ret, Constants.TASK_ASSET_COUNT_PROPAGATED, task.getAssetsCountPropagated());
setEncodedProperty(ret, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());

LOG.info("Creating task vertex: {}: {}, {}: {}, {}: {} ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private void performTask(AtlasVertex taskVertex, AtlasTask task) throws Exceptio

AbstractTask runnableTask = factory.create(task);

registry.inProgress(taskVertex, task);
registry.inProgress(taskVertex, task, runnableTask);

runnableTask.run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.apache.atlas.AtlasException;
import org.apache.atlas.model.tasks.AtlasTask;

import java.util.List;
Expand All @@ -26,7 +27,7 @@ public interface TaskFactory {
* @param atlasTask
* @return
*/
AbstractTask create(AtlasTask atlasTask);
AbstractTask create(AtlasTask atlasTask) throws AtlasException;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious to know why are we allowing to throw error here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been removed now, an implementation was complaining about missing exception. Corrected that in this PR, forgot to remove throwing this exception. Thanks


List<String> getSupportedTypes();
}
Loading
Loading