Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Restcomm#150 : Resubmissions of requests to alternate peers now possible until all peers exhausted, while respecting Weighted Round Robin balancing algorithm #156

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
.classpath
.project
.settings
.checkstyle

# IntelliJ IDEA #
#################
Expand All @@ -29,3 +30,7 @@ target
Icon?
ehthumbs.db
Thumbs.db

# Testsuite generated *_sctp.xml files #
########################################
testsuite/tests/*_sctp.xml
28 changes: 15 additions & 13 deletions .project
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>diameter-parent</name>
<comment/>
<projects/>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments/>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
<name>diameter-parent</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,39 @@ public interface IRouter {
*/
void processRedirectAnswer(IRequest request, IAnswer answer, IPeerTable table) throws InternalException, RouteException;


/**
* Indicates whether this router implementation is able to resubmit requests to an alternative peer,
* for which a Busy or Unable to Deliver Answer has already been received from one peer.<br /><br />
*
* <strong>Note: </strong> Returning <code>true</code> from this method when the router implementation has not been designed to
* handle resubmitting such requests can result in a request being resubmitted perpetually.
*
* @return <code>false</code> by default. <code>true</code> when and only when the router implementation has specific logic to handle
* submitting requests which have received a Busy or Unable to Deliver Answer from one peer, to an alternative peer, and to avoid
* perpetual re-submission of such requests.
*/
boolean canProcessBusyOrUnableToDeliverAnswer();


/**
* Called when a 3002 or 3004 is received for a request. This method attempts to resubmit the request to an alternative peer.
*
* @param request
* @param table
*/
void processBusyOrUnableToDeliverAnswer(IRequest request, IPeerTable table) throws InternalException, RouteException;


/**
* Based on Redirect entries or any other factors, this method changes route information.
* @param message
* @return
* @throws RouteException
* @throws AvpDataException
*/


boolean updateRoute(IRequest message) throws RouteException, AvpDataException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,17 @@ private boolean isRedirectAnswer(Avp avpResCode, IMessage answer) {
}
}

private boolean isBusyOrUnableToDeliverAnswer(Avp avpResCode, IMessage answer) {
try {
// E-bit set indicating a protocol error, and Result Code one of 3002 or 3004
return (answer.getFlags() & 0x20) != 0 && avpResCode != null
&& (avpResCode.getInteger32() == ResultCode.TOO_BUSY || avpResCode.getInteger32() == ResultCode.UNABLE_TO_DELIVER);
}
catch (AvpDataException e) {
return false;
}
}

@Override
public IStatistic getStatistic() {
return statistic;
Expand Down Expand Up @@ -480,6 +491,24 @@ private IMessage processRedirectAnswer(IMessage request, IMessage answer) {
return answer;
}

private IMessage processBusyOrUnableToDeliverAnswer(IMessage request, IMessage answer) {
if (router.canProcessBusyOrUnableToDeliverAnswer()) {
try {
logger.debug("Message with [sessionId={}] received a Busy or Unable to Deliver Answer and will be resubmitted.", request.getSessionId());
router.processBusyOrUnableToDeliverAnswer(request, table);
return null;
}
catch (Throwable exc) {
// Any error when attempting a resubmit to an alternative peer simply results in the original
// Busy or Unable to Deliver Answer being returned
if (logger.isErrorEnabled()) {
logger.error("Failed to reprocess busy or unable to deliver response - all peers exhausted?", exc);
}
}
}
return answer;
}

@Override
public void connect() throws InternalException, IOException, IllegalDiameterStateException {
if (getState(PeerState.class) != PeerState.DOWN) {
Expand Down Expand Up @@ -1034,6 +1063,12 @@ public boolean receiveMessage(IMessage message) {
//if return value is not null, there was some error, lets try to invoke listener if it exists...
isProcessed = message == null;
}
avpResCode = message.getAvps().getAvp(RESULT_CODE);
if (isBusyOrUnableToDeliverAnswer(avpResCode, message)) {
message = processBusyOrUnableToDeliverAnswer(request, message);
// if return value is not null, there was some error, lets try to invoke listener if it exists...
isProcessed = message == null;
}

if (message != null) {
if (request.getEventListener() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public IPeer getPeer(IMessage message, IPeerTable manager) throws RouteException
}

// Balancing
IPeer peer = selectPeer(availablePeers);
IPeer peer = selectPeer(message, availablePeers);
if (peer == null) {
throw new RouteException("Unable to find valid connection to peer[" + destHost + "] in realm[" + destRealm + "]");
}
Expand Down Expand Up @@ -639,6 +639,33 @@ public void processRedirectAnswer(IRequest request, IAnswer answer, IPeerTable t
}
}

/**
* This method should always return false unless specifically designed to handle
* submitting requests which have received a Busy or Unable to Deliver Answer from one peer, to an alternative peer, and to avoid
* perpetual re-submission of such requests.
*
* @return <code>false</code>
*/
@Override
public boolean canProcessBusyOrUnableToDeliverAnswer() {
return false;
}

public void processBusyOrUnableToDeliverAnswer(IRequest request, IPeerTable table) throws InternalException, RouteException {
try {
table.sendMessage((IMessage) request);
}
catch (AvpDataException exc) {
throw new InternalException(exc);
}
catch (IllegalDiameterStateException e) {
throw new InternalException(e);
}
catch (IOException e) {
throw new InternalException(e);
}
}

/**
*
*/
Expand Down Expand Up @@ -797,6 +824,10 @@ protected IPeer selectPeer(List<IPeer> availablePeers) {
return p;
}

protected IPeer selectPeer(IMessage message, List<IPeer> availablePeers) {
return selectPeer(availablePeers);
}

// protected void redirectProcessing(IMessage message, final String destRealm, final String destHost) throws AvpDataException {
// String userName = null;
// // get Session id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@

package org.jdiameter.client.impl.router;

import java.util.Arrays;
import java.util.List;

import org.jdiameter.api.Configuration;
import org.jdiameter.api.MetaData;
import org.jdiameter.api.PeerState;
import org.jdiameter.client.api.IContainer;
import org.jdiameter.client.api.IMessage;
import org.jdiameter.client.api.controller.IPeer;
import org.jdiameter.client.api.controller.IRealmTable;
import org.jdiameter.common.api.concurrent.IConcurrentFactory;
import org.jdiameter.common.api.statistic.IStatistic;
import org.jdiameter.common.api.statistic.IStatisticRecord;
import org.jdiameter.server.api.IRouter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import org.jdiameter.server.api.IRouter;

/**
* Weighted Least-Connections router implementation<br/><br/>
*
Expand Down Expand Up @@ -102,13 +103,13 @@ public WeightedLeastConnectionsRouter(IContainer container, IConcurrentFactory c
* <pre>
* {@code
* for (m = 0; m < n; m++) {
* if (W(Sm) > 0) {
* for (i = m+1; i < n; i++) {
* if (C(Sm)*W(Si) > C(Si)*W(Sm))
* m = i;
* if (W(Sm) > 0) {
* for (i = m+1; i < n; i++) {
* if (C(Sm)*W(Si) > C(Si)*W(Sm))
* m = i;
* }
* return Sm;
* }
* return Sm;
* }
* }
* return NULL;
* }
Expand All @@ -125,6 +126,19 @@ public WeightedLeastConnectionsRouter(IContainer container, IConcurrentFactory c
*/
@Override
public IPeer selectPeer(List<IPeer> availablePeers) {
return selectPeer(null, availablePeers);
}

/**
* Return peer with least connections
*
* @param message the message to be sent
* @param availablePeers list of peers that are in {@link PeerState#OKAY OKAY} state
* @return the selected peer according to algorithm
*
*/
@Override
public IPeer selectPeer(IMessage message, List<IPeer> availablePeers) {
int peerSize = availablePeers != null ? availablePeers.size() : 0;

// Return none if empty, or first if only one member found
Expand Down
Loading