Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add user in heartbeat of RssShuffleManager #1876

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public Thread newThread(Runnable r) {
scheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
client.sendAppHeartbeat(appId, heartbeatTimeout);
client.sendAppHeartbeat(appId, "user", heartbeatTimeout);
LOG.info("Finish send heartbeat to coordinator and servers");
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ public SendShuffleDataResult sendShuffleData(
}

@Override
public void sendAppHeartbeat(String appId, long timeoutMs) {}
public void sendAppHeartbeat(String appId, String user, long timeoutMs) {}

@Override
public void registerApplicationInfo(String appId, long timeoutMs, String user) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ public SendShuffleDataResult sendShuffleData(
}

@Override
public void sendAppHeartbeat(String appId, long timeoutMs) {}
public void sendAppHeartbeat(String appId, String user, long timeoutMs) {}

@Override
public void registerApplicationInfo(String appId, long timeoutMs, String user) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ private void startHeartbeat() {
heartBeatScheduledExecutorService.scheduleAtFixedRate(
() -> {
try {
shuffleWriteClient.sendAppHeartbeat(appId, heartbeatTimeout);
shuffleWriteClient.sendAppHeartbeat(appId, user, heartbeatTimeout);
LOG.info("Finish send heartbeat to coordinator and servers");
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ private synchronized void startHeartbeat() {
() -> {
try {
String appId = id.get();
shuffleWriteClient.sendAppHeartbeat(appId, heartbeatTimeout);
shuffleWriteClient.sendAppHeartbeat(appId, user, heartbeatTimeout);
LOG.info("Finish send heartbeat to coordinator and servers");
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ public static void initAndStartRSSClient(final RssDAGAppMaster appMaster, Config
appMaster.heartBeatExecutorService.scheduleAtFixedRate(
() -> {
try {
appMaster.getShuffleWriteClient().sendAppHeartbeat(strAppAttemptId, heartbeatTimeout);
appMaster
.getShuffleWriteClient()
.sendAppHeartbeat(strAppAttemptId, "user", heartbeatTimeout);
if (LOG.isDebugEnabled()) {
LOG.debug("Finish send heartbeat to coordinator and servers");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public SendShuffleDataResult sendShuffleData(
}

@Override
public void sendAppHeartbeat(String appId, long timeoutMs) {}
public void sendAppHeartbeat(String appId, String user, long timeoutMs) {}

@Override
public void registerApplicationInfo(String appId, long timeoutMs, String user) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ SendShuffleDataResult sendShuffleData(
List<ShuffleBlockInfo> shuffleBlockInfoList,
Supplier<Boolean> needCancelRequest);

void sendAppHeartbeat(String appId, long timeoutMs);
void sendAppHeartbeat(String appId, String user, long timeoutMs);

void registerApplicationInfo(String appId, long timeoutMs, String user);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,8 @@ public void registerApplicationInfo(String appId, long timeoutMs, String user) {
}

@Override
public void sendAppHeartbeat(String appId, long timeoutMs) {
RssAppHeartBeatRequest request = new RssAppHeartBeatRequest(appId, timeoutMs);
public void sendAppHeartbeat(String appId, String user, long timeoutMs) {
RssAppHeartBeatRequest request = new RssAppHeartBeatRequest(appId, user, timeoutMs);
Set<ShuffleServerInfo> allShuffleServers = getAllShuffleServers(appId);

ThreadUtils.executeTasks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ public void registerApplicationInfo(String appId, String user) {
}
}

public void refreshAppId(String appId) {
String user = appIdToUser.get(appId);
public void refreshAppId(String appId, String user) {
// compatible with lower version clients
if (user == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to handle the case user == null? Because there may be existing legacy clients.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to handle the case user == null? Because there may be existing legacy clients.

When the user field is not specified in legacy clients, the value will be empty instead of null. So if coordinator restarts, it actually calls registerApplicationInfo(appId, "").

registerApplicationInfo(appId, "");
if (appIdToUser.get(appId) == null) {
registerApplicationInfo(appId, user);
} else {
user = appIdToUser.get(appId);
Map<String, AppInfo> appAndTime = currentUserAndApp.get(user);
AppInfo appInfo = appAndTime.get(appId);
long currentTimeMs = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ public void reportClientOperation(
public void appHeartbeat(
AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> responseObserver) {
String appId = request.getAppId();
coordinatorServer.getApplicationManager().refreshAppId(appId);
String user = request.getUser();
coordinatorServer.getApplicationManager().refreshAppId(appId, user);
if (LOG.isDebugEnabled()) {
LOG.debug("Got heartbeat from application: {}", appId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void clearWithoutRemoteStorageTest() throws Exception {
// NPE shouldn't happen when clear the resource
String testApp = "application_clearWithoutRemoteStorageTest";
applicationManager.registerApplicationInfo(testApp, "user");
applicationManager.refreshAppId(testApp);
applicationManager.refreshAppId(testApp, "user");
// just set a value != 0, it should be reset to 0 if everything goes well
CoordinatorMetrics.gaugeRunningAppNum.set(100.0);
assertEquals(1, applicationManager.getAppIds().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void selectStorageTest() throws Exception {
applicationManager.incRemoteStorageCounter(remotePath1);
String testApp1 = "application_test_" + 1;
applicationManager.registerApplicationInfo(testApp1, "user");
applicationManager.refreshAppId(testApp1);
applicationManager.refreshAppId(testApp1, "user");
// in this case, ensure that all the paths are read and written normally
applicationManager.getRemoteStoragePathRankValue().get(remotePath1).getCostTime().set(0);
applicationManager.getRemoteStoragePathRankValue().get(remotePath2).getCostTime().set(0);
Expand All @@ -116,7 +116,7 @@ public void selectStorageTest() throws Exception {
// refresh app1, got remotePath2, then remove remotePath2,
// it should be existed in counter until it expired
applicationManager.registerApplicationInfo(testApp1, "user");
applicationManager.refreshAppId(testApp1);
applicationManager.refreshAppId(testApp1, "user");
assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath());
remoteStoragePath = remotePath1;
applicationManager.refreshRemoteStorage(remoteStoragePath, "");
Expand Down Expand Up @@ -160,7 +160,7 @@ public void storageCounterMulThreadTest() throws Exception {
for (int i = 0; i < 1000; i++) {
String appId = testApp1 + i;
applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.refreshAppId(appId, "user");
applicationManager.pickRemoteStorage(appId);
}
cdl.countDown();
Expand All @@ -172,7 +172,7 @@ public void storageCounterMulThreadTest() throws Exception {
for (int i = 1000; i < 2000; i++) {
String appId = testApp1 + i;
applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.refreshAppId(appId, "user");
applicationManager.pickRemoteStorage(appId);
}
cdl.countDown();
Expand All @@ -184,7 +184,7 @@ public void storageCounterMulThreadTest() throws Exception {
for (int i = 2000; i < 3000; i++) {
String appId = testApp1 + i;
applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.refreshAppId(appId, "user");
applicationManager.pickRemoteStorage(appId);
}
cdl.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void selectStorageTest() throws Exception {
applicationManager.incRemoteStorageCounter(remoteStorage1);
String testApp1 = "application_test_" + 1;
applicationManager.registerApplicationInfo(testApp1, "user");
applicationManager.refreshAppId(testApp1);
applicationManager.refreshAppId(testApp1, "user");
selectStorageStrategy.sortPathByRankValue(remoteStorage2, testFile, System.currentTimeMillis());
// Ensure that the `System.currentTimeMillis()` corresponding to remoteStorage1 is greater than
// that of
Expand Down Expand Up @@ -138,7 +138,7 @@ public void selectStorageTest() throws Exception {
// refresh app1, got remotePath2, then remove remotePath2,
// it should be existed in counter until it expired
applicationManager.registerApplicationInfo(testApp1, "user");
applicationManager.refreshAppId(testApp1);
applicationManager.refreshAppId(testApp1, "user");
assertEquals(remoteStorage2, applicationManager.pickRemoteStorage(testApp1).getPath());
remoteStoragePath = remoteStorage1;
applicationManager.refreshRemoteStorage(remoteStoragePath, "");
Expand Down Expand Up @@ -185,7 +185,7 @@ public void selectStorageMulThreadTest() throws Exception {
for (int i = 0; i < 1000; i++) {
String appId = testApp1 + i;
applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.refreshAppId(appId, "user");
applicationManager.pickRemoteStorage(appId);
}
cdl.countDown();
Expand All @@ -197,7 +197,7 @@ public void selectStorageMulThreadTest() throws Exception {
for (int i = 1000; i < 2000; i++) {
String appId = testApp1 + i;
applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.refreshAppId(appId, "user");
applicationManager.pickRemoteStorage(appId);
}
cdl.countDown();
Expand All @@ -209,7 +209,7 @@ public void selectStorageMulThreadTest() throws Exception {
for (int i = 2000; i < 3000; i++) {
String appId = testApp1 + i;
applicationManager.registerApplicationInfo(appId, "user");
applicationManager.refreshAppId(appId);
applicationManager.refreshAppId(appId, "user");
applicationManager.pickRemoteStorage(appId);
}
cdl.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.UnsafeByteOperations;
import org.apache.commons.lang3.StringUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -188,9 +189,9 @@ public void clearResourceTest() throws Exception {
ShuffleDataDistributionType.NORMAL,
-1);
shuffleWriteClient.registerApplicationInfo("application_clearResourceTest1", 500L, "user");
shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", 500L);
shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", "user", 500L);
shuffleWriteClient.registerApplicationInfo("application_clearResourceTest2", 500L, "user");
shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest2", 500L);
shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest2", "user", 500L);

RssRegisterShuffleRequest rrsr =
new RssRegisterShuffleRequest(
Expand All @@ -210,7 +211,7 @@ public void clearResourceTest() throws Exception {
() -> {
int i = 0;
while (i < 20) {
shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", 500L);
shuffleWriteClient.sendAppHeartbeat("application_clearResourceTest1", "user", 500L);
i++;
try {
Thread.sleep(1000);
Expand Down Expand Up @@ -783,7 +784,8 @@ public void rpcMetricsTest() throws Exception {
.getCounterMap()
.get(ShuffleServerGrpcMetrics.APP_HEARTBEAT_METHOD)
.get();
grpcShuffleServerClient.sendHeartBeat(new RssAppHeartBeatRequest(appId, 10000));
grpcShuffleServerClient.sendHeartBeat(
new RssAppHeartBeatRequest(appId, StringUtils.EMPTY, 10000));
newValue =
grpcShuffleServers
.get(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest request) {
@Override
public RssAppHeartBeatResponse sendAppHeartBeat(RssAppHeartBeatRequest request) {
RssProtos.AppHeartBeatRequest rpcRequest =
RssProtos.AppHeartBeatRequest.newBuilder().setAppId(request.getAppId()).build();
RssProtos.AppHeartBeatRequest.newBuilder()
.setAppId(request.getAppId())
.setUser(request.getUser())
.build();
RssProtos.AppHeartBeatResponse rpcResponse =
blockingStub
.withDeadlineAfter(request.getTimeoutMs(), TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ private ShuffleCommitResponse doSendCommit(String appId, int shuffleId) {
throw new RssException("Send commit to host[" + host + "], port[" + port + "] failed");
}

private AppHeartBeatResponse doSendHeartBeat(String appId, long timeout) {
AppHeartBeatRequest request = AppHeartBeatRequest.newBuilder().setAppId(appId).build();
private AppHeartBeatResponse doSendHeartBeat(String appId, String user, long timeout) {
AppHeartBeatRequest request =
AppHeartBeatRequest.newBuilder().setAppId(appId).setUser(user).build();
return blockingStub.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS).appHeartbeat(request);
}

Expand Down Expand Up @@ -653,7 +654,7 @@ public RssSendCommitResponse sendCommit(RssSendCommitRequest request) {
@Override
public RssAppHeartBeatResponse sendHeartBeat(RssAppHeartBeatRequest request) {
AppHeartBeatResponse appHeartBeatResponse =
doSendHeartBeat(request.getAppId(), request.getTimeoutMs());
doSendHeartBeat(request.getAppId(), request.getUser(), request.getTimeoutMs());
if (appHeartBeatResponse.getStatus() != RssProtos.StatusCode.SUCCESS) {
String msg =
"Can't send heartbeat to "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@
public class RssAppHeartBeatRequest {

private final String appId;
private final String user;
private final long timeoutMs;

public RssAppHeartBeatRequest(String appId, long timeoutMs) {
public RssAppHeartBeatRequest(String appId, String user, long timeoutMs) {
this.appId = appId;
this.user = user;
this.timeoutMs = timeoutMs;
}

public String getAppId() {
return appId;
}

public String getUser() {
return user;
}

public long getTimeoutMs() {
return timeoutMs;
}
Expand Down
1 change: 1 addition & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ service CoordinatorServer {

message AppHeartBeatRequest {
string appId = 1;
string user = 2;
}

message AppHeartBeatResponse {
Expand Down
Loading