Skip to content

Commit

Permalink
Merge branch 'release-1.118.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
dpoltavets committed Mar 23, 2022
2 parents 6c7ed3f + 3a609de commit 7c2e819
Show file tree
Hide file tree
Showing 56 changed files with 449 additions and 170 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Changelog

## v1.118.0 (24/03/2022)

### Features:
- [#4730](https://github.com/telstra/open-kilda/pull/4730) Added HistoryTopology to save flow history events
- [#4733](https://github.com/telstra/open-kilda/pull/4733) GRPC stub: return errors on invalid requests (Issue: [#4728](https://github.com/telstra/open-kilda/issues/4728))

### Bug Fixes:
- [#4724](https://github.com/telstra/open-kilda/pull/4724) Fixed inner vlan change in case of partial update of default flow (Issue: [#4411](https://github.com/telstra/open-kilda/issues/4411)) [**storm-topologies**]
- [#4725](https://github.com/telstra/open-kilda/pull/4725) Fixed lag creation with duplicate port numbers (Issue: [#4644](https://github.com/telstra/open-kilda/issues/4644)) [**storm-topologies**]

### Improvements:
- [#4679](https://github.com/telstra/open-kilda/pull/4679) Bumped version of github api and jwt for changelog generator
- [#4715](https://github.com/telstra/open-kilda/pull/4715) DeleteAllFlowsV1 endpoint correlation-id [**northbound**]


For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.117.0...v1.118.0).

### Affected Components:
grpc, nbworker, history, nb, flow-hs, swmanager

---

## v1.117.0 (21/03/2022)

### Features:
Expand Down
5 changes: 5 additions & 0 deletions confd/conf.d/history-topology.topology_definition.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[template]
src = "history-topology/history-topology.tmpl"
dest = "src-java/history-topology/history-storm-topology/src/main/resources/history-topology.yaml"
keys = [ "/" ]
mode = "0644"
15 changes: 15 additions & 0 deletions confd/templates/history-topology/history-topology.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# topology configuration
config:
topology.parallelism: {{ getv "/kilda_storm_history_parallelism" }}
topology.workers: {{ getv "/kilda_storm_parallelism_workers_count" }}
topology.spouts.parallelism: 1

# spout definitions
spouts:
- id: "zookeeper.spout"
parallelism: 1

# bolt definitions
bolts:
- id: "zookeeper.bolt"
parallelism: 1
1 change: 1 addition & 0 deletions confd/vars/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ kilda_storm_flow_hs_parallelism: 2
kilda_storm_flow_hs_reroute_hub_count_multiplier: 2
kilda_storm_flowhs_workers: 1
kilda_storm_parallelism_workers_count: 1
kilda_storm_history_parallelism: 2

kilda_storm_flow_monitoring_parallelism: 2

Expand Down
5 changes: 5 additions & 0 deletions docker/wfm/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ COPY BUILD/flowmonitoring-storm-topology/build.gradle /app/flowmonitor
COPY BUILD/flowmonitoring-storm-topology/resources/flowmonitoring-topology.yaml /app/flowmonitoring-storm-topology/topology-definition.yaml


#history
COPY BUILD/history-storm-topology/libs/* /app/history-storm-topology/libs/
COPY BUILD/history-storm-topology/build.gradle /app/history-storm-topology/
COPY BUILD/history-storm-topology/resources/history-topology.yaml /app/history-storm-topology/topology-definition.yaml


#isllatency
COPY BUILD/isllatency-storm-topology/libs/* /app/isllatency-storm-topology/libs/
Expand Down
6 changes: 3 additions & 3 deletions docker/wfm/app/Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# WFM topology management

TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control
BLUE_TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control
GREEN_TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control
TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control history
BLUE_TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control history
GREEN_TOPOLOGY := network flowhs flowmonitoring stats isllatency opentsdb portstate nbworker ping reroute swmanager floodlightrouter connecteddevices server42-control history

STORM := /opt/storm/bin/storm

Expand Down
8 changes: 8 additions & 0 deletions docker/zookeeper/bin/create-zknode-and-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ done
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/connecteddevices/blue/expected_state ""
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/connecteddevices/blue/build-version "v3r\$i0n"

# add history topology zkNodes
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history ""
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue ""
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue/signal "START"
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue/state ""
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue/expected_state ""
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/history/blue/build-version "v3r\$i0n"

# add flowhs topology zkNodes
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/flowhs ""
/opt/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181 create /${KILDA_ZKNODE}/flowhs/blue ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.openkilda.model.MeterId;
import org.openkilda.model.PathComputationStrategy;
import org.openkilda.model.SwitchId;
import org.openkilda.model.cookie.Cookie;
import org.openkilda.model.cookie.FlowSegmentCookie;

import lombok.AllArgsConstructor;
import lombok.Builder;
Expand All @@ -44,8 +44,8 @@ public class FlowDumpData implements Serializable {
private int destinationVlan;
private int sourceInnerVlan;
private int destinationInnerVlan;
private Cookie forwardCookie;
private Cookie reverseCookie;
private FlowSegmentCookie forwardCookie;
private FlowSegmentCookie reverseCookie;
private MeterId forwardMeterId;
private MeterId reverseMeterId;
private String forwardPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@

package org.openkilda.wfm.share.history.model;

import org.openkilda.messaging.info.InfoData;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Value;

import java.io.Serializable;

@Value
@Builder
public class FlowHistoryHolder implements Serializable {
@EqualsAndHashCode(callSuper = false)
public class FlowHistoryHolder extends InfoData {
private final String taskId;

private final FlowDumpData flowDumpData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import org.openkilda.rulemanager.RuleManagerConfig;
import org.openkilda.wfm.LaunchEnvironment;
import org.openkilda.wfm.kafka.AbstractMessageSerializer;
import org.openkilda.wfm.kafka.MessageSerializer;
import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig;
import org.openkilda.wfm.share.history.bolt.HistoryBolt;
import org.openkilda.wfm.share.hubandspoke.CoordinatorBolt;
import org.openkilda.wfm.share.hubandspoke.CoordinatorSpout;
import org.openkilda.wfm.share.hubandspoke.HubBolt;
Expand Down Expand Up @@ -161,7 +161,7 @@ public StormTopology createTopology() {
flowMonitoringTopologyOutput(tb);
statsTopologyOutput(tb);

history(tb, persistenceManager);
history(tb);

zkSpout(tb);
zkBolt(tb);
Expand Down Expand Up @@ -1028,28 +1028,25 @@ private void statsTopologyOutput(TopologyBuilder topologyBuilder) {
Stream.HUB_TO_STATS_TOPOLOGY_SENDER.name());
}

private void history(TopologyBuilder topologyBuilder, PersistenceManager persistenceManager) {
HistoryBolt historyBolt = new HistoryBolt(persistenceManager);
Fields grouping = HistoryBolt.newInputGroupingFields();
declareBolt(topologyBuilder, historyBolt, ComponentId.HISTORY_BOLT.name())
.fieldsGrouping(ComponentId.FLOW_CREATE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(ComponentId.FLOW_UPDATE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(ComponentId.FLOW_REROUTE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(ComponentId.FLOW_DELETE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(ComponentId.FLOW_PATH_SWAP_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(ComponentId.FLOW_SWAP_ENDPOINTS_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(
ComponentId.FLOW_CREATE_MIRROR_POINT_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(
ComponentId.FLOW_DELETE_MIRROR_POINT_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(
ComponentId.YFLOW_CREATE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(
ComponentId.YFLOW_UPDATE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(
ComponentId.YFLOW_REROUTE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping)
.fieldsGrouping(
ComponentId.YFLOW_DELETE_HUB.name(), Stream.HUB_TO_HISTORY_BOLT.name(), grouping);
private void history(TopologyBuilder topologyBuilder) {
KafkaBolt<String, Message> kafkaBolt = makeKafkaBolt(
topologyConfig.getKafkaHistoryTopic(), MessageSerializer.class);
declareBolt(topologyBuilder, kafkaBolt, ComponentId.HISTORY_BOLT.name())
.shuffleGrouping(ComponentId.FLOW_CREATE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.FLOW_UPDATE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.FLOW_REROUTE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.FLOW_DELETE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.FLOW_PATH_SWAP_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.FLOW_SWAP_ENDPOINTS_HUB.name(),
Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.FLOW_CREATE_MIRROR_POINT_HUB.name(),
Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.FLOW_DELETE_MIRROR_POINT_HUB.name(),
Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.YFLOW_CREATE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.YFLOW_UPDATE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.YFLOW_REROUTE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name())
.shuffleGrouping(ComponentId.YFLOW_DELETE_HUB.name(), Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name());
}

@Override
Expand Down Expand Up @@ -1158,7 +1155,7 @@ public enum Stream {
ROUTER_TO_YFLOW_VALIDATION_HUB,

HUB_TO_SPEAKER_WORKER,
HUB_TO_HISTORY_BOLT,
HUB_TO_HISTORY_TOPOLOGY_SENDER,
HUB_TO_METRICS_BOLT,

SPEAKER_WORKER_TO_HUB_CREATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ default String getKafkaPingTopic() {
return getKafkaTopics().getPingTopic();
}

default String getKafkaHistoryTopic() {
return getKafkaTopics().getTopoHistoryTopic();
}

default String getKafkaFlowHsServer42StormNotifyTopic() {
return getKafkaTopics().getFlowHsServer42StormNotifyTopic();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package org.openkilda.wfm.topology.flowhs.bolts;

import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER;
Expand Down Expand Up @@ -44,7 +44,6 @@
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig;
import org.openkilda.wfm.share.flow.resources.FlowResourcesManager;
import org.openkilda.wfm.share.history.bolt.HistoryBolt;
import org.openkilda.wfm.share.history.model.FlowHistoryHolder;
import org.openkilda.wfm.share.hubandspoke.HubBolt;
import org.openkilda.wfm.share.utils.KeyProvider;
Expand Down Expand Up @@ -164,8 +163,10 @@ public void sendNorthboundResponse(@NonNull Message message) {

@Override
public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) {
emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple(
historyHolder, getCommandContext()));
InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(),
getCommandContext().getCorrelationId());
emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(),
new Values(historyHolder.getTaskId(), message));
}

@Override
Expand Down Expand Up @@ -221,7 +222,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS);
declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_SERVER42_CONTROL_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(ZkStreams.ZK.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package org.openkilda.wfm.topology.flowhs.bolts;

import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_FLOW_MONITORING_TOPOLOGY_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_METRICS_BOLT;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_PING_SENDER;
Expand All @@ -41,7 +41,6 @@
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig;
import org.openkilda.wfm.share.flow.resources.FlowResourcesManager;
import org.openkilda.wfm.share.history.bolt.HistoryBolt;
import org.openkilda.wfm.share.history.model.FlowHistoryHolder;
import org.openkilda.wfm.share.hubandspoke.HubBolt;
import org.openkilda.wfm.share.utils.KeyProvider;
Expand Down Expand Up @@ -150,8 +149,10 @@ public void sendNorthboundResponse(@NonNull Message message) {

@Override
public void sendHistoryUpdate(@NonNull FlowHistoryHolder historyHolder) {
emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple(
historyHolder, getCommandContext()));
InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(),
getCommandContext().getCorrelationId());
emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(),
new Values(historyHolder.getTaskId(), message));
}

@Override
Expand Down Expand Up @@ -209,7 +210,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS);
declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_PING_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_SERVER42_CONTROL_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(ZkStreams.ZK.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package org.openkilda.wfm.topology.flowhs.bolts;

import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_BOLT;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_NB_RESPONSE_SENDER;
import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.HUB_TO_SPEAKER_WORKER;
import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD;
Expand All @@ -25,6 +25,7 @@
import org.openkilda.floodlight.api.response.SpeakerFlowSegmentResponse;
import org.openkilda.messaging.Message;
import org.openkilda.messaging.command.flow.FlowMirrorPointCreateRequest;
import org.openkilda.messaging.info.InfoMessage;
import org.openkilda.pce.AvailableNetworkFactory;
import org.openkilda.pce.PathComputer;
import org.openkilda.pce.PathComputerConfig;
Expand All @@ -33,7 +34,6 @@
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.share.flow.resources.FlowResourcesConfig;
import org.openkilda.wfm.share.flow.resources.FlowResourcesManager;
import org.openkilda.wfm.share.history.bolt.HistoryBolt;
import org.openkilda.wfm.share.history.model.FlowHistoryHolder;
import org.openkilda.wfm.share.hubandspoke.HubBolt;
import org.openkilda.wfm.share.utils.KeyProvider;
Expand Down Expand Up @@ -140,8 +140,10 @@ public void sendNorthboundResponse(Message message) {

@Override
public void sendHistoryUpdate(FlowHistoryHolder historyHolder) {
emit(Stream.HUB_TO_HISTORY_BOLT.name(), getCurrentTuple(), HistoryBolt.newInputTuple(
historyHolder, getCommandContext()));
InfoMessage message = new InfoMessage(historyHolder, getCommandContext().getCreateTime(),
getCommandContext().getCorrelationId());
emitWithContext(Stream.HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), getCurrentTuple(),
new Values(historyHolder.getTaskId(), message));
}

@Override
Expand All @@ -161,7 +163,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declareStream(HUB_TO_SPEAKER_WORKER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_NB_RESPONSE_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(HUB_TO_HISTORY_BOLT.name(), HistoryBolt.INPUT_FIELDS);
declarer.declareStream(HUB_TO_HISTORY_TOPOLOGY_SENDER.name(), MessageKafkaTranslator.STREAM_FIELDS);
declarer.declareStream(ZkStreams.ZK.toString(),
new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT));
}
Expand Down
Loading

0 comments on commit 7c2e819

Please sign in to comment.