diff --git a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/GetLoadProducerConsumer.java b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/GetLoadProducerConsumer.java index bd67928ce8..bd33b37e06 100644 --- a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/GetLoadProducerConsumer.java +++ b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/GetLoadProducerConsumer.java @@ -1,3 +1,16 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ package com.github.ambry.tools.perf.serverperf; import com.github.ambry.clustermap.ClusterMap; @@ -34,9 +47,10 @@ public class GetLoadProducerConsumer implements LoadProducerConsumer { private final ServerPerfNetworkQueue networkQueue; private final ServerPerformanceConfig config; private final ClusterMap clusterMap; + private final DataNodeId dataNodeId; - private final AtomicInteger correlationId = new AtomicInteger(); - private static final String CLIENT_ID = "ServerReadPerformance"; + private final AtomicInteger correlationId; + private static final String CLIENT_ID = "ServerGETPerformance"; private static final Logger logger = LoggerFactory.getLogger(GetLoadProducerConsumer.class); public GetLoadProducerConsumer(ServerPerfNetworkQueue networkQueue, ServerPerformanceConfig config, @@ -44,11 +58,13 @@ public GetLoadProducerConsumer(ServerPerfNetworkQueue networkQueue, ServerPerfor this.networkQueue = networkQueue; this.config = config; this.clusterMap = clusterMap; + dataNodeId = clusterMap.getDataNodeId(config.serverPerformanceHostname, config.serverPerformancePort); + correlationId = new AtomicInteger(); } @Override public void produce() throws Exception { - final BufferedReader br = new BufferedReader(new FileReader(config.serverPerformanceBlobIdFilePath)); + final BufferedReader br = new BufferedReader(new FileReader(config.serverPerformanceGetTestBlobIdFilePath)); String line; boolean isShutDown = false; @@ -60,7 +76,6 @@ public void produce() throws Exception { new PartitionRequestInfo(blobId.getPartition(), Collections.singletonList(blobId)); GetRequest getRequest = new GetRequest(correlationId.incrementAndGet(), CLIENT_ID, MessageFormatFlags.Blob, Collections.singletonList(partitionRequestInfo), GetOption.Include_All); - DataNodeId dataNodeId = clusterMap.getDataNodeId(config.serverPerformanceHostname, config.serverPerformancePort); ReplicaId replicaId = getReplicaFromNode(dataNodeId, getRequest.getPartitionInfoList().get(0).getPartition(), clusterMap); String hostname = dataNodeId.getHostname(); @@ -108,7 +123,6 @@ public void consume() throws Exception { } catch (Exception e) { logger.error("error in load consumer thread", e); } - logger.info("Load consumer thread is finished"); } void processGetResponse(ResponseInfo responseInfo) { diff --git a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/LoadProducerConsumer.java b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/LoadProducerConsumer.java index 67daaab2e8..dfd67902aa 100644 --- a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/LoadProducerConsumer.java +++ b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/LoadProducerConsumer.java @@ -1,3 +1,16 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ package com.github.ambry.tools.perf.serverperf; public interface LoadProducerConsumer { diff --git a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/PutLoadProducerConsumer.java b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/PutLoadProducerConsumer.java index d26cc34dbf..896e36d9b1 100644 --- a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/PutLoadProducerConsumer.java +++ b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/PutLoadProducerConsumer.java @@ -1,40 +1,152 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ package com.github.ambry.tools.perf.serverperf; +import com.github.ambry.account.Account; +import com.github.ambry.account.Container; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.DiskId; import com.github.ambry.clustermap.ReplicaId; +import com.github.ambry.commons.BlobId; +import com.github.ambry.config.RouterConfig; +import com.github.ambry.messageformat.BlobProperties; +import com.github.ambry.messageformat.BlobType; +import com.github.ambry.network.Port; +import com.github.ambry.network.RequestInfo; +import com.github.ambry.protocol.PutRequest; +import com.github.ambry.protocol.PutResponse; import com.github.ambry.tools.perf.serverperf.ServerPerformance.ServerPerformanceConfig; +import com.github.ambry.utils.NettyByteBufDataInputStream; +import io.netty.buffer.Unpooled; +import java.io.DataInputStream; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PutLoadProducerConsumer implements LoadProducerConsumer { private final ServerPerfNetworkQueue networkQueue; private final ServerPerformanceConfig config; + private final RouterConfig routerConfig; private final ClusterMap clusterMap; + private final DataNodeId dataNodeId; + + private final List replicaIdsSelected; + private final AtomicInteger correlationId; + + private int totalDataSentBytes; + + private static final String CLIENT_ID = "ServerPUTPerformance"; + + private static final Logger logger = LoggerFactory.getLogger(PutLoadProducerConsumer.class); public PutLoadProducerConsumer(ServerPerfNetworkQueue networkQueue, ServerPerformanceConfig config, - ClusterMap clusterMap) { + ClusterMap clusterMap, RouterConfig routerConfig) { this.networkQueue = networkQueue; this.config = config; this.clusterMap = clusterMap; + this.routerConfig = routerConfig; + replicaIdsSelected = new ArrayList<>(); + dataNodeId = clusterMap.getDataNodeId(config.serverPerformanceHostname, config.serverPerformancePort); + selectReplica(); + correlationId = new AtomicInteger(); + totalDataSentBytes = 0; } - @Override - public void produce() throws Exception { - DataNodeId dataNodeId = clusterMap.getDataNodeId(config.serverPerformanceHostname, config.serverPerformancePort); - List replicaIds = clusterMap.getReplicaIds(dataNodeId); + void selectReplica() { + Random random = new Random(); + List allReplicaIds = clusterMap.getReplicaIds(dataNodeId); - replicaIds.forEach(replicaId -> { + Map> diskIdToReplicaIds = new HashMap<>(); + allReplicaIds.forEach(replicaId -> { if (!replicaId.isUnsealed()) { return; } + diskIdToReplicaIds.putIfAbsent(replicaId.getDiskId(), new ArrayList<>()); + diskIdToReplicaIds.get(replicaId.getDiskId()).add(replicaId); + }); - // PutRequest putRequest = new PutRequest(1, BlobId.BlobIdType.NATIVE, clusterMap.getLocalDatacenterId(), ) + diskIdToReplicaIds.values().forEach(replicaIds -> { + replicaIdsSelected.add(replicaIds.get(random.nextInt(replicaIds.size()))); }); } @Override - public void consume() throws Exception { + public void produce() throws Exception { + while (true) { + for (ReplicaId replicaId : replicaIdsSelected) { + int blobSize = config.serverPerformancePutTestBlobSizeBytes; + totalDataSentBytes = totalDataSentBytes + blobSize; + if (totalDataSentBytes > config.serverPerformancePutTestDataLimitBytes) { + throw new ShutDownException("Shut down producer as size limit for bytes reached"); + } + byte[] blob = new byte[blobSize]; + byte[] usermetadata = new byte[new Random().nextInt(1024)]; + BlobProperties props = + new BlobProperties(blobSize, CLIENT_ID, Account.UNKNOWN_ACCOUNT_ID, Container.UNKNOWN_CONTAINER_ID, false); + props.setTimeToLiveInSeconds(config.serverPerformancePutTestBlobExpirySeconds); + BlobId blobId = new BlobId(routerConfig.routerBlobidCurrentVersion, BlobId.BlobIdType.NATIVE, + clusterMap.getLocalDatacenterId(), props.getAccountId(), props.getContainerId(), replicaId.getPartitionId(), + false, BlobId.BlobDataType.DATACHUNK); + + PutRequest putRequest = + new PutRequest(correlationId.incrementAndGet(), CLIENT_ID, blobId, props, ByteBuffer.wrap(usermetadata), + Unpooled.wrappedBuffer(blob), props.getBlobSize(), BlobType.DataBlob, null); + + String hostname = dataNodeId.getHostname(); + Port port = dataNodeId.getPortToConnectTo(); + + RequestInfo requestInfo = new RequestInfo(hostname, port, putRequest, replicaId, null); + + try { + logger.info("Submitting put request {} , blob size {}", requestInfo.getRequest().getCorrelationId(), 4); + networkQueue.submit(requestInfo); + } catch (ShutDownException e) { + throw e; + } catch (Exception e) { + logger.error("Error while sending request", e); + } + } + } + } + + @Override + public void consume() throws Exception { + try { + networkQueue.poll(responseInfo -> { + try { + InputStream serverResponseStream = new NettyByteBufDataInputStream(responseInfo.content()); + PutResponse putResponse = PutResponse.readFrom(new DataInputStream(serverResponseStream)); + logger.info("received success response for correlation id {}", + responseInfo.getRequestInfo().getRequest().getCorrelationId()); + } catch (Exception e) { + logger.error("Error while processing response", e); + } + }); + } catch (ShutDownException e) { + throw e; + } catch (Exception e) { + logger.error("Error while consuming", e); + } } } diff --git a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ServerPerfNetworkQueue.java b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ServerPerfNetworkQueue.java index 1229e7f490..2cd56e3666 100644 --- a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ServerPerfNetworkQueue.java +++ b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ServerPerfNetworkQueue.java @@ -196,7 +196,7 @@ void poll(ResponseInfoProcessor responseInfoProcessor) throws Exception { * * If shutdown is triggered , waits for all responses to be processed and shuts down {@link #executorService} * 1. Releases a token so if any thread trying to submit is waiting stops waiting. - * 2. Tries to acquire {@link #maxParallelism+1} tokens as it can only acquire all tokens when + * 2. Tries to acquire {@link #maxParallelism} +1 tokens as it can only acquire all tokens when * all responses get processed */ @Override diff --git a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ServerPerformance.java b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ServerPerformance.java index 82b96ca133..9e1a365d08 100644 --- a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ServerPerformance.java +++ b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ServerPerformance.java @@ -20,6 +20,7 @@ import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.Config; import com.github.ambry.config.Default; +import com.github.ambry.config.RouterConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.network.http2.Http2ClientMetrics; import com.github.ambry.tools.util.ToolUtils; @@ -97,13 +98,6 @@ public static class ServerPerformanceConfig { @Default("15") public final int serverPerformanceOperationsTimeOutSec; - /** - * Path to file from which to read the blob ids - */ - @Config("server.performance.blob.id.file.path") - @Default("") - public final String serverPerformanceBlobIdFilePath; - /** * The hostname of the target server as it appears in the partition layout. */ @@ -125,13 +119,32 @@ public static class ServerPerformanceConfig { @Default("30") public final int serverPerformanceTimeOutSeconds; + /** + * Path to file from which to read the blob ids + */ + @Config("server.performance.get.test.blob.id.file.path") + @Default("") + public final String serverPerformanceGetTestBlobIdFilePath; + + + @Config("server.performance.put.test.blob.size.bytes") + @Default("4096") + public final int serverPerformancePutTestBlobSizeBytes; + + @Config("server.performance.put.test.blob.expiry.seconds") + @Default("10") + public final int serverPerformancePutTestBlobExpirySeconds; + + @Config("server.performance.put.test.data.limit.bytes") + @Default("204800") + public final int serverPerformancePutTestDataLimitBytes; + public ServerPerformanceConfig(VerifiableProperties verifiableProperties) { serverPerformanceTestType = TestType.valueOf(verifiableProperties.getString("server.performance.test.type")); serverPerformanceHardwareLayoutFilePath = verifiableProperties.getString("server.performance.hardware.layout.file.path", ""); serverPerformancePartitionLayoutFilePath = verifiableProperties.getString("server.performance.partition.layout.file.path", ""); - serverPerformanceBlobIdFilePath = verifiableProperties.getString("server.performance.blob.id.file.path", ""); serverPerformanceHostname = verifiableProperties.getString("server.performance.hostname", "localhost"); serverPerformancePort = verifiableProperties.getInt("server.performance.port", 6667); serverPerformanceMaxParallelRequests = @@ -140,12 +153,21 @@ public ServerPerformanceConfig(VerifiableProperties verifiableProperties) { serverPerformanceTimeOutSeconds = verifiableProperties.getInt("server.performance.time.out.seconds", 30); serverPerformanceOperationsTimeOutSec = verifiableProperties.getInt("server.performance.operations.time.out.sec", 15); + serverPerformanceGetTestBlobIdFilePath = + verifiableProperties.getString("server.performance.get.test.blob.id.file.path", ""); + serverPerformancePutTestBlobSizeBytes = + verifiableProperties.getInt("server.performance.put.test.blob.size.bytes", 4096); + serverPerformancePutTestBlobExpirySeconds = + verifiableProperties.getInt("server.performance.put.test.blob.expiry.seconds", 10); + serverPerformancePutTestDataLimitBytes = + verifiableProperties.getInt("server.performance.put.test.data.limit.bytes", 204800); } } public ServerPerformance(VerifiableProperties verifiableProperties) throws Exception { config = new ServerPerformanceConfig(verifiableProperties); ClusterMapConfig clusterMapConfig = new ClusterMapConfig(verifiableProperties); + RouterConfig routerConfig = new RouterConfig(verifiableProperties); ClusterMap clusterMap = ((ClusterAgentsFactory) Utils.getObj(clusterMapConfig.clusterMapClusterAgentsFactory, clusterMapConfig, config.serverPerformanceHardwareLayoutFilePath, @@ -158,15 +180,16 @@ public ServerPerformance(VerifiableProperties verifiableProperties) throws Excep config.serverPerformanceOperationsTimeOutSec); networkQueue.start(); - switch (config.serverPerformanceTestType) { - case GET_BLOB: - producerConsumer = new GetLoadProducerConsumer(networkQueue, config, clusterMap); - break; - case PUT_BLOB: - producerConsumer = new PutLoadProducerConsumer(networkQueue, config, clusterMap); - default: - throw new IllegalArgumentException("Unrecognized test type: "+ config.serverPerformanceTestType); - } + switch (config.serverPerformanceTestType) { + case GET_BLOB: + producerConsumer = new GetLoadProducerConsumer(networkQueue, config, clusterMap); + break; + case PUT_BLOB: + producerConsumer = new PutLoadProducerConsumer(networkQueue, config, clusterMap, routerConfig); + break; + default: + throw new IllegalArgumentException("Unrecognized test type: " + config.serverPerformanceTestType); + } } public static void main(String[] args) throws Exception { @@ -193,6 +216,7 @@ public void startLoadTest() throws Exception { loadProducer.start(); loadConsumer.start(); loadProducer.join(); + timedShutDownLatch.countDown(); loadConsumer.join(); shutDownLatch.countDown(); } diff --git a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ShutDownException.java b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ShutDownException.java index ac00024134..0e8f3fb604 100644 --- a/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ShutDownException.java +++ b/ambry-tools/src/main/java/com/github/ambry/tools/perf/serverperf/ShutDownException.java @@ -1,3 +1,16 @@ +/** + * Copyright 2024 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ package com.github.ambry.tools.perf.serverperf; public class ShutDownException extends Exception {