Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update RCF to v3.8 and Enable Auto AD with 'Alert Once' Option #979

Merged
merged 3 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.10.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.14.1"
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/org/opensearch/ad/ml/EntityColdStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.timeseries.util.ExceptionUtil;

import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

/**
Expand Down Expand Up @@ -375,17 +376,18 @@ private void trainModelFromDataSegments(
// overlapping x3, x4, and only store x5, x6.
.shingleSize(shingleSize)
.internalShinglingEnabled(true)
.anomalyRate(1 - this.thresholdMinPvalue);
.anomalyRate(1 - this.thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true);

if (rcfSeed > 0) {
rcfBuilder.randomSeed(rcfSeed);
}
ThresholdedRandomCutForest trcf = new ThresholdedRandomCutForest(rcfBuilder);

while (!dataPoints.isEmpty()) {
trcf.process(dataPoints.poll(), 0);
}

EntityModel model = entityState.getModel();
if (model == null) {
model = new EntityModel(entity, new ArrayDeque<>(), null);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/opensearch/ad/ml/ModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.AnomalyDescriptor;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

Expand Down Expand Up @@ -532,6 +533,10 @@ private void trainModelForStep(
.boundingBoxCacheFraction(TimeSeriesSettings.REAL_TIME_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(detector.getShingleSize())
.anomalyRate(1 - thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

.alertOnce(true)
.autoAdjust(true)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AlertOnce and AutoAdjust can interfere. It's ok to have both true -- will have fewer anomalies. If any of these are changed to false then we should consider changing the other (in particular, if autoAdjust is false then do we want alertOnce -- unclear, may be may be not)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you saying there are dependencies between these two options? How to figure out whether to set one false if the other one is false?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, "alertOnce" is a directive :) That is, if there is a "run" or continuous anomalies then we start alert once and stop (till some internal checks indicates that at least one normal point was seen). In the autoAdjust case, we would learn and try to suppress anomalies of both the observed value and expected value are in some noise range. So if there is a level shift followed quickly by another spike/dip then autoAdjust has some chance of catching it. But all of these are based on unsupervised estimations -- so it's hard to guarantee behavior. In general starting from (false,false) autoAdjust = true should reduce anomalies; and alertOnce = true would reduce even further. But there may be applications where users prefer a lot of continuous anomalies (for example, to detect a level shift!).

.internalShinglingEnabled(false)
.build();
Arrays.stream(dataPoints).forEach(s -> trcf.process(s, 0));

Expand Down Expand Up @@ -622,6 +627,10 @@ public List<ThresholdingResult> getPreviewResults(double[][] dataPoints, int shi
.boundingBoxCacheFraction(AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.anomalyRate(1 - this.thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)
.build();
return Arrays.stream(dataPoints).map(point -> {
AnomalyDescriptor descriptor = trcf.process(point, 0);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/opensearch/ad/task/ADBatchTaskCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.timeseries.model.Entity;

import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

/**
Expand Down Expand Up @@ -80,6 +81,10 @@ protected ADBatchTaskCache(ADTask adTask) {
.boundingBoxCacheFraction(AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.anomalyRate(1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)
.build();

this.thresholdModelTrained = false;
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/org/opensearch/ad/MemoryTrackerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.timeseries.settings.TimeSeriesSettings;

import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

public class MemoryTrackerTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -109,6 +110,9 @@ public void setUp() throws Exception {
.boundingBoxCacheFraction(TimeSeriesSettings.REAL_TIME_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.internalShinglingEnabled(true)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();

detector = mock(AnomalyDetector.class);
Expand Down Expand Up @@ -152,6 +156,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(shingleSize)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(603708, tracker.estimateTRCFModelSize(rcf2));
assertTrue(tracker.isHostingAllowed(detectorId, rcf2));
Expand All @@ -171,6 +178,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(false)
// same with dimension for opportunistic memory saving
.shingleSize(1)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(1685208, tracker.estimateTRCFModelSize(rcf3));

Expand All @@ -188,6 +198,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(1)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(521304, tracker.estimateTRCFModelSize(rcf4));

Expand All @@ -205,6 +218,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(2)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(467340, tracker.estimateTRCFModelSize(rcf5));

Expand All @@ -222,6 +238,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(4)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(603676, tracker.estimateTRCFModelSize(rcf6));

Expand All @@ -239,6 +258,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(16)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(401481, tracker.estimateTRCFModelSize(rcf7));

Expand All @@ -256,6 +278,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(32)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(1040432, tracker.estimateTRCFModelSize(rcf8));

Expand All @@ -273,6 +298,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(64)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(1040688, tracker.estimateTRCFModelSize(rcf9));

Expand All @@ -290,6 +318,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(65)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
expectThrows(IllegalArgumentException.class, () -> tracker.estimateTRCFModelSize(rcf10));
}
Expand Down
146 changes: 141 additions & 5 deletions src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@

import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.AnomalyDescriptor;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;
import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestMapper;
Expand Down Expand Up @@ -256,6 +257,10 @@ private ThresholdedRandomCutForest createTRCF() {
.precision(Precision.FLOAT_32)
.randomSeed(seed)
.boundingBoxCacheFraction(0)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for existing models? Single Streams? Creating external shingled models will take up more memory (and more queries)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only enabled internal shingling for real-time HC detectors. Other detectors all use external shingling for historical reasons.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thx.

.build();
for (double[] point : data) {
forest.process(point, 0);
Expand Down Expand Up @@ -910,8 +915,6 @@ public void testFromEntityModelCheckpointBWC() throws FileNotFoundException, IOE
assertEquals(1, forest.getDimensions());
assertEquals(10, forest.getNumberOfTrees());
assertEquals(256, forest.getSampleSize());
// there are at least 10 scores in the checkpoint
assertTrue(trcf.getThresholder().getCount() > 10);

Random random = new Random(0);
for (int i = 0; i < 100; i++) {
Expand Down Expand Up @@ -979,8 +982,6 @@ public void testFromEntityModelCheckpointNoThreshold() throws FileNotFoundExcept
assertEquals(1, forest.getDimensions());
assertEquals(10, forest.getNumberOfTrees());
assertEquals(256, forest.getSampleSize());
// there are no scores in the checkpoint
assertEquals(0, trcf.getThresholder().getCount());
}

public void testFromEntityModelCheckpointWithEntity() throws Exception {
Expand Down Expand Up @@ -1082,11 +1083,12 @@ public void testDeserializeTRCFModel() throws Exception {
grade.add(0.0);
grade.add(0.0);
grade.add(0.0);
// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(descriptor.getAnomalyGrade(), grade.get(i), 1e-9);
}
}

Expand All @@ -1098,4 +1100,138 @@ public void testShouldSave() {
// 1658863778000L + 6 hrs < Instant.now
assertTrue(checkpointDao.shouldSave(Instant.ofEpochMilli(1658863778000L), false, Duration.ofHours(6), clock));
}

// This test is intended to check if given a checkpoint created by RCF-3.0-rc3 ("rcf_3_0_rc3_single_stream.json")
// and given the same sample data will rc3 and current RCF version (this test originally created when 3.0-rc3 is in use)
// will produce the same anomaly scores.
// The scores in this method were produced from AD running with RCF3.0-rc3 dependency
// and this test runs with the most recent RCF dependency that is being pulled by this project.
public void testDeserialize_rcf3_rc3_single_stream_model() throws Exception {
// Model in file rc1_trcf_model_direct is a checkpoint creatd by RCF-3.0-rc1
URI uri = ClassLoader.getSystemResource("org/opensearch/ad/ml/rcf_3_0_rc3_single_stream.json").toURI();
String filePath = Paths.get(uri).toString();
String json = Files.readString(Paths.get(filePath), Charset.defaultCharset());
// For the parsing of .toTrcf to work I had to manually change "\u003d" in code back to =.
// In the byte array it doesn't seem like this is an issue but whenever reading the byte array response into a file it
// converts "=" to "\u003d" https://groups.google.com/g/google-gson/c/JDHUo9DWyyM?pli=1
// I also needed to bypass the trcf as it wasn't being read as a key value but instead part of the string
Map map = gson.fromJson(json, Map.class);
String model = (String) ((Map) ((Map) ((ArrayList) ((Map) map.get("hits")).get("hits")).get(0)).get("_source")).get("modelV2");
// model = model.split(":")[1].substring(1);
Copy link
Member

@amitgalitz amitgalitz Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: previously i think I added this to deal with "trcf" was this not needed any more in this case?, also commented out code here and in line 1173 can be deleted if not needed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single stream model does not need this line. Only HC detector checkpoint need it as you find in my HC test. Will remove commented out code.

ThresholdedRandomCutForest forest = checkpointDao.toTrcf(model);

// single-stream model uses external shingling
List<double[]> coldStartData = new ArrayList<>();
double[] sample1 = new double[] { 64, 58, 59, 60, 61, 62, 63, 57.0 };
double[] sample2 = new double[] { 58, 59, 60, 61, 62, 63, 57.0, 1.0 };
double[] sample3 = new double[] { 59, 60, 61, 62, 63, 57.0, 1.0, -19.0 };
double[] sample4 = new double[] { 60, 61, 62, 63, 57.0, 1.0, -19.0, 13.0 };
double[] sample5 = new double[] { 61, 62, 63, 57.0, 1.0, -19.0, 13.0, 41.0 };

coldStartData.add(sample1);
coldStartData.add(sample2);
coldStartData.add(sample3);
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
List<Double> scores = new ArrayList<>();
scores.add(3.3830441158587066);
scores.add(2.825961659490065);
scores.add(2.4685871670647384);
scores.add(2.3123460886413647);
scores.add(2.1401987653477135);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
}
}

// This test is intended to check if given a checkpoint created by RCF-3.0-rc3 ("rcf_3_0_rc3_hc.json")
// and given the same sample data will rc3 and current RCF version (this test originally created when 3.0-rc3 is in use)
// will produce the same anomaly scores.
// The scores in this method were produced from AD running with RCF3.0-rc3 dependency
// and this test runs with the most recent RCF dependency that is being pulled by this project.
public void testDeserialize_rcf3_rc3_hc_model() throws Exception {
// Model in file rc1_trcf_model_direct is a checkpoint creatd by RCF-3.0-rc1
URI uri = ClassLoader.getSystemResource("org/opensearch/ad/ml/rcf_3_0_rc3_hc.json").toURI();
String filePath = Paths.get(uri).toString();
String json = Files.readString(Paths.get(filePath), Charset.defaultCharset());
// For the parsing of .toTrcf to work I had to manually change "\u003d" in code back to =.
// In the byte array it doesn't seem like this is an issue but whenever reading the byte array response into a file it
// converts "=" to "\u003d" https://groups.google.com/g/google-gson/c/JDHUo9DWyyM?pli=1
// I also needed to bypass the trcf as it wasn't being read as a key value but instead part of the string
Map map = gson.fromJson(json, Map.class);
String model = (String) ((Map) ((Map) ((ArrayList) ((Map) map.get("hits")).get("hits")).get(0)).get("_source")).get("modelV2");
model = model.split(":")[1];
model = model.substring(1, model.length() - 2);
// model = Base64.getEncoder().encodeToString(org.apache.commons.codec.binary.Base64.decodeBase64(model));
// Simulate JSON parsing by replacing Unicode escape sequence with the actual character
model = unescapeJavaString(model);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this needed now after already using .fromJson, I see it wasn't used in the previous bwc test

Copy link
Collaborator Author

@kaituo kaituo Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe related to how I record those Json response file. I download the response using curl. Without escaping Java string, we experience model corruption exception. I remembered you asked me about fixing invalid characters when you bump rcf version, how do you end up doing it? I recall I told you to use special library that can automatically do it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I said the difference in recording the json response makes sense, I'm not 100% sure if we are talking about the same thing, but I used the lines above to remove the last = charachter in the response

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, different checkpoint might need different conversion. In my single stream checkpoint, I don't need to do anything.


ThresholdedRandomCutForest forest = checkpointDao.toTrcf(model);

// hc model uses internal shingling
List<double[]> coldStartData = new ArrayList<>();
double[] sample1 = new double[] { 53, 54, 55, 56, 57.0 };
double[] sample2 = new double[] { 54, 55, 56, 57.0, 1.0 };
double[] sample3 = new double[] { 55, 56, 57.0, 1.0, -19.0 };
double[] sample4 = new double[] { 56, 57.0, 1.0, -19.0, 13.0 };
double[] sample5 = new double[] { 57.0, 1.0, -19.0, 13.0, 41.0 };

coldStartData.add(sample1);
coldStartData.add(sample2);
coldStartData.add(sample3);
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
List<Double> scores = new ArrayList<>();
scores.add(1.86645896573027);
scores.add(1.8760247712797833);
scores.add(1.6809181763279901);
scores.add(1.7126716645678555);
scores.add(1.323776514074674);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
}
}

public static String unescapeJavaString(String st) {
StringBuilder sb = new StringBuilder(st.length());

for (int i = 0; i < st.length(); i++) {
char ch = st.charAt(i);
if (ch == '\\') {
char nextChar = (i == st.length() - 1) ? '\\' : st.charAt(i + 1);
switch (nextChar) {
case 'u':
sb.append((char) Integer.parseInt(st.substring(i + 2, i + 6), 16));
i += 5;
break;
case '\\':
sb.append('\\');
i++;
break;
default:
sb.append(ch);
break;
}
} else {
sb.append(ch);
}
}
return sb.toString();
}
}
Loading
Loading