Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jul 19, 2023
1 parent 45fd698 commit 084a498
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 32 deletions.
37 changes: 22 additions & 15 deletions src/org/jgroups/protocols/relay/RELAY2.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ public class RELAY2 extends Protocol {

protected volatile RouteStatusListener route_status_listener;

protected final Set<String> site_cache=new HashSet<>(); // to prevent duplicate site-ups
// to prevent duplicate sitesUp()/sitesDown() notifications; this is needed in every member: routes are only
// maintained by site masters (relayer != null)
// todo: replace with topo once JGRP-2706 is in place
@ManagedAttribute(description="A cache maintaining a list of sites that are up")
protected final Set<String> site_cache=new HashSet<>();

/** Number of messages forwarded to the local SiteMaster */
protected final LongAdder forward_to_site_master=new LongAdder();
Expand Down Expand Up @@ -679,21 +683,24 @@ protected boolean handleAdminMessage(Relay2Header hdr, Message msg) {
case SITES_UP:
case SITES_DOWN:
Set<String> tmp_sites=hdr.getSites();
if(route_status_listener != null && tmp_sites != null) {
tmp_sites.remove(this.site);
tmp_sites.remove(this.site);
if(tmp_sites != null) {
if(hdr.type == SITES_UP) {
tmp_sites.removeAll(site_cache);
site_cache.addAll(tmp_sites);
}
if(tmp_sites.isEmpty())
return true;
String[] tmp=tmp_sites.toArray(new String[]{});
if(hdr.type == SITES_UP)
route_status_listener.sitesUp(tmp);
else {
route_status_listener.sitesDown(tmp);
site_cache.removeAll(tmp_sites);
else { // SITES_DOWN
topo.removeAll(tmp_sites);
tmp_sites.retainAll(site_cache);
site_cache.removeAll(tmp_sites);
}

if(route_status_listener != null && !tmp_sites.isEmpty()) {
String[] tmp=tmp_sites.toArray(new String[]{});
if(hdr.type == SITES_UP)
route_status_listener.sitesUp(tmp);
else
route_status_listener.sitesDown(tmp);
}
}
return true;
Expand Down Expand Up @@ -1013,7 +1020,7 @@ protected void sendSiteUnreachableTo(Address src, String target_site) {
}
// send message back to the src node.
Message msg=new EmptyMessage(src).setFlag(Message.Flag.OOB)
.putHeader(id, new Relay2Header(SITE_UNREACHABLE).setSites(target_site));
.putHeader(id, new Relay2Header(SITE_UNREACHABLE).addToSites(target_site));
down(msg);
}

Expand Down Expand Up @@ -1057,11 +1064,11 @@ protected void deliver(Address dest, Address sender, final Message msg) {
}
}

protected void sitesChange(boolean down, String ... sites) {
if(!broadcast_route_notifications || sites == null || sites.length == 0)
protected void sitesChange(boolean down, Set<String> sites) {
if(!broadcast_route_notifications || sites == null || sites.isEmpty())
return;
Relay2Header hdr=new Relay2Header(down? SITES_DOWN : SITES_UP, null, null)
.setSites(sites);
.addToSites(sites);
down_prot.down(new EmptyMessage(null).putHeader(id, hdr)); // .setFlag(Message.Flag.NO_RELAY));
}

Expand Down
20 changes: 13 additions & 7 deletions src/org/jgroups/protocols/relay/Relay2Header.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,17 @@ public Relay2Header(byte type, Address final_dest, Address original_sender) {
public Address getOriginalSender() {return original_sender;}
public Relay2Header setOriginalSender(Address s) {original_sender=s; return this;}
public Set<String> getSites() {return sites;}
public Relay2Header setSites(Set<String> s) {sites=s; return this;}

public Relay2Header setSites(String ... s) {
public Relay2Header addToSites(Collection<String> s) {
if(s != null) {
if(this.sites == null)
this.sites=new HashSet<>(s.size());
this.sites.addAll(s);
}
return this;
}

public Relay2Header addToSites(String ... s) {
if(s != null && s.length > 0) {
if(this.sites == null)
this.sites=new HashSet<>();
Expand Down Expand Up @@ -87,12 +95,10 @@ public Relay2Header addToVisitedSites(Collection<String> list) {
public Set<String> getVisitedSites() {return visited_sites;}

public Relay2Header copy() {
Relay2Header hdr=new Relay2Header(type, final_dest, original_sender);
if(this.sites != null)
hdr.sites=new HashSet<>(this.sites);
if(visited_sites != null) {
Relay2Header hdr=new Relay2Header(type, final_dest, original_sender)
.addToSites(this.sites);
if(visited_sites != null)
hdr.addToVisitedSites(visited_sites);
}
return hdr;
}

Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/protocols/relay/Relayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ public void viewAccepted(View new_view) {
removed_routes.forEach(routes.keySet()::remove);

if(!down.isEmpty())
relay.sitesChange(true, down.toArray(new String[0]));
relay.sitesChange(true, down);
if(!up.isEmpty())
relay.sitesChange(false, up.toArray(new String[0]));
relay.sitesChange(false, up);
}

@Override
Expand Down
6 changes: 3 additions & 3 deletions tests/junit-functional/org/jgroups/tests/Relay2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,14 +399,14 @@ public void testSitesUp() throws Exception {
assert Stream.of(_a,_d,_g).map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class))
.allMatch(RELAY2::isSiteMaster);

Stream.of(_d,_e,_f,_g,_h,_i)
Stream.of(_b,_c,_d,_e,_f,_g,_h,_i)
.map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class))
.forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener()));
.forEach(r -> r.setRouteStatusListener(new MyRouteStatusListener(r.getAddress()).verbose(true)));

// now stop A; B will become new site master and we should get a site-down(NYC), then site-up(NYC)
Util.close(_a);

Util.waitUntil(5000, 500, () -> Stream.of(_d, _e, _f, _g, _h, _i)
Util.waitUntil(5000, 500, () -> Stream.of(_b,_c,_d, _e, _f, _g, _h, _i)
.map(ch -> (RELAY2)ch.getProtocolStack().findProtocol(RELAY2.class))
.peek(r -> System.out.printf("%s: %s\n", r.getAddress(), r.getRouteStatusListener()))
.map(r -> (MyRouteStatusListener)r.getRouteStatusListener())
Expand Down
19 changes: 16 additions & 3 deletions tests/junit-functional/org/jgroups/tests/RelayTests.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jgroups.tests;

import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.View;
import org.jgroups.logging.Log;
Expand Down Expand Up @@ -129,24 +130,36 @@ protected static void waitForBridgeView(int expected_size, long timeout, long in
}

protected static class MyRouteStatusListener implements RouteStatusListener {
protected final Address local_addr;
protected final List<String> up=new ArrayList<>(), down=new ArrayList<>();
protected boolean verbose;

protected List<String> up() {return up;}
protected List<String> down() {return down;}
protected MyRouteStatusListener(Address local_addr) {
this.local_addr=local_addr;
}

protected List<String> up() {return up;}
protected List<String> down() {return down;}
protected MyRouteStatusListener verbose(boolean b) {this.verbose=b; return this;}
protected boolean verbose() {return verbose;}

@Override public synchronized void sitesUp(String... sites) {
if(verbose)
System.out.printf("%s: UP(%s)\n", local_addr, Arrays.toString(sites));
up.addAll(Arrays.asList(sites));
}

@Override public synchronized void sitesDown(String... sites) {
if(verbose)
System.out.printf("%s: DOWN(%s)\n", local_addr, Arrays.toString(sites));
down.addAll(Arrays.asList(sites));
}

protected synchronized MyRouteStatusListener clear() {up.clear(); down.clear(); return this;}

@Override
public String toString() {
return String.format("up: %s, down: %s", up, down);
return String.format("down: %s, up: %s", down, up);
}
}

Expand Down
4 changes: 2 additions & 2 deletions tests/junit-functional/org/jgroups/tests/SizeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -603,11 +603,11 @@ public void testRelay2Header() throws Exception {
_testSize(hdr);

hdr=new Relay2Header(Relay2Header.SITES_UP, null, null)
.setSites("sfo", "lon","nyc");
.addToSites("sfo", "lon", "nyc");
_testSize(hdr);

hdr=new Relay2Header(DATA, dest, null)
.setSites("sfo")
.addToSites("sfo")
.addToVisitedSites(List.of("nyc", "sfc", "lon"));
_testSize(hdr);

Expand Down

0 comments on commit 084a498

Please sign in to comment.