Skip to content

Commit

Permalink
Update RCF to v3.8 and Enable Auto AD with 'Alert Once' Option
Browse files Browse the repository at this point in the history
This PR added support for automatic Anomaly Detection (AD) and the 'Alert Once' option introduced in RCF 3.8.

Testing done:

1. Deserialization Test:
* Verified model deserialization from 3.0-rc3.
* Ensured consistent scoring using the rc3 checkpoint and rc3 dependency on identical data.

2. Backward Compatibility Test:
* Executed a mixed cluster with versions 2.10 and 3.0.
* Validated that older detectors still produce results without throwing any exceptions in a blue-green simulation scenario.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Aug 14, 2023
1 parent cadc9bb commit b3fe699
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 22 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.10.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.14.1"
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/org/opensearch/ad/ml/EntityColdStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.timeseries.util.ExceptionUtil;

import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

/**
Expand Down Expand Up @@ -375,17 +376,18 @@ private void trainModelFromDataSegments(
// overlapping x3, x4, and only store x5, x6.
.shingleSize(shingleSize)
.internalShinglingEnabled(true)
.anomalyRate(1 - this.thresholdMinPvalue);
.anomalyRate(1 - this.thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true);

if (rcfSeed > 0) {
rcfBuilder.randomSeed(rcfSeed);
}
ThresholdedRandomCutForest trcf = new ThresholdedRandomCutForest(rcfBuilder);

while (!dataPoints.isEmpty()) {
trcf.process(dataPoints.poll(), 0);
}

EntityModel model = entityState.getModel();
if (model == null) {
model = new EntityModel(entity, new ArrayDeque<>(), null);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/opensearch/ad/ml/ModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.AnomalyDescriptor;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

Expand Down Expand Up @@ -532,6 +533,10 @@ private void trainModelForStep(
.boundingBoxCacheFraction(TimeSeriesSettings.REAL_TIME_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(detector.getShingleSize())
.anomalyRate(1 - thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)
.build();
Arrays.stream(dataPoints).forEach(s -> trcf.process(s, 0));

Expand Down Expand Up @@ -622,6 +627,10 @@ public List<ThresholdingResult> getPreviewResults(double[][] dataPoints, int shi
.boundingBoxCacheFraction(AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.anomalyRate(1 - this.thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)
.build();
return Arrays.stream(dataPoints).map(point -> {
AnomalyDescriptor descriptor = trcf.process(point, 0);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/opensearch/ad/task/ADBatchTaskCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.timeseries.model.Entity;

import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

/**
Expand Down Expand Up @@ -80,6 +81,10 @@ protected ADBatchTaskCache(ADTask adTask) {
.boundingBoxCacheFraction(AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.anomalyRate(1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)
.build();

this.thresholdModelTrained = false;
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/org/opensearch/ad/MemoryTrackerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.timeseries.settings.TimeSeriesSettings;

import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

public class MemoryTrackerTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -109,6 +110,9 @@ public void setUp() throws Exception {
.boundingBoxCacheFraction(TimeSeriesSettings.REAL_TIME_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.internalShinglingEnabled(true)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();

detector = mock(AnomalyDetector.class);
Expand Down Expand Up @@ -152,6 +156,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(shingleSize)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(603708, tracker.estimateTRCFModelSize(rcf2));
assertTrue(tracker.isHostingAllowed(detectorId, rcf2));
Expand All @@ -171,6 +178,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(false)
// same with dimension for opportunistic memory saving
.shingleSize(1)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(1685208, tracker.estimateTRCFModelSize(rcf3));

Expand All @@ -188,6 +198,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(1)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(521304, tracker.estimateTRCFModelSize(rcf4));

Expand All @@ -205,6 +218,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(2)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(467340, tracker.estimateTRCFModelSize(rcf5));

Expand All @@ -222,6 +238,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(4)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(603676, tracker.estimateTRCFModelSize(rcf6));

Expand All @@ -239,6 +258,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(16)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(401481, tracker.estimateTRCFModelSize(rcf7));

Expand All @@ -256,6 +278,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(32)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(1040432, tracker.estimateTRCFModelSize(rcf8));

Expand All @@ -273,6 +298,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(64)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(1040688, tracker.estimateTRCFModelSize(rcf9));

Expand All @@ -290,6 +318,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(65)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
expectThrows(IllegalArgumentException.class, () -> tracker.estimateTRCFModelSize(rcf10));
}
Expand Down
146 changes: 141 additions & 5 deletions src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@

import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.AnomalyDescriptor;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;
import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestMapper;
Expand Down Expand Up @@ -256,6 +257,10 @@ private ThresholdedRandomCutForest createTRCF() {
.precision(Precision.FLOAT_32)
.randomSeed(seed)
.boundingBoxCacheFraction(0)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)
.build();
for (double[] point : data) {
forest.process(point, 0);
Expand Down Expand Up @@ -910,8 +915,6 @@ public void testFromEntityModelCheckpointBWC() throws FileNotFoundException, IOE
assertEquals(1, forest.getDimensions());
assertEquals(10, forest.getNumberOfTrees());
assertEquals(256, forest.getSampleSize());
// there are at least 10 scores in the checkpoint
assertTrue(trcf.getThresholder().getCount() > 10);

Random random = new Random(0);
for (int i = 0; i < 100; i++) {
Expand Down Expand Up @@ -979,8 +982,6 @@ public void testFromEntityModelCheckpointNoThreshold() throws FileNotFoundExcept
assertEquals(1, forest.getDimensions());
assertEquals(10, forest.getNumberOfTrees());
assertEquals(256, forest.getSampleSize());
// there are no scores in the checkpoint
assertEquals(0, trcf.getThresholder().getCount());
}

public void testFromEntityModelCheckpointWithEntity() throws Exception {
Expand Down Expand Up @@ -1082,11 +1083,12 @@ public void testDeserializeTRCFModel() throws Exception {
grade.add(0.0);
grade.add(0.0);
grade.add(0.0);
// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(descriptor.getAnomalyGrade(), grade.get(i), 1e-9);
}
}

Expand All @@ -1098,4 +1100,138 @@ public void testShouldSave() {
// 1658863778000L + 6 hrs < Instant.now
assertTrue(checkpointDao.shouldSave(Instant.ofEpochMilli(1658863778000L), false, Duration.ofHours(6), clock));
}

// This test is intended to check if given a checkpoint created by RCF-3.0-rc3 ("rcf_3_0_rc3_single_stream.json")
// and given the same sample data will rc3 and current RCF version (this test originally created when 3.0-rc3 is in use)
// will produce the same anomaly scores.
// The scores in this method were produced from AD running with RCF3.0-rc3 dependency
// and this test runs with the most recent RCF dependency that is being pulled by this project.
public void testDeserialize_rcf3_rc3_single_stream_model() throws Exception {
// Model in file rc1_trcf_model_direct is a checkpoint creatd by RCF-3.0-rc1
URI uri = ClassLoader.getSystemResource("org/opensearch/ad/ml/rcf_3_0_rc3_single_stream.json").toURI();
String filePath = Paths.get(uri).toString();
String json = Files.readString(Paths.get(filePath), Charset.defaultCharset());
// For the parsing of .toTrcf to work I had to manually change "\u003d" in code back to =.
// In the byte array it doesn't seem like this is an issue but whenever reading the byte array response into a file it
// converts "=" to "\u003d" https://groups.google.com/g/google-gson/c/JDHUo9DWyyM?pli=1
// I also needed to bypass the trcf as it wasn't being read as a key value but instead part of the string
Map map = gson.fromJson(json, Map.class);
String model = (String) ((Map) ((Map) ((ArrayList) ((Map) map.get("hits")).get("hits")).get(0)).get("_source")).get("modelV2");
// model = model.split(":")[1].substring(1);
ThresholdedRandomCutForest forest = checkpointDao.toTrcf(model);

// single-stream model uses external shingling
List<double[]> coldStartData = new ArrayList<>();
double[] sample1 = new double[] { 64, 58, 59, 60, 61, 62, 63, 57.0 };
double[] sample2 = new double[] { 58, 59, 60, 61, 62, 63, 57.0, 1.0 };
double[] sample3 = new double[] { 59, 60, 61, 62, 63, 57.0, 1.0, -19.0 };
double[] sample4 = new double[] { 60, 61, 62, 63, 57.0, 1.0, -19.0, 13.0 };
double[] sample5 = new double[] { 61, 62, 63, 57.0, 1.0, -19.0, 13.0, 41.0 };

coldStartData.add(sample1);
coldStartData.add(sample2);
coldStartData.add(sample3);
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
List<Double> scores = new ArrayList<>();
scores.add(3.3830441158587066);
scores.add(2.825961659490065);
scores.add(2.4685871670647384);
scores.add(2.3123460886413647);
scores.add(2.1401987653477135);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
}
}

// This test is intended to check if given a checkpoint created by RCF-3.0-rc3 ("rcf_3_0_rc3_hc.json")
// and given the same sample data will rc3 and current RCF version (this test originally created when 3.0-rc3 is in use)
// will produce the same anomaly scores.
// The scores in this method were produced from AD running with RCF3.0-rc3 dependency
// and this test runs with the most recent RCF dependency that is being pulled by this project.
public void testDeserialize_rcf3_rc3_hc_model() throws Exception {
// Model in file rc1_trcf_model_direct is a checkpoint creatd by RCF-3.0-rc1
URI uri = ClassLoader.getSystemResource("org/opensearch/ad/ml/rcf_3_0_rc3_hc.json").toURI();
String filePath = Paths.get(uri).toString();
String json = Files.readString(Paths.get(filePath), Charset.defaultCharset());
// For the parsing of .toTrcf to work I had to manually change "\u003d" in code back to =.
// In the byte array it doesn't seem like this is an issue but whenever reading the byte array response into a file it
// converts "=" to "\u003d" https://groups.google.com/g/google-gson/c/JDHUo9DWyyM?pli=1
// I also needed to bypass the trcf as it wasn't being read as a key value but instead part of the string
Map map = gson.fromJson(json, Map.class);
String model = (String) ((Map) ((Map) ((ArrayList) ((Map) map.get("hits")).get("hits")).get(0)).get("_source")).get("modelV2");
model = model.split(":")[1];
model = model.substring(1, model.length() - 2);
// model = Base64.getEncoder().encodeToString(org.apache.commons.codec.binary.Base64.decodeBase64(model));
// Simulate JSON parsing by replacing Unicode escape sequence with the actual character
model = unescapeJavaString(model);

ThresholdedRandomCutForest forest = checkpointDao.toTrcf(model);

// hc model uses internal shingling
List<double[]> coldStartData = new ArrayList<>();
double[] sample1 = new double[] { 53, 54, 55, 56, 57.0 };
double[] sample2 = new double[] { 54, 55, 56, 57.0, 1.0 };
double[] sample3 = new double[] { 55, 56, 57.0, 1.0, -19.0 };
double[] sample4 = new double[] { 56, 57.0, 1.0, -19.0, 13.0 };
double[] sample5 = new double[] { 57.0, 1.0, -19.0, 13.0, 41.0 };

coldStartData.add(sample1);
coldStartData.add(sample2);
coldStartData.add(sample3);
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
List<Double> scores = new ArrayList<>();
scores.add(1.86645896573027);
scores.add(1.8760247712797833);
scores.add(1.6809181763279901);
scores.add(1.7126716645678555);
scores.add(1.323776514074674);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
}
}

public static String unescapeJavaString(String st) {
StringBuilder sb = new StringBuilder(st.length());

for (int i = 0; i < st.length(); i++) {
char ch = st.charAt(i);
if (ch == '\\') {
char nextChar = (i == st.length() - 1) ? '\\' : st.charAt(i + 1);
switch (nextChar) {
case 'u':
sb.append((char) Integer.parseInt(st.substring(i + 2, i + 6), 16));
i += 5;
break;
case '\\':
sb.append('\\');
i++;
break;
default:
sb.append(ch);
break;
}
} else {
sb.append(ch);
}
}
return sb.toString();
}
}
Loading

0 comments on commit b3fe699

Please sign in to comment.