-
Notifications
You must be signed in to change notification settings - Fork 34
Send GoalStateV2 from DPM, NCM and ACA #704
Changes from 59 commits
04cbb1e
4c100e5
e38bcf9
067a988
dd5dafd
aa76562
8ae37db
f0462ae
976883f
ef7a2d2
670dcc6
59a65f0
1178f61
5a98d10
e4e0792
ae15be0
9c88b19
9be2bb9
733872a
b3154ae
e2bed17
7c5013e
ac7acf8
bcc0716
5b7e69f
6b1fd3d
43af049
db5f112
8722772
dda2678
6bf77ac
410e50c
ca741c7
7352b24
2df153c
40a3693
2d6f74d
e76fb18
e0a2fbb
8ae12cf
0ebf1f0
fa34330
72a4b1e
7c4a8b7
a2b2397
76e08f2
c5c1f0d
d3c5e0c
55885e1
a8a7673
736bf1f
b80033d
7103bb3
a4462fe
94417f0
0f5949f
a0d40af
3f08872
140456b
722e197
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,8 @@ spec: | |
replicas: 1 | ||
template: | ||
metadata: | ||
annotations: | ||
linkerd.io/inject: enabled | ||
labels: | ||
app: netwconfigmanager | ||
spec: | ||
|
@@ -69,7 +71,8 @@ spec: | |
imagePullPolicy: IfNotPresent | ||
command: ["java", "-jar", "/app/AlcorNetworkConfigManager-0.1.0.jar", "--spring.config.location=/etc/ncm/application.properties"] | ||
ports: | ||
- containerPort: 8080 | ||
- containerPort: 50001 | ||
protocol: TCP | ||
volumeMounts: | ||
- name: ncm-volume | ||
mountPath: /etc/ncm | ||
|
@@ -84,10 +87,10 @@ metadata: | |
labels: | ||
name: netwconfigmanager-service | ||
spec: | ||
type: NodePort | ||
type: ClusterIP | ||
ports: | ||
- port: 9014 | ||
targetPort: 8080 | ||
nodePort: 30014 | ||
- port: 50001 | ||
protocol: TCP | ||
targetPort: 50001 | ||
Comment on lines
+92
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @DavidLiu506 Do we need to include port spec for restful here too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we can add restful configure after restful api fix.
Comment on lines
+92
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this change block port 9014 for REST APIs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for grpc configure, we can add another service for restful. |
||
selector: | ||
app: netwconfigmanager |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,9 +22,7 @@ | |
import com.futurewei.alcor.dataplane.config.Config; | ||
import com.futurewei.alcor.dataplane.entity.MulticastGoalStateV2; | ||
import com.futurewei.alcor.dataplane.entity.UnicastGoalStateV2; | ||
import com.futurewei.alcor.schema.GoalStateProvisionerGrpc; | ||
import com.futurewei.alcor.schema.Goalstate; | ||
import com.futurewei.alcor.schema.Goalstateprovisioner; | ||
import com.futurewei.alcor.schema.*; | ||
import io.grpc.ConnectivityState; | ||
import io.grpc.ManagedChannel; | ||
import io.grpc.ManagedChannelBuilder; | ||
|
@@ -37,6 +35,8 @@ | |
|
||
import java.util.*; | ||
import java.util.concurrent.*; | ||
import java.util.logging.Level; | ||
import java.util.stream.Collectors; | ||
|
||
@Service("grpcDataPlaneClient") | ||
@ConditionalOnProperty(prefix = "protobuf.goal-state-message", name = "version", havingValue = "102") | ||
|
@@ -56,21 +56,27 @@ public class DataPlaneClientImplV2 implements DataPlaneClient<UnicastGoalStateV2 | |
// when a channel is set up, send this amount of default GoalStates for warmup. | ||
private int numberOfWarmupsPerChannel; | ||
|
||
private String netwconfigmanagerHost; | ||
|
||
// prints out UUID and time, when sending a GoalState to any of the monitorHosts | ||
private ArrayList<String> monitorHosts; | ||
|
||
private ConcurrentHashMap<String, ArrayList<GrpcChannelStub>> hostIpGrpcChannelStubMap; | ||
|
||
@Override | ||
public List<String> sendGoalStates(List<UnicastGoalStateV2> unicastGoalStates) throws Exception { | ||
Goalstate.GoalStateV2.Builder goalStateBuilder = Goalstate.GoalStateV2.newBuilder(); | ||
final CountDownLatch finishLatch = new CountDownLatch(1); | ||
List<String> results = new ArrayList<>(); | ||
for (UnicastGoalStateV2 unicastGoalState : unicastGoalStates) { | ||
String resp = doSendGoalState(unicastGoalState); | ||
if (resp != null) { | ||
results.add(resp); | ||
} | ||
goalStateBuilder = getGoalState(goalStateBuilder, unicastGoalState); | ||
} | ||
doSendGoalState(goalStateBuilder.build(), finishLatch, results); | ||
|
||
if (!finishLatch.await(1, TimeUnit.MINUTES)) { | ||
LOG.warn("Send goal states can not finish within 1 minutes"); | ||
return Arrays.asList("Send goal states can not finish within 1 minutes"); | ||
} | ||
Comment on lines
+76
to
+79
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @DavidLiu506 Do we know which goalstate or which ncm cannot sent to? If we do, can we put the id or uri in the log and return results? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, can we put 1 mins as a variable in application.properties? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NCM managed by linked, there only one goal state v2 send to NCM. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Talked with James, NCM already have ACA response information. DPM don't need to know it. |
||
return results; | ||
} | ||
|
||
|
@@ -92,6 +98,10 @@ public DataPlaneClientImplV2(Config globalConfig, ArrayList<String> monitorHosts | |
numberOfWarmupsPerChannel = 0; | ||
} | ||
|
||
if (netwconfigmanagerHost == null || netwconfigmanagerHost.isEmpty()) { | ||
netwconfigmanagerHost = globalConfig.netwconfigmanagerHost; | ||
} | ||
|
||
this.monitorHosts = monitorHosts; | ||
LOG.info("Printing out all monitorHosts"); | ||
for(String host : this.monitorHosts){ | ||
|
@@ -100,7 +110,7 @@ public DataPlaneClientImplV2(Config globalConfig, ArrayList<String> monitorHosts | |
LOG.info("Done printing out all monitorHosts"); | ||
this.numberOfGrpcChannelPerHost = globalConfig.numberOfGrpcChannelPerHost; | ||
this.numberOfWarmupsPerChannel = globalConfig.numberOfWarmupsPerChannel; | ||
this.hostAgentPort = 50001; | ||
this.hostAgentPort = 9016; | ||
xieus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
this.executor = new ThreadPoolExecutor(100, | ||
200, | ||
|
@@ -173,7 +183,8 @@ private ArrayList<GrpcChannelStub> createGrpcChannelStubArrayList(String hostIp) | |
ArrayList<GrpcChannelStub> arr = new ArrayList<>(); | ||
for (int i = 0; i < numberOfGrpcChannelPerHost; i++) { | ||
GrpcChannelStub channelStub = createGrpcChannelStub(hostIp); | ||
warmUpChannelStub(channelStub, hostIp); | ||
// Using Linkerd load balance | ||
//warmUpChannelStub(channelStub, hostIp); | ||
xieus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
arr.add(channelStub); | ||
} | ||
long end = System.currentTimeMillis(); | ||
|
@@ -220,11 +231,9 @@ public void onCompleted() { | |
return; | ||
} | ||
|
||
private String doSendGoalState(UnicastGoalStateV2 unicastGoalState) { | ||
String hostIp = unicastGoalState.getHostIp(); | ||
LOG.info("Setting up a channel to ACA on: " + hostIp); | ||
private String doSendGoalState(Goalstate.GoalStateV2 goalStateV2, CountDownLatch finishLatch, List<String> replies) { | ||
String hostIp = netwconfigmanagerHost; | ||
long start = System.currentTimeMillis(); | ||
|
||
GrpcChannelStub channelStub = getOrCreateGrpcChannel(hostIp); | ||
long chan_established = System.currentTimeMillis(); | ||
LOG.info("[doSendGoalState] Established channel, elapsed Time in milli seconds: " + (chan_established - start)); | ||
|
@@ -239,6 +248,12 @@ private String doSendGoalState(UnicastGoalStateV2 unicastGoalState) { | |
public void onNext(Goalstateprovisioner.GoalStateOperationReply reply) { | ||
LOG.info("Receive response from ACA@" + hostIp + " | " + reply.toString()); | ||
result.put(hostIp, reply.getOperationStatusesList()); | ||
if (reply.getOperationStatusesList().stream().filter(item -> item.getOperationStatus().equals(Common.OperationStatus.FAILURE)).collect(Collectors.toList()).size() > 0) { | ||
replies.add(reply.toString()); | ||
while (finishLatch.getCount() > 0) { | ||
finishLatch.countDown(); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -249,22 +264,13 @@ public void onError(Throwable t) { | |
@Override | ||
public void onCompleted() { | ||
LOG.info("Complete receiving message from ACA@" + hostIp); | ||
finishLatch.countDown(); | ||
} | ||
}; | ||
|
||
StreamObserver<Goalstate.GoalStateV2> requestObserver = asyncStub.pushGoalStatesStream(responseObserver); | ||
try { | ||
Goalstate.GoalStateV2 goalState = unicastGoalState.getGoalState(); | ||
LOG.info("Sending GS to Host " + hostIp + " as follows | " + goalState.toString()); | ||
requestObserver.onNext(goalState); | ||
if (unicastGoalState.getGoalState().getNeighborStatesCount() == 1 && monitorHosts.contains(hostIp)) { | ||
long sent_gs_time = System.currentTimeMillis(); | ||
// If there's only one neighbor state and it is trying to send it to aca_node_one, the IP of which is now | ||
// hardcoded) this send goalstate action is probably caused by on-demand workflow, need to record when it | ||
// sends this goalState so what we can look into this and the ACA log to see how much time was spent. | ||
String neighbor_id = unicastGoalState.getGoalState().getNeighborStatesMap().keySet().iterator().next(); | ||
LOG.info("Sending neighbor ID: " + neighbor_id + " at: " + sent_gs_time); | ||
} | ||
requestObserver.onNext(goalStateV2); | ||
} catch (RuntimeException e) { | ||
// Cancel RPC | ||
LOG.warn("[doSendGoalState] Sending GS, but error happened | " + e.getMessage()); | ||
|
@@ -275,10 +281,7 @@ public void onCompleted() { | |
LOG.info("Sending GS to Host " + hostIp + " is completed"); | ||
|
||
// comment out onCompleted so that the same channel/stub and keep sending next time. | ||
// requestObserver.onCompleted(); | ||
|
||
// shutdown(channel); | ||
|
||
requestObserver.onCompleted(); | ||
return null; | ||
} | ||
|
||
|
@@ -291,4 +294,115 @@ public GrpcChannelStub(ManagedChannel channel, GoalStateProvisionerGrpc.GoalStat | |
this.stub = stub; | ||
} | ||
} | ||
|
||
private Goalstate.GoalStateV2.Builder getGoalState(Goalstate.GoalStateV2.Builder goalStateBuilder, UnicastGoalStateV2 unicastGoalStateV2) { | ||
Goalstate.GoalStateV2 goalStateV2 = unicastGoalStateV2.getGoalState(); | ||
Goalstate.HostResources.Builder hostResourceBuilder = Goalstate.HostResources.newBuilder(); | ||
if (goalStateV2.getSubnetStatesCount() > 0) { | ||
goalStateV2.getSubnetStatesMap().keySet().forEach(key -> { | ||
Goalstate.ResourceIdType subnetResourceIdType = Goalstate.ResourceIdType.newBuilder() | ||
.setType(Common.ResourceType.SUBNET) | ||
.setId(key) | ||
.build(); | ||
hostResourceBuilder.addResources(subnetResourceIdType); | ||
}); | ||
goalStateBuilder.putAllSubnetStates(goalStateV2.getSubnetStatesMap()); | ||
} | ||
|
||
if (goalStateV2.getDhcpStatesCount() > 0) { | ||
goalStateV2.getDhcpStatesMap().keySet().forEach(key -> { | ||
Goalstate.ResourceIdType dhcpResourceIdType = Goalstate.ResourceIdType.newBuilder() | ||
.setType(Common.ResourceType.DHCP) | ||
.setId(key) | ||
.build(); | ||
hostResourceBuilder.addResources(dhcpResourceIdType); | ||
}); | ||
goalStateBuilder.putAllDhcpStates(goalStateV2.getDhcpStatesMap()); | ||
} | ||
|
||
if (goalStateV2.getPortStatesCount() > 0) { | ||
goalStateV2.getPortStatesMap().keySet().forEach(key -> { | ||
Goalstate.ResourceIdType portResourceIdType = Goalstate.ResourceIdType.newBuilder() | ||
.setType(Common.ResourceType.PORT) | ||
.setId(key) | ||
.build(); | ||
hostResourceBuilder.addResources(portResourceIdType); | ||
}); | ||
goalStateBuilder.putAllPortStates(goalStateV2.getPortStatesMap()); | ||
} | ||
|
||
if (goalStateV2.getSecurityGroupStatesCount() > 0) { | ||
goalStateV2.getSecurityGroupStatesMap().keySet().forEach(key -> { | ||
Goalstate.ResourceIdType securityGroupResourceIdType = Goalstate.ResourceIdType.newBuilder() | ||
.setType(Common.ResourceType.SECURITYGROUP) | ||
.setId(key) | ||
.build(); | ||
hostResourceBuilder.addResources(securityGroupResourceIdType); | ||
}); | ||
goalStateBuilder.putAllSecurityGroupStates(goalStateV2.getSecurityGroupStatesMap()); | ||
} | ||
|
||
if (goalStateV2.getNeighborStatesCount() > 0) { | ||
goalStateV2.getNeighborStatesMap().keySet().forEach(key -> { | ||
Goalstate.ResourceIdType neighborGroupResourceIdType = Goalstate.ResourceIdType.newBuilder() | ||
.setType(Common.ResourceType.NEIGHBOR) | ||
.setId(key) | ||
.build(); | ||
hostResourceBuilder.addResources(neighborGroupResourceIdType); | ||
}); | ||
goalStateBuilder.putAllNeighborStates(goalStateV2.getNeighborStatesMap()); | ||
} | ||
|
||
if (goalStateV2.getRouterStatesCount() > 0) { | ||
goalStateV2.getRouterStatesMap().entrySet().forEach(entry -> { | ||
Goalstate.ResourceIdType routerResourceIdType = Goalstate.ResourceIdType.newBuilder() | ||
.setType(Common.ResourceType.ROUTER) | ||
.setId(unicastGoalStateV2.getHostIp() + "/" + entry.getKey()) | ||
.build(); | ||
hostResourceBuilder.addResources(routerResourceIdType); | ||
|
||
|
||
if (goalStateBuilder.containsRouterStates(unicastGoalStateV2.getHostIp() + "/" + entry.getKey())) { | ||
Router.RouterConfiguration.Builder routerConfigurationBuilder = goalStateBuilder.getRouterStatesMap().get(unicastGoalStateV2.getHostIp() + "/" + entry.getKey()).getConfiguration().toBuilder(); | ||
routerConfigurationBuilder.addAllSubnetRoutingTables(entry.getValue().getConfiguration().getSubnetRoutingTablesList()); | ||
Router.RouterState.Builder routerStateBuilder = goalStateBuilder.getRouterStatesMap().get(unicastGoalStateV2.getHostIp() + "/" + entry.getKey()).toBuilder(); | ||
routerStateBuilder.setConfiguration(routerConfigurationBuilder); | ||
} else { | ||
goalStateBuilder.putRouterStates(unicastGoalStateV2.getHostIp() + "/" + entry.getKey(), entry.getValue()); | ||
} | ||
}); | ||
} | ||
|
||
if (goalStateV2.getVpcStatesCount() > 0) { | ||
goalStateV2.getVpcStatesMap().keySet().forEach(key -> { | ||
Goalstate.ResourceIdType vpcResourceIdType = Goalstate.ResourceIdType.newBuilder() | ||
.setType(Common.ResourceType.VPC) | ||
.setId(key) | ||
.build(); | ||
hostResourceBuilder.addResources(vpcResourceIdType); | ||
}); | ||
goalStateBuilder.putAllVpcStates(goalStateV2.getVpcStatesMap()); | ||
} | ||
|
||
if (goalStateV2.getGatewayStatesCount() > 0) { | ||
goalStateV2.getGatewayStatesMap().keySet().forEach(key -> { | ||
Goalstate.ResourceIdType gatewayResourceIdType = Goalstate.ResourceIdType.newBuilder() | ||
.setType(Common.ResourceType.GATEWAY) | ||
.setId(key) | ||
.build(); | ||
hostResourceBuilder.addResources(gatewayResourceIdType); | ||
}); | ||
goalStateBuilder.putAllGatewayStates(goalStateV2.getGatewayStatesMap()); | ||
} | ||
|
||
if (goalStateBuilder.containsHostResources(unicastGoalStateV2.getHostIp())) { | ||
Goalstate.HostResources.Builder hostResourceBuilder1 = Goalstate.HostResources.newBuilder(); | ||
hostResourceBuilder1.addAllResources(hostResourceBuilder.getResourcesList()); | ||
hostResourceBuilder1.addAllResources(goalStateBuilder.getHostResourcesMap().get(unicastGoalStateV2.getHostIp()).getResourcesList()); | ||
goalStateBuilder.putHostResources(unicastGoalStateV2.getHostIp(), hostResourceBuilder1.build()); | ||
} else { | ||
goalStateBuilder.putHostResources(unicastGoalStateV2.getHostIp(), hostResourceBuilder.build()); | ||
} | ||
return goalStateBuilder; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DavidLiu506 Why we need to change 8080 to 50001 here? Is this for grpc connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is for grpc connection.