33
33
package org .opensearch .index .fielddata ;
34
34
35
35
import org .opensearch .action .admin .cluster .stats .ClusterStatsResponse ;
36
+ import org .opensearch .action .admin .indices .cache .clear .ClearIndicesCacheRequest ;
37
+ import org .opensearch .core .xcontent .XContentBuilder ;
38
+ import org .opensearch .index .query .MatchAllQueryBuilder ;
39
+ import org .opensearch .search .sort .SortOrder ;
36
40
import org .opensearch .test .OpenSearchIntegTestCase ;
37
41
42
+ import java .util .HashMap ;
43
+ import java .util .Map ;
44
+ import java .util .concurrent .CountDownLatch ;
45
+ import java .util .concurrent .Phaser ;
46
+
38
47
import static org .opensearch .common .xcontent .XContentFactory .jsonBuilder ;
39
48
import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
40
49
import static org .hamcrest .Matchers .greaterThan ;
41
50
42
51
public class FieldDataLoadingIT extends OpenSearchIntegTestCase {
43
-
44
52
public void testEagerGlobalOrdinalsFieldDataLoading () throws Exception {
45
53
assertAcked (
46
54
prepareCreate ("test" ).setMapping (
@@ -62,6 +70,117 @@ public void testEagerGlobalOrdinalsFieldDataLoading() throws Exception {
62
70
63
71
ClusterStatsResponse response = client ().admin ().cluster ().prepareClusterStats ().get ();
64
72
assertThat (response .getIndicesStats ().getFieldData ().getMemorySizeInBytes (), greaterThan (0L ));
73
+
74
+ // Ensure cache cleared before other tests in the suite begin
75
+ client ().admin ().indices ().clearCache (new ClearIndicesCacheRequest ().fieldDataCache (true )).actionGet ();
76
+ assertBusy (() -> {
77
+ ClusterStatsResponse clearedResponse = client ().admin ().cluster ().prepareClusterStats ().get ();
78
+ assertEquals (0 , clearedResponse .getIndicesStats ().getFieldData ().getMemorySizeInBytes ());
79
+ });
80
+ }
81
+
82
+ public void testFieldDataCacheClearConcurrentIndices () throws Exception {
83
+ // Check concurrently clearing multiple indices from the FD cache correctly removes all expected keys.
84
+ int numIndices = 10 ;
85
+ String indexPrefix = "test" ;
86
+ createAndSearchIndices (numIndices , 1 , indexPrefix , "field" );
87
+ // TODO: Should be 1 entry per field per index in cache, but cannot check this directly until we add the items count stat in a
88
+ // future PR
89
+
90
+ // Concurrently clear multiple indices from FD cache
91
+ Thread [] threads = new Thread [numIndices ];
92
+ Phaser phaser = new Phaser (numIndices + 1 );
93
+ CountDownLatch countDownLatch = new CountDownLatch (numIndices );
94
+
95
+ for (int i = 0 ; i < numIndices ; i ++) {
96
+ int finalI = i ;
97
+ threads [i ] = new Thread (() -> {
98
+ try {
99
+ ClearIndicesCacheRequest clearCacheRequest = new ClearIndicesCacheRequest ().fieldDataCache (true )
100
+ .indices (indexPrefix + finalI );
101
+ client ().admin ().indices ().clearCache (clearCacheRequest ).actionGet ();
102
+ phaser .arriveAndAwaitAdvance ();
103
+ } catch (Exception e ) {
104
+ throw new RuntimeException (e );
105
+ }
106
+ countDownLatch .countDown ();
107
+ });
108
+ threads [i ].start ();
109
+ }
110
+ phaser .arriveAndAwaitAdvance ();
111
+ countDownLatch .await ();
112
+
113
+ // Cache size should be 0
114
+ assertBusy (() -> {
115
+ ClusterStatsResponse response = client ().admin ().cluster ().prepareClusterStats ().get ();
116
+ assertEquals (0 , response .getIndicesStats ().getFieldData ().getMemorySizeInBytes ());
117
+ });
65
118
}
66
119
120
+ public void testFieldDataCacheClearConcurrentFields () throws Exception {
121
+ // Check concurrently clearing multiple indices + fields from the FD cache correctly removes all expected keys.
122
+ int numIndices = 10 ;
123
+ int numFieldsPerIndex = 5 ;
124
+ String indexPrefix = "test" ;
125
+ String fieldPrefix = "field" ;
126
+ createAndSearchIndices (numIndices , numFieldsPerIndex , indexPrefix , fieldPrefix );
127
+
128
+ // Concurrently clear multiple indices+fields from FD cache
129
+ Thread [] threads = new Thread [numIndices * numFieldsPerIndex ];
130
+ Phaser phaser = new Phaser (numIndices * numFieldsPerIndex + 1 );
131
+ CountDownLatch countDownLatch = new CountDownLatch (numIndices * numFieldsPerIndex );
132
+
133
+ for (int i = 0 ; i < numIndices ; i ++) {
134
+ int finalI = i ;
135
+ for (int j = 0 ; j < numFieldsPerIndex ; j ++) {
136
+ int finalJ = j ;
137
+ threads [i * numFieldsPerIndex + j ] = new Thread (() -> {
138
+ try {
139
+ ClearIndicesCacheRequest clearCacheRequest = new ClearIndicesCacheRequest ().fieldDataCache (true )
140
+ .indices (indexPrefix + finalI )
141
+ .fields (fieldPrefix + finalJ );
142
+ client ().admin ().indices ().clearCache (clearCacheRequest ).actionGet ();
143
+ phaser .arriveAndAwaitAdvance ();
144
+ } catch (Exception e ) {
145
+ throw new RuntimeException (e );
146
+ }
147
+ countDownLatch .countDown ();
148
+ });
149
+ threads [i * numFieldsPerIndex + j ].start ();
150
+ }
151
+ }
152
+ phaser .arriveAndAwaitAdvance ();
153
+ countDownLatch .await ();
154
+
155
+ // Cache size should be 0
156
+ assertBusy (() -> {
157
+ ClusterStatsResponse response = client ().admin ().cluster ().prepareClusterStats ().get ();
158
+ assertEquals (0 , response .getIndicesStats ().getFieldData ().getMemorySizeInBytes ());
159
+ });
160
+ }
161
+
162
+ private void createAndSearchIndices (int numIndices , int numFieldsPerIndex , String indexPrefix , String fieldPrefix ) throws Exception {
163
+ for (int i = 0 ; i < numIndices ; i ++) {
164
+ String index = indexPrefix + i ;
165
+ XContentBuilder req = jsonBuilder ().startObject ().startObject ("properties" );
166
+ for (int j = 0 ; j < numFieldsPerIndex ; j ++) {
167
+ req .startObject (fieldPrefix + j ).field ("type" , "text" ).field ("fielddata" , true ).endObject ();
168
+ }
169
+ req .endObject ().endObject ();
170
+ assertAcked (prepareCreate (index ).setMapping (req ));
171
+ Map <String , String > source = new HashMap <>();
172
+ for (int j = 0 ; j < numFieldsPerIndex ; j ++) {
173
+ source .put (fieldPrefix + j , "value" );
174
+ }
175
+ client ().prepareIndex (index ).setId ("1" ).setSource (source ).get ();
176
+ client ().admin ().indices ().prepareRefresh (index ).get ();
177
+ // Search on each index to fill the cache
178
+ for (int j = 0 ; j < numFieldsPerIndex ; j ++) {
179
+ client ().prepareSearch (index ).setQuery (new MatchAllQueryBuilder ()).addSort (fieldPrefix + j , SortOrder .ASC ).get ();
180
+ }
181
+ }
182
+ ensureGreen ();
183
+ ClusterStatsResponse response = client ().admin ().cluster ().prepareClusterStats ().get ();
184
+ assertTrue (response .getIndicesStats ().getFieldData ().getMemorySizeInBytes () > 0L );
185
+ }
67
186
}
0 commit comments