Skip to content

Commit

Permalink
Merge pull request #9 from OSGP/feature/FDP-94
Browse files Browse the repository at this point in the history
FDP-94: Add monitoring and small fixes
  • Loading branch information
jasperkamerling authored Dec 7, 2023
2 parents 6515252 + dfed231 commit 3e4fcac
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 99 deletions.
1 change: 1 addition & 0 deletions application/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter-engine")
testImplementation("org.junit.jupiter:junit-jupiter-params")
testImplementation("org.mockito:mockito-junit-jupiter")
testImplementation("org.assertj:assertj-core")

// Generate test and integration test reports
jacocoAggregation(project(":application"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.time.Duration


@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ComponentScan(basePackages = ["org.gxf.soapbridge"])
@EmbeddedKafka(topics = ["requests", "responses"])
class EndToEndTest(
@LocalServerPort private val soapPort: Int,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package org.gxf.soapbridge.application.services;

import org.gxf.soapbridge.soap.clients.Connection;
import org.gxf.soapbridge.soap.exceptions.ConnectionNotFoundInCacheException;
import org.gxf.soapbridge.valueobjects.ProxyServerResponseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,48 +16,50 @@
@Service
public class ClientCommunicationService {

private static final Logger LOGGER = LoggerFactory.getLogger(ClientCommunicationService.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ClientCommunicationService.class);

/** Service used to cache incoming connections from client applications. */
private final ConnectionCacheService connectionCacheService;
/**
* Service used to cache incoming connections from client applications.
*/
private final ConnectionCacheService connectionCacheService;

/** Service used to sign and/or verify the content of queue messages. */
private final SigningService signingService;
/**
* Service used to sign and/or verify the content of queue messages.
*/
private final SigningService signingService;

public ClientCommunicationService(
final ConnectionCacheService connectionCacheService, final SigningService signingService) {
this.connectionCacheService = connectionCacheService;
this.signingService = signingService;
}
public ClientCommunicationService(
final ConnectionCacheService connectionCacheService, final SigningService signingService) {
this.connectionCacheService = connectionCacheService;
this.signingService = signingService;
}

/**
* Process an incoming queue message. The content of the message has to be verified by the {@link
* SigningService}. Then a response from GXF will set for the pending connection from a client.
*
* @param proxyServerResponseMessage The incoming queue message to process.
*/
public void handleIncomingResponse(final ProxyServerResponseMessage proxyServerResponseMessage) {
final boolean isValid =
signingService.verifyContent(
proxyServerResponseMessage.constructString(),
proxyServerResponseMessage.getSignature());

/**
* Process an incoming queue message. The content of the message has to be verified by the {@link
* SigningService}. Then a response from GXF will set for the pending connection from a client.
*
* @param proxyServerResponseMessage The incoming queue message to process.
*/
public void handleIncomingResponse(final ProxyServerResponseMessage proxyServerResponseMessage) {
final boolean isValid =
signingService.verifyContent(
proxyServerResponseMessage.constructString(),
proxyServerResponseMessage.getSignature());
final Connection connection =
connectionCacheService.findConnection(proxyServerResponseMessage.getConnectionId());

if (connection == null) {
LOGGER.error("No connection found in cache for id: {}", proxyServerResponseMessage.getConnectionId());
return;
}

try {
final Connection connection =
connectionCacheService.findConnection(proxyServerResponseMessage.getConnectionId());
if (connection != null) {
if (isValid) {
LOGGER.debug("Connection valid, set SOAP response");
connection.setSoapResponse(proxyServerResponseMessage.getSoapResponse());
LOGGER.debug("Connection valid, set SOAP response");
connection.setSoapResponse(proxyServerResponseMessage.getSoapResponse());
} else {
LOGGER.error("ProxyServerResponseMessage failed to pass security check.");
connection.setSoapResponse("Security check has failed.");
LOGGER.error("ProxyServerResponseMessage failed to pass security check.");
connection.setSoapResponse("Security check has failed.");
}
} else {
LOGGER.error("No connection found in cache for id.");
}
} catch (final ConnectionNotFoundInCacheException e) {
LOGGER.error("ConnectionNotFoundInCacheException", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package org.gxf.soapbridge.application.services;

import java.util.concurrent.ConcurrentHashMap;

import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import org.gxf.soapbridge.monitoring.MonitoringService;
import org.gxf.soapbridge.soap.clients.Connection;
import org.gxf.soapbridge.soap.exceptions.ConnectionNotFoundInCacheException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
Expand All @@ -21,6 +24,17 @@ public class ConnectionCacheService {
*/
private static final ConcurrentHashMap<String, Connection> cache = new ConcurrentHashMap<>();

private final MonitoringService monitoringService;

public ConnectionCacheService(MonitoringService monitoringService) {
this.monitoringService = monitoringService;
}

@PostConstruct
public void postConstructor() {
monitoringService.monitorCacheSize(cache);
}

/**
* Creates a connection and puts it in the cache.
*
Expand All @@ -39,19 +53,12 @@ public Connection cacheConnection() {
*
* @param connectionId The key for the {@link Connection} instance obtained by calling {@link
* ConnectionCacheService#cacheConnection()}.
* @return A {@link Connection} instance.
* @throws ConnectionNotFoundInCacheException In case the connection is not present in the {@link
* ConnectionCacheService#cache}.
* @return A {@link Connection} instance. If no connection with the id is present return null.
*/
public Connection findConnection(final String connectionId)
throws ConnectionNotFoundInCacheException {
@Nullable
public Connection findConnection(final String connectionId) {
LOGGER.debug("Trying to find connection with connectionId: {}", connectionId);
final Connection connection = cache.get(connectionId);
if (connection == null) {
throw new ConnectionNotFoundInCacheException(
String.format("Unable to find connection for connectionId: %s", connectionId));
}
return connection;
return cache.get(connectionId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
import org.gxf.soapbridge.application.services.ConnectionCacheService;
import org.gxf.soapbridge.application.services.SigningService;
import org.gxf.soapbridge.configuration.properties.SoapConfigurationProperties;
import org.gxf.soapbridge.kafka.senders.ProxyRequestKafkaSender;
import org.gxf.soapbridge.monitoring.MonitoringService;
import org.gxf.soapbridge.soap.clients.Connection;
import org.gxf.soapbridge.soap.exceptions.ConnectionNotFoundInCacheException;
import org.gxf.soapbridge.soap.exceptions.ProxyServerException;
import org.gxf.soapbridge.valueobjects.ProxyServerRequestMessage;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -57,16 +58,20 @@ public class SoapEndpoint implements HttpRequestHandler {
/** Map of time-outs for specific functions. */
private final Map<String, Integer> customTimeOutsMap;

private final MonitoringService monitoringService;

public SoapEndpoint(
final ConnectionCacheService connectionCacheService,
final SoapConfigurationProperties soapConfiguration,
final ProxyRequestKafkaSender proxyRequestsSender,
final SigningService signingService) {
final ConnectionCacheService connectionCacheService,
final SoapConfigurationProperties soapConfiguration,
final ProxyRequestKafkaSender proxyRequestsSender,
final SigningService signingService,
MonitoringService monitoringService) {
this.connectionCacheService = connectionCacheService;
this.soapConfiguration = soapConfiguration;
this.proxyRequestsSender = proxyRequestsSender;
this.signingService = signingService;
customTimeOutsMap = soapConfiguration.getCustomTimeouts();
this.monitoringService = monitoringService;
}

/** Handles incoming SOAP requests. */
Expand All @@ -75,6 +80,7 @@ public void handleRequest(
@NotNull final HttpServletRequest request, @NotNull final HttpServletResponse response)
throws ServletException, IOException {

Instant startTime = Instant.now();
// For debugging, print all headers and parameters.
LOGGER.debug("Start of SoapEndpoint.handleRequest()");
logHeaderValues(request);
Expand All @@ -89,6 +95,7 @@ public void handleRequest(
final String soapPayload = readSoapPayload(request);
if (soapPayload == null) {
LOGGER.error("Unable to read SOAP request, returning 500.");
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
return;
}
Expand All @@ -102,6 +109,7 @@ public void handleRequest(
}
if (organisationName == null) {
LOGGER.error("Unable to find client certificate, returning 500.");
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
return;
}
Expand All @@ -121,6 +129,7 @@ public void handleRequest(
requestMessage.setSignature(signature);
} catch (final ProxyServerException e) {
LOGGER.error("Unable to sign message or set security key", e);
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
connectionCacheService.removeConnection(connectionId);
return;
Expand All @@ -142,12 +151,14 @@ public void handleRequest(
final boolean responseReceived = newConnection.waitForResponseReceived(timeout);
if (!responseReceived) {
LOGGER.error("No response received within the specified timeout of {} seconds", timeout);
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
connectionCacheService.removeConnection(connectionId);
return;
}
} catch (final InterruptedException e) {
LOGGER.error("Error while waiting for response", e);
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
connectionCacheService.removeConnection(connectionId);
Thread.currentThread().interrupt();
Expand All @@ -157,10 +168,12 @@ public void handleRequest(
final String soap = readResponse(connectionId);
if (soap == null) {
LOGGER.error("Unable to read SOAP response: null");
monitoringService.recordConnectionTime(startTime, request.getContextPath(), false);
createErrorResponse(response);
} else {
LOGGER.debug("Request handled, trying to send response...");
createSuccessFulResponse(response, soap);
monitoringService.recordConnectionTime(startTime, request.getContextPath(), true);
}

LOGGER.debug(
Expand Down Expand Up @@ -229,14 +242,16 @@ private Integer shouldUseCustomTimeOut(final String soapPayload) {

private String readResponse(final String connectionId) throws ServletException {
final String soap;
try {
final Connection connection = connectionCacheService.findConnection(connectionId);
soap = connection.getSoapResponse();
connectionCacheService.removeConnection(connectionId);
} catch (final ConnectionNotFoundInCacheException e) {
LOGGER.error("Unexpected error while trying to find a cached connection", e);

final Connection connection = connectionCacheService.findConnection(connectionId);

if (connection == null) {
LOGGER.error("Unexpected error while trying to find a cached connection for id: {}", connectionId);
throw new ServletException("Unable to obtain response");
}

soap = connection.getSoapResponse();
connectionCacheService.removeConnection(connectionId);
return soap;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.gxf.soapbridge.monitoring

import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Timer
import org.springframework.stereotype.Service
import java.time.Duration
import java.time.Instant

@Service
class MonitoringService(
private val registry: MeterRegistry
) {

companion object {
private const val METRIC_PREFIX = "gxf.soap.bridge"
const val CACHE_SIZE_METRIC = "${METRIC_PREFIX}.cache.size"
const val CONNECTION_TIMER_METRIC = "${METRIC_PREFIX}.request.timer"

const val CONNECTION_TIMER_CONTEXT_TAG = "context"
const val CONNECTION_TIMER_SUCCESSFUL_TAG = "successful"

}

/**
* Creates a gauge to monitor the size of a cache.
*
* @param cache The cache to monitor, represented as a Map.
* @return A Gauge object that measures the size of the cache.
*/
fun monitorCacheSize(cache: Map<*, *>) =
Gauge
.builder(CACHE_SIZE_METRIC, cache) { it.size.toDouble() }
.register(registry)

/**
* Records the connection time for a request.
*
* The timer also counts the amount of requests handled.
*
* @param startTime The start time of the request.
* @param context The context of the request.
* @param successful Flag indicating if the request was successful.
*/
fun recordConnectionTime(startTime: Instant, context: String, successful: Boolean) {
val duration = Duration.between(startTime, Instant.now())

Timer
.builder(CONNECTION_TIMER_METRIC)
.description("The time it takes to handle an incoming soap request")
.tag(CONNECTION_TIMER_CONTEXT_TAG, context)
.tag(CONNECTION_TIMER_SUCCESSFUL_TAG, successful.toString())
.register(registry)
.record(duration)
}
}
Loading

0 comments on commit 3e4fcac

Please sign in to comment.