Skip to content

Commit

Permalink
Issue: RestComm#150 : Diameter error codes 3002 and 3004 are not work…
Browse files Browse the repository at this point in the history
…ing as expected
  • Loading branch information
BarisAkkas-kcom committed Feb 11, 2020
1 parent d7a3185 commit 0e6d72b
Show file tree
Hide file tree
Showing 6 changed files with 640 additions and 467 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,24 @@ public interface IRouter {
*/
void processRedirectAnswer(IRequest request, IAnswer answer, IPeerTable table) throws InternalException, RouteException;


/**
* Called when 3004 is received for request. This method update cc-request-number and sends the message back to stack.
* @param request
* @param table
*/
void processSecondAttempt(IRequest request, IPeerTable table) throws InternalException, RouteException;


/**
* Based on Redirect entries or any other factors, this method changes route information.
* @param message
* @return
* @throws RouteException
* @throws AvpDataException
*/


boolean updateRoute(IRequest message) throws RouteException, AvpDataException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,22 @@ private IMessage processRedirectAnswer(IMessage request, IMessage answer) {
return answer;
}

private boolean processSecondAttempt(IMessage request) {
try {
int request_number = request.getAvps().getAvp(Avp.CC_REQUEST_NUMBER).getInteger32();
logger.debug("increase the request number to : " + request_number+1);
request.getAvps().removeAvp(Avp.CC_REQUEST_NUMBER);
request.getAvps().addAvp(Avp.CC_REQUEST_NUMBER, request_number + 1);
router.processSecondAttempt(request, table);
}
catch (Throwable exc) {
// Incorrect redirect message
logger.debug("Failed to process second attempt!", exc);
return false;
}
return true;
}

@Override
public void connect() throws InternalException, IOException, IllegalDiameterStateException {
if (getState(PeerState.class) != PeerState.DOWN) {
Expand Down Expand Up @@ -1037,7 +1053,23 @@ public boolean receiveMessage(IMessage message) {

if (message != null) {
if (request.getEventListener() != null) {
request.getEventListener().receivedSuccessMessage(request, message);
try {
if (avpResCode.getUnsigned32() == ResultCode.TOO_BUSY){
logger.debug("RESPONSE IS AN 3004 SO TRYING AGAIN...");
logger.debug("Message with this sessionId will be retried : " + message.getSessionId());
if (!processSecondAttempt(request)){
request.getEventListener().receivedSuccessMessage(request, message);
}
} else {
request.getEventListener().receivedSuccessMessage(request, message);
}
} catch (AvpDataException e) {
e.printStackTrace();
logger.error("Unable to get Result Code");
if (statistic.isEnabled()) {
statistic.getRecordByName(IStatisticRecord.Counters.NetGenRejectedResponse.name()).inc();
}
}
}
else {
logger.debug("Unable to call answer listener for request {} because listener is not set", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public IPeer getPeer(IMessage message, IPeerTable manager) throws RouteException
}

// Balancing
IPeer peer = selectPeer(availablePeers);
IPeer peer = selectPeer(message, availablePeers);
if (peer == null) {
throw new RouteException("Unable to find valid connection to peer[" + destHost + "] in realm[" + destRealm + "]");
}
Expand Down Expand Up @@ -639,6 +639,21 @@ public void processRedirectAnswer(IRequest request, IAnswer answer, IPeerTable t
}
}

public void processSecondAttempt(IRequest request, IPeerTable table) throws InternalException, RouteException {
try {
table.sendMessage((IMessage) request);
}
catch (AvpDataException exc) {
throw new InternalException(exc);
}
catch (IllegalDiameterStateException e) {
throw new InternalException(e);
}
catch (IOException e) {
throw new InternalException(e);
}
}

/**
*
*/
Expand Down Expand Up @@ -787,7 +802,7 @@ public void destroy() {
requestEntryMap = null;
}

protected IPeer selectPeer(List<IPeer> availablePeers) {
protected IPeer selectPeer(IMessage message, List<IPeer> availablePeers) {
IPeer p = null;
for (IPeer c : availablePeers) {
if (p == null || c.getRating() >= p.getRating()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.jdiameter.api.Configuration;
import org.jdiameter.api.MetaData;
import org.jdiameter.api.PeerState;
import org.jdiameter.api.StatisticRecord;
import org.jdiameter.client.api.IContainer;
import org.jdiameter.client.api.IMessage;
import org.jdiameter.client.api.controller.IPeer;
import org.jdiameter.client.api.controller.IRealmTable;
import org.jdiameter.common.api.concurrent.IConcurrentFactory;
Expand Down Expand Up @@ -124,7 +126,7 @@ public WeightedLeastConnectionsRouter(IContainer container, IConcurrentFactory c
* @return the selected peer according to algorithm
*/
@Override
public IPeer selectPeer(List<IPeer> availablePeers) {
public IPeer selectPeer(IMessage message, List<IPeer> availablePeers) {
int peerSize = availablePeers != null ? availablePeers.size() : 0;

// Return none if empty, or first if only one member found
Expand Down Expand Up @@ -173,6 +175,16 @@ protected long getNumConnections(IPeer peer) {
return 0;
}

// logger.debug("peer.getUri() : " + peer.getUri());
// logger.debug("AppGenRequestPerSecond : " + getRecord(IStatisticRecord.Counters.AppGenRequestPerSecond.name()+'.'+peer.getUri(), stats));
// logger.debug("NetGenRequestPerSecond : " + getRecord(IStatisticRecord.Counters.NetGenRequestPerSecond.name()+'.'+peer.getUri(), stats));
// logger.debug("AppGenRejectedResponse : " + getRecord(IStatisticRecord.Counters.AppGenRejectedResponse.name()+'.'+peer.getUri(), stats));
// logger.debug("NetGenRejectedResponse : " + getRecord(IStatisticRecord.Counters.NetGenRejectedResponse.name()+'.'+peer.getUri(), stats));

for (StatisticRecord rec : stats.getRecords()){
logger.debug(rec.getName() + " : " + rec.getValueAsLong());
}

// Requests per second initiated by Local Peer + Request initiated by Remote peer
String uri = peer.getUri() == null ? "local" : peer.getUri().toString();
long requests = getRecord(IStatisticRecord.Counters.AppGenRequestPerSecond.name()+'.'+uri, stats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,42 @@

package org.jdiameter.client.impl.router;

import org.jdiameter.api.AvpDataException;
import org.jdiameter.api.Configuration;
import org.jdiameter.api.MetaData;
import org.jdiameter.api.PeerState;
import org.jdiameter.client.api.IContainer;
import org.jdiameter.client.api.IMessage;
import org.jdiameter.client.api.controller.IPeer;
import org.jdiameter.client.api.controller.IRealmTable;
import org.jdiameter.common.api.concurrent.IConcurrentFactory;

import org.jdiameter.server.api.IRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;

import static org.jdiameter.api.Avp.CC_REQUEST_NUMBER;

/**
* Weighted round-robin router implementation
*
* @see <a href="http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling">http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling</a>
* @author <a href="mailto:[email protected]">Nils Sowen</a>
* @see
* <a href="http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling">http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling</a>
*/
public class WeightedRoundRobinRouter extends RouterImpl implements IRouter{
public class WeightedRoundRobinRouter extends RouterImpl implements IRouter {

private static final Logger logger = LoggerFactory.getLogger(WeightedRoundRobinRouter.class);

private int lastSelectedPeer = -1;
private int currentWeight = 0;
private ConcurrentHashMap<String, HashSet<IPeer>> messageToPeer = new ConcurrentHashMap<String, HashSet<IPeer>>();
private int timeout = 30000;

protected WeightedRoundRobinRouter(IRealmTable table, Configuration config) {
super(null, null, table, config, null);
Expand Down Expand Up @@ -108,24 +122,44 @@ public WeightedRoundRobinRouter(IContainer container, IConcurrentFactory concurr
* <p>
* This method is internally synchronized due to concurrent modifications to lastSelectedPeer and currentWeight.
* Please consider this when relying on heavy throughput.
*
* <p>
* Please note: if the list of availablePeers changes between calls (e.g. if a peer becomes active or inactive),
* the balancing algorithm is disturbed and might be distributed uneven.
* This is likely to happen if peers are flapping.
*
* @param availablePeers list of peers that are in {@link PeerState#OKAY OKAY} state
* @return the selected peer according to algorithm
* @see <a href="http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling">http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling</a>
* @see
* <a href="http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling">http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling</a>
*/
@Override
public IPeer selectPeer(List<IPeer> availablePeers) {

public IPeer selectPeer(IMessage message, List<IPeer> availablePeers) {
IPeer selectedPeer = null;
int peerSize = availablePeers != null ? availablePeers.size() : 0;

logger.debug("peerSize " + peerSize);
// Return none if empty, or first if only one member found
if (peerSize <= 0) {
return null;
}

if (message.getPeer() != null) {
addRecordToMap(message, message.getPeer());
}

long request_number = 0;
try {
request_number = message.getAvps().getAvp(CC_REQUEST_NUMBER).getUnsigned32();
logger.debug("request_number later : " + request_number);
} catch (AvpDataException e) {
e.printStackTrace();
}

if (request_number == peerSize) {
logger.debug("No peers left for this message : " + message.getSessionId() + ", returning...");
return null;
}

if (peerSize == 1) {
return availablePeers.iterator().next();
}
Expand All @@ -136,11 +170,17 @@ public IPeer selectPeer(List<IPeer> availablePeers) {
for (IPeer peer : availablePeers) {
maxWeight = Math.max(maxWeight, peer.getRating());
gcd = (gcd == null) ? peer.getRating() : gcd(gcd, peer.getRating());
logger.debug("peer.getRating() " + peer.getRating());
}

// logger.debug("maxWeight " + maxWeight);
// logger.debug("gcd " + gcd);
// logger.debug("lastSelectedPeer " + lastSelectedPeer);
// logger.debug("currentWeight " + currentWeight);

// Find best matching candidate. Synchronized here due to consistent changes on member variables
synchronized (this) {
for ( ;; ) {
for (; ; ) {
lastSelectedPeer = (lastSelectedPeer + 1) % peerSize;
if (lastSelectedPeer == 0) {
currentWeight = currentWeight - gcd;
Expand All @@ -154,12 +194,75 @@ public IPeer selectPeer(List<IPeer> availablePeers) {
}
IPeer candidate = availablePeers.get(lastSelectedPeer);
if (candidate.getRating() >= currentWeight) {
return availablePeers.get(lastSelectedPeer);
selectedPeer = availablePeers.get(lastSelectedPeer);
if (message.getPeer() != null) {
logger.debug("Moving selected peer to next peer as it looks like to be the 2nd attempt of this message and last peer of this message");
lastSelectedPeer = selectPeerForSecondAttempt(lastSelectedPeer, availablePeers, message);
if (lastSelectedPeer < 0) {
logger.debug("No unattempted peers left for this message : " + message.getSessionId() + ", returning...");
return null;
} else {
selectedPeer = availablePeers.get(lastSelectedPeer);
}
}
return selectedPeer;
}
}
}
}

private int selectPeerForSecondAttempt(int selectedPeerIndex, List<IPeer> availablePeers, IMessage message) {

logger.debug("Checking peer history of message : " + message.getSessionId());

if (!messageToPeer.containsKey(message.getSessionId())) {
logger.debug("messageToPeer does not contain selected peer so return it");
return selectedPeerIndex;
}
for (int i = 0; i < availablePeers.size(); i++) {
IPeer candidate = availablePeers.get(selectedPeerIndex);
if (messageToPeer.get(message.getSessionId()).contains(candidate)) {
logger.debug("this peer has been tried before : " + candidate.getUri() + ", skipping to next peer");
selectedPeerIndex = selectedPeerIndex < availablePeers.size() - 1 ? selectedPeerIndex + 1 : 0;
continue;
} else {
logger.debug("This peer : " + candidate.getUri() + " hasn't been tried for this message : " + message.getSessionId());
return selectedPeerIndex;
}
}
logger.debug("All peers are tried, returning -1");
messageToPeer.remove(message.getSessionId());
return -1;
}

private synchronized void addRecordToMap(final IMessage message, IPeer peer) {
if (messageToPeer.containsKey(message.getSessionId())) {
messageToPeer.get(message.getSessionId()).add(peer);
} else {
HashSet<IPeer> peerSet = new HashSet<IPeer>();
peerSet.add(peer);
messageToPeer.put(message.getSessionId(), peerSet);

new Timer().schedule(new TimerTask() {
@Override
public void run() {
actionAfterTimeout(message.getSessionId());
}
}, timeout);
}
}

void actionAfterTimeout(String key) {
logger.debug("messageToPeer.size() before : " + messageToPeer.size());
HashSet<IPeer> peerSet = messageToPeer.remove(key);
if (peerSet != null) {
logger.debug("peerSet with size : " + peerSet.size() + " has been removed for message : " + key);
} else {
logger.debug("Nothing removed! : " + key);
}
logger.debug("messageToPeer.size() after : " + messageToPeer.size());
}

/**
* Return greatest common divisor for two integers
* https://en.wikipedia.org/wiki/Greatest_common_divisor#Using_Euclid.27s_algorithm
Expand Down
Loading

0 comments on commit 0e6d72b

Please sign in to comment.