Skip to content

Commit

Permalink
fix no permission bug when update index mapping (#545) (#546)
Browse files Browse the repository at this point in the history
Signed-off-by: Yaliang Wu <[email protected]>

Signed-off-by: Yaliang Wu <[email protected]>
(cherry picked from commit 5b42aec)

Co-authored-by: Yaliang Wu <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and ylwu-amzn authored Nov 8, 2022
1 parent 3281e67 commit 580f392
Showing 1 changed file with 45 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,61 +57,62 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
String indexName = index.getIndexName();
String mapping = index.getMapping();

if (!clusterService.state().metadata().hasIndex(indexName)) {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<Boolean> internalListener = ActionListener.runBefore(listener, () -> threadContext.restore());
if (!clusterService.state().metadata().hasIndex(indexName)) {
ActionListener<CreateIndexResponse> actionListener = ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
log.info("create index:{}", indexName);
listener.onResponse(true);
internalListener.onResponse(true);
} else {
listener.onResponse(false);
internalListener.onResponse(false);
}
}, e -> {
log.error("Failed to create index " + indexName, e);
listener.onFailure(e);
internalListener.onFailure(e);
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping);
client.admin().indices().create(request, ActionListener.runBefore(actionListener, () -> threadContext.restore()));
} catch (Exception e) {
log.error("Failed to init index " + indexName, e);
listener.onFailure(e);
}
} else {
log.debug("index:{} is already created", indexName);
if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) {
shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> {
if (r) {
// return true if should update index
client
.admin()
.indices()
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
listener.onResponse(true);
} else {
listener.onFailure(new MLException("Failed to update index: " + indexName));
}
}, exception -> {
log.error("Failed to update index " + indexName, exception);
listener.onFailure(exception);
})
);
} else {
// no need to update index if it does not exist or the version is already up-to-date.
indexMappingUpdated.get(indexName).set(true);
listener.onResponse(true);
}
}, e -> {
log.error("Failed to update index mapping", e);
listener.onFailure(e);
}));
client.admin().indices().create(request, actionListener);
} else {
// No need to update index if it's not ML system index or it's already updated.
listener.onResponse(true);
log.info("index:{} is already created", indexName);
if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) {
shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> {
if (r) {
// return true if should update index
client
.admin()
.indices()
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener.onFailure(new MLException("Failed to update index: " + indexName));
}
}, exception -> {
log.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
})
);
} else {
// no need to update index if it does not exist or the version is already up-to-date.
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
}
}, e -> {
log.error("Failed to update index mapping", e);
internalListener.onFailure(e);
}));
} else {
// No need to update index if it's not ML system index or it's already updated.
internalListener.onResponse(true);
}
}
} catch (Exception e) {
log.error("Failed to init index " + indexName, e);
listener.onFailure(e);
}
}

Expand Down

0 comments on commit 580f392

Please sign in to comment.