Skip to content

DG-1924 | Add 'assetsCountToPropagate' and 'assetsCountPropagated' in task vertex. #4032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 79 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
759f717
Added ‘assetsCountToPropagate’ and ‘assetsCountPropagated’ to the tas…
jnkrmg Jan 23, 2025
65d920d
review comment fixes
abhijeet-atlan Jan 29, 2025
61d58dd
fixing dry for task vertex updation
abhijeet-atlan Jan 30, 2025
dfddb79
fixing dry for task vertex updation
abhijeet-atlan Jan 30, 2025
b4ce7af
removed redundant code for 'add prop', 'text update', 'delete prop', …
abhijeet-atlan Jan 30, 2025
ed847c1
reverted taskregistry
abhijeet-atlan Feb 2, 2025
c257421
removed all repeating code for 6 prop types
abhijeet-atlan Feb 2, 2025
adc6ed1
more fixes based on review comments
abhijeet-atlan Feb 2, 2025
fdca703
refactored 'updateTaskVertexProperty'
abhijeet-atlan Feb 4, 2025
164ec29
updated maven.yaml
abhijeet-atlan Feb 4, 2025
4e8c6df
added docstring to explain functionality to 'updateTaskVertexProperty'
abhijeet-atlan Feb 5, 2025
d046141
Init script
hr2904 Jan 15, 2025
288062c
PR Fixes #1
hr2904 Jan 19, 2025
ae9dec1
added code to send kafka message to topic 'TAG_PROP_EVENTS'
abhijeet-atlan Feb 4, 2025
a13f17e
added abstract method to send kafka message to a partition
abhijeet-atlan Feb 4, 2025
d364c8e
Update maven.yml
abhijeet-atlan Feb 4, 2025
b9b6887
corrected delete handler
abhijeet-atlan Feb 4, 2025
317322e
Merge with master
abhijeet-atlan Feb 5, 2025
7cf3072
Review comments fixes
abhijeet-atlan Feb 5, 2025
ba5d144
Removed dup dependency
abhijeet-atlan Feb 5, 2025
14db31d
changed signature and usage for 'updateTaskVertexProperty'
abhijeet-atlan Feb 5, 2025
102ae40
deleted not used function 'updateTaskVertexProperty'
abhijeet-atlan Feb 5, 2025
64775b4
experiment to check if everything works
abhijeet-atlan Feb 5, 2025
42540a0
removed commented out code
abhijeet-atlan Feb 5, 2025
97bbbaf
fixes for review comment
abhijeet-atlan Feb 13, 2025
42b074a
Merge branch 'master' into tagpropv1master
abhijeet-atlan Feb 13, 2025
c7c8c4e
payload change for kafka message
abhijeet-atlan Feb 24, 2025
6eaf1eb
changed code for prop relationship update
abhijeet-atlan Feb 24, 2025
5caa581
changed topic name
abhijeet-atlan Mar 3, 2025
6d176e7
updated code to add three params to the task vertex:
abhijeet-atlan Mar 3, 2025
6451bfd
Merge branch 'master' into tagpropv1master
abhijeet-atlan Mar 3, 2025
e51846d
updated maven to use cacheV3
abhijeet-atlan Mar 3, 2025
48e07f4
error fix for missing params
abhijeet-atlan Mar 3, 2025
547d561
Merge branch 'tagpropv1master' into tagpropv1.5master
abhijeet-atlan Mar 3, 2025
616dcef
changed obj prop topic partition count to be 10
abhijeet-atlan Mar 3, 2025
06fb81c
Merge pull request #4085 from atlanhq/tagpropv1.5master
abhijeet-atlan Mar 3, 2025
cab6deb
opted for round robin approach for oartition selection
abhijeet-atlan Mar 4, 2025
157e2db
maven update
abhijeet-atlan Mar 4, 2025
06e406a
changes related to kafka
abhijeet-atlan Mar 4, 2025
3936029
changes related to kafka
abhijeet-atlan Mar 4, 2025
de81019
cleanup
suraj5077 Mar 5, 2025
720cda0
cleanup
suraj5077 Mar 5, 2025
1b4dbee
changes related to kafka
abhijeet-atlan Mar 5, 2025
e66c636
fixed build
suraj5077 Mar 5, 2025
0f6af6d
Merge remote-tracking branch 'origin/tagpropv1master' into tagpropv1m…
suraj5077 Mar 5, 2025
73c7cd8
changes related to kafka
abhijeet-atlan Mar 5, 2025
edebe8c
reverting last change
abhijeet-atlan Mar 5, 2025
6961d65
added property 'assetGuid'.
abhijeet-atlan Mar 5, 2025
5f31f32
fixed errors
abhijeet-atlan Mar 5, 2025
8063ff0
fixed a comment typo
hr2904 Mar 5, 2025
b0aadc1
fixed an infinite loop error
hr2904 Mar 6, 2025
5d31524
updated kafka message
abhijeet-atlan Mar 7, 2025
08fa364
Added new bg thread for task updation
suraj5077 Mar 12, 2025
77dd858
Added redis methods for tag-prop
suraj5077 Mar 12, 2025
817418b
added logs for kafka messages
abhijeet-atlan Mar 12, 2025
613db6c
Kafka topic creation for tag-prop topic
suraj5077 Mar 12, 2025
18ccd0f
Merge remote-tracking branch 'origin/tagpropv1master' into tagpropv1m…
suraj5077 Mar 12, 2025
d7f5e47
Added new watcher thread for tag-prop v2
suraj5077 Mar 12, 2025
4e6eb48
updated kafka message
abhijeet-atlan Mar 12, 2025
0d8bdf9
Merge remote-tracking branch 'origin/tagpropv1master' into tagpropv1m…
abhijeet-atlan Mar 12, 2025
386b92c
updated kafka message
abhijeet-atlan Mar 12, 2025
850fffe
TaskExecutor new thread
suraj5077 Mar 12, 2025
65cb0fb
Merge remote-tracking branch 'origin/tagpropv1master' into tagpropv1m…
suraj5077 Mar 12, 2025
6aede5c
Pushed new TagPropagator class, reverted EntityGrahMapper
suraj5077 Mar 12, 2025
22e973a
Merge branch 'master' into tagpropv1master
suraj5077 Mar 12, 2025
2991b78
Refactored TagPropagator class
suraj5077 Mar 12, 2025
75a8e14
Refactored
suraj5077 Mar 12, 2025
bc27ce9
fixed tagprop code
suraj5077 Mar 13, 2025
0b8dcd0
Made DeleteHandler backward compatiple
suraj5077 Mar 13, 2025
754ceb0
fix NPE
suraj5077 Mar 13, 2025
e297cae
added code to update taskvertex with data on redis
abhijeet-atlan Mar 13, 2025
c1de532
Fixed task updater
suraj5077 Mar 14, 2025
c1fa864
Adds redis dlock
suraj5077 Mar 14, 2025
6abecbb
Added dlocking to kafka code
suraj5077 Mar 14, 2025
a4b9046
updated kafka object for delete prop to send classification id
abhijeet-atlan Mar 17, 2025
050c1f3
Fixed race condition in-progress issue and complete status not commit…
hr2904 Mar 18, 2025
c35fde8
redis cleanup
suraj5077 Mar 18, 2025
fda9378
Added post status COMPLETE handling
suraj5077 Mar 18, 2025
7c89b28
Added logs for latency and metric calculations for benchmarking
hr2904 Mar 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ on:
- master
- lineageondemand
- makerlogic
- tagpropv1master

jobs:
build:
Expand Down Expand Up @@ -67,7 +68,7 @@ jobs:
- name: Build with Maven
run: |
branch_name=${{ env.BRANCH_NAME }}
if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'lineageondemand' ]]
if [[ $branch_name == 'main' || $branch_name == 'master' || $branch_name == 'tagpropv1master' ]]
then
echo "build without dashboard"
chmod +x ./build.sh && ./build.sh build_without_dashboard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ public final class Constants {
public static final String TASK_CLASSIFICATION_ID = encodePropertyKey(TASK_PREFIX + "classificationId");
public static final String TASK_ENTITY_GUID = encodePropertyKey(TASK_PREFIX + "entityGuid");
public static final String TASK_CLASSIFICATION_TYPENAME = encodePropertyKey(TASK_PREFIX + "classificationTypeName");
public static final String TASK_ASSET_COUNT_TO_PROPAGATE = encodePropertyKey(TASK_PREFIX + "assetsCountToPropagate");
public static final String TASK_ASSET_COUNT_PROPAGATED = encodePropertyKey(TASK_PREFIX + "assetsCountPropagated");
public static final String ACTIVE_STATE_VALUE = "ACTIVE";
public static final String TASK_HEADER_ATLAN_AGENT = "x-atlan-agent";
public static final String TASK_HEADER_ATLAN_AGENT_ID = "x-atlan-agent-id";
Expand Down
22 changes: 20 additions & 2 deletions intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ public class AtlasPatch implements Serializable {
private long createdTime;
private long updatedTime;
private PatchStatus status;
private long assetsCountToPropagate;
private long assetsCountPropagated;

public enum PatchStatus { UNKNOWN, APPLIED, SKIPPED, FAILED }

public AtlasPatch() { }

public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status,
String updatedBy, String createdBy, long createdTime, long updatedTime) {
String updatedBy, String createdBy, long createdTime, long updatedTime, long assetsCountToPropagate, long assetsCountPropagated) {
this.id = id;
this.description = patchName;
this.type = type;
Expand All @@ -65,6 +67,8 @@ public AtlasPatch(String id, String patchName, String type, String action, Patch
this.createdBy = createdBy;
this.createdTime = createdTime;
this.updatedTime = updatedTime;
this.assetsCountToPropagate = assetsCountToPropagate;
this.assetsCountPropagated = assetsCountPropagated;
}

public String getId() {
Expand Down Expand Up @@ -139,6 +143,18 @@ public void setUpdatedTime(long updatedTime) {
this.updatedTime = updatedTime;
}

public void setAssetsCountToPropagate(Long assetsCount) {
this.assetsCountToPropagate = assetsCount;
}

public Long getAssetsCountToPropagate() {
return assetsCountToPropagate;
}

public Long getAssetsCountPropagated(){
return assetsCountPropagated;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -157,7 +173,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status);
return Objects.hash(id, description, type, action, updatedBy, createdBy, createdTime, updatedTime, status, assetsCountToPropagate, assetsCountPropagated);
}

@Override
Expand All @@ -173,6 +189,8 @@ public String toString() {
sb.append(", createdTime=").append(createdTime);
sb.append(", updatedTime=").append(updatedTime);
sb.append(", status=").append(status);
sb.append(", assetsCountToPropagate=").append(assetsCountToPropagate);
sb.append(", assetsCountPropagated=").append(assetsCountPropagated);
sb.append('}');

return sb.toString();
Expand Down
22 changes: 22 additions & 0 deletions intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public static Status from(String s) {
private String classificationId;
private String entityGuid;
private String classificationTypeName;
private Long assetsCountToPropagate;
private Long assetsCountPropagated;

public AtlasTask() {
}
Expand All @@ -111,6 +113,8 @@ public AtlasTask(String type, String createdBy, Map<String, Object> parameters,
this.attemptCount = 0;
this.classificationId = classificationId;
this.entityGuid = entityGuid;
this.assetsCountToPropagate = 0L;
this.assetsCountPropagated = 0L;
}

public String getGuid() {
Expand Down Expand Up @@ -239,6 +243,22 @@ public String getEntityGuid() {
return entityGuid;
}

public void setAssetsCountToPropagate(Long assetsCount) {
this.assetsCountToPropagate = assetsCount;
}

public Long getAssetsCountToPropagate() {
return assetsCountToPropagate;
}

public void setAssetsCountPropagated(Long assetsCountPropagated) {
this.assetsCountPropagated = assetsCountPropagated;
}

public Long getAssetsCountPropagated(){
return assetsCountPropagated;
}

@JsonIgnore
public void start() {
this.setStatus(Status.IN_PROGRESS);
Expand Down Expand Up @@ -270,6 +290,8 @@ public String toString() {
", attemptCount=" + attemptCount +
", errorMessage='" + errorMessage + '\'' +
", status=" + status +
", assetsCountToPropagate=" + assetsCountToPropagate +
", assetsCountPropagated=" + assetsCountPropagated +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ private void initialize(AtlasGraph graph) throws RepositoryException, IndexExcep
createCommonVertexIndex(management, TASK_ENTITY_GUID, UniqueKind.NONE, String.class, SINGLE, false, false, true);
createCommonVertexIndex(management, TASK_ERROR_MESSAGE, UniqueKind.NONE, String.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ATTEMPT_COUNT, UniqueKind.NONE, Integer.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ASSET_COUNT_TO_PROPAGATE, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_ASSET_COUNT_PROPAGATED, UniqueKind.NONE, Long.class, SINGLE, false, false);

createCommonVertexIndex(management, TASK_UPDATED_TIME, UniqueKind.NONE, Long.class, SINGLE, false, false);
createCommonVertexIndex(management, TASK_TIME_TAKEN_IN_SECONDS, UniqueKind.NONE, Long.class, SINGLE, false, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ private static AtlasPatch toAtlasPatch(AtlasVertex vertex) {
ret.setUpdatedBy(getEncodedProperty(vertex, MODIFIED_BY_KEY, String.class));
ret.setCreatedTime(getEncodedProperty(vertex, TIMESTAMP_PROPERTY_KEY, Long.class));
ret.setUpdatedTime(getEncodedProperty(vertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class));
ret.setAssetsCountToPropagate(getEncodedProperty(vertex, TASK_ASSET_COUNT_TO_PROPAGATE, Long.class));
ret.setStatus(getPatchStatus(vertex));

return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@
import org.slf4j.LoggerFactory;
import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;


import java.util.*;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasClient.DATA_SET_SUPER_TYPE;
Expand Down Expand Up @@ -96,7 +96,22 @@ public abstract class DeleteHandlerV1 {
private final TaskManagement taskManagement;
private final AtlasGraph graph;
private final TaskUtil taskUtil;
private static final int CHUNK_SIZE = AtlasConfiguration.TASKS_GRAPH_COMMIT_CHUNK_SIZE.getInt();

public void updateTaskVertexProperty(String propertyKey, long value, boolean isIncremental, BiConsumer<AtlasTask, Long> taskSetter) {
AtlasTask currentTask = RequestContext.get().getCurrentTask();
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query()
.has(TASK_GUID, currentTask.getGuid())
.vertices().iterator().next();

Long valueFromTaskVertex = currentTaskVertex.getProperty(propertyKey, Long.class);
long valueToPushToTaskVertex = isIncremental ? (valueFromTaskVertex != null ? valueFromTaskVertex : 0L) + value : value;
if (taskSetter != null) {
taskSetter.accept(currentTask, valueToPushToTaskVertex);
}

currentTaskVertex.setProperty(propertyKey, valueToPushToTaskVertex);
}

public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete, TaskManagement taskManagement) {
this.typeRegistry = typeRegistry;
Expand Down Expand Up @@ -1220,16 +1235,32 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship
}
}

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_TO_PROPAGATE, graph, addPropagationsMap.size() + removePropagationsMap.size(), false, AtlasTask::setAssetsCountToPropagate);

int propagatedCount = 0;
for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
List<AtlasVertex> entitiesToAddPropagation = addPropagationsMap.get(classificationVertex);

addTagPropagation(classificationVertex, entitiesToAddPropagation);
propagatedCount++;
if (propagatedCount == CHUNK_SIZE){

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (propagatedCount == CHUNK_SIZE){

Why this check?
propagatedCount is incremented per classification, not per classification<>asset propagation
I am not sure how does this works but IMO TASK_ASSET_COUNT_PROPAGATED will never get updated if classification count is less than CHUNK_SIZE

Copy link
Author

@abhijeet-atlan abhijeet-atlan Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a classification relationship update to happen addTagPropagation and removeTagPropagation both are performed. The number in total could be in hundreds, i am just keeping that check to updated assetsCountPropagated every 100 actions be it addition or removal

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect whether testing this scenario is done with a large dataset, can you ensure the asset on end2 of the relationship that you are going to update has further columns & validate this again for final count as well as incremental count updates

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fairly tested with a dataset that has columns on both end of the relationship and the output was correct.

The total lineage count is 5812 for this dataset and one tag was propagated both for hierarchy and lineage. Changing the "propagateTags" from "TWO_TO_ONE" to "NONE" resulted in removal of one tag.

image

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mentioned that "The total lineage count is 5812", with this can you please justify how the tested value (1) is correct & why we should not expect 5812 as value for assetsCountToPropagate ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's talk in office hours

taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedCount - 1, true, AtlasTask::setAssetsCountPropagated);
propagatedCount = 0;
}
}

for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
List<AtlasVertex> entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex);

removeTagPropagation(classificationVertex, entitiesToRemovePropagation);
propagatedCount++;
if (propagatedCount == CHUNK_SIZE){
taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedCount, true, AtlasTask::setAssetsCountPropagated);
propagatedCount = 0;
}
}
if (propagatedCount != 0){
taskManagement.updateTaskVertexProperty(TASK_ASSET_COUNT_PROPAGATED, graph, propagatedCount, true, AtlasTask::setAssetsCountPropagated);
}
} else {
// update blocked propagated classifications only if there is no change is tag propagation (don't update both)
Expand Down
Loading
Loading