Skip to content

Commit ed9ff81

Browse files
committed
[Improvement-16985][Registry] SubscribeListener support set scope
1 parent 25108c8 commit ed9ff81

File tree

12 files changed

+270
-226
lines changed

12 files changed

+270
-226
lines changed

Diff for: dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/AbstractClusterSubscribeListener.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public void notify(Event event) {
3030
try {
3131
// make sure the event is processed in order
3232
synchronized (this) {
33-
Event.Type type = event.type();
34-
T server = parseServerFromHeartbeat(event.data());
33+
Event.Type type = event.getType();
34+
T server = parseServerFromHeartbeat(event.getEventData());
3535
if (server == null) {
3636
log.error("Unknown cluster change event: {}", event);
3737
return;
@@ -58,6 +58,11 @@ public void notify(Event event) {
5858
}
5959
}
6060

61+
@Override
62+
public SubscribeScope getSubscribeScope() {
63+
return SubscribeScope.CHILDREN_ONLY;
64+
}
65+
6166
abstract T parseServerFromHeartbeat(String serverHeartBeatJson);
6267

6368
public abstract void onServerAdded(T serverHeartBeat);

Diff for: dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java

+13-98
Original file line numberDiff line numberDiff line change
@@ -17,115 +17,30 @@
1717

1818
package org.apache.dolphinscheduler.registry.api;
1919

20+
import lombok.AllArgsConstructor;
21+
import lombok.Builder;
22+
import lombok.Getter;
23+
import lombok.ToString;
24+
25+
@Getter
26+
@ToString
27+
@Builder
28+
@AllArgsConstructor
2029
public class Event {
2130

22-
// The prefix which is watched
23-
private String key;
31+
// The path which is watched
32+
private String watchedPath;
2433
// The full path where the event was generated
25-
private String path;
34+
private String eventPath;
2635
// The value corresponding to the path
27-
private String data;
36+
private String eventData;
2837
// The event type {ADD, REMOVE, UPDATE}
2938
private Type type;
3039

31-
public Event(String key, String path, String data, Type type) {
32-
this.key = key;
33-
this.path = path;
34-
this.data = data;
35-
this.type = type;
36-
}
37-
38-
public Event() {
39-
}
40-
41-
public static EventBuilder builder() {
42-
return new EventBuilder();
43-
}
44-
45-
public String key() {
46-
return this.key;
47-
}
48-
49-
public String path() {
50-
return this.path;
51-
}
52-
53-
public String data() {
54-
return this.data;
55-
}
56-
57-
public Type type() {
58-
return this.type;
59-
}
60-
61-
public Event key(String key) {
62-
this.key = key;
63-
return this;
64-
}
65-
66-
public Event path(String path) {
67-
this.path = path;
68-
return this;
69-
}
70-
71-
public Event data(String data) {
72-
this.data = data;
73-
return this;
74-
}
75-
76-
public Event type(Type type) {
77-
this.type = type;
78-
return this;
79-
}
80-
81-
public String toString() {
82-
return "Event(key=" + this.key() + ", path=" + this.path() + ", data=" + this.data() + ", type=" + this.type()
83-
+ ")";
84-
}
85-
8640
public enum Type {
8741
ADD,
8842
REMOVE,
8943
UPDATE
9044
}
9145

92-
public static class EventBuilder {
93-
94-
private String key;
95-
private String path;
96-
private String data;
97-
private Type type;
98-
99-
EventBuilder() {
100-
}
101-
102-
public EventBuilder key(String key) {
103-
this.key = key;
104-
return this;
105-
}
106-
107-
public EventBuilder path(String path) {
108-
this.path = path;
109-
return this;
110-
}
111-
112-
public EventBuilder data(String data) {
113-
this.data = data;
114-
return this;
115-
}
116-
117-
public EventBuilder type(Type type) {
118-
this.type = type;
119-
return this;
120-
}
121-
122-
public Event build() {
123-
return new Event(key, path, data, type);
124-
}
125-
126-
public String toString() {
127-
return "Event.EventBuilder(key=" + this.key + ", path=" + this.path + ", data=" + this.data + ", type="
128-
+ this.type + ")";
129-
}
130-
}
13146
}

Diff for: dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,23 @@
1919

2020
public interface SubscribeListener {
2121

22-
void notify(Event event);
22+
void notify(final Event event);
23+
24+
SubscribeScope getSubscribeScope();
25+
26+
enum SubscribeScope {
27+
/**
28+
* Only watch the path itself
29+
*/
30+
PATH_ONLY,
31+
/**
32+
* Only watch the children of the path
33+
*/
34+
CHILDREN_ONLY,
35+
/**
36+
* Watch the path and all its children and the parent path
37+
*/
38+
ALL
39+
40+
}
2341
}

Diff for: dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2323
import org.apache.dolphinscheduler.registry.api.Event;
2424
import org.apache.dolphinscheduler.registry.api.Registry;
25+
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
2526

2627
import java.util.List;
2728

@@ -56,16 +57,25 @@ public AbstractHAServer(final Registry registry, final String selectorPath, fina
5657

5758
@Override
5859
public void start() {
59-
registry.subscribe(selectorPath, event -> {
60-
if (Event.Type.REMOVE.equals(event.type())) {
61-
if (serverIdentify.equals(event.data())) {
62-
statusChange(ServerStatus.STAND_BY);
63-
} else {
64-
if (participateElection()) {
65-
statusChange(ServerStatus.ACTIVE);
60+
registry.subscribe(selectorPath, new SubscribeListener() {
61+
62+
@Override
63+
public void notify(Event event) {
64+
if (Event.Type.REMOVE.equals(event.getType())) {
65+
if (serverIdentify.equals(event.getEventData())) {
66+
statusChange(ServerStatus.STAND_BY);
67+
} else {
68+
if (participateElection()) {
69+
statusChange(ServerStatus.ACTIVE);
70+
}
6671
}
6772
}
6873
}
74+
75+
@Override
76+
public SubscribeScope getSubscribeScope() {
77+
return SubscribeScope.PATH_ONLY;
78+
}
6979
});
7080

7181
if (participateElection()) {

Diff for: dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java

+44-25
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.HashMap;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.Optional;
3536
import java.util.concurrent.ConcurrentHashMap;
3637
import java.util.concurrent.ExecutionException;
3738
import java.util.concurrent.TimeUnit;
@@ -158,7 +159,24 @@ public void subscribe(String path, SubscribeListener listener) {
158159
watcherMap.computeIfAbsent(path,
159160
$ -> client.getWatchClient().watch(watchKey, watchOption, watchResponse -> {
160161
for (WatchEvent event : watchResponse.getEvents()) {
161-
listener.notify(new EventAdaptor(event, path));
162+
final String eventPath = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
163+
switch (listener.getSubscribeScope()) {
164+
case PATH_ONLY:
165+
if (eventPath.equals(path)) {
166+
listener.notify(toEvent(event, path));
167+
}
168+
break;
169+
case CHILDREN_ONLY:
170+
if (!eventPath.equals(path)) {
171+
listener.notify(toEvent(event, path));
172+
}
173+
break;
174+
case ALL:
175+
listener.notify(toEvent(event, path));
176+
break;
177+
default:
178+
throw new RegistryException("Unknown event scope: " + listener.getSubscribeScope());
179+
}
162180
}
163181
}));
164182
} catch (Exception e) {
@@ -373,30 +391,31 @@ private static ByteSequence byteSequence(String val) {
373391
return ByteSequence.from(val, StandardCharsets.UTF_8);
374392
}
375393

376-
static final class EventAdaptor extends Event {
377-
378-
public EventAdaptor(WatchEvent event, String key) {
379-
key(key);
380-
381-
switch (event.getEventType()) {
382-
case PUT:
383-
if (event.getPrevKV().getKey().isEmpty()) {
384-
type(Type.ADD);
385-
} else {
386-
type(Type.UPDATE);
387-
}
388-
break;
389-
case DELETE:
390-
type(Type.REMOVE);
391-
break;
392-
default:
393-
break;
394-
}
395-
KeyValue keyValue = event.getKeyValue();
396-
if (keyValue != null) {
397-
path(keyValue.getKey().toString(StandardCharsets.UTF_8));
398-
data(keyValue.getValue().toString(StandardCharsets.UTF_8));
399-
}
394+
private Event toEvent(final WatchEvent watchEvent, final String watchedPath) {
395+
Event.Type eventType = null;
396+
switch (watchEvent.getEventType()) {
397+
case PUT:
398+
if (watchEvent.getPrevKV().getKey().isEmpty()) {
399+
eventType = Event.Type.ADD;
400+
} else {
401+
eventType = Event.Type.UPDATE;
402+
}
403+
break;
404+
case DELETE:
405+
eventType = Event.Type.REMOVE;
406+
break;
407+
default:
408+
break;
400409
}
410+
final KeyValue keyValue = watchEvent.getKeyValue();
411+
return Event.builder()
412+
.type(eventType)
413+
.watchedPath(watchedPath)
414+
.eventPath(Optional.ofNullable(keyValue).map(kv -> kv.getKey().toString(StandardCharsets.UTF_8))
415+
.orElse(null))
416+
.eventData(Optional.ofNullable(keyValue).map(kv -> kv.getValue().toString(StandardCharsets.UTF_8))
417+
.orElse(null))
418+
.build();
401419
}
420+
402421
}

Diff for: dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,24 @@ public void testSubscribe() {
8181
final AtomicBoolean subscribeRemoved = new AtomicBoolean(false);
8282
final AtomicBoolean subscribeUpdated = new AtomicBoolean(false);
8383

84-
SubscribeListener subscribeListener = event -> {
85-
System.out.println("Receive event: " + event);
86-
if (event.type() == Event.Type.ADD) {
87-
subscribeAdded.compareAndSet(false, true);
84+
final SubscribeListener subscribeListener = new SubscribeListener() {
85+
86+
@Override
87+
public void notify(Event event) {
88+
if (event.type() == Event.Type.ADD) {
89+
subscribeAdded.compareAndSet(false, true);
90+
}
91+
if (event.type() == Event.Type.REMOVE) {
92+
subscribeRemoved.compareAndSet(false, true);
93+
}
94+
if (event.type() == Event.Type.UPDATE) {
95+
subscribeUpdated.compareAndSet(false, true);
96+
}
8897
}
89-
if (event.type() == Event.Type.REMOVE) {
90-
subscribeRemoved.compareAndSet(false, true);
91-
}
92-
if (event.type() == Event.Type.UPDATE) {
93-
subscribeUpdated.compareAndSet(false, true);
98+
99+
@Override
100+
public SubscribeScope getSubscribeScope() {
101+
return SubscribeScope.PATH_ONLY;
94102
}
95103
};
96104
String key = "/nodes/master" + System.nanoTime();

0 commit comments

Comments
 (0)