Skip to content

Commit

Permalink
adding put blob test
Browse files Browse the repository at this point in the history
  • Loading branch information
manbearpig1996 committed Feb 6, 2025
1 parent c3599e7 commit b027b09
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.tools.perf.serverperf;

import com.github.ambry.clustermap.ClusterMap;
Expand Down Expand Up @@ -34,21 +47,24 @@ public class GetLoadProducerConsumer implements LoadProducerConsumer {
private final ServerPerfNetworkQueue networkQueue;
private final ServerPerformanceConfig config;
private final ClusterMap clusterMap;
private final DataNodeId dataNodeId;

private final AtomicInteger correlationId = new AtomicInteger();
private static final String CLIENT_ID = "ServerReadPerformance";
private final AtomicInteger correlationId;
private static final String CLIENT_ID = "ServerGETPerformance";
private static final Logger logger = LoggerFactory.getLogger(GetLoadProducerConsumer.class);

public GetLoadProducerConsumer(ServerPerfNetworkQueue networkQueue, ServerPerformanceConfig config,
ClusterMap clusterMap) {
this.networkQueue = networkQueue;
this.config = config;
this.clusterMap = clusterMap;
dataNodeId = clusterMap.getDataNodeId(config.serverPerformanceHostname, config.serverPerformancePort);
correlationId = new AtomicInteger();
}

@Override
public void produce() throws Exception {
final BufferedReader br = new BufferedReader(new FileReader(config.serverPerformanceBlobIdFilePath));
final BufferedReader br = new BufferedReader(new FileReader(config.serverPerformanceGetTestBlobIdFilePath));
String line;
boolean isShutDown = false;

Expand All @@ -60,7 +76,6 @@ public void produce() throws Exception {
new PartitionRequestInfo(blobId.getPartition(), Collections.singletonList(blobId));
GetRequest getRequest = new GetRequest(correlationId.incrementAndGet(), CLIENT_ID, MessageFormatFlags.Blob,
Collections.singletonList(partitionRequestInfo), GetOption.Include_All);
DataNodeId dataNodeId = clusterMap.getDataNodeId(config.serverPerformanceHostname, config.serverPerformancePort);
ReplicaId replicaId =
getReplicaFromNode(dataNodeId, getRequest.getPartitionInfoList().get(0).getPartition(), clusterMap);
String hostname = dataNodeId.getHostname();
Expand Down Expand Up @@ -108,7 +123,6 @@ public void consume() throws Exception {
} catch (Exception e) {
logger.error("error in load consumer thread", e);
}
logger.info("Load consumer thread is finished");
}

void processGetResponse(ResponseInfo responseInfo) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.tools.perf.serverperf;

public interface LoadProducerConsumer {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,152 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.tools.perf.serverperf;

import com.github.ambry.account.Account;
import com.github.ambry.account.Container;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.DiskId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.commons.BlobId;
import com.github.ambry.config.RouterConfig;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.messageformat.BlobType;
import com.github.ambry.network.Port;
import com.github.ambry.network.RequestInfo;
import com.github.ambry.protocol.PutRequest;
import com.github.ambry.protocol.PutResponse;
import com.github.ambry.tools.perf.serverperf.ServerPerformance.ServerPerformanceConfig;
import com.github.ambry.utils.NettyByteBufDataInputStream;
import io.netty.buffer.Unpooled;
import java.io.DataInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class PutLoadProducerConsumer implements LoadProducerConsumer {
private final ServerPerfNetworkQueue networkQueue;
private final ServerPerformanceConfig config;
private final RouterConfig routerConfig;
private final ClusterMap clusterMap;
private final DataNodeId dataNodeId;

private final List<ReplicaId> replicaIdsSelected;
private final AtomicInteger correlationId;

private int totalDataSentBytes;

private static final String CLIENT_ID = "ServerPUTPerformance";

private static final Logger logger = LoggerFactory.getLogger(PutLoadProducerConsumer.class);

public PutLoadProducerConsumer(ServerPerfNetworkQueue networkQueue, ServerPerformanceConfig config,
ClusterMap clusterMap) {
ClusterMap clusterMap, RouterConfig routerConfig) {
this.networkQueue = networkQueue;
this.config = config;
this.clusterMap = clusterMap;
this.routerConfig = routerConfig;
replicaIdsSelected = new ArrayList<>();
dataNodeId = clusterMap.getDataNodeId(config.serverPerformanceHostname, config.serverPerformancePort);
selectReplica();
correlationId = new AtomicInteger();
totalDataSentBytes = 0;
}

@Override
public void produce() throws Exception {
DataNodeId dataNodeId = clusterMap.getDataNodeId(config.serverPerformanceHostname, config.serverPerformancePort);
List<? extends ReplicaId> replicaIds = clusterMap.getReplicaIds(dataNodeId);
void selectReplica() {
Random random = new Random();
List<? extends ReplicaId> allReplicaIds = clusterMap.getReplicaIds(dataNodeId);

replicaIds.forEach(replicaId -> {
Map<DiskId, List<ReplicaId>> diskIdToReplicaIds = new HashMap<>();
allReplicaIds.forEach(replicaId -> {
if (!replicaId.isUnsealed()) {
return;
}
diskIdToReplicaIds.putIfAbsent(replicaId.getDiskId(), new ArrayList<>());
diskIdToReplicaIds.get(replicaId.getDiskId()).add(replicaId);
});

// PutRequest putRequest = new PutRequest(1, BlobId.BlobIdType.NATIVE, clusterMap.getLocalDatacenterId(), )
diskIdToReplicaIds.values().forEach(replicaIds -> {
replicaIdsSelected.add(replicaIds.get(random.nextInt(replicaIds.size())));
});
}

@Override
public void consume() throws Exception {
public void produce() throws Exception {
while (true) {
for (ReplicaId replicaId : replicaIdsSelected) {
int blobSize = config.serverPerformancePutTestBlobSizeBytes;
totalDataSentBytes = totalDataSentBytes + blobSize;
if (totalDataSentBytes > config.serverPerformancePutTestDataLimitBytes) {
throw new ShutDownException("Shut down producer as size limit for bytes reached");
}

byte[] blob = new byte[blobSize];
byte[] usermetadata = new byte[new Random().nextInt(1024)];
BlobProperties props =
new BlobProperties(blobSize, CLIENT_ID, Account.UNKNOWN_ACCOUNT_ID, Container.UNKNOWN_CONTAINER_ID, false);
props.setTimeToLiveInSeconds(config.serverPerformancePutTestBlobExpirySeconds);
BlobId blobId = new BlobId(routerConfig.routerBlobidCurrentVersion, BlobId.BlobIdType.NATIVE,
clusterMap.getLocalDatacenterId(), props.getAccountId(), props.getContainerId(), replicaId.getPartitionId(),
false, BlobId.BlobDataType.DATACHUNK);

PutRequest putRequest =
new PutRequest(correlationId.incrementAndGet(), CLIENT_ID, blobId, props, ByteBuffer.wrap(usermetadata),
Unpooled.wrappedBuffer(blob), props.getBlobSize(), BlobType.DataBlob, null);

String hostname = dataNodeId.getHostname();
Port port = dataNodeId.getPortToConnectTo();

RequestInfo requestInfo = new RequestInfo(hostname, port, putRequest, replicaId, null);

try {
logger.info("Submitting put request {} , blob size {}", requestInfo.getRequest().getCorrelationId(), 4);
networkQueue.submit(requestInfo);
} catch (ShutDownException e) {
throw e;
} catch (Exception e) {
logger.error("Error while sending request", e);
}
}
}
}

@Override
public void consume() throws Exception {
try {
networkQueue.poll(responseInfo -> {
try {
InputStream serverResponseStream = new NettyByteBufDataInputStream(responseInfo.content());
PutResponse putResponse = PutResponse.readFrom(new DataInputStream(serverResponseStream));
logger.info("received success response for correlation id {}",
responseInfo.getRequestInfo().getRequest().getCorrelationId());
} catch (Exception e) {
logger.error("Error while processing response", e);
}
});
} catch (ShutDownException e) {
throw e;
} catch (Exception e) {
logger.error("Error while consuming", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void poll(ResponseInfoProcessor responseInfoProcessor) throws Exception {
*
* If shutdown is triggered , waits for all responses to be processed and shuts down {@link #executorService}
* 1. Releases a token so if any thread trying to submit is waiting stops waiting.
* 2. Tries to acquire {@link #maxParallelism+1} tokens as it can only acquire all tokens when
* 2. Tries to acquire {@link #maxParallelism} +1 tokens as it can only acquire all tokens when
* all responses get processed
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.Config;
import com.github.ambry.config.Default;
import com.github.ambry.config.RouterConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.network.http2.Http2ClientMetrics;
import com.github.ambry.tools.util.ToolUtils;
Expand Down Expand Up @@ -97,13 +98,6 @@ public static class ServerPerformanceConfig {
@Default("15")
public final int serverPerformanceOperationsTimeOutSec;

/**
* Path to file from which to read the blob ids
*/
@Config("server.performance.blob.id.file.path")
@Default("")
public final String serverPerformanceBlobIdFilePath;

/**
* The hostname of the target server as it appears in the partition layout.
*/
Expand All @@ -125,13 +119,32 @@ public static class ServerPerformanceConfig {
@Default("30")
public final int serverPerformanceTimeOutSeconds;

/**
* Path to file from which to read the blob ids
*/
@Config("server.performance.get.test.blob.id.file.path")
@Default("")
public final String serverPerformanceGetTestBlobIdFilePath;


@Config("server.performance.put.test.blob.size.bytes")
@Default("4096")
public final int serverPerformancePutTestBlobSizeBytes;

@Config("server.performance.put.test.blob.expiry.seconds")
@Default("10")
public final int serverPerformancePutTestBlobExpirySeconds;

@Config("server.performance.put.test.data.limit.bytes")
@Default("204800")
public final int serverPerformancePutTestDataLimitBytes;

public ServerPerformanceConfig(VerifiableProperties verifiableProperties) {
serverPerformanceTestType = TestType.valueOf(verifiableProperties.getString("server.performance.test.type"));
serverPerformanceHardwareLayoutFilePath =
verifiableProperties.getString("server.performance.hardware.layout.file.path", "");
serverPerformancePartitionLayoutFilePath =
verifiableProperties.getString("server.performance.partition.layout.file.path", "");
serverPerformanceBlobIdFilePath = verifiableProperties.getString("server.performance.blob.id.file.path", "");
serverPerformanceHostname = verifiableProperties.getString("server.performance.hostname", "localhost");
serverPerformancePort = verifiableProperties.getInt("server.performance.port", 6667);
serverPerformanceMaxParallelRequests =
Expand All @@ -140,12 +153,21 @@ public ServerPerformanceConfig(VerifiableProperties verifiableProperties) {
serverPerformanceTimeOutSeconds = verifiableProperties.getInt("server.performance.time.out.seconds", 30);
serverPerformanceOperationsTimeOutSec =
verifiableProperties.getInt("server.performance.operations.time.out.sec", 15);
serverPerformanceGetTestBlobIdFilePath =
verifiableProperties.getString("server.performance.get.test.blob.id.file.path", "");
serverPerformancePutTestBlobSizeBytes =
verifiableProperties.getInt("server.performance.put.test.blob.size.bytes", 4096);
serverPerformancePutTestBlobExpirySeconds =
verifiableProperties.getInt("server.performance.put.test.blob.expiry.seconds", 10);
serverPerformancePutTestDataLimitBytes =
verifiableProperties.getInt("server.performance.put.test.data.limit.bytes", 204800);
}
}

public ServerPerformance(VerifiableProperties verifiableProperties) throws Exception {
config = new ServerPerformanceConfig(verifiableProperties);
ClusterMapConfig clusterMapConfig = new ClusterMapConfig(verifiableProperties);
RouterConfig routerConfig = new RouterConfig(verifiableProperties);
ClusterMap clusterMap =
((ClusterAgentsFactory) Utils.getObj(clusterMapConfig.clusterMapClusterAgentsFactory, clusterMapConfig,
config.serverPerformanceHardwareLayoutFilePath,
Expand All @@ -158,15 +180,16 @@ public ServerPerformance(VerifiableProperties verifiableProperties) throws Excep
config.serverPerformanceOperationsTimeOutSec);
networkQueue.start();

switch (config.serverPerformanceTestType) {
case GET_BLOB:
producerConsumer = new GetLoadProducerConsumer(networkQueue, config, clusterMap);
break;
case PUT_BLOB:
producerConsumer = new PutLoadProducerConsumer(networkQueue, config, clusterMap);
default:
throw new IllegalArgumentException("Unrecognized test type: "+ config.serverPerformanceTestType);
}
switch (config.serverPerformanceTestType) {
case GET_BLOB:
producerConsumer = new GetLoadProducerConsumer(networkQueue, config, clusterMap);
break;
case PUT_BLOB:
producerConsumer = new PutLoadProducerConsumer(networkQueue, config, clusterMap, routerConfig);
break;
default:
throw new IllegalArgumentException("Unrecognized test type: " + config.serverPerformanceTestType);
}
}

public static void main(String[] args) throws Exception {
Expand All @@ -193,6 +216,7 @@ public void startLoadTest() throws Exception {
loadProducer.start();
loadConsumer.start();
loadProducer.join();
timedShutDownLatch.countDown();
loadConsumer.join();
shutDownLatch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.tools.perf.serverperf;

public class ShutDownException extends Exception {
Expand Down

0 comments on commit b027b09

Please sign in to comment.