Skip to content

Commit

Permalink
Issue 3495: Split metadata scalability test into two separate tests (p…
Browse files Browse the repository at this point in the history
…ravega#3496)

* Splits metadata scalability test into two separate tests so that they are individually not resource hungry and test intended scenarios.

Signed-off-by: Shivesh Ranjan <[email protected]>
  • Loading branch information
shiveshr authored and fpj committed Mar 28, 2019
1 parent f321595 commit 911df70
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public final class Config {
private static final Property<String> PROPERTY_TLS_TRUST_STORE = Property.named("auth.tlsTrustStore", "");
private static final Property<String> PROPERTY_TLS_KEY_FILE = Property.named("auth.tlsKeyFile", "");
private static final Property<String> PROPERTY_TOKEN_SIGNING_KEY = Property.named("auth.tokenSigningKey", "");
private static final Property<String> PROPERTY_ZK_URL = Property.named("zk.url", "localhost:2121");
private static final Property<String> PROPERTY_ZK_URL = Property.named("zk.url", "localhost:2181");
private static final Property<Integer> PROPERTY_ZK_RETRY_MILLIS = Property.named("zk.retryIntervalMillis", 5000);
private static final Property<Integer> PROPERTY_ZK_MAX_RETRY_COUNT = Property.named("maxRetries", 5);
private static final Property<Integer> PROPERTY_ZK_SESSION_TIMEOUT_MILLIS = Property.named("sessionTimeoutMillis", 10000);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.test.system;

import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.ControllerImpl;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import io.pravega.test.system.framework.SystemTestRunner;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* This test creates a stream with 10k segments and then rapidly scales it 10 times.
* Then it performs truncation a random number of times.
*/
@Slf4j
@RunWith(SystemTestRunner.class)
public class MetadataScalabilityLargeNumSegmentsTest extends MetadataScalabilityTest {
private static final String STREAM_NAME = "metadataScalabilitySegments";
private static final int NUM_SEGMENTS = 10000;
private static final StreamConfiguration CONFIG = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(NUM_SEGMENTS)).build();
private static final int SCALES_TO_PERFORM = 10;

private final AtomicInteger counter = new AtomicInteger(0);

@Override
String getStreamName() {
return STREAM_NAME;
}

@Override
StreamConfiguration getStreamConfig() {
return CONFIG;
}

@Override
int getScalesToPerform() {
return SCALES_TO_PERFORM;
}

/**
* Chooses one segment out of the current segments and selects its matching range as the input for next scale.
* @param sortedCurrentSegments sorted current segments
* @return scale input for next scale
*/
Pair<List<Long>, Map<Double, Double>> getScaleInput(ArrayList<Segment> sortedCurrentSegments) {
int i = counter.incrementAndGet();
List<Long> segmentsToSeal = sortedCurrentSegments.stream()
.filter(x -> i - 1 == StreamSegmentNameUtils.getSegmentNumber(x.getSegmentId()) % NUM_SEGMENTS)
.map(Segment::getSegmentId).collect(Collectors.toList());
Map<Double, Double> newRanges = new HashMap<>();
double delta = 1.0 / NUM_SEGMENTS;
newRanges.put(delta * (i - 1), delta * i);

return new ImmutablePair<>(segmentsToSeal, newRanges);
}

@Test
public void largeNumSegmentsScalability() {
testState = new TestState(false);

ControllerImpl controller = getController();

List<List<Segment>> listOfEpochs = scale(controller);
// TODO: uncomment truncation as part of #3478
// truncation(controller, listOfEpochs);
sealAndDeleteStream(controller);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.test.system;

import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.ControllerImpl;
import io.pravega.test.system.framework.SystemTestRunner;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* This test creates a stream with 10 segments and then rapidly scales it 1010 times.
* Then it performs truncation a random number of times.
*/
@Slf4j
@RunWith(SystemTestRunner.class)
public class MetadataScalabilityLargeScalesTest extends MetadataScalabilityTest {
private static final String STREAM_NAME = "metadataScalabilityScale";
private static final int NUM_SEGMENTS = 10;
private static final StreamConfiguration CONFIG = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(NUM_SEGMENTS)).build();
private static final int SCALES_TO_PERFORM = 1010;

private final Map<Double, Double> newRanges = new HashMap<>();

@Override
String getStreamName() {
return STREAM_NAME;
}

@Override
StreamConfiguration getStreamConfig() {
return CONFIG;
}

@Override
int getScalesToPerform() {
return SCALES_TO_PERFORM;
}

/**
* Scale all the segments in the current epoch and replace them with new identical 10 segments.
* @param sortedCurrentSegments segments in current epoch
* @return scale input for next scale to perform
*/
Pair<List<Long>, Map<Double, Double>> getScaleInput(ArrayList<Segment> sortedCurrentSegments) {
return new ImmutablePair<>(getSegmentsToSeal(sortedCurrentSegments), getNewRanges());
}

private List<Long> getSegmentsToSeal(ArrayList<Segment> sorted) {
return sorted.stream()
.map(Segment::getSegmentId).collect(Collectors.toList());
}

@Synchronized
private Map<Double, Double> getNewRanges() {
if (newRanges.isEmpty()) {
double delta = 1.0 / NUM_SEGMENTS;
for (int i = 0; i < NUM_SEGMENTS; i++) {
double low = delta * i;
double high = i == NUM_SEGMENTS - 1 ? 1.0 : delta * (i + 1);

newRanges.put(low, high);
}
}
return newRanges;
}

@Test
public void largeNumScalesScalability() {
testState = new TestState(false);

ControllerImpl controller = getController();

List<List<Segment>> listOfEpochs = scale(controller);
truncation(controller, listOfEpochs);
sealAndDeleteStream(controller);
}
}
Loading

0 comments on commit 911df70

Please sign in to comment.