Skip to content

Commit

Permalink
connector transport actions, requests and responses (#1053)
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Zhang <[email protected]>
  • Loading branch information
Zhangxunmt authored Jul 9, 2023
1 parent 4e1fb9b commit 79f1806
Show file tree
Hide file tree
Showing 10 changed files with 642 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.transport.connector;

import org.opensearch.action.ActionType;
import org.opensearch.action.delete.DeleteResponse;

public class MLConnectorDeleteAction extends ActionType<DeleteResponse> {
public static final MLConnectorDeleteAction INSTANCE = new MLConnectorDeleteAction();
public static final String NAME = "cluster:admin/opensearch/ml/connectors/delete";

private MLConnectorDeleteAction() { super(NAME, DeleteResponse::new);}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.transport.connector;

import lombok.Builder;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;

import static org.opensearch.action.ValidateActions.addValidationError;

public class MLConnectorDeleteRequest extends ActionRequest {
@Getter
String connectorId;

@Builder
public MLConnectorDeleteRequest(String connectorId) {
this.connectorId = connectorId;
}

public MLConnectorDeleteRequest(StreamInput input) throws IOException {
super(input);
this.connectorId = input.readString();
}

@Override
public void writeTo(StreamOutput output) throws IOException {
super.writeTo(output);
output.writeString(connectorId);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException exception = null;

if (this.connectorId == null) {
exception = addValidationError("ML connector id can't be null", exception);
}

return exception;
}

public static MLConnectorDeleteRequest fromActionRequest(ActionRequest actionRequest) {
if (actionRequest instanceof MLConnectorDeleteRequest) {
return (MLConnectorDeleteRequest)actionRequest;
}

try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
actionRequest.writeTo(osso);
try (StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) {
return new MLConnectorDeleteRequest(input);
}
} catch (IOException e) {
throw new UncheckedIOException("failed to parse ActionRequest into MLConnectorDeleteRequest", e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.transport.connector;

import org.opensearch.action.ActionType;

public class MLConnectorGetAction extends ActionType<MLConnectorGetResponse> {
public static final MLConnectorGetAction INSTANCE = new MLConnectorGetAction();
public static final String NAME = "cluster:admin/opensearch/ml/connectors/get";

private MLConnectorGetAction() { super(NAME, MLConnectorGetResponse::new);}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.transport.connector;

import lombok.Builder;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;

import static org.opensearch.action.ValidateActions.addValidationError;

@Getter
public class MLConnectorGetRequest extends ActionRequest {

String connectorId;
boolean returnContent;

@Builder
public MLConnectorGetRequest(String connectorId, boolean returnContent) {
this.connectorId = connectorId;
this.returnContent = returnContent;
}

public MLConnectorGetRequest(StreamInput in) throws IOException {
super(in);
this.connectorId = in.readString();
this.returnContent = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(this.connectorId);
out.writeBoolean(returnContent);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException exception = null;

if (this.connectorId == null) {
exception = addValidationError("ML connector id can't be null", exception);
}

return exception;
}

public static MLConnectorGetRequest fromActionRequest(ActionRequest actionRequest) {
if (actionRequest instanceof MLConnectorGetRequest) {
return (MLConnectorGetRequest) actionRequest;
}

try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
actionRequest.writeTo(osso);
try (StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) {
return new MLConnectorGetRequest(input);
}
} catch (IOException e) {
throw new UncheckedIOException("failed to parse ActionRequest into MLConnectorGetRequest", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.transport.connector;

import lombok.Builder;
import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ml.common.connector.Connector;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;

public class MLConnectorGetResponse extends ActionResponse implements ToXContentObject {
Connector mlConnector;

@Builder
public MLConnectorGetResponse(Connector mlConnector) {
this.mlConnector = mlConnector;
}

public MLConnectorGetResponse(StreamInput in) throws IOException {
super(in);
mlConnector = Connector.fromStream(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException{
mlConnector.writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder xContentBuilder, ToXContent.Params params) throws IOException {
return mlConnector.toXContent(xContentBuilder, params);
}

public static MLConnectorGetResponse fromActionResponse(ActionResponse actionResponse) {
if (actionResponse instanceof MLConnectorGetResponse) {
return (MLConnectorGetResponse) actionResponse;
}

try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) {
actionResponse.writeTo(osso);
try (StreamInput input = new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) {
return new MLConnectorGetResponse(input);
}
} catch (IOException e) {
throw new UncheckedIOException("failed to parse ActionResponse into MLConnectorGetResponse", e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.transport.connector;

import org.opensearch.action.ActionType;
import org.opensearch.action.search.SearchResponse;

public class MLConnectorSearchAction extends ActionType<SearchResponse> {
// External Action which used for public facing RestAPIs.
public static final String NAME = "cluster:admin/opensearch/ml/connectors/search";
public static final MLConnectorSearchAction INSTANCE = new MLConnectorSearchAction();

private MLConnectorSearchAction() {
super(NAME, SearchResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.common.transport.connector;

import org.opensearch.action.ActionType;

public class MLCreateConnectorAction extends ActionType<MLCreateConnectorResponse> {
public static MLCreateConnectorAction INSTANCE = new MLCreateConnectorAction();
public static final String NAME = "cluster:admin/opensearch/ml/create_connector";

private MLCreateConnectorAction() {
super(NAME, MLCreateConnectorResponse::new);
}
}
Loading

0 comments on commit 79f1806

Please sign in to comment.