-
Notifications
You must be signed in to change notification settings - Fork 273
Commit
Signed-off-by: tmanninger <[email protected]> (cherry picked from commit 9c5e32a) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
package org.opensearch.security.auditlog.sink; | ||
|
||
import java.io.IOException; | ||
|
||
import org.opensearch.action.DocWriteRequest; | ||
import org.opensearch.action.index.IndexRequestBuilder; | ||
import org.opensearch.action.support.WriteRequest.RefreshPolicy; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext; | ||
import org.opensearch.security.auditlog.impl.AuditMessage; | ||
import org.opensearch.security.support.ConfigConstants; | ||
import org.opensearch.security.support.HeaderHelper; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
public abstract class AbstractInternalOpenSearchSink extends AuditLogSink { | ||
|
||
protected final Client clientProvider; | ||
private final ThreadPool threadPool; | ||
private final DocWriteRequest.OpType storeOpType; | ||
|
||
public AbstractInternalOpenSearchSink( | ||
final String name, | ||
final Settings settings, | ||
final String settingsPrefix, | ||
final Client clientProvider, | ||
ThreadPool threadPool, | ||
AuditLogSink fallbackSink, | ||
DocWriteRequest.OpType storeOpType | ||
) { | ||
super(name, settings, settingsPrefix, fallbackSink); | ||
this.clientProvider = clientProvider; | ||
this.threadPool = threadPool; | ||
this.storeOpType = storeOpType; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
|
||
} | ||
Check warning on line 52 in src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java Codecov / codecov/patchsrc/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java#L52
|
||
|
||
public boolean doStore(final AuditMessage msg, String indexName) { | ||
|
||
if (Boolean.parseBoolean( | ||
HeaderHelper.getSafeFromHeader(threadPool.getThreadContext(), ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER) | ||
)) { | ||
if (log.isTraceEnabled()) { | ||
log.trace("audit log of audit log will not be executed"); | ||
Check warning on line 60 in src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java Codecov / codecov/patchsrc/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java#L60
|
||
} | ||
return true; | ||
Check warning on line 62 in src/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java Codecov / codecov/patchsrc/main/java/org/opensearch/security/auditlog/sink/AbstractInternalOpenSearchSink.java#L62
|
||
} | ||
|
||
try (StoredContext ctx = threadPool.getThreadContext().stashContext()) { | ||
try { | ||
final IndexRequestBuilder irb = clientProvider.prepareIndex(indexName) | ||
.setRefreshPolicy(RefreshPolicy.IMMEDIATE) | ||
.setSource(msg.getAsMap()); | ||
threadPool.getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER, "true"); | ||
irb.setTimeout(TimeValue.timeValueMinutes(1)); | ||
if (this.storeOpType != null) { | ||
irb.setOpType(this.storeOpType); | ||
} | ||
irb.execute().actionGet(); | ||
return true; | ||
} catch (final Exception e) { | ||
log.error("Unable to index audit log {} due to", msg, e); | ||
return false; | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
package org.opensearch.security.auditlog.sink; | ||
|
||
// CS-SUPPRESS-SINGLE: RegexpSingleline https://github.com/opensearch-project/OpenSearch/issues/3663 | ||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
import java.util.List; | ||
|
||
import org.opensearch.ResourceAlreadyExistsException; | ||
import org.opensearch.action.DocWriteRequest; | ||
import org.opensearch.action.admin.indices.datastream.CreateDataStreamAction; | ||
import org.opensearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.metadata.ComposableIndexTemplate; | ||
import org.opensearch.cluster.metadata.DataStream; | ||
import org.opensearch.cluster.metadata.Template; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.security.auditlog.impl.AuditMessage; | ||
import org.opensearch.security.support.ConfigConstants; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.transport.RemoteTransportException; | ||
|
||
public final class InternalOpenSearchDataStreamSink extends AbstractInternalOpenSearchSink { | ||
|
||
String dataStreamName; | ||
private boolean dataStreamInitialized = false; | ||
|
||
public InternalOpenSearchDataStreamSink( | ||
final String name, | ||
final Settings settings, | ||
final String settingsPrefix, | ||
final Path configPath, | ||
final Client clientProvider, | ||
ThreadPool threadPool, | ||
AuditLogSink fallbackSink | ||
) { | ||
super(name, settings, settingsPrefix, clientProvider, threadPool, fallbackSink, DocWriteRequest.OpType.CREATE); | ||
Settings sinkSettings = getSinkSettings(settingsPrefix); | ||
|
||
this.dataStreamName = sinkSettings.get(ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_NAME, "opensearch-security-auditlog"); | ||
|
||
// Node is no ready yet... this.initDataStream() must be called later (in method doStore()) | ||
} | ||
|
||
private boolean initDataStream() { | ||
|
||
if (this.dataStreamInitialized) { | ||
return true; | ||
} | ||
|
||
Settings sinkSettings = getSinkSettings(settingsPrefix); | ||
|
||
final boolean templateManage = sinkSettings.getAsBoolean( | ||
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_MANAGE, | ||
true | ||
); | ||
|
||
// Create datastream template | ||
if (templateManage) { | ||
|
||
final String templateName = sinkSettings.get( | ||
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NAME, | ||
"opensearch-security-auditlog" | ||
); | ||
final Integer numberOfReplicas = sinkSettings.getAsInt( | ||
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_REPLICAS, | ||
0 | ||
); | ||
final Integer numberOfShards = sinkSettings.getAsInt( | ||
ConfigConstants.SECURITY_AUDIT_OPENSEARCH_DATASTREAM_TEMPLATE_NUMBER_OF_SHARDS, | ||
1 | ||
); | ||
|
||
ComposableIndexTemplate template = new ComposableIndexTemplate( | ||
List.of(dataStreamName), | ||
new Template( | ||
Settings.builder().put("number_of_shards", numberOfShards).put("number_of_replicas", numberOfReplicas).build(), | ||
null, | ||
null | ||
), | ||
null, | ||
null, | ||
null, | ||
null, | ||
new ComposableIndexTemplate.DataStreamTemplate(new DataStream.TimestampField("@timestamp")) | ||
); | ||
|
||
try { | ||
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(templateName); | ||
request.indexTemplate(template); | ||
AcknowledgedResponse response = clientProvider.execute(PutComposableIndexTemplateAction.INSTANCE, request).get(); | ||
if (!response.isAcknowledged()) { | ||
log.error("Failed to create index template {}", templateName); | ||
return false; | ||
Check warning on line 105 in src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java Codecov / codecov/patchsrc/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java#L104-L105
|
||
} | ||
} catch (final Exception e) { | ||
log.error("Cannot create index template {} due to", templateName, e); | ||
return false; | ||
Check warning on line 109 in src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java Codecov / codecov/patchsrc/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java#L107-L109
|
||
} | ||
} | ||
|
||
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); | ||
try { | ||
AcknowledgedResponse response = clientProvider.admin().indices().createDataStream(createDataStreamRequest).get(); | ||
if (!response.isAcknowledged()) { | ||
log.error("Failed to create datastream {}", dataStreamName); | ||
Check warning on line 117 in src/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java Codecov / codecov/patchsrc/main/java/org/opensearch/security/auditlog/sink/InternalOpenSearchDataStreamSink.java#L117
|
||
} | ||
this.dataStreamInitialized = true; | ||
} catch (final Exception e) { | ||
if (e.getCause() instanceof ResourceAlreadyExistsException | ||
|| (e.getCause() instanceof RemoteTransportException | ||
&& e.getCause().getCause() instanceof ResourceAlreadyExistsException)) { | ||
log.trace("Datastream {} already exists", dataStreamName); | ||
this.dataStreamInitialized = true; | ||
} else { | ||
log.error("Cannot create datastream {} due to", dataStreamName, e); | ||
return false; | ||
} | ||
} | ||
|
||
return this.dataStreamInitialized; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
|
||
} | ||
|
||
public boolean doStore(final AuditMessage msg) { | ||
|
||
if (!this.initDataStream()) { | ||
log.error("Datastream initializaten failed. Cannot write to auditlog"); | ||
return false; | ||
} | ||
|
||
return super.doStore(msg, this.dataStreamName); | ||
} | ||
} |