diff --git a/server/src/main/java/org/opensearch/common/io/stream/ProtobufWriteable.java b/server/src/main/java/org/opensearch/common/io/stream/ProtobufWriteable.java index a5d5201c52022..73d3d45fd9f2f 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/ProtobufWriteable.java +++ b/server/src/main/java/org/opensearch/common/io/stream/ProtobufWriteable.java @@ -27,7 +27,7 @@ public interface ProtobufWriteable extends BaseWriteable headers) { + this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT); + } + + public ProtobufCancellableTask( + long id, + String type, + String action, + String description, + ProtobufTaskId parentTaskId, + Map headers, + TimeValue cancelAfterTimeInterval + ) { + super(id, type, action, description, parentTaskId, headers); + this.cancelAfterTimeInterval = cancelAfterTimeInterval; + } + + /** + * This method is called by the task manager when this task is cancelled. + */ + public void cancel(String reason) { + assert reason != null; + if (cancelled.compareAndSet(false, true)) { + this.reason = reason; + onCancelled(); + } + } + + /** + * Returns true if this task should be automatically cancelled if the coordinating node that + * requested this task left the cluster. + */ + public boolean cancelOnParentLeaving() { + return true; + } + + /** + * Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled. + */ + public abstract boolean shouldCancelChildrenOnCancellation(); + + public boolean isCancelled() { + return cancelled.get(); + } + + public TimeValue getCancellationTimeout() { + return cancelAfterTimeInterval; + } + + /** + * The reason the task was cancelled or null if it hasn't been cancelled. + */ + @Nullable + public final String getReasonCancelled() { + return reason; + } + + /** + * Called after the task is cancelled so that it can take any actions that it has to take. + */ + protected void onCancelled() {} +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTask.java b/server/src/main/java/org/opensearch/tasks/ProtobufTask.java new file mode 100644 index 0000000000000..6d67d1c48b315 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTask.java @@ -0,0 +1,446 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionResponse; +import org.opensearch.action.NotifyOnceListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.NamedWriteable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Current task information +* +* @opensearch.internal +*/ +public class ProtobufTask { + + private static final Logger logger = LogManager.getLogger(ProtobufTask.class); + + /** + * The request header to mark tasks with specific ids + */ + public static final String X_OPAQUE_ID = "X-Opaque-Id"; + + private static final String TOTAL = "total"; + + private final long id; + + private final String type; + + private final String action; + + private final String description; + + private final ProtobufTaskId parentTask; + + private final Map headers; + + private final Map> resourceStats; + + private final List> resourceTrackingCompletionListeners; + + /** + * Keeps track of the number of active resource tracking threads for this task. It is initialized to 1 to track + * the task's own/self thread. When this value becomes 0, all threads have been marked inactive and the resource + * tracking can be stopped for this task. + */ + private final AtomicInteger numActiveResourceTrackingThreads = new AtomicInteger(1); + + /** + * The task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style). + */ + private final long startTime; + + /** + * The task's start time as a relative time ({@link System#nanoTime()} style). + */ + private final long startTimeNanos; + + public ProtobufTask(long id, String type, String action, String description, ProtobufTaskId parentTask, Map headers) { + this( + id, + type, + action, + description, + parentTask, + System.currentTimeMillis(), + System.nanoTime(), + headers, + new ConcurrentHashMap<>(), + new ArrayList<>() + ); + } + + public ProtobufTask( + long id, + String type, + String action, + String description, + ProtobufTaskId parentTask, + long startTime, + long startTimeNanos, + Map headers, + ConcurrentHashMap> resourceStats, + List> resourceTrackingCompletionListeners + ) { + this.id = id; + this.type = type; + this.action = action; + this.description = description; + this.parentTask = parentTask; + this.startTime = startTime; + this.startTimeNanos = startTimeNanos; + this.headers = headers; + this.resourceStats = resourceStats; + this.resourceTrackingCompletionListeners = resourceTrackingCompletionListeners; + } + + /** + * Build a version of the task status you can throw over the wire and back + * to the user. + * + * @param localNodeId + * the id of the node this task is running on + * @param detailed + * should the information include detailed, potentially slow to + * generate data? + */ + public final ProtobufTaskInfo taskInfo(String localNodeId, boolean detailed) { + return taskInfo(localNodeId, detailed, detailed == false); + } + + /** + * Build a version of the task status you can throw over the wire and back + * with the option to include resource stats or not. + * This method is only used during creating TaskResult to avoid storing resource information into the task index. + * + * @param excludeStats should information exclude resource stats. + * By default, detailed flag is used to control including resource information. + * But inorder to avoid storing resource stats into task index as strict mapping is enforced and breaks when adding this field. + * In the future, task-index-mapping.json can be modified to add resource stats. + */ + private ProtobufTaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeStats) { + String description = null; + ProtobufTask.Status status = null; + TaskResourceStats resourceStats = null; + if (detailed) { + description = getDescription(); + status = getStatus(); + } + if (excludeStats == false) { + resourceStats = new TaskResourceStats(new HashMap<>() { + { + put(TOTAL, getTotalResourceStats()); + } + }); + } + return taskInfo(localNodeId, description, status, resourceStats); + } + + /** + * Build a {@link ProtobufTaskInfo} for this task without resource stats. + */ + protected final ProtobufTaskInfo taskInfo(String localNodeId, String description, Status status) { + return taskInfo(localNodeId, description, status, null); + } + + /** + * Build a proper {@link ProtobufTaskInfo} for this task. + */ + protected final ProtobufTaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) { + return new ProtobufTaskInfo( + new ProtobufTaskId(localNodeId, getId()), + getType(), + getAction(), + description, + status, + startTime, + System.nanoTime() - startTimeNanos, + this instanceof ProtobufCancellableTask, + this instanceof ProtobufCancellableTask && ((ProtobufCancellableTask) this).isCancelled(), + parentTask, + headers, + resourceStats + ); + } + + /** + * Returns task id + */ + public long getId() { + return id; + } + + /** + * Returns task channel type (netty, transport, direct) + */ + public String getType() { + return type; + } + + /** + * Returns task action + */ + public String getAction() { + return action; + } + + /** + * Generates task description + */ + public String getDescription() { + return description; + } + + /** + * Returns the task's start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style). + */ + public long getStartTime() { + return startTime; + } + + /** + * Returns the task's start time in nanoseconds ({@link System#nanoTime()} style). + */ + public long getStartTimeNanos() { + return startTimeNanos; + } + + /** + * Returns id of the parent task or NO_PARENT_ID if the task doesn't have any parent tasks + */ + public ProtobufTaskId getParentTaskId() { + return parentTask; + } + + /** + * Build a status for this task or null if this task doesn't have status. + * Since most tasks don't have status this defaults to returning null. While + * this can never perform IO it might be a costly operation, requiring + * collating lists of results, etc. So only use it if you need the value. + */ + public Status getStatus() { + return null; + } + + /** + * Returns thread level resource consumption of the task + */ + public Map> getResourceStats() { + return Collections.unmodifiableMap(resourceStats); + } + + /** + * Returns current total resource usage of the task. + * Currently, this method is only called on demand, during get and listing of tasks. + * In the future, these values can be cached as an optimization. + */ + public TaskResourceUsage getTotalResourceStats() { + return new TaskResourceUsage(getTotalResourceUtilization(ResourceStats.CPU), getTotalResourceUtilization(ResourceStats.MEMORY)); + } + + /** + * Returns total resource consumption for a specific task stat. + */ + public long getTotalResourceUtilization(ResourceStats stats) { + long totalResourceConsumption = 0L; + for (List threadResourceInfosList : resourceStats.values()) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) { + final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats); + if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) { + totalResourceConsumption += statsInfo.getTotalValue(); + } + } + } + return totalResourceConsumption; + } + + /** + * Adds thread's starting resource consumption information + * @param threadId ID of the thread + * @param statsType stats type + * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException matching active thread entry was found which is not expected. + */ + public void startThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + final List threadResourceInfoList = resourceStats.computeIfAbsent(threadId, k -> new ArrayList<>()); + // active thread entry should not be present in the list + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + throw new IllegalStateException( + "unexpected active thread resource entry present [" + threadId + "]:[" + threadResourceInfo + "]" + ); + } + } + threadResourceInfoList.add(new ThreadResourceInfo(threadId, statsType, resourceUsageMetrics)); + incrementResourceTrackingThreads(); + } + + /** + * This method is used to update the resource consumption stats so that the data isn't too stale for long-running task. + * If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception. + * @param threadId ID of the thread + * @param statsType stats type + * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException if no matching active thread entry was found. + */ + public void updateThreadResourceStats(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + final List threadResourceInfoList = resourceStats.get(threadId); + if (threadResourceInfoList != null) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + // the active entry present in the list is updated + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); + return; + } + } + } + throw new IllegalStateException("cannot update if active thread resource entry is not present"); + } + + /** + * Record the thread's final resource consumption values. + * If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception. + * @param threadId ID of the thread + * @param statsType stats type + * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException if no matching active thread entry was found. + */ + public void stopThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { + final List threadResourceInfoList = resourceStats.get(threadId); + if (threadResourceInfoList != null) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + threadResourceInfo.setActive(false); + threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); + decrementResourceTrackingThreads(); + return; + } + } + } + throw new IllegalStateException("cannot update final values if active thread resource entry is not present"); + } + + /** + * Individual tasks can override this if they want to support task resource tracking. We just need to make sure that + * the ThreadPool on which the task runs on have runnable wrapper similar to + * {@link org.opensearch.common.util.concurrent.OpenSearchExecutors#newResizable} + * + * @return true if resource tracking is supported by the task + */ + public boolean supportsResourceTracking() { + return false; + } + + /** + * Report of the internal status of a task. These can vary wildly from task + * to task because each task is implemented differently but we should try + * to keep each task consistent from version to version where possible. + * That means each implementation of {@linkplain ProtobufTask.Status#toXContent} + * should avoid making backwards incompatible changes to the rendered + * result. But if we change the way a request is implemented it might not + * be possible to preserve backwards compatibility. In that case, we + * can change this on version upgrade but we should be careful + * because some statuses (reindex) have become defacto standardized because + * they are used by systems like Kibana. + */ + public interface Status extends ToXContentObject, NamedWriteable {} + + /** + * Returns stored task header associated with the task + */ + public String getHeader(String header) { + return headers.get(header); + } + + public ProtobufTaskResult result(DiscoveryNode node, Exception error) throws IOException { + return new ProtobufTaskResult(taskInfo(node.getId(), true, true), error); + } + + public ProtobufTaskResult result(DiscoveryNode node, ActionResponse response) throws IOException { + if (response instanceof ToXContent) { + return new ProtobufTaskResult(taskInfo(node.getId(), true, true), (ToXContent) response); + } else { + throw new IllegalStateException("response has to implement ToXContent to be able to store the results"); + } + } + + /** + * Registers a task resource tracking completion listener on this task if resource tracking is still active. + * Returns true on successful subscription, false otherwise. + */ + public boolean addResourceTrackingCompletionListener(NotifyOnceListener listener) { + if (numActiveResourceTrackingThreads.get() > 0) { + resourceTrackingCompletionListeners.add(listener); + return true; + } + + return false; + } + + /** + * Increments the number of active resource tracking threads. + * + * @return the number of active resource tracking threads. + */ + public int incrementResourceTrackingThreads() { + return numActiveResourceTrackingThreads.incrementAndGet(); + } + + /** + * Decrements the number of active resource tracking threads. + * This method is called when threads finish execution, and also when the task is unregistered (to mark the task's + * own thread as complete). When the active thread count becomes zero, the onTaskResourceTrackingCompleted method + * is called exactly once on all registered listeners. + * + * Since a task is unregistered after the message is processed, it implies that the threads responsible to produce + * the response must have started prior to it (i.e. startThreadResourceTracking called before unregister). + * This ensures that the number of active threads doesn't drop to zero pre-maturely. + * + * Rarely, some threads may even start execution after the task is unregistered. As resource stats are piggy-backed + * with the response, any thread usage info captured after the task is unregistered may be irrelevant. + * + * @return the number of active resource tracking threads. + */ + public int decrementResourceTrackingThreads() { + int count = numActiveResourceTrackingThreads.decrementAndGet(); + + if (count == 0) { + List listenerExceptions = new ArrayList<>(); + resourceTrackingCompletionListeners.forEach(listener -> { + try { + listener.onResponse(this); + } catch (Exception e1) { + try { + listener.onFailure(e1); + } catch (Exception e2) { + listenerExceptions.add(e2); + } + } + }); + ExceptionsHelper.maybeThrowRuntimeAndSuppress(listenerExceptions); + } + + return count; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTaskAwareRequest.java b/server/src/main/java/org/opensearch/tasks/ProtobufTaskAwareRequest.java new file mode 100644 index 0000000000000..beebdea5beebb --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTaskAwareRequest.java @@ -0,0 +1,50 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import java.util.Map; + +/** + * An interface for a request that can be used to register a task manager task +* +* @opensearch.internal +*/ +public interface ProtobufTaskAwareRequest { + /** + * Set a reference to task that caused this task to be run. + */ + default void setParentTask(String parentTaskNode, long parentTaskId) { + setParentTask(new ProtobufTaskId(parentTaskNode, parentTaskId)); + } + + /** + * Set a reference to task that created this request. + */ + void setParentTask(ProtobufTaskId taskId); + + /** + * Get a reference to the task that created this request. Implementers should default to + * {@link ProtobufTaskId#EMPTY_TASK_ID}, meaning "there is no parent". + */ + ProtobufTaskId getParentTask(); + + /** + * Returns the task object that should be used to keep track of the processing of the request. + */ + default ProtobufTask createTask(long id, String type, String action, ProtobufTaskId parentTaskId, Map headers) { + return new ProtobufTask(id, type, action, getDescription(), parentTaskId, headers); + } + + /** + * Returns optional description of the request to be displayed by the task manager + */ + default String getDescription() { + return ""; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTaskId.java b/server/src/main/java/org/opensearch/tasks/ProtobufTaskId.java new file mode 100644 index 0000000000000..c7bc874d9a74f --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTaskId.java @@ -0,0 +1,143 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import org.opensearch.OpenSearchParseException; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.ProtobufWriteable; +import org.opensearch.core.xcontent.ContextParser; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; + +/** + * Task id that consists of node id and id of the task on the node +* +* @opensearch.internal +*/ +public final class ProtobufTaskId implements ProtobufWriteable { + + public static final ProtobufTaskId EMPTY_TASK_ID = new ProtobufTaskId(); + + private final String nodeId; + private final long id; + + public ProtobufTaskId(String nodeId, long id) { + if (nodeId.isEmpty()) { + throw new IllegalArgumentException("0 length nodeIds are reserved for EMPTY_TASK_ID and are otherwise invalid."); + } + this.nodeId = nodeId; + this.id = id; + } + + /** + * Builds {@link #EMPTY_TASK_ID}. + */ + private ProtobufTaskId() { + nodeId = ""; + id = -1; + } + + public ProtobufTaskId(String taskId) { + if (Strings.hasLength(taskId) && "unset".equals(taskId) == false) { + String[] s = Strings.split(taskId, ":"); + if (s == null || s.length != 2) { + throw new IllegalArgumentException("malformed task id " + taskId); + } + this.nodeId = s[0]; + try { + this.id = Long.parseLong(s[1]); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException("malformed task id " + taskId, ex); + } + } else { + nodeId = ""; + id = -1L; + } + } + + /** + * Read a {@linkplain ProtobufTaskId} from a stream. {@linkplain ProtobufTaskId} has this rather than the usual constructor that takes a + * {@linkplain CodedInputStream} so we can return the {@link #EMPTY_TASK_ID} without allocating. + */ + public static ProtobufTaskId readFromStream(com.google.protobuf.CodedInputStream in) throws IOException { + String nodeId = in.readString(); + if (nodeId.isEmpty()) { + /* + * The only TaskId allowed to have the empty string as its nodeId is the EMPTY_TASK_ID and there is only ever one of it and it + * never writes its taskId to save bytes on the wire because it is by far the most common TaskId. + */ + return EMPTY_TASK_ID; + } + return new ProtobufTaskId(nodeId, in.readInt64()); + } + + @Override + public void writeTo(com.google.protobuf.CodedOutputStream out) throws IOException { + out.writeString(0, nodeId); + if (nodeId.isEmpty()) { + // Shortcut the EMPTY_TASK_ID, the only TaskId allowed to have the empty string as its nodeId. + return; + } + out.writeInt64(1, id); + } + + public static ContextParser parser() { + return (p, c) -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return new ProtobufTaskId(p.text()); + } + throw new OpenSearchParseException("Expected a string but found [{}] instead", p.currentToken()); + }; + } + + public String getNodeId() { + return nodeId; + } + + public long getId() { + return id; + } + + public boolean isSet() { + return id != -1L; + } + + @Override + public String toString() { + if (isSet()) { + return nodeId + ":" + id; + } else { + return "unset"; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ProtobufTaskId taskId = (ProtobufTaskId) o; + + if (id != taskId.id) return false; + return nodeId.equals(taskId.nodeId); + + } + + @Override + public int hashCode() { + int result = nodeId.hashCode(); + result = 31 * result + (int) (id ^ (id >>> 32)); + return result; + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTaskInfo.java b/server/src/main/java/org/opensearch/tasks/ProtobufTaskInfo.java new file mode 100644 index 0000000000000..ddd900b493be8 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTaskInfo.java @@ -0,0 +1,355 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import org.opensearch.Version; +import org.opensearch.common.Strings; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.ProtobufWriteable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.common.xcontent.ObjectParserHelper; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg; +import static org.opensearch.core.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * Information about a currently running task. +*

+* Tasks are used for communication with transport actions. As a result, they can contain callback +* references as well as mutable state. That makes it impractical to send tasks over transport channels +* and use in APIs. Instead, immutable and writeable ProtobufTaskInfo objects are used to represent +* snapshot information about currently running tasks. +* +* @opensearch.internal +*/ +public final class ProtobufTaskInfo implements ProtobufWriteable, ToXContentFragment { + private final ProtobufTaskId taskId; + + private final String type; + + private final String action; + + private final String description; + + private final long startTime; + + private final long runningTimeNanos; + + private final ProtobufTask.Status status; + + private final boolean cancellable; + + private final boolean cancelled; + + private final ProtobufTaskId parentTaskId; + + private final Map headers; + + private final TaskResourceStats resourceStats; + + public ProtobufTaskInfo( + ProtobufTaskId taskId, + String type, + String action, + String description, + ProtobufTask.Status status, + long startTime, + long runningTimeNanos, + boolean cancellable, + boolean cancelled, + ProtobufTaskId parentTaskId, + Map headers, + TaskResourceStats resourceStats + ) { + if (cancellable == false && cancelled == true) { + throw new IllegalArgumentException("task cannot be cancelled"); + } + this.taskId = taskId; + this.type = type; + this.action = action; + this.description = description; + this.status = status; + this.startTime = startTime; + this.runningTimeNanos = runningTimeNanos; + this.cancellable = cancellable; + this.cancelled = cancelled; + this.parentTaskId = parentTaskId; + this.headers = headers; + this.resourceStats = resourceStats; + } + + /** + * Read from a stream. + */ + @SuppressWarnings("unchecked") + public ProtobufTaskInfo(com.google.protobuf.CodedInputStream in) throws IOException { + taskId = ProtobufTaskId.readFromStream(in); + type = in.readString(); + action = in.readString(); + description = in.readString(); + status = null; + startTime = in.readInt64(); + runningTimeNanos = in.readInt64(); + cancellable = in.readBool(); + cancelled = false; + if (cancellable == false && cancelled == true) { + throw new IllegalArgumentException("task cannot be cancelled"); + } + parentTaskId = ProtobufTaskId.readFromStream(in); + headers = in.readMap(StreamInput::readString, StreamInput::readString); + resourceStats = null; + } + + @Override + public void writeTo(com.google.protobuf.CodedOutputStream out) throws IOException { + taskId.writeTo(out); + out.writeString(0, type); + out.writeString(1, action); + out.writeString(2, description); + // out.writeOptionalNamedWriteable(status); + out.writeInt64(4, startTime); + out.writeInt64(5, runningTimeNanos); + out.writeBool(6, cancellable); + out.writeBool(7, cancelled); + parentTaskId.writeTo(out); + out.writeMap(8, headers, StreamOutput::writeString, StreamOutput::writeString); + } + + public ProtobufTaskId getTaskId() { + return taskId; + } + + public long getId() { + return taskId.getId(); + } + + public String getType() { + return type; + } + + public String getAction() { + return action; + } + + public String getDescription() { + return description; + } + + /** + * The status of the running task. Only available if TaskInfos were build + * with the detailed flag. + */ + public ProtobufTask.Status getStatus() { + return status; + } + + /** + * Returns the task start time + */ + public long getStartTime() { + return startTime; + } + + /** + * Returns the task running time + */ + public long getRunningTimeNanos() { + return runningTimeNanos; + } + + /** + * Returns true if the task supports cancellation + */ + public boolean isCancellable() { + return cancellable; + } + + /** + * Returns true if the task has been cancelled + */ + public boolean isCancelled() { + return cancelled; + } + + /** + * Returns the parent task id + */ + public ProtobufTaskId getParentTaskId() { + return parentTaskId; + } + + /** + * Returns the task headers + */ + public Map getHeaders() { + return headers; + } + + /** + * Returns the task resource information + */ + public TaskResourceStats getResourceStats() { + return resourceStats; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("node", taskId.getNodeId()); + builder.field("id", taskId.getId()); + builder.field("type", type); + builder.field("action", action); + if (status != null) { + builder.field("status", status, params); + } + if (description != null) { + builder.field("description", description); + } + builder.timeField("start_time_in_millis", "start_time", startTime); + if (builder.humanReadable()) { + builder.field("running_time", new TimeValue(runningTimeNanos, TimeUnit.NANOSECONDS).toString()); + } + builder.field("running_time_in_nanos", runningTimeNanos); + builder.field("cancellable", cancellable); + builder.field("cancelled", cancelled); + if (parentTaskId.isSet()) { + builder.field("parent_task_id", parentTaskId.toString()); + } + builder.startObject("headers"); + for (Map.Entry attribute : headers.entrySet()) { + builder.field(attribute.getKey(), attribute.getValue()); + } + builder.endObject(); + if (resourceStats != null) { + builder.startObject("resource_stats"); + resourceStats.toXContent(builder, params); + builder.endObject(); + } + return builder; + } + + public static ProtobufTaskInfo fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("task_info", true, a -> { + int i = 0; + ProtobufTaskId id = new ProtobufTaskId((String) a[i++], (Long) a[i++]); + String type = (String) a[i++]; + String action = (String) a[i++]; + String description = (String) a[i++]; + BytesReference statusBytes = (BytesReference) a[i++]; + long startTime = (Long) a[i++]; + long runningTimeNanos = (Long) a[i++]; + boolean cancellable = (Boolean) a[i++]; + boolean cancelled = a[i++] == Boolean.TRUE; + String parentTaskIdString = (String) a[i++]; + @SuppressWarnings("unchecked") + Map headers = (Map) a[i++]; + if (headers == null) { + // This might happen if we are reading an old version of task info + headers = Collections.emptyMap(); + } + @SuppressWarnings("unchecked") + TaskResourceStats resourceStats = (TaskResourceStats) a[i++]; + RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes); + ProtobufTaskId parentTaskId = parentTaskIdString == null ? ProtobufTaskId.EMPTY_TASK_ID : new ProtobufTaskId(parentTaskIdString); + return new ProtobufTaskInfo( + id, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers, + resourceStats + ); + }); + static { + // Note for the future: this has to be backwards and forwards compatible with all changes to the task storage format + PARSER.declareString(constructorArg(), new ParseField("node")); + PARSER.declareLong(constructorArg(), new ParseField("id")); + PARSER.declareString(constructorArg(), new ParseField("type")); + PARSER.declareString(constructorArg(), new ParseField("action")); + PARSER.declareString(optionalConstructorArg(), new ParseField("description")); + ObjectParserHelper parserHelper = new ObjectParserHelper<>(); + parserHelper.declareRawObject(PARSER, optionalConstructorArg(), new ParseField("status")); + PARSER.declareLong(constructorArg(), new ParseField("start_time_in_millis")); + PARSER.declareLong(constructorArg(), new ParseField("running_time_in_nanos")); + PARSER.declareBoolean(constructorArg(), new ParseField("cancellable")); + PARSER.declareBoolean(optionalConstructorArg(), new ParseField("cancelled")); + PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id")); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers")); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> TaskResourceStats.fromXContent(p), new ParseField("resource_stats")); + } + + @Override + public String toString() { + return Strings.toString(XContentType.JSON, this, true, true); + } + + // Implements equals and hashCode for testing + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != ProtobufTaskInfo.class) { + return false; + } + ProtobufTaskInfo other = (ProtobufTaskInfo) obj; + return Objects.equals(taskId, other.taskId) + && Objects.equals(type, other.type) + && Objects.equals(action, other.action) + && Objects.equals(description, other.description) + && Objects.equals(startTime, other.startTime) + && Objects.equals(runningTimeNanos, other.runningTimeNanos) + && Objects.equals(parentTaskId, other.parentTaskId) + && Objects.equals(cancellable, other.cancellable) + && Objects.equals(cancelled, other.cancelled) + && Objects.equals(status, other.status) + && Objects.equals(headers, other.headers) + && Objects.equals(resourceStats, other.resourceStats); + } + + @Override + public int hashCode() { + return Objects.hash( + taskId, + type, + action, + description, + startTime, + runningTimeNanos, + parentTaskId, + cancellable, + cancelled, + status, + headers, + resourceStats + ); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/ProtobufTaskResult.java b/server/src/main/java/org/opensearch/tasks/ProtobufTaskResult.java new file mode 100644 index 0000000000000..c3314ae40455f --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ProtobufTaskResult.java @@ -0,0 +1,226 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.tasks; + +import org.opensearch.OpenSearchException; +import org.opensearch.client.Requests; +import org.opensearch.common.Nullable; +import org.opensearch.core.ParseField; +import org.opensearch.common.Strings; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.ProtobufWriteable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.xcontent.InstantiatingObjectParser; +import org.opensearch.common.xcontent.ObjectParserHelper; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import static java.util.Collections.emptyMap; +import static java.util.Objects.requireNonNull; +import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg; +import static org.opensearch.core.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.opensearch.common.xcontent.XContentHelper.convertToMap; + +/** + * Information about a running task or a task that stored its result. Running tasks just have a {@link #getTask()} while +* tasks with stored result will have either a {@link #getError()} or {@link #getResponse()}. +* +* @opensearch.internal +*/ +public final class ProtobufTaskResult implements ProtobufWriteable, ToXContentObject { + private final boolean completed; + private final ProtobufTaskInfo task; + @Nullable + private final BytesReference error; + @Nullable + private final BytesReference response; + + /** + * Construct a {@linkplain TaskResult} for a task for which we don't have a result or error. That usually means that the task + * is incomplete, but it could also mean that we waited for the task to complete but it didn't save any error information. + */ + public ProtobufTaskResult(boolean completed, ProtobufTaskInfo task) { + this(completed, task, null, null); + } + + /** + * Construct a {@linkplain TaskResult} for a task that completed with an error. + */ + public ProtobufTaskResult(ProtobufTaskInfo task, Exception error) throws IOException { + this(true, task, toXContent(error), null); + } + + /** + * Construct a {@linkplain ProtobufTaskResult} for a task that completed successfully. + */ + public ProtobufTaskResult(ProtobufTaskInfo task, ToXContent response) throws IOException { + this(true, task, null, XContentHelper.toXContent(response, Requests.INDEX_CONTENT_TYPE, true)); + } + + public ProtobufTaskResult(boolean completed, ProtobufTaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) { + this.completed = completed; + this.task = requireNonNull(task, "task is required"); + this.error = error; + this.response = result; + } + + /** + * Read from a stream. + */ + public ProtobufTaskResult(com.google.protobuf.CodedInputStream in) throws IOException { + completed = in.readBool(); + task = new ProtobufTaskInfo(in); + error = in.readOptionalBytesReference(); + response = in.readOptionalBytesReference(); + } + + @Override + public void writeTo(com.google.protobuf.CodedOutputStream out) throws IOException { + out.writeBool(0, completed); + task.writeTo(out); + out.writeByteArray(1, error); + out.writeOptionalBytesReference(response); + } + + /** + * Get the task that this wraps. + */ + public ProtobufTaskInfo getTask() { + return task; + } + + /** + * Get the error that finished this task. Will return null if the task didn't finish with an error, it hasn't yet finished, or didn't + * store its result. + */ + public BytesReference getError() { + return error; + } + + /** + * Convert {@link #getError()} from XContent to a Map for easy processing. Will return an empty map if the task didn't finish with an + * error, hasn't yet finished, or didn't store its result. + */ + public Map getErrorAsMap() { + if (error == null) { + return emptyMap(); + } + return convertToMap(error, false).v2(); + } + + /** + * Get the response that this task finished with. Will return null if the task was finished by an error, it hasn't yet finished, or + * didn't store its result. + */ + public BytesReference getResponse() { + return response; + } + + /** + * Convert {@link #getResponse()} from XContent to a Map for easy processing. Will return an empty map if the task was finished with an + * error, hasn't yet finished, or didn't store its result. + */ + public Map getResponseAsMap() { + if (response == null) { + return emptyMap(); + } + return convertToMap(response, false).v2(); + } + + public boolean isCompleted() { + return completed; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + innerToXContent(builder, params); + return builder.endObject(); + } + + public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("completed", completed); + builder.startObject("task"); + task.toXContent(builder, params); + builder.endObject(); + if (error != null) { + XContentHelper.writeRawField("error", error, builder, params); + } + if (response != null) { + XContentHelper.writeRawField("response", response, builder, params); + } + return builder; + } + + public static final InstantiatingObjectParser PARSER; + + static { + InstantiatingObjectParser.Builder parser = InstantiatingObjectParser.builder( + "stored_task_result", + true, + ProtobufTaskResult.class + ); + parser.declareBoolean(constructorArg(), new ParseField("completed")); + parser.declareObject(constructorArg(), ProtobufTaskInfo.PARSER, new ParseField("task")); + ObjectParserHelper parserHelper = new ObjectParserHelper<>(); + parserHelper.declareRawObject(parser, optionalConstructorArg(), new ParseField("error")); + parserHelper.declareRawObject(parser, optionalConstructorArg(), new ParseField("response")); + PARSER = parser.build(); + } + + @Override + public String toString() { + return Strings.toString(XContentType.JSON, this); + } + + // Implements equals and hashcode for testing + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != ProtobufTaskResult.class) { + return false; + } + ProtobufTaskResult other = (ProtobufTaskResult) obj; + /* + * Equality of error and result is done by converting them to a map first. Not efficient but ignores field order and spacing + * differences so perfect for testing. + */ + return Objects.equals(completed, other.completed) + && Objects.equals(task, other.task) + && Objects.equals(getErrorAsMap(), other.getErrorAsMap()) + && Objects.equals(getResponseAsMap(), other.getResponseAsMap()); + } + + @Override + public int hashCode() { + /* + * Hashing of error and result is done by converting them to a map first. Not efficient but ignores field order and spacing + * differences so perfect for testing. + */ + return Objects.hash(completed, task, getErrorAsMap(), getResponseAsMap()); + } + + private static BytesReference toXContent(Exception error) throws IOException { + try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { + builder.startObject(); + OpenSearchException.generateThrowableXContent(builder, ToXContent.EMPTY_PARAMS, error); + builder.endObject(); + return BytesReference.bytes(builder); + } + } +} diff --git a/server/src/main/java/org/opensearch/transport/ProtobufTransportMessage.java b/server/src/main/java/org/opensearch/transport/ProtobufTransportMessage.java new file mode 100644 index 0000000000000..0bca490774ec4 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/ProtobufTransportMessage.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +import com.google.protobuf.CodedInputStream; +import org.opensearch.common.io.stream.ProtobufWriteable; +import org.opensearch.common.transport.TransportAddress; + +/** + * Message over the transport interface with Protobuf serialization. +* +* @opensearch.internal +*/ +public abstract class ProtobufTransportMessage implements ProtobufWriteable { + + private TransportAddress remoteAddress; + + public void remoteAddress(TransportAddress remoteAddress) { + this.remoteAddress = remoteAddress; + } + + public TransportAddress remoteAddress() { + return remoteAddress; + } + + /** + * Constructs a new empty transport message + */ + public ProtobufTransportMessage() {} + + /** + * Constructs a new transport message with the data from the {@link CodedInputStream}. This is + * currently a no-op + */ + public ProtobufTransportMessage(com.google.protobuf.CodedInputStream in) {} +} + \ No newline at end of file diff --git a/server/src/main/java/org/opensearch/transport/ProtobufTransportRequest.java b/server/src/main/java/org/opensearch/transport/ProtobufTransportRequest.java new file mode 100644 index 0000000000000..ec78e3cdb2bf1 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/ProtobufTransportRequest.java @@ -0,0 +1,59 @@ +/* +* SPDX-License-Identifier: Apache-2.0 +* +* The OpenSearch Contributors require contributions made to +* this file be licensed under the Apache-2.0 license or a +* compatible open source license. +*/ + +package org.opensearch.transport; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.tasks.ProtobufTaskId; +import org.opensearch.tasks.ProtobufTaskAwareRequest; +import org.opensearch.tasks.TaskId; + +import java.io.IOException; + +/** + * A transport request with Protobuf serialization. +* +* @opensearch.internal +*/ +public abstract class ProtobufTransportRequest extends ProtobufTransportMessage implements ProtobufTaskAwareRequest { + + /** + * Parent of this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "no parent". + */ + private ProtobufTaskId parentTaskId = ProtobufTaskId.EMPTY_TASK_ID; + + public ProtobufTransportRequest() {} + + public ProtobufTransportRequest(com.google.protobuf.CodedInputStream in) throws IOException { + parentTaskId = ProtobufTaskId.readFromStream(in); + } + + /** + * Set a reference to task that created this request. + */ + @Override + public void setParentTask(ProtobufTaskId taskId) { + this.parentTaskId = taskId; + } + + /** + * Get a reference to the task that created this request. Defaults to {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent". + */ + @Override + public ProtobufTaskId getParentTask() { + return parentTaskId; + } + + @Override + public void writeTo(com.google.protobuf.CodedOutputStream out) throws IOException { + parentTaskId.writeTo(out); + } +}