Skip to content

Commit e05da93

Browse files
committed
Incorporate review suggestions
Signed-off-by: Tomas Longo <[email protected]>
1 parent 965b93d commit e05da93

File tree

5 files changed

+69
-55
lines changed

5 files changed

+69
-55
lines changed

data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer,
266266
if (serverConfiguration.getMaxRequestLength() != null) {
267267
sb.maxRequestLength(serverConfiguration.getMaxRequestLength().getBytes());
268268
}
269+
269270
final int threads = serverConfiguration.getThreadCount();
270271
final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
271272
sb.blockingTaskExecutor(blockingTaskExecutor, true);
@@ -309,4 +310,4 @@ private List<ServerInterceptor> getAuthenticationInterceptor(
309310
}
310311
return Collections.singletonList(authenticationInterceptor);
311312
}
312-
}
313+
}

data-prepper-plugins/otel-logs-source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dependencies {
1313
implementation project(':data-prepper-plugins:blocking-buffer')
1414
implementation project(':data-prepper-plugins:otel-proto-common')
1515
implementation project(':data-prepper-plugins:http-common')
16+
implementation project(':data-prepper-plugins:http-source-common' )
1617
implementation libs.commons.codec
1718
implementation project(':data-prepper-plugins:armeria-common')
1819
testImplementation project(':data-prepper-api').sourceSets.test.output

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
import com.linecorp.armeria.server.grpc.GrpcService;
1616
import com.linecorp.armeria.server.grpc.GrpcServiceBuilder;
1717
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
18+
import com.linecorp.armeria.server.throttling.ThrottlingService;
1819

1920
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
2021
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
2122
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
23+
import org.opensearch.dataprepper.http.LogThrottlingRejectHandler;
24+
import org.opensearch.dataprepper.http.LogThrottlingStrategy;
2225
import org.opensearch.dataprepper.metrics.PluginMetrics;
2326
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
2427
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
@@ -54,6 +57,7 @@
5457
import java.util.List;
5558
import java.util.Map;
5659
import java.util.Optional;
60+
import java.util.concurrent.BlockingQueue;
5761
import java.util.concurrent.ExecutionException;
5862
import java.util.concurrent.ScheduledThreadPoolExecutor;
5963

@@ -66,6 +70,8 @@
6670
public class OTelLogsSource implements Source<Record<Object>> {
6771
private static final Logger LOG = LoggerFactory.getLogger(OTelLogsSource.class);
6872
static final String SERVER_CONNECTIONS = "serverConnections";
73+
static final String HEALTH_CHECK_PATH = "/health";
74+
static final int MAX_PENDING_REQUESTS = 1024;
6975

7076
private final OTelLogsSourceConfig oTelLogsSourceConfig;
7177
private final String pipelineName;
@@ -108,15 +114,15 @@ public void start(Buffer<Record<Object>> buffer) {
108114
final SessionProtocol protocol = oTelLogsSourceConfig.isSsl() ? SessionProtocol.HTTPS : SessionProtocol.HTTP;
109115
final ServerBuilder serverBuilder = Server.builder().port(oTelLogsSourceConfig.getPort(), protocol);
110116
if (server == null) {
111-
configureServer(serverBuilder);
117+
server = createServer(serverBuilder, buffer);
112118

113-
final String transformedGrpcPath = oTelLogsSourceConfig.getPath().replace("${pipelineName}", pipelineName);
114-
configureGrpcService(serverBuilder, buffer, transformedGrpcPath);
119+
// final String transformedGrpcPath = oTelLogsSourceConfig.getPath().replace("${pipelineName}", pipelineName);
120+
// configureGrpcService(serverBuilder, buffer, transformedGrpcPath);
121+
//
122+
// final String transformedHttpPath = oTelLogsSourceConfig.getHttpPath().replace("${pipelineName}", pipelineName);
123+
// configureHttpService(serverBuilder, buffer, transformedHttpPath);
115124

116-
final String transformedHttpPath = oTelLogsSourceConfig.getHttpPath().replace("${pipelineName}", pipelineName);
117-
configureHttpService(serverBuilder, buffer, transformedHttpPath);
118-
119-
server = serverBuilder.build();
125+
// server = serverBuilder.build();
120126
pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
121127
}
122128
try {
@@ -134,7 +140,7 @@ public void start(Buffer<Record<Object>> buffer) {
134140
LOG.info("Started otel_logs_source...");
135141
}
136142

137-
private void configureServer(ServerBuilder serverBuilder) {
143+
private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer) {
138144
serverBuilder.disableServerHeader();
139145
if (oTelLogsSourceConfig.isSsl()) {
140146
LOG.info("Creating http source with SSL/TLS enabled.");
@@ -163,18 +169,32 @@ private void configureServer(ServerBuilder serverBuilder) {
163169
serverBuilder.maxRequestLength(oTelLogsSourceConfig.getMaxRequestLength().getBytes());
164170
}
165171
final int threadCount = oTelLogsSourceConfig.getThreadCount();
166-
serverBuilder.blockingTaskExecutor(new ScheduledThreadPoolExecutor(threadCount), true);
172+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threadCount);
173+
serverBuilder.blockingTaskExecutor(executor, true);
167174

168175
if (oTelLogsSourceConfig.hasHealthCheck()) {
169176
LOG.info("HTTP source health check is enabled");
170-
serverBuilder.service("/health", HealthCheckService.builder().longPolling(0).build());
177+
serverBuilder.service(HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build());
171178
}
179+
180+
configureGrpcService(serverBuilder, buffer);
181+
configureHttpService(serverBuilder, buffer, executor.getQueue());
182+
183+
return serverBuilder.build();
172184
}
173185

174-
private void configureHttpService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer, String path) {
186+
private void configureHttpService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer, BlockingQueue<Runnable> blockingQueue) {
187+
final String path = oTelLogsSourceConfig.getHttpPath().replace("${pipelineName}", pipelineName);
188+
175189
final ArmeriaHttpService armeriaHttpService = new ArmeriaHttpService(buffer, pluginMetrics, 100);
176190
final RetryInfoConfig retryInfo = oTelLogsSourceConfig.getRetryInfo();
177191
final HttpExceptionHandler httpExceptionHandler = new HttpExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay());
192+
193+
final int maxPendingRequests = MAX_PENDING_REQUESTS;
194+
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(maxPendingRequests, blockingQueue);
195+
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);
196+
serverBuilder.decorator(path, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
197+
178198
if (CompressionOption.NONE.equals(oTelLogsSourceConfig.getCompression())) {
179199
serverBuilder.annotatedService(path, armeriaHttpService, httpExceptionHandler);
180200
} else {
@@ -186,7 +206,8 @@ private void configureHttpService(ServerBuilder serverBuilder, Buffer<Record<Obj
186206
}
187207
}
188208

189-
private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer, String path) {
209+
private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer) {
210+
190211
final GrpcServiceBuilder grpcServiceBuilder = GrpcService
191212
.builder()
192213
.useClientTimeoutHeader(false)
@@ -210,6 +231,7 @@ private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Obj
210231
grpcServiceBuilder.enableUnframedRequests(true);
211232
}
212233

234+
final String path = oTelLogsSourceConfig.getPath().replace("${pipelineName}", pipelineName);
213235
final CreateServer.GRPCServiceConfig<?, ?> grpcServiceConfig = new CreateServer.GRPCServiceConfig<>(oTelLogsGrpcService);
214236
grpcServiceBuilder.addService(
215237
path,

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/http/ArmeriaHttpService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import org.opensearch.dataprepper.exceptions.BadRequestException;
88
import org.opensearch.dataprepper.exceptions.BufferWriteException;
9+
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
910
import org.opensearch.dataprepper.metrics.PluginMetrics;
1011
import org.opensearch.dataprepper.model.buffer.Buffer;
1112
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
@@ -72,15 +73,15 @@ private void processRequest(final ExportLogsServiceRequest request) {
7273
try {
7374
logs = oTelProtoDecoder.parseExportLogsServiceRequest(request, Instant.now());
7475
} catch (Exception e) {
75-
LOG.error("Failed to parse the request {} due to:", request, e);
76+
LOG.warn(DataPrepperMarkers.SENSITIVE, "Failed to parse the request with error {}. Request body: {}", e, request);
7677
throw new BadRequestException(e.getMessage(), e);
7778
}
7879

79-
final List<Record<Object>> records = logs.stream().map(log -> new Record<Object>(log)).collect(Collectors.toList());
8080
try {
8181
if (buffer.isByteBuffer()) {
8282
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
8383
} else {
84+
final List<Record<Object>> records = logs.stream().map(log -> new Record<Object>(log)).collect(Collectors.toList());
8485
buffer.writeAll(records, bufferWriteTimeoutInMillis);
8586
}
8687
} catch (Exception e) {

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceHttpTest.java

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77

88
import static com.linecorp.armeria.common.HttpStatus.INSUFFICIENT_STORAGE;
99
import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
10+
import static com.linecorp.armeria.common.HttpStatus.REQUEST_ENTITY_TOO_LARGE;
1011
import static org.hamcrest.MatcherAssert.assertThat;
1112
import static org.hamcrest.Matchers.equalTo;
1213
import static org.hamcrest.Matchers.hasItem;
1314
import static org.hamcrest.Matchers.is;
1415
import static org.hamcrest.Matchers.not;
1516
import static org.hamcrest.Matchers.nullValue;
1617
import static org.junit.jupiter.api.Assertions.assertEquals;
18+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
1719
import static org.junit.jupiter.api.Assertions.assertThrows;
1820
import static org.junit.jupiter.api.Assertions.assertTrue;
1921
import static org.junit.jupiter.params.provider.Arguments.arguments;
@@ -161,17 +163,20 @@ public void beforeEach() {
161163
lenient().when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))).thenReturn(authenticationProvider);
162164
pipelineDescription = mock(PipelineDescription.class);
163165
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
164-
SOURCE = new OTelLogsSource(createDefaultConfig(), pluginMetrics, pluginFactory, pipelineDescription);
165166
}
166167

167168
@AfterEach
168169
public void afterEach() {
169170
SOURCE.stop();
170171
}
171172

172-
private void configureObjectUnderTest() {
173-
SOURCE = new OTelLogsSource(createDefaultConfig(), pluginMetrics, pluginFactory, pipelineDescription);
174-
assertTrue(SOURCE.getDecoder() instanceof OTelLogsDecoder);
173+
private void configureSource() {
174+
configureSource(createDefaultConfig());
175+
}
176+
177+
private void configureSource(OTelLogsSourceConfig config) {
178+
SOURCE = new OTelLogsSource(config, pluginMetrics, pluginFactory, pipelineDescription);
179+
assertInstanceOf(OTelLogsDecoder.class, SOURCE.getDecoder());
175180
}
176181

177182
private RequestHeadersBuilder getDefaultRequestHeadersBuilder() {
@@ -186,10 +191,9 @@ private RequestHeadersBuilder getDefaultRequestHeadersBuilder() {
186191
@ParameterizedTest
187192
@MethodSource("getPathParams")
188193
void httpRequest_writesToBuffer_returnsSuccessfulResponse(String givenPath, String resolvedRequestPath) throws Exception {
189-
OTelLogsSource source = new OTelLogsSource(createDefaultConfigBuilder()
190-
.httpPath(givenPath)
191-
.build(), pluginMetrics, pluginFactory, pipelineDescription);
192-
source.start(buffer);
194+
OTelLogsSourceConfig config = createDefaultConfigBuilder().httpPath(givenPath).build();
195+
configureSource(config);
196+
SOURCE.start(buffer);
193197
ExportLogsServiceRequest request = createExportLogsRequest();
194198

195199
WebClient.of().execute(
@@ -201,13 +205,12 @@ void httpRequest_writesToBuffer_returnsSuccessfulResponse(String givenPath, Stri
201205
.join();
202206

203207
verify(buffer).writeAll(any(), anyInt());
204-
source.stop();
205208
}
206209

207210
@Test
208211
void httpsRequest_requestIsProcessed_writesToBufferAndReturnsSuccessfulResponse() throws Exception {
209-
OTelLogsSource source = new OTelLogsSource(createBuilderForConfigWithSsl().build(), pluginMetrics, pluginFactory, pipelineDescription);
210-
source.start(buffer);
212+
configureSource(createBuilderForConfigWithSsl().build());
213+
SOURCE.start(buffer);
211214
ExportLogsServiceRequest request = createExportLogsRequest();
212215

213216
WebClient.builder()
@@ -221,7 +224,6 @@ void httpsRequest_requestIsProcessed_writesToBufferAndReturnsSuccessfulResponse(
221224
.join();
222225

223226
verify(buffer).writeAll(any(), anyInt());
224-
source.stop();
225227
}
226228

227229
private static Stream<Arguments> getPathParams() {
@@ -233,6 +235,7 @@ private static Stream<Arguments> getPathParams() {
233235

234236
@Test
235237
void httpRequest_oneConnectionIsEstablished_metricsReflectCorrectConnectionCount() throws InvalidProtocolBufferException {
238+
configureSource();
236239
SOURCE.start(buffer);
237240

238241
WebClient.of().execute(getDefaultRequestHeadersBuilder().build(), createJsonHttpPayload())
@@ -249,12 +252,8 @@ void httpRequest_oneConnectionIsEstablished_metricsReflectCorrectConnectionCount
249252

250253
@Test
251254
void httpRequest_payloadIsCompressed_returns200() throws IOException {
252-
OTelLogsSource source = new OTelLogsSource(
253-
createDefaultConfigBuilder().compression(CompressionOption.GZIP).build(),
254-
pluginMetrics,
255-
pluginFactory,
256-
pipelineDescription);
257-
source.start(buffer);
255+
configureSource( createDefaultConfigBuilder().compression(CompressionOption.GZIP).build());
256+
SOURCE.start(buffer);
258257

259258
WebClient.of().execute(getDefaultRequestHeadersBuilder()
260259
.add(HttpHeaderNames.CONTENT_ENCODING, "gzip")
@@ -263,9 +262,6 @@ void httpRequest_payloadIsCompressed_returns200() throws IOException {
263262
.aggregate()
264263
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.OK, throwable))
265264
.join();
266-
267-
268-
source.stop();
269265
}
270266

271267
@ParameterizedTest
@@ -274,8 +270,8 @@ void httpRequest_withBasicAuth_returnsAppropriateResponse(String givenUsername,
274270
final HttpBasicAuthenticationConfig basicAuthConfig = new HttpBasicAuthenticationConfig(BASIC_AUTH_USERNAME, BASIC_AUTH_PASSWORD);
275271
final HttpBasicArmeriaHttpAuthenticationProvider authProvider = new HttpBasicArmeriaHttpAuthenticationProvider(basicAuthConfig);
276272
when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class))).thenReturn(authProvider);
277-
final OTelLogsSource source = new OTelLogsSource(createConfigBuilderWithBasicAuth().build(), pluginMetrics, pluginFactory, pipelineDescription);
278-
source.start(buffer);
273+
configureSource(createConfigBuilderWithBasicAuth().build());
274+
SOURCE.start(buffer);
279275

280276
final String encodedCredentials = Base64.getEncoder().encodeToString(String.format("%s:%s", givenUsername, givenPassword).getBytes(StandardCharsets.UTF_8));
281277
WebClient.of().execute(getDefaultRequestHeadersBuilder()
@@ -287,7 +283,6 @@ void httpRequest_withBasicAuth_returnsAppropriateResponse(String givenUsername,
287283
.join();
288284

289285
verify(buffer, expectedBufferWrites).writeAll(any(), anyInt());
290-
source.stop();
291286
}
292287

293288
private static Stream<Arguments> getBasicAuthTestData() {
@@ -300,13 +295,8 @@ private static Stream<Arguments> getBasicAuthTestData() {
300295
@ParameterizedTest
301296
@MethodSource("getHealthCheckParams")
302297
void healthCheckRequest_requestIsProcesses_returnsStatusCodeAccordingToConfig(boolean givenHealthCheckConfig, HttpStatus expectedStatus) throws IOException {
303-
final OTelLogsSource source = new OTelLogsSource(
304-
createDefaultConfigBuilder().healthCheck(givenHealthCheckConfig).build(),
305-
pluginMetrics,
306-
pluginFactory,
307-
pipelineDescription
308-
);
309-
source.start(buffer);
298+
configureSource(createDefaultConfigBuilder().healthCheck(givenHealthCheckConfig).build());
299+
SOURCE.start(buffer);
310300

311301
WebClient.of().execute(getDefaultRequestHeadersBuilder()
312302
.path("/health")
@@ -316,8 +306,6 @@ void healthCheckRequest_requestIsProcesses_returnsStatusCodeAccordingToConfig(bo
316306
.aggregate()
317307
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, expectedStatus, throwable))
318308
.join();
319-
320-
source.stop();
321309
}
322310

323311
private static Stream<Arguments> getHealthCheckParams() {
@@ -329,6 +317,7 @@ private static Stream<Arguments> getHealthCheckParams() {
329317

330318
@Test
331319
void testStartWithEmptyBuffer() {
320+
configureSource();
332321
assertThrows(IllegalStateException.class, () -> SOURCE.start(null));
333322
}
334323

@@ -337,7 +326,7 @@ void testStartWithEmptyBuffer() {
337326
void httpRequest_writingToBufferThrowsAnException_correctHttpStatusIsReturned(
338327
final Class<Exception> bufferExceptionClass,
339328
final HttpStatus expectedStatus) throws Exception {
340-
configureObjectUnderTest();
329+
configureSource();
341330
SOURCE.start(buffer);
342331
doThrow(bufferExceptionClass)
343332
.when(buffer)
@@ -353,17 +342,17 @@ void httpRequest_writingToBufferThrowsAnException_correctHttpStatusIsReturned(
353342
}
354343

355344
@Test
356-
void httpRequest_requestBodyIsTooLarge_returns507() throws InvalidProtocolBufferException {
357-
OTelLogsSource source = new OTelLogsSource(createDefaultConfigBuilder().maxRequestLength(ByteCount.ofBytes(4)).build(), pluginMetrics, pluginFactory, pipelineDescription);
358-
source.start(buffer);
345+
void httpRequest_requestBodyIsTooLarge_returns413() throws InvalidProtocolBufferException {
346+
configureSource(createDefaultConfigBuilder().maxRequestLength(ByteCount.ofBytes(4)).build());
347+
SOURCE.start(buffer);
359348

360349
WebClient.of()
361350
.execute(getDefaultRequestHeadersBuilder().build(), createJsonHttpPayload())
362351
.aggregate()
363-
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, INSUFFICIENT_STORAGE, throwable))
352+
.whenComplete((response, throwable) -> {
353+
assertSecureResponseWithStatusCode(response, REQUEST_ENTITY_TOO_LARGE, throwable);
354+
})
364355
.join();
365-
366-
source.stop();
367356
}
368357

369358
static class BufferExceptionToStatusArgumentsProvider implements ArgumentsProvider {

0 commit comments

Comments
 (0)