Skip to content

Commit

Permalink
Merge branch 'release-1.147.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
ichupin committed Nov 27, 2023
2 parents 2cec54d + 6ca8d97 commit 67a3ed8
Show file tree
Hide file tree
Showing 20 changed files with 197 additions and 29 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## v1.147.1 (27/11/2023)

### Bug Fixes:
- [#5467](https://github.com/telstra/open-kilda/pull/5467) Fix KildaEntryCacheService cache init on the service activation.

### Improvements:
- [#5477](https://github.com/telstra/open-kilda/pull/5477) [TEST]: Additional logs: Topology rebuild issue [**tests**]
- [#5478](https://github.com/telstra/open-kilda/pull/5478) [TEST]: Improvement: Flows(regular, y-, ha-) leftovers cleanup [**tests**]
- [#5482](https://github.com/telstra/open-kilda/pull/5482) [TEST] Debug enabled for flaky Flow Monitoring spec [**tests**]

For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.147.0...v1.147.1).

---

## v1.147.0 (21/11/2023)

### Bug Fixes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.openkilda.wfm.topology.stats.model.YFlowDescriptor;
import org.openkilda.wfm.topology.stats.model.YFlowSubDescriptor;

import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
Expand Down Expand Up @@ -110,6 +111,11 @@ public KildaEntryCacheService(PersistenceManager persistenceManager, KildaEntryC
this.active = false;
}

@VisibleForTesting
HashSetValuedHashMap<MeterCacheKey, KildaEntryDescriptor> getSwitchAndMeterToFlow() {
return new HashSetValuedHashMap<>(switchAndMeterToFlow);
}

/**
* Activates the service. Fills cache in particular.
*/
Expand Down Expand Up @@ -370,7 +376,7 @@ private void processHaPath(HaFlowPath haPath, CacheAddUpdateHandler cacheHandler
cacheHandler, subPath.getHaFlowId(),
FlowPathMapper.INSTANCE.mapToPathNodes(subPath.getHaFlowPath().getHaFlow(), subPath),
subPath.getCookie(),
subPath.getMeterId(),
subPath.isForward() ? haPath.getSharedPointMeterId() : subPath.getMeterId(),
subPath.getCookie().getDirection() == FlowPathDirection.FORWARD
? subPath.getHaFlowPath().getYPointGroupId() : null,
subPath.getCookie().getDirection() == FlowPathDirection.REVERSE
Expand Down Expand Up @@ -434,14 +440,14 @@ private static KildaEntryDescriptor newEndpointPathEntry(
* @param cookie the FlowSegmentCookie associated with the flow
* @param meterId the MeterId associated with the flow
* @param path the list of PathNodePayload representing the flow path
* @param ypointGroupId the GroupId associated with the y-point switch
* @param yPointGroupId the GroupId associated with the y-point switch
* @param yPointMeterId the MeterId associated with the y-point switch
* @param haSubFlowId the ID of the HA subflow
* @param yPointSwitchId the SwitchId of the y-point switch
*/
private void processHaCookies(
KildaEntryDescriptorHandler cacheHandler, String haFlowId,
FlowSegmentCookie cookie, MeterId meterId, List<PathNodePayload> path, GroupId ypointGroupId,
FlowSegmentCookie cookie, MeterId meterId, List<PathNodePayload> path, GroupId yPointGroupId,
MeterId yPointMeterId, String haSubFlowId, SwitchId yPointSwitchId) {

List<SwitchId> switchIds = path.stream().map(PathNodePayload::getSwitchId).collect(Collectors.toList());
Expand All @@ -461,7 +467,7 @@ private void processHaCookies(
//y-point
if (sw.equals(yPointSwitchId)) {
cacheHandler.handle(newHaFlowDescriptor(sw, HA_FLOW_Y_POINT, haFlowId, modifiedCookie, null,
ypointGroupId, yPointMeterId, resolveHaSubFlowIdForYPoint(cookie, haSubFlowId)));
yPointGroupId, yPointMeterId, resolveHaSubFlowIdForYPoint(cookie, haSubFlowId)));
} else if (i > 0 && i < path.size() - 1) {
// do not send meter into a transit
cacheHandler.handle(newCommonHaFlowDescriptor(sw, TRANSIT, haFlowId, modifiedCookie,
Expand Down Expand Up @@ -546,7 +552,8 @@ private void refreshCache() {
cookieToFlow, switchAndMeterToFlow, switchAndGroupToFlow);
}

private void clearCache() {
@VisibleForTesting
void clearCache() {
cookieToFlow = createNewCookieCacheInstance();
switchAndMeterToFlow = createNewMeterCacheInstance();
switchAndGroupToFlow = createNewGroupCacheInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.times;
Expand All @@ -36,21 +37,29 @@
import org.openkilda.messaging.info.stats.RemoveFlowPathInfo;
import org.openkilda.messaging.info.stats.UpdateFlowPathInfo;
import org.openkilda.model.Flow;
import org.openkilda.model.FlowEncapsulationType;
import org.openkilda.model.FlowEndpoint;
import org.openkilda.model.FlowMirrorPoints;
import org.openkilda.model.FlowPath;
import org.openkilda.model.FlowPathDirection;
import org.openkilda.model.FlowStatus;
import org.openkilda.model.GroupId;
import org.openkilda.model.HaFlow;
import org.openkilda.model.HaFlowPath;
import org.openkilda.model.HaSubFlow;
import org.openkilda.model.MeterId;
import org.openkilda.model.MirrorDirection;
import org.openkilda.model.MirrorGroup;
import org.openkilda.model.MirrorGroupType;
import org.openkilda.model.PathComputationStrategy;
import org.openkilda.model.PathId;
import org.openkilda.model.Switch;
import org.openkilda.model.SwitchId;
import org.openkilda.model.YFlow;
import org.openkilda.model.cookie.CookieBase.CookieType;
import org.openkilda.model.cookie.FlowSegmentCookie;
import org.openkilda.persistence.PersistenceManager;
import org.openkilda.persistence.ferma.repositories.FermaModelUtils;
import org.openkilda.persistence.repositories.FlowRepository;
import org.openkilda.persistence.repositories.HaFlowRepository;
import org.openkilda.persistence.repositories.RepositoryFactory;
Expand All @@ -63,6 +72,8 @@
import org.openkilda.wfm.topology.stats.model.EndpointFlowDescriptor;
import org.openkilda.wfm.topology.stats.model.FlowStatsAndDescriptor;
import org.openkilda.wfm.topology.stats.model.KildaEntryDescriptor;
import org.openkilda.wfm.topology.stats.model.MeasurePoint;
import org.openkilda.wfm.topology.stats.model.MeterCacheKey;
import org.openkilda.wfm.topology.stats.model.MeterStatsAndDescriptor;
import org.openkilda.wfm.topology.stats.model.StatVlanDescriptor;
import org.openkilda.wfm.topology.stats.model.StatsAndDescriptor;
Expand All @@ -71,7 +82,9 @@
import org.openkilda.wfm.topology.stats.model.YFlowDescriptor;
import org.openkilda.wfm.topology.stats.model.YFlowSubDescriptor;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -83,6 +96,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@ExtendWith(MockitoExtension.class)
public class KildaEntryCacheServiceTest {
Expand Down Expand Up @@ -873,6 +887,32 @@ public void serviceActivationAfterDeactivationTest() {
verify(yFlowRepository, times(2)).findAll();
}

@Test
void whenHaFlowExist_activateService_populatesCache() {
service.clearCache();
HashSetValuedHashMap<MeterCacheKey, KildaEntryDescriptor> cache = service.getSwitchAndMeterToFlow();
assertTrue(cache.isEmpty());

try {
prepareHaFlowRepository();

service.deactivate();
service.activate();

List<String> measurePoints = service.getSwitchAndMeterToFlow().values().stream()
.map(KildaEntryDescriptor::getMeasurePoint)
.map(MeasurePoint::toString)
.sorted()
.collect(Collectors.toList());

assertEquals(2, measurePoints.size());
assertEquals("HA_FLOW_Y_POINT", measurePoints.get(0));
assertEquals("INGRESS", measurePoints.get(1));
} finally {
service.clearCache();
}
}

private void assertCookieCache(
List<FlowStatsAndDescriptor> statsEntries, FlowSegmentCookie cookie,
KildaEntryDescriptor expectedDescriptor) {
Expand Down Expand Up @@ -964,4 +1004,54 @@ private MeterStatsData getMeterStatsDataDstSwitch() {
new MeterStatsEntry(REVERSE_METER_ID, 0, 0),
new MeterStatsEntry(PROTECTED_REVERSE_METER_ID, 0, 0)));
}

private HaFlow createHaFlowWithDistinctYSwitch() {
Switch sharedSwitch = Switch.builder().switchId(new SwitchId(1)).build();
Switch yPointSwitch = Switch.builder().switchId(new SwitchId(2)).build();
Switch endpointSwitchA = Switch.builder().switchId(new SwitchId(3)).build();
Switch endpointSwitchB = Switch.builder().switchId(new SwitchId(4)).build();

HaFlow haFlow = FermaModelUtils.buildHaFlow("HA-flow ID", sharedSwitch, 1, 1, 1,
12, 13, 14, FlowEncapsulationType.VXLAN, 1, "description",
PathComputationStrategy.COST, FlowStatus.UP, false, false, false,
false, false);

HaSubFlow haSubFlow1 = FermaModelUtils.buildHaSubFlow("sub1", endpointSwitchA, 2, 2, 2, "");
HaSubFlow haSubFlow2 = FermaModelUtils.buildHaSubFlow("sub1", endpointSwitchB, 3, 3, 3, "");

haFlow.setHaSubFlows(Lists.newArrayList(haSubFlow1, haSubFlow2));

HaFlowPath forward = FermaModelUtils.buildHaFlowPath(new PathId("1"), 1, FlowSegmentCookie.builder()
.direction(FlowPathDirection.FORWARD).build(),
new MeterId(12), new MeterId(13), sharedSwitch, sharedSwitch.getSwitchId(), GroupId.MIN_FLOW_GROUP_ID);
forward.setHaSubFlows(Lists.newArrayList(haSubFlow1, haSubFlow2));

FlowPath forwardPath = FermaModelUtils.buildPath(new PathId("1"), forward, sharedSwitch, endpointSwitchA);
forwardPath.setHaSubFlow(haSubFlow1);
forwardPath.setCookie(FORWARD_PATH_COOKIE);
forwardPath.setSegments(FermaModelUtils.buildSegments(new PathId("1"),
sharedSwitch, yPointSwitch, endpointSwitchA));
forward.setSubPaths(Collections.singletonList(forwardPath));
haFlow.setForwardPath(forward);

HaFlowPath reverse = FermaModelUtils.buildHaFlowPath(new PathId("2"), 1, FlowSegmentCookie.builder()
.direction(FlowPathDirection.REVERSE).build(),
new MeterId(15), new MeterId(16), sharedSwitch, sharedSwitch.getSwitchId(), GroupId.MIN_FLOW_GROUP_ID);
reverse.setHaSubFlows(Lists.newArrayList(haSubFlow1, haSubFlow2));

FlowPath reversePath = FermaModelUtils.buildPath(new PathId("2"), reverse, sharedSwitch, endpointSwitchA);
reversePath.setCookie(REVERSE_PATH_COOKIE);
reversePath.setHaSubFlow(haSubFlow1);
reversePath.setSegments(FermaModelUtils.buildSegments(new PathId("2"),
endpointSwitchA, yPointSwitch, sharedSwitch));
reverse.setSubPaths(Collections.singletonList(reversePath));

haFlow.setReversePath(reverse);

return haFlow;
}

private void prepareHaFlowRepository() {
when(haFlowRepository.findAll()).thenReturn(Collections.singletonList(createHaFlowWithDistinctYSwitch()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class TopologyConfig {
@Scope("specThread")
TopologyDefinition getTopologyDefinition(TopologyPool topologyPool) throws IOException {
def topo = topologyPool.take()
log.info("Assigned topology: " + topo)
topo.setParentPool(topologyPool)
return topo
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,15 @@ class EnvExtension extends AbstractGlobalExtension implements SpringContextListe
northbound.deleteLink(new LinkParametersDto(it.source.switchId.toString(), it.source.portNo,
it.destination.switchId.toString(), it.destination.portNo))
}
Wrappers.wait(WAIT_OFFSET / 2) {
Wrappers.wait(WAIT_OFFSET) {
assert northbound.getAllLinks().empty
}
log.info("Deleting all switches")
northbound.getAllSwitches().each { northbound.deleteSwitch(it.switchId, false) }
Wrappers.wait(WAIT_OFFSET / 2) { assert northbound.getAllSwitches().empty }

List<TopologyDefinition> topologies = [topology, topologyPool.topologies].flatten()
log.info("Creating topologies (${topologies.size()})")
def unblocked = false
topologies.each { topo ->
def lab = labService.createLab(topo) //can't create in parallel, hangs on Jenkins. why?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class HaFlowHelper {
ep
}
return HaFlowCreatePayload.builder()
.haFlowId(flowHelperV2.generateFlowId())
.haFlowId(flowHelperV2.generateFlowId() + "_haflow")
.sharedEndpoint(se)
.subFlows(subFlows)
.maximumBandwidth(1000)
Expand Down Expand Up @@ -150,6 +150,7 @@ class HaFlowHelper {
}

return HaFlowCreatePayload.builder()
.haFlowId(flowHelperV2.generateFlowId() + "_haflow")
.sharedEndpoint(HaFlowSharedEndpoint.builder().switchId(sw.dpId).portNumber(sePort).vlanId(seVlan).build())
.subFlows(subFlows)
.maximumBandwidth(1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package org.openkilda.functionaltests.helpers
import groovy.transform.InheritConstructors
import groovy.util.logging.Slf4j

import static java.lang.System.currentTimeMillis

@Slf4j
class Wrappers {

Expand Down Expand Up @@ -67,8 +69,8 @@ class Wrappers {
* @param closure code to repeat in a loop
*/
static void timedLoop(double timeout, Closure closure) {
long endTime = System.currentTimeMillis() + (long) (timeout * 1000)
while (System.currentTimeMillis() < endTime) {
long endTime = currentTimeMillis() + (long) (timeout * 1000)
while (currentTimeMillis() < endTime) {
closure.call()
}
}
Expand All @@ -84,13 +86,16 @@ class Wrappers {
* @return True if wait was successful, throws WaitTimeoutException otherwise
*/
static boolean wait(double timeout, double retryInterval = 0.5, Closure closure) {
long endTime = System.currentTimeMillis() + (long) (timeout * 1000)
long endTime = currentTimeMillis() + (long) (timeout * 1000)
long sleepTime = (long) (retryInterval * 1000)
Throwable thrown = null
while (System.currentTimeMillis() < endTime) {
while (currentTimeMillis() < endTime) {
try {
def result = closure.call()
if (result || result == null) {
log.debug("Closure: ${closure.toString()}\n" +
"Took ${(currentTimeMillis() - (endTime - (long) (timeout * 1000))) * 1000} seconds\n" +
"Limit was ${timeout} seconds")
return true
} else {
thrown = null
Expand All @@ -112,12 +117,12 @@ class Wrappers {
* @return result of the given closure
*/
static def benchmark(name, closure) {
def start = System.currentTimeMillis()
def start = currentTimeMillis()
def result
try {
result = closure.call()
} finally {
def now = System.currentTimeMillis()
def now = currentTimeMillis()
log.debug("$name took " + (now - start) + "ms")
}
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class YFlowHelper {
NorthboundService northbound
@Autowired
PathHelper pathHelper
@Autowired
FlowHelperV2 flowHelperV2

def random = new Random()
def faker = new Faker()
Expand Down Expand Up @@ -81,6 +83,7 @@ class YFlowHelper {
ep
}
return YFlowCreatePayload.builder()
.yFlowId(flowHelperV2.generateFlowId() + "_yflow")
.sharedEndpoint(se)
.subFlows(subFlows)
.maximumBandwidth(1000)
Expand Down Expand Up @@ -133,6 +136,7 @@ class YFlowHelper {
}

return YFlowCreatePayload.builder()
.yFlowId(flowHelperV2.generateFlowId() + "_yflow")
.sharedEndpoint(YFlowSharedEndpoint.builder().switchId(sw.dpId).portNumber(sePort).build())
.subFlows(subFlows)
.maximumBandwidth(1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,6 @@ class HaFlowBuilder {

private String generateFlowId() {
return new SimpleDateFormat("ddMMMHHmmss_SSS", Locale.US).format(new Date()) + "_" +
faker.food().ingredient().toLowerCase().replaceAll(/\W/, "") + faker.number().digits(4)
faker.food().ingredient().toLowerCase().replaceAll(/\W/, "") + faker.number().digits(4) + "_haflow"
}
}
Loading

0 comments on commit 67a3ed8

Please sign in to comment.