diff --git a/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java b/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java index 9d24d3653397b..f349baeef3cd7 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java +++ b/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java @@ -45,6 +45,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -54,7 +55,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public interface BytesReference extends Comparable, ToXContentFragment { +public interface BytesReference extends Comparable, ToXContentFragment, Serializable { /** * Convert an {@link XContentBuilder} into a BytesReference. This method closes the builder, diff --git a/plugins/crypto-kms/licenses/slf4j-api-LICENSE.txt b/plugins/crypto-kms/licenses/slf4j-api-LICENSE.txt deleted file mode 100644 index 2be7689435062..0000000000000 --- a/plugins/crypto-kms/licenses/slf4j-api-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (c) 2004-2022 QOS.ch -All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/discovery-ec2/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/discovery-ec2/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/slf4j-api-LICENSE.txt b/plugins/discovery-ec2/licenses/slf4j-api-LICENSE.txt deleted file mode 100644 index 2be7689435062..0000000000000 --- a/plugins/discovery-ec2/licenses/slf4j-api-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (c) 2004-2022 QOS.ch -All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/slf4j-api-NOTICE.txt b/plugins/discovery-ec2/licenses/slf4j-api-NOTICE.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/plugins/identity-shiro/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/identity-shiro/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/identity-shiro/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/identity-shiro/licenses/slf4j-api-NOTICE.txt b/plugins/identity-shiro/licenses/slf4j-api-NOTICE.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/plugins/ingest-attachment/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/ingest-attachment/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/ingest-attachment/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/repository-azure/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/repository-azure/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/repository-hdfs/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/repository-hdfs/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/repository-s3/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/repository-s3/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/server/build.gradle b/server/build.gradle index f6db3d53a0dcc..dc3fa97ea4ca6 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -158,6 +158,10 @@ dependencies { api "com.google.protobuf:protobuf-java:${versions.protobuf}" api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}" + // ehcache + api "org.ehcache:ehcache:3.10.8" // using 3.3.1 rather than 3.10.8 to test if dependency conflicts are causing my issues + api "org.slf4j:slf4j-api:1.7.36" + testImplementation(project(":test:framework")) { // tests use the locally compiled version of server exclude group: 'org.opensearch', module: 'server' diff --git a/server/licenses/ehcache-3.10.8.jar.sha1 b/server/licenses/ehcache-3.10.8.jar.sha1 new file mode 100644 index 0000000000000..dee07e9238ebf --- /dev/null +++ b/server/licenses/ehcache-3.10.8.jar.sha1 @@ -0,0 +1 @@ +f0d50ede46609db78413ca7f4250d348a597b101 \ No newline at end of file diff --git a/server/licenses/ehcache-LICENSE.txt b/server/licenses/ehcache-LICENSE.txt new file mode 100644 index 0000000000000..8dada3edaf50d --- /dev/null +++ b/server/licenses/ehcache-LICENSE.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/server/licenses/ehcache-NOTICE.txt b/server/licenses/ehcache-NOTICE.txt new file mode 100644 index 0000000000000..1dbd38242cc98 --- /dev/null +++ b/server/licenses/ehcache-NOTICE.txt @@ -0,0 +1,5 @@ +Ehcache V3 +Copyright 2014-2023 Terracotta, Inc. + +The product includes software from the Apache Commons Lang project, +under the Apache License 2.0 (see: org.ehcache.impl.internal.classes.commonslang) diff --git a/plugins/crypto-kms/licenses/slf4j-api-1.7.36.jar.sha1 b/server/licenses/slf4j-api-1.7.36.jar.sha1 similarity index 100% rename from plugins/crypto-kms/licenses/slf4j-api-1.7.36.jar.sha1 rename to server/licenses/slf4j-api-1.7.36.jar.sha1 diff --git a/plugins/identity-shiro/licenses/slf4j-api-LICENSE.txt b/server/licenses/slf4j-api-LICENSE.txt similarity index 95% rename from plugins/identity-shiro/licenses/slf4j-api-LICENSE.txt rename to server/licenses/slf4j-api-LICENSE.txt index 8fda22f4d72f6..a51675a21c10f 100644 --- a/plugins/identity-shiro/licenses/slf4j-api-LICENSE.txt +++ b/server/licenses/slf4j-api-LICENSE.txt @@ -1,4 +1,4 @@ -Copyright (c) 2004-2014 QOS.ch +Copyright (c) 2004-2022 QOS.ch Sarl (Switzerland) All rights reserved. Permission is hereby granted, free of charge, to any person obtaining @@ -19,3 +19,5 @@ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + diff --git a/plugins/crypto-kms/licenses/slf4j-api-NOTICE.txt b/server/licenses/slf4j-api-NOTICE.txt similarity index 100% rename from plugins/crypto-kms/licenses/slf4j-api-NOTICE.txt rename to server/licenses/slf4j-api-NOTICE.txt diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 98a22717019cf..a1815d9be2daf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -634,6 +634,45 @@ public void testProfileDisableCache() throws Exception { } } + public void testCacheWithInvalidation() throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); + + assertCacheState(client, "index", 0, 1); + // Index but don't refresh + indexRandom(false, client.prepareIndex("index").setSource("k", "hello2")); + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect hit as here as refresh didn't happen + assertCacheState(client, "index", 1, 1); + + // Explicit refresh would invalidate cache + refresh(); + // Hit same query again + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) + assertCacheState(client, "index", 1, 2); + } + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { RequestCacheStats requestCacheStats = client.admin() .indices() @@ -648,6 +687,7 @@ private static void assertCacheState(Client client, String index, long expectedH Arrays.asList(expectedHits, expectedMisses, 0L), Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) ); + } } diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index 77609822d3d90..b2e21d2076cbb 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -39,6 +39,7 @@ import org.opensearch.core.index.shard.ShardId; import java.io.IOException; +import java.util.UUID; /** * A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes @@ -51,11 +52,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader { private final ShardId shardId; private final FilterDirectoryReader.SubReaderWrapper wrapper; + private DelegatingCacheHelper delegatingCacheHelper; + private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId) throws IOException { super(in, wrapper); this.wrapper = wrapper; this.shardId = shardId; + this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper()); } /** @@ -68,7 +72,61 @@ public ShardId shardId() { @Override public CacheHelper getReaderCacheHelper() { // safe to delegate since this reader does not alter the index - return in.getReaderCacheHelper(); + return this.delegatingCacheHelper; + } + + public DelegatingCacheHelper getDelegatingCacheHelper() { + return this.delegatingCacheHelper; + } + + /** + * Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey. + * @opensearch.internal + */ + public class DelegatingCacheHelper implements CacheHelper { + CacheHelper cacheHelper; + DelegatingCacheKey serializableCacheKey; + + DelegatingCacheHelper(CacheHelper cacheHelper) { + this.cacheHelper = cacheHelper; + this.serializableCacheKey = new DelegatingCacheKey(cacheHelper.getKey()); + } + + @Override + public CacheKey getKey() { + return this.cacheHelper.getKey(); + } + + public DelegatingCacheKey getDelegatingCacheKey() { + return this.serializableCacheKey; + } + + @Override + public void addClosedListener(ClosedListener listener) { + this.cacheHelper.addClosedListener(listener); + } + } + + /** + * Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of + * object itself for serialization purposes. + */ + public class DelegatingCacheKey { + CacheKey cacheKey; + private final UUID uniqueId; + + DelegatingCacheKey(CacheKey cacheKey) { + this.cacheKey = cacheKey; + this.uniqueId = UUID.randomUUID(); + } + + public CacheKey getCacheKey() { + return this.cacheKey; + } + + public UUID getId() { + return uniqueId; + } } @Override diff --git a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java index cb181840406a5..2e5eae5ceebe0 100644 --- a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java @@ -32,6 +32,7 @@ package org.opensearch.common.metrics; +import java.io.Serializable; import java.util.concurrent.atomic.LongAdder; /** @@ -62,5 +63,4 @@ public void dec(long n) { public long count() { return counter.sum(); } - } diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index 1beef5217355f..3194aee757fc4 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -37,6 +37,7 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.indices.TierType; +import java.io.Serializable; import java.util.EnumMap; /** @@ -56,11 +57,34 @@ public ShardRequestCache() { public RequestCacheStats stats() { // TODO: Change RequestCacheStats to support disk tier stats. + return stats(TierType.ON_HEAP); + } + + public RequestCacheStats stats(TierType tierType) { + return new RequestCacheStats( + statsHolder.get(tierType).totalMetric.count(), + statsHolder.get(tierType).evictionsMetric.count(), + statsHolder.get(tierType).hitCount.count(), + statsHolder.get(tierType).missCount.count() + ); + } + + public RequestCacheStats overallStats() { + long totalSize = 0; + long totalEvictions = 0; + long totalHits = 0; + long totalMisses = 0; + for (TierType tierType : TierType.values()) { + totalSize += statsHolder.get(tierType).totalMetric.count(); + totalEvictions += statsHolder.get(tierType).evictionsMetric.count(); + totalHits += statsHolder.get(tierType).hitCount.count(); + totalMisses += statsHolder.get(tierType).missCount.count(); + } return new RequestCacheStats( - statsHolder.get(TierType.ON_HEAP).totalMetric.count(), - statsHolder.get(TierType.ON_HEAP).evictionsMetric.count(), - statsHolder.get(TierType.ON_HEAP).hitCount.count(), - statsHolder.get(TierType.ON_HEAP).missCount.count() + totalSize, + totalEvictions, + totalHits, + totalMisses ); } @@ -90,7 +114,7 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted, Ti statsHolder.get(tierType).totalMetric.dec(dec); } - static class StatsHolder { + static class StatsHolder implements Serializable { final CounterMetric evictionsMetric = new CounterMetric(); final CounterMetric totalMetric = new CounterMetric(); diff --git a/server/src/main/java/org/opensearch/indices/CachingTier.java b/server/src/main/java/org/opensearch/indices/CachingTier.java index 85596929cfd6b..6726167fe469d 100644 --- a/server/src/main/java/org/opensearch/indices/CachingTier.java +++ b/server/src/main/java/org/opensearch/indices/CachingTier.java @@ -10,6 +10,8 @@ import org.opensearch.common.cache.RemovalListener; +import java.io.IOException; + /** * asdsadssa * @param diff --git a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java index efd9a459cd338..b1b5b03ed13cd 100644 --- a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java @@ -9,5 +9,14 @@ package org.opensearch.indices; public interface DiskCachingTier extends CachingTier { + /** + * Closes the disk tier. + */ + void close(); + /** + * Get the EWMA time in milliseconds for a get(). + * @return + */ + double getTimeMillisEWMA(); } diff --git a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java new file mode 100644 index 0000000000000..6ccbb68515b50 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java @@ -0,0 +1,279 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.ehcache.PersistentCacheManager; +import org.ehcache.config.builders.CacheConfigurationBuilder; +import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; +import org.ehcache.config.builders.ResourcePoolsBuilder; +import org.ehcache.config.units.MemoryUnit; +import org.ehcache.event.EventType; +import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration; +import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.cache.RemovalListener; +import org.ehcache.Cache; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.io.PathUtils; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; + +public class EhcacheDiskCachingTier implements DiskCachingTier, RemovalListener { + + public static HashMap cacheManagers = new HashMap<>(); + // Because of the way test cases are set up, each node may try to instantiate several disk caching tiers. + // Only one of them will be used, but there will be initialization errors when multiple cache managers try to + // use the same file path and create/get caches with the same alias. We resolve this with a static reference + // to a cache manager, which is populated if it is null and reused if it is not. + // (See https://stackoverflow.com/questions/53756412/ehcache-org-ehcache-statetransitionexception-persistence-directory-already-lo) + // To deal with IT cases, we need to create a manager per node, as otherwise nodes will try to reuse the same manager, + // so we get the correct cache manager by looking up the node ID in this map. + // I don't think any of this can happen in production, because nodes shouldn't share a JVM, + // and they should only instantiate their services once? But it's best to resolve it anyway. + + private PersistentCacheManager cacheManager; // This is the manager this tier will actually use + private Cache cache; + public final static String BASE_DISK_CACHE_FP = "disk_cache_tier"; + // Placeholder. this should probably be defined somewhere else, since we need to change security.policy based on its value + // To accomodate test setups, where multiple nodes may exist on the same filesystem, we add the node ID to the end of this + // These will be subfolders of BASE_DISK_CACHE_FP + private final String diskCacheFP; // the one to use for this node + private RemovalListener removalListener; + private ExponentiallyWeightedMovingAverage getTimeMillisEWMA; + private static final double GET_TIME_EWMA_ALPHA = 0.3; // This is the value used elsewhere in OpenSearch + private static final int MIN_WRITE_THREADS = 0; + private static final int MAX_WRITE_THREADS = 4; // Max number of threads for the PooledExecutionService which handles writes + private static final String cacheAlias = "diskTier"; + private CounterMetric count; // number of entries in cache + private final EhcacheEventListener listener; + private final IndicesRequestCache indicesRequestCache; // only used to create new Keys + private final String nodeId; + public int numGets; // debug for concurrency test + // private RBMIntKeyLookupStore keystore; + // private CacheTierPolicy[] policies; + // private IndicesRequestCacheDiskTierPolicy policy; + + public EhcacheDiskCachingTier( + long maxWeightInBytes, + long maxKeystoreWeightInBytes, + IndicesRequestCache indicesRequestCache, + String nodeId + ) { + this.count = new CounterMetric(); + this.listener = new EhcacheEventListener(this, this); + this.indicesRequestCache = indicesRequestCache; + this.nodeId = nodeId; + this.diskCacheFP = PathUtils.get(BASE_DISK_CACHE_FP, nodeId).toString(); + this.numGets = 0; // debug + // I know this function warns it shouldn't often be used, we can fix it to use the roots once we pick a final FP + + getManager(); + try { + cacheManager.destroyCache(cacheAlias); + } catch (Exception e) { + System.out.println("Unable to destroy cache!!"); + e.printStackTrace(); + // do actual logging later + } + createCache(maxWeightInBytes); + this.getTimeMillisEWMA = new ExponentiallyWeightedMovingAverage(GET_TIME_EWMA_ALPHA, 10); + + // this.keystore = new RBMIntKeyLookupStore((int) Math.pow(2, 28), maxKeystoreWeightInBytes); + // this.policies = new CacheTierPolicy[]{ new IndicesRequestCacheTookTimePolicy(settings, clusterSettings) }; + // this.policy = new IndicesRequestCacheDiskTierPolicy(this.policies, true); + } + + public void getManager() { + // based on https://stackoverflow.com/questions/53756412/ehcache-org-ehcache-statetransitionexception-persistence-directory-already-lo + // resolving double-initialization issue when using OpenSearchSingleNodeTestCase + PersistentCacheManager oldCacheManager = cacheManagers.get(nodeId); + if (oldCacheManager != null) { + try { + try { + oldCacheManager.close(); + } catch (IllegalStateException e) { + System.out.println("Cache was uninitialized, skipping close() and moving to destroy()"); + } + oldCacheManager.destroy(); + } catch (Exception e) { + System.out.println("Was unable to destroy existing cache manager"); + e.printStackTrace(); + // actual logging later + } + } + PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder() + .defaultPool("default", MIN_WRITE_THREADS, MAX_WRITE_THREADS) + .build(); + + cacheManagers.put(nodeId, + CacheManagerBuilder.newCacheManagerBuilder() + .using(threadConfig) + .with(CacheManagerBuilder.persistence(diskCacheFP) + ).build(true) + ); + this.cacheManager = cacheManagers.get(nodeId); + } + + private void createCache(long maxWeightInBytes) { + // our EhcacheEventListener should receive events every time an entry is changed + CacheEventListenerConfigurationBuilder listenerConfig = CacheEventListenerConfigurationBuilder + .newEventListenerConfiguration(listener, + EventType.EVICTED, + EventType.EXPIRED, + EventType.REMOVED, + EventType.UPDATED, + EventType.CREATED) + .ordered().asynchronous(); + // ordered() has some performance penalty as compared to unordered(), we can also use synchronous() + + cache = cacheManager.createCache(cacheAlias, + CacheConfigurationBuilder.newCacheConfigurationBuilder( + EhcacheKey.class, BytesReference.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, false)) + .withService(listenerConfig)); + } + + @Override + public BytesReference get(IndicesRequestCache.Key key) { + // I don't think we need to do the future stuff as the cache is threadsafe + + // if (keystore.contains(key.hashCode()) { + long now = System.nanoTime(); + numGets++; + BytesReference value = null; + try { + value = cache.get(new EhcacheKey(key)); + } catch (IOException e) { // do smth with this later + System.out.println("Error in get!"); + e.printStackTrace(); + } + double tookTimeMillis = ((double) (System.nanoTime() - now)) / 1000000; + getTimeMillisEWMA.addValue(tookTimeMillis); + + return value; + // } + // return null; + } + + @Override + public void put(IndicesRequestCache.Key key, BytesReference value) { + // No need to get old value, this is handled by EhcacheEventListener. + + // CheckDataResult policyResult = policy.checkData(value) + // if (policyResult.isAccepted()) { + try { + cache.put(new EhcacheKey(key), value); + } catch (IOException ignored) { // do smth with this later + } + // keystore.add(key.hashCode()); + // else { do something with policyResult.deniedReason()? } + // } + } + + @Override + public BytesReference computeIfAbsent(IndicesRequestCache.Key key, TieredCacheLoader loader) throws Exception { + return null; // should not need to fill out, Cache.computeIfAbsent is always used + } + + @Override + public void invalidate(IndicesRequestCache.Key key) { + // keep keystore check to avoid unneeded disk seek + // RemovalNotification is handled by EhcacheEventListener + + // if (keystore.contains(key.hashCode()) { + try { + cache.remove(new EhcacheKey(key)); + } catch (IOException ignored) { // do smth with this later + } + // keystore.remove(key.hashCode()); + // } + } + + @Override + public BytesReference compute(IndicesRequestCache.Key key, TieredCacheLoader loader) throws Exception { + return null; // should not need to fill out, Cache.compute is always used + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = removalListener; // this is passed the spillover strategy, same as on-heap + } + + @Override + public void invalidateAll() { + // can we just wipe the cache and start over? Or do we need to create a bunch of RemovalNotifications? + } + + @Override + public Iterable keys() { + // ehcache doesn't provide a method like this, because it might be a huge number of keys that consume all + // the memory. Do we want this method for disk tier? + return Collections::emptyIterator; + } + + @Override + public int count() { + return (int) count.count(); + } + + protected void countInc() { + count.inc(); + } + protected void countDec() { + count.dec(); + } + + @Override + public TierType getTierType() { + return TierType.DISK; + } + + @Override + public void onRemoval(RemovalNotification notification) { + removalListener.onRemoval(notification); + } + + @Override + public double getTimeMillisEWMA() { + return getTimeMillisEWMA.getAverage(); + } + + public IndicesRequestCache.Key convertEhcacheKeyToOriginal(EhcacheKey eKey) throws IOException { + BytesStreamInput is = new BytesStreamInput(); + byte[] bytes = eKey.getBytes(); + is.readBytes(bytes, 0, bytes.length); + try { + return indicesRequestCache.new Key(is); + } catch (Exception e) { + System.out.println("Was unable to reconstruct EhcacheKey into Key"); + e.printStackTrace(); + // actual logging later + } + return null; + } + + @Override + public void close() { + // Should be called after each test + cacheManager.removeCache(cacheAlias); + cacheManager.close(); + } + + public void destroy() throws Exception { + // Close the cache and delete any persistent data associated with it + // Might also be appropriate after standalone tests + cacheManager.close(); + cacheManager.destroy(); + } +} diff --git a/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java new file mode 100644 index 0000000000000..27872269ba77b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.ehcache.event.CacheEvent; +import org.ehcache.event.CacheEventListener; +import org.ehcache.event.EventType; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.core.common.bytes.BytesReference; + +public class EhcacheEventListener implements CacheEventListener { + // Receives key-value pairs (EhcacheKey, BytesReference), but must transform into (Key, BytesReference) + // to send removal notifications + private final RemovalListener removalListener; + private final EhcacheDiskCachingTier tier; + EhcacheEventListener(RemovalListener removalListener, EhcacheDiskCachingTier tier) { + this.removalListener = removalListener; + this.tier = tier; // needed to handle count changes + } + @Override + public void onEvent(CacheEvent event) { + EhcacheKey ehcacheKey = event.getKey(); + BytesReference oldValue = event.getOldValue(); + BytesReference newValue = event.getNewValue(); + EventType eventType = event.getType(); + + // handle changing count for the disk tier + if (oldValue == null && newValue != null) { + tier.countInc(); + } else if (oldValue != null && newValue == null) { + tier.countDec(); + } + + // handle creating a RemovalReason, unless eventType is CREATED + RemovalReason reason; + switch (eventType) { + case CREATED: + return; + case EVICTED: + reason = RemovalReason.EVICTED; // why is there both RemovalReason.EVICTED and RemovalReason.CAPACITY? + break; + case EXPIRED: + case REMOVED: + reason = RemovalReason.INVALIDATED; + // this is probably fine for EXPIRED. We use cache.remove() to invalidate keys, but this might overlap with RemovalReason.EXPLICIT? + break; + case UPDATED: + reason = RemovalReason.REPLACED; + break; + default: + reason = null; + } + try { + IndicesRequestCache.Key key = tier.convertEhcacheKeyToOriginal(ehcacheKey); + removalListener.onRemoval(new RemovalNotification<>(key, oldValue, reason)); + } catch (Exception ignored) {} + } +} diff --git a/server/src/main/java/org/opensearch/indices/EhcacheKey.java b/server/src/main/java/org/opensearch/indices/EhcacheKey.java new file mode 100644 index 0000000000000..f8fa87932d66f --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/EhcacheKey.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +public class EhcacheKey implements Serializable { + // the IndicesRequestCache.Key is not Serializable, but it is Writeable. + // We use the output stream's bytes in this wrapper class and implement the appropriate interfaces/methods. + private byte[] bytes; + + public EhcacheKey(Writeable key) throws IOException { + BytesStreamOutput os = new BytesStreamOutput(); // Should we pass in an expected size? If so, how big? + key.writeTo(os); + this.bytes = BytesReference.toBytes(os.bytes()); + } + + public byte[] getBytes() { + return this.bytes; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof EhcacheKey)) { + return false; + } + EhcacheKey other = (EhcacheKey) o; + return Arrays.equals(this.bytes, other.bytes); + } + + @Override + public int hashCode() { + return Arrays.hashCode(this.bytes); + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index ab80e63a4abce..d3084c8c2c231 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -47,6 +47,9 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; import java.io.Closeable; @@ -103,11 +106,12 @@ public final class IndicesRequestCache implements TieredCacheEventListener keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; + private final IndicesService indicesService; // private final Cache cache; - private final TieredCacheHandler tieredCacheHandler; - - IndicesRequestCache(Settings settings) { + //private final TieredCacheHandler tieredCacheHandler; + public final TieredCacheSpilloverStrategyHandler tieredCacheHandler; // Change this back after done debugging serialization issues + IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); @@ -124,9 +128,13 @@ public final class IndicesRequestCache implements TieredCacheEventListener k.ramBytesUsed() + v.ramBytesUsed() ).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build(); + int diskTierWeight = 100 * 1048576; // 100 MB, for testing only + EhcacheDiskCachingTier diskCachingTier; + diskCachingTier = new EhcacheDiskCachingTier(diskTierWeight, 0, this, indicesService.getNodeId()); tieredCacheHandler = new TieredCacheSpilloverStrategyHandler.Builder().setOnHeapCachingTier( openSearchOnHeapCache - ).setOnDiskCachingTier(new DummyDiskCachingTier<>()).setTieredCacheEventListener(this).build(); + ).setOnDiskCachingTier(diskCachingTier).setTieredCacheEventListener(this).build(); + this.indicesService = indicesService; } @Override @@ -166,13 +174,19 @@ BytesReference getOrCompute( BytesReference cacheKey ) throws Exception { assert reader.getReaderCacheHelper() != null; - final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey); + assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper; + + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + String readerCacheKeyUniqueId = delegatingCacheHelper.getDelegatingCacheKey().getId().toString(); + assert readerCacheKeyUniqueId != null; + final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyUniqueId); Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = tieredCacheHandler.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { // key.entity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key - CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey()); + CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyUniqueId); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { @@ -194,8 +208,14 @@ BytesReference getOrCompute( */ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; - tieredCacheHandler.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); - // cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); + String readerCacheKeyUniqueId = null; + if (reader instanceof OpenSearchDirectoryReader) { + IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); + readerCacheKeyUniqueId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey() + .getId() + .toString(); + } + tieredCacheHandler.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyUniqueId)); } /** @@ -230,7 +250,7 @@ public BytesReference load(Key key) throws Exception { /** * Basic interface to make this cache testable. */ - interface CacheEntity extends Accountable { + interface CacheEntity extends Accountable, Writeable { /** * Called after the value was loaded. @@ -263,6 +283,7 @@ interface CacheEntity extends Accountable { * Called when this entity instance is removed */ void onRemoval(RemovalNotification notification); + } /** @@ -270,17 +291,23 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - static class Key implements Accountable { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); + class Key implements Accountable, Writeable { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality - public final IndexReader.CacheKey readerCacheKey; + public final String readerCacheKeyUniqueId; public final BytesReference value; - Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) { + Key(CacheEntity entity, BytesReference value, String readerCacheKeyUniqueId) { this.entity = entity; - this.readerCacheKey = Objects.requireNonNull(readerCacheKey); this.value = value; + this.readerCacheKeyUniqueId = Objects.requireNonNull(readerCacheKeyUniqueId); + } + + Key(StreamInput in) throws IOException { + this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1)); + this.readerCacheKeyUniqueId = in.readOptionalString(); + this.value = in.readBytesReference(); } @Override @@ -299,7 +326,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyUniqueId, key.readerCacheKeyUniqueId) == false) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; @@ -308,19 +335,26 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + readerCacheKey.hashCode(); + result = 31 * result + readerCacheKeyUniqueId.hashCode(); result = 31 * result + value.hashCode(); return result; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(entity); + out.writeOptionalString(readerCacheKeyUniqueId); + out.writeBytesReference(value); + } } private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final IndexReader.CacheKey readerCacheKey; + final String readerCacheKeyUniqueId; - private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) { + private CleanupKey(CacheEntity entity, String readerCacheKeyUniqueId) { this.entity = entity; - this.readerCacheKey = readerCacheKey; + this.readerCacheKeyUniqueId = readerCacheKeyUniqueId; } @Override @@ -338,7 +372,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyUniqueId, that.readerCacheKeyUniqueId) == false) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -346,7 +380,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Objects.hashCode(readerCacheKey); + result = 31 * result + Objects.hashCode(readerCacheKeyUniqueId); return result; } } @@ -359,7 +393,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyUniqueId == null || cleanupKey.entity.isOpen() == false) { // null indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { @@ -372,7 +406,7 @@ synchronized void cleanCache() { if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) { + if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyUniqueId))) { iterator.remove(); } } @@ -392,4 +426,8 @@ long count() { int numRegisteredCloseListeners() { // for testing return registeredClosedListeners.size(); } + + public void closeDiskTier() { + tieredCacheHandler.closeDiskTier(); + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a72142e65c5e8..4be7061deabf3 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -391,7 +391,7 @@ public IndicesService( this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.indicesRequestCache = new IndicesRequestCache(settings); + this.indicesRequestCache = new IndicesRequestCache(settings, this); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -1746,14 +1746,21 @@ private BytesReference cacheShardLevelResult( * * @opensearch.internal */ - static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); + public final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; - protected IndexShardCacheEntity(IndexShard indexShard) { + public IndexShardCacheEntity(IndexShard indexShard) { this.indexShard = indexShard; } + public IndexShardCacheEntity(StreamInput in) throws IOException { + Index index = in.readOptionalWriteable(Index::new); + int shardId = in.readVInt(); + IndexService indexService = indices.get(index.getUUID()); + this.indexShard = Optional.ofNullable(indexService).map(indexService1 -> indexService1.getShard(shardId)).orElse(null); + } + @Override protected ShardRequestCache stats() { return indexShard.requestCache(); @@ -1775,6 +1782,12 @@ public long ramBytesUsed() { // across many entities return BASE_RAM_BYTES_USED; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(indexShard.shardId().getIndex()); + out.writeVInt(indexShard.shardId().id()); + } } @FunctionalInterface @@ -1926,6 +1939,10 @@ public boolean allPendingDanglingIndicesWritten() { || (danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0); } + public String getNodeId() { + return nodeEnv.nodeId(); + } + /** * Validates the cluster default index refresh interval. * diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java index 5fe41f5adce94..4816f94f7d619 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java @@ -21,4 +21,8 @@ public interface TieredCacheHandler { long count(); CachingTier getOnHeapCachingTier(); + + void closeDiskTier(); + + double diskGetTimeMillisEWMA(); } diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java index 1047f8a6dc2cc..a5db62ba2fe07 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java @@ -11,10 +11,12 @@ import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.core.common.io.stream.Writeable; import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; /** @@ -22,10 +24,10 @@ * @param * @param */ -public class TieredCacheSpilloverStrategyHandler implements TieredCacheHandler, RemovalListener { +public class TieredCacheSpilloverStrategyHandler implements TieredCacheHandler, RemovalListener { private final OnHeapCachingTier onHeapCachingTier; - private final CachingTier diskCachingTier; + private final DiskCachingTier diskCachingTier; private final TieredCacheEventListener tieredCacheEventListener; /** @@ -33,15 +35,23 @@ public class TieredCacheSpilloverStrategyHandler implements TieredCacheHan */ private final List> cachingTierList; + /*public AtomicInteger numGets; // debug only + public AtomicInteger numHeapGets; + public AtomicInteger numHeapHits; + public AtomicInteger numDiskHits;*/ private TieredCacheSpilloverStrategyHandler( OnHeapCachingTier onHeapCachingTier, - CachingTier diskCachingTier, + DiskCachingTier diskCachingTier, TieredCacheEventListener tieredCacheEventListener ) { this.onHeapCachingTier = Objects.requireNonNull(onHeapCachingTier); this.diskCachingTier = Objects.requireNonNull(diskCachingTier); this.tieredCacheEventListener = tieredCacheEventListener; this.cachingTierList = Arrays.asList(onHeapCachingTier, diskCachingTier); + /*this.numGets = new AtomicInteger(); + this.numHeapGets = new AtomicInteger(); + this.numHeapHits = new AtomicInteger(); + this.numDiskHits = new AtomicInteger();*/ setRemovalListeners(); } @@ -54,7 +64,7 @@ public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception tieredCacheEventListener.onCached(key, value, TierType.ON_HEAP); return value; } else { - tieredCacheEventListener.onHit(key, cacheValue.value, cacheValue.source); + //tieredCacheEventListener.onHit(key, cacheValue.value, cacheValue.source); // this double counts, see line 122 } return cacheValue.value; } @@ -89,6 +99,15 @@ public long count() { return totalCount; } + public long count(TierType tierType) { + for (CachingTier cachingTier : cachingTierList) { + if (cachingTier.getTierType() == tierType) { + return cachingTier.count(); + } + } + return -1L; + } + @Override public void onRemoval(RemovalNotification notification) { if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) { @@ -108,6 +127,10 @@ public CachingTier getOnHeapCachingTier() { return this.onHeapCachingTier; } + public EhcacheDiskCachingTier getDiskCachingTier() { // change to CachingTier after debug + return (EhcacheDiskCachingTier) this.diskCachingTier; + } + private void setRemovalListeners() { for (CachingTier cachingTier : cachingTierList) { cachingTier.setRemovalListener(this); @@ -117,9 +140,22 @@ private void setRemovalListeners() { private Function> getValueFromTierCache() { return key -> { for (CachingTier cachingTier : cachingTierList) { + // counters are debug only + /*if (cachingTier.getTierType() == TierType.ON_HEAP) { + numHeapGets.incrementAndGet(); + } else if (cachingTier.getTierType() == TierType.DISK) { + numGets.incrementAndGet(); + }*/ + V value = cachingTier.get(key); if (value != null) { tieredCacheEventListener.onHit(key, value, cachingTier.getTierType()); + /*if (cachingTier.getTierType() == TierType.ON_HEAP) { + numHeapHits.incrementAndGet(); + } + if (cachingTier.getTierType() == TierType.DISK) { + numDiskHits.incrementAndGet(); + }*/ return new CacheValue<>(value, cachingTier.getTierType()); } tieredCacheEventListener.onMiss(key, cachingTier.getTierType()); @@ -127,6 +163,15 @@ private Function> getValueFromTierCache() { return null; }; } + @Override + public void closeDiskTier() { + diskCachingTier.close(); + } + + @Override + public double diskGetTimeMillisEWMA() { + return diskCachingTier.getTimeMillisEWMA(); + } public static class CacheValue { V value; @@ -138,9 +183,9 @@ public static class CacheValue { } } - public static class Builder { + public static class Builder { private OnHeapCachingTier onHeapCachingTier; - private CachingTier diskCachingTier; + private DiskCachingTier diskCachingTier; private TieredCacheEventListener tieredCacheEventListener; public Builder() {} @@ -150,7 +195,7 @@ public Builder setOnHeapCachingTier(OnHeapCachingTier onHeapCachingT return this; } - public Builder setOnDiskCachingTier(CachingTier diskCachingTier) { + public Builder setOnDiskCachingTier(DiskCachingTier diskCachingTier) { this.diskCachingTier = diskCachingTier; return this; } diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy index 77cd0ab05278e..1fbfbb323e3af 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/security.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy @@ -188,4 +188,14 @@ grant { permission java.io.FilePermission "/sys/fs/cgroup/memory", "read"; permission java.io.FilePermission "/sys/fs/cgroup/memory/-", "read"; + // ehcache + permission java.lang.RuntimePermission "createClassLoader"; + permission java.lang.RuntimePermission "accessClassInPackage.sun.misc"; + permission java.lang.RuntimePermission "getenv.*"; + permission java.io.FilePermission "disk_cache_tier", "read"; // change this to wherever we will put disk tier folder + permission java.io.FilePermission "disk_cache_tier", "write"; + permission java.io.FilePermission "disk_cache_tier", "delete"; + permission java.io.FilePermission "disk_cache_tier/-", "read"; + permission java.io.FilePermission "disk_cache_tier/-", "write"; + permission java.io.FilePermission "disk_cache_tier/-", "delete"; }; diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8494259c8fd8a..a68f0795c2e2c 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -46,29 +46,42 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.Randomness; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; +import org.opensearch.index.IndexService; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; -public class IndicesRequestCacheTests extends OpenSearchTestCase { +public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -119,10 +132,207 @@ public void testBasicOperationsCache() throws Exception { IOUtils.close(reader, writer, dir, cache); assertEquals(0, cache.numRegisteredCloseListeners()); + cache.closeDiskTier(); + } + + public void testAddDirectToEhcache() throws Exception { + // this test is for debugging serialization issues and can eventually be removed + // Put breakpoints at line 260 of AbstractOffHeapStore to catch serialization errors + // that would otherwise fail silently + ShardRequestCache requestCacheStats = new ShardRequestCache(); + Settings.Builder settingsBuilder = Settings.builder(); + long heapSizeBytes = 1000; + settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes)); + IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class)); + + // set up a key + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + AtomicBoolean indexShard = new AtomicBoolean(true); + TestEntity entity = new TestEntity(requestCacheStats, indexShard); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + String rKey = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); + IndicesRequestCache.Key key = cache.new Key(entity, termBytes, rKey); + + BytesReference value = new BytesArray(new byte[]{0}); + cache.tieredCacheHandler.getDiskCachingTier().put(key, value); + + BytesReference res = cache.tieredCacheHandler.getDiskCachingTier().get(key); + assertEquals(value, res); + assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK)); + + IOUtils.close(reader, writer, dir, cache); + cache.closeDiskTier(); + } + + public void testSpillover() throws Exception { + // fill the on-heap cache until we spill over + ShardRequestCache requestCacheStats = new ShardRequestCache(); + Settings.Builder settingsBuilder = Settings.builder(); + long heapSizeBytes = 1000; // each of these queries is 131 bytes, so we can fit 7 in the heap cache + int heapKeySize = 131; + int maxNumInHeap = 1000 / heapKeySize; + settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes)); + IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class)); + + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + AtomicBoolean indexShard = new AtomicBoolean(true); + + TestEntity entity = new TestEntity(requestCacheStats, indexShard); + Loader loader = new Loader(reader, 0); + System.out.println("On-heap cache size at start = " + requestCacheStats.stats().getMemorySizeInBytes()); + BytesReference[] termBytesArr = new BytesReference[maxNumInHeap + 1]; + + for (int i = 0; i < maxNumInHeap + 1; i++) { + TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(i)); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + String rKey = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); + termBytesArr[i] = termBytes; + BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + } + // get value from disk cache, the first key should have been evicted + BytesReference firstValue = cache.getOrCompute(entity, loader, reader, termBytesArr[0]); + + assertEquals(maxNumInHeap * heapKeySize, requestCacheStats.stats().getMemorySizeInBytes()); + // TODO: disk weight bytes + assertEquals(1, requestCacheStats.stats().getEvictions()); + assertEquals(1, requestCacheStats.stats(TierType.DISK).getHitCount()); + assertEquals(maxNumInHeap + 1, requestCacheStats.stats(TierType.DISK).getMissCount()); + assertEquals(0, requestCacheStats.stats().getHitCount()); + assertEquals(maxNumInHeap + 2, requestCacheStats.stats().getMissCount()); + assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP)); + assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK)); + + // get a value from heap cache, second key should still be there + BytesReference secondValue = cache.getOrCompute(entity, loader, reader, termBytesArr[1]); + // get the value on disk cache again + BytesReference firstValueAgain = cache.getOrCompute(entity, loader, reader, termBytesArr[0]); + + assertEquals(1, requestCacheStats.stats().getEvictions()); + assertEquals(2, requestCacheStats.stats(TierType.DISK).getHitCount()); + assertEquals(maxNumInHeap + 1, requestCacheStats.stats(TierType.DISK).getMissCount()); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(maxNumInHeap + 3, requestCacheStats.stats().getMissCount()); + assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP)); + assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK)); + + IOUtils.close(reader, writer, dir, cache); + cache.closeDiskTier(); + } + + public void testDiskGetTimeEWMA() throws Exception { + ShardRequestCache requestCacheStats = new ShardRequestCache(); + Settings.Builder settingsBuilder = Settings.builder(); + long heapSizeBytes = 0; // skip directly to disk cache + settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes)); + IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class)); + + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + AtomicBoolean indexShard = new AtomicBoolean(true); + + TestEntity entity = new TestEntity(requestCacheStats, indexShard); + Loader loader = new Loader(reader, 0); + + for (int i = 0; i < 50; i++) { + TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(i)); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + // on my machine get time EWMA converges to ~0.025 ms, but it does have an SSD + assertTrue(cache.tieredCacheHandler.diskGetTimeMillisEWMA() > 0); + } + + IOUtils.close(reader, writer, dir, cache); + cache.closeDiskTier(); + } + + public void testEhcacheConcurrency() throws Exception { + ShardRequestCache requestCacheStats = new ShardRequestCache(); + Settings.Builder settingsBuilder = Settings.builder(); + long heapSizeBytes = 0; // skip directly to disk cache + settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes)); + IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class)); + + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + AtomicBoolean indexShard = new AtomicBoolean(true); + + TestEntity entity = new TestEntity(requestCacheStats, indexShard); + Loader loader = new Loader(reader, 0); + + Random rand = Randomness.get(); + int minThreads = 6; + int maxThreads = 8; + int numThreads = rand.nextInt(maxThreads - minThreads) + minThreads; + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads); + int numRequests = 50; + int numRepeats = 10; + BytesReference[] termBytesArr = new BytesReference[numRequests]; + ArrayList permutation = new ArrayList<>(); + + // Have these threads make 50 requests, with 10 repeats, in random order, and keep track of the keys. + // At the end, make sure that all the keys are in the cache, there are 40 misses, and 10 hits. + + for (int i = 0; i < numRequests; i++) { + int searchValue = i; + if (i > numRequests - numRepeats - 1) { + searchValue = i - (numRequests - numRepeats); // repeat values 0-9 + } + //System.out.println("values: " + i + " " + searchValue); + permutation.add(searchValue); + if (i == searchValue) { + TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(searchValue)); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + termBytesArr[i] = termBytes; + } + } + java.util.Collections.shuffle(permutation); + + ArrayList> futures = new ArrayList<>(); + + for (int j = 0; j < permutation.size(); j++) { + int keyNumber = permutation.get(j); + Future fut = executor.submit(() -> cache.getOrCompute(entity, loader, reader, termBytesArr[keyNumber])); + futures.add(fut); + } + + // now go thru and get them all + for (Future fut : futures) { + BytesReference value = fut.get(); + assertNotNull(value); + } + + System.out.println("heap size " + cache.tieredCacheHandler.count(TierType.ON_HEAP)); + System.out.println("disk size " + cache.tieredCacheHandler.count(TierType.DISK)); + System.out.println("disk misses " + requestCacheStats.stats(TierType.DISK).getMissCount()); + System.out.println("disk hits " + requestCacheStats.stats(TierType.DISK).getHitCount()); + /*System.out.println("disk num gets " + cache.tieredCacheHandler.getDiskCachingTier().numGets); + System.out.println("handler num get " + cache.tieredCacheHandler.numGets.intValue()); + System.out.println("handler num heap get " + cache.tieredCacheHandler.numHeapGets.intValue()); + System.out.println("handler num heap hit " + cache.tieredCacheHandler.numHeapHits.intValue()); + System.out.println("handler num disk hit " + cache.tieredCacheHandler.numDiskHits.intValue());*/ + + assertEquals(numRequests - numRepeats, cache.tieredCacheHandler.count(TierType.DISK)); // correct + assertEquals(numRequests - numRepeats, requestCacheStats.stats(TierType.DISK).getMissCount()); // usually correctly 40, sometimes 41 + assertEquals(numRepeats, requestCacheStats.stats(TierType.DISK).getHitCount()); // should be 10, is usually 9 + + IOUtils.close(reader, writer, dir, cache); + cache.closeDiskTier(); + } public void testCacheDifferentReaders() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -213,12 +423,13 @@ public void testCacheDifferentReaders() throws Exception { IOUtils.close(secondReader, writer, dir, cache); assertEquals(0, cache.numRegisteredCloseListeners()); + cache.closeDiskTier(); } public void testEviction() throws Exception { final ByteSizeValue size; { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -242,9 +453,11 @@ public void testEviction() throws Exception { assertEquals("bar", value2.streamInput().readString()); size = requestCacheStats.stats().getMemorySize(); IOUtils.close(reader, secondReader, writer, dir, cache); + cache.closeDiskTier(); } IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build() + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), + null ); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -278,10 +491,11 @@ public void testEviction() throws Exception { assertEquals(2, cache.count()); assertEquals(1, requestCacheStats.stats().getEvictions()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); + cache.closeDiskTier(); } public void testClearAllEntityIdentity() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -325,6 +539,7 @@ public void testClearAllEntityIdentity() throws Exception { assertEquals("baz", value3.streamInput().readString()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); + cache.closeDiskTier(); } @@ -366,7 +581,7 @@ public BytesReference get() { public void testInvalidate() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -430,25 +645,29 @@ public void testInvalidate() throws Exception { IOUtils.close(reader, writer, dir, cache); assertEquals(0, cache.numRegisteredCloseListeners()); + cache.closeDiskTier(); } public void testEqualsKey() throws IOException { AtomicBoolean trueBoolean = new AtomicBoolean(true); AtomicBoolean falseBoolean = new AtomicBoolean(false); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig(); IndexWriter writer = new IndexWriter(dir, config); - IndexReader reader1 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey(); + ShardId shardId = new ShardId("foo", "bar", 1); + IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); writer.addDocument(new Document()); - IndexReader reader2 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey(); + IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); IOUtils.close(reader1, reader2, writer, dir); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1)); - IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2)); + IndicesRequestCache.Key key1 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key2 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key3 = indicesRequestCache.new Key(new TestEntity(null, falseBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key4 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey2); + IndicesRequestCache.Key key5 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(2), rKey2); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2); @@ -459,8 +678,33 @@ public void testEqualsKey() throws IOException { assertNotEquals(key1, key5); } - private class TestBytesReference extends AbstractBytesReference { + public void testSerializationDeserializationOfCacheKey() throws Exception { + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + ShardRequestCache shardRequestCache = new ShardRequestCache(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; + IndexService indexService = createIndex("test"); + IndexShard indexShard = indexService.getShard(0); + IndicesService.IndexShardCacheEntity shardCacheEntity = indicesService.new IndexShardCacheEntity(indexShard); + String readerCacheKeyId = UUID.randomUUID().toString(); + IndicesRequestCache.Key key1 = indicesRequestCache.new Key(shardCacheEntity, termBytes, readerCacheKeyId); + BytesReference bytesReference = null; + try (BytesStreamOutput out = new BytesStreamOutput()) { + key1.writeTo(out); + bytesReference = out.bytes(); + } + StreamInput in = bytesReference.streamInput(); + + IndicesRequestCache.Key key2 = indicesRequestCache.new Key(in); + + assertEquals(readerCacheKeyId, key2.readerCacheKeyUniqueId); + assertEquals(shardCacheEntity.getCacheIdentity(), key2.entity.getCacheIdentity()); + assertEquals(termBytes, key2.value); + } + + private class TestBytesReference extends AbstractBytesReference { int dummyValue; TestBytesReference(int dummyValue) { @@ -510,7 +754,7 @@ public boolean isFragment() { } } - private class TestEntity extends AbstractIndexShardCacheEntity { + private class TestEntity extends AbstractIndexShardCacheEntity implements Serializable { private final AtomicBoolean standInForIndexShard; private final ShardRequestCache shardRequestCache; @@ -538,5 +782,8 @@ public Object getCacheIdentity() { public long ramBytesUsed() { return 42; } + + @Override + public void writeTo(StreamOutput out) throws IOException {} } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 2fe1f87f5d828..766d80a81b097 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -41,6 +41,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; @@ -59,6 +60,7 @@ import org.opensearch.test.hamcrest.OpenSearchAssertions; import org.opensearch.transport.nio.MockNioTransportPlugin; +import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; @@ -315,6 +317,11 @@ public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { assertEquals(0L, cache.count()); IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { + @Override + public void writeTo(StreamOutput out) throws IOException { + + } + @Override public long ramBytesUsed() { return 42;