Skip to content

Request response workspace #843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package software.amazon.awssdk.crt.iot;

import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;

/**
* A helper class for AWS service clients that use MQTT as the transport protocol.
*
* The class supports orchestrating request-response operations and creating streaming operations. Used by the
* IoT SDKs to implement higher-level service clients that provide a good user experience.
*
* Not intended to be constructed or used directly; the service client will create one during its construction.
*/
public class MqttRequestResponseClient extends CrtResource {

public MqttRequestResponseClient(Mqtt5Client client, MqttRequestResponseClientBuilder.MqttRequestResponseClientOptions options) throws CrtRuntimeException {
acquireNativeHandle(mqttRequestResponseClientNewFrom5(
this,
client.getNativeHandle(),
options.getMaxRequestResponseSubscriptions(),
options.getMaxStreamingSubscriptions(),
options.getOperationTimeoutSeconds()
));
}

public MqttRequestResponseClient(MqttClientConnection client, MqttRequestResponseClientBuilder.MqttRequestResponseClientOptions options) throws CrtRuntimeException {
acquireNativeHandle(mqttRequestResponseClientNewFrom311(
this,
client.getNativeHandle(),
options.getMaxRequestResponseSubscriptions(),
options.getMaxStreamingSubscriptions(),
options.getOperationTimeoutSeconds()
));
}

/**
* Cleans up the native resources associated with this client. The client is unusable after this call
*/
@Override
protected void releaseNativeHandle() {
if (!isNull()) {
mqttRequestResponseClientDestroy(getNativeHandle());
}
}

/**
* Determines whether a resource releases its dependencies at the same time the native handle is released or if it waits.
* Resources that wait are responsible for calling releaseReferences() manually.
*/
@Override
protected boolean canReleaseReferencesImmediately() { return true; }

/*******************************************************************************
* native methods
******************************************************************************/

private static native long mqttRequestResponseClientNewFrom5(
MqttRequestResponseClient client,
long protocolClientHandle,
int maxRequestResponseSubscriptions,
int maxStreamingSubscriptions,
int operationTimeoutSeconds
) throws CrtRuntimeException;

private static native long mqttRequestResponseClientNewFrom311(
MqttRequestResponseClient client,
long protocolClientHandle,
int maxRequestResponseSubscriptions,
int maxStreamingSubscriptions,
int operationTimeoutSeconds
) throws CrtRuntimeException;

private static native void mqttRequestResponseClientDestroy(long client);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package software.amazon.awssdk.crt.iot;

import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;

/**
* Class to configure an MQTT-based request response client.
*/
public class MqttRequestResponseClientBuilder {

public static class MqttRequestResponseClientOptions {
int maxRequestResponseSubscriptions = 0;
int maxStreamingSubscriptions = 0;
int operationTimeoutSeconds = 0;

private MqttRequestResponseClientOptions() {}

private void setMaxRequestResponseSubscriptions(int value) {
maxRequestResponseSubscriptions = value;
}

public int getMaxRequestResponseSubscriptions() { return maxRequestResponseSubscriptions; }

private void setMaxStreamingSubscriptions(int value) {
maxStreamingSubscriptions = value;
}

public int getMaxStreamingSubscriptions() { return maxStreamingSubscriptions; }

private void setOperationTimeoutSeconds(int value) {
operationTimeoutSeconds = value;
}

public int getOperationTimeoutSeconds() { return operationTimeoutSeconds; }
}

private MqttRequestResponseClientOptions options = new MqttRequestResponseClientOptions();

public MqttRequestResponseClientBuilder() {
}

/**
* Sets the maximum number of subscriptions that the client will concurrently use for request-response operations
*
* @param maxRequestResponseSubscriptions maximum number of subscriptions that the client will concurrently use for request-response operations
* @return the builder instance
*/
public MqttRequestResponseClientBuilder withMaxRequestResponseSubscriptions(int maxRequestResponseSubscriptions) {
options.setMaxRequestResponseSubscriptions(maxRequestResponseSubscriptions);

return this;
}

/**
* Sets the maximum number of subscriptions that the client will concurrently use for streaming operations
*
* @param maxStreamingSubscriptions aximum number of subscriptions that the client will concurrently use for streaming operations
* @return the builder instance
*/
public MqttRequestResponseClientBuilder withMaxStreamingSubscriptions(int maxStreamingSubscriptions) {
options.setMaxStreamingSubscriptions(maxStreamingSubscriptions);

return this;
}

/**
* Sets the Duration, in seconds, that a request-response operation will wait for completion before giving up
*
* @param operationTimeoutSeconds curation, in seconds, that a request-response operation will wait for completion before giving up
* @return the builder instance
*/
public MqttRequestResponseClientBuilder withOperationTimeoutSeconds(int operationTimeoutSeconds) {
options.setOperationTimeoutSeconds(operationTimeoutSeconds);

return this;
}

/**
* Creates a new MQTT request-response client that uses an MQTT5 client as transport
*
* @param client MQTT5 client to use
* @return a new MQTT request-response client
*/
public MqttRequestResponseClient build(Mqtt5Client client) {
return new MqttRequestResponseClient(client, options);
}

/**
* Creates a new MQTT request-response client that uses an MQTT311 client as transport
*
* @param client MQTT311 client to use
* @return a new MQTT request-response client
*/
public MqttRequestResponseClient build(MqttClientConnection client) {
return new MqttRequestResponseClient(client, options);
}
}
6 changes: 6 additions & 0 deletions src/native/mqtt5_client_jni.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
#ifndef AWS_JNI_CLIENT_H
#define AWS_JNI_CLIENT_H

#include <jni.h>

#include <aws/io/tls_channel_handler.h>

struct aws_mqtt5_client;

struct aws_mqtt5_client_java_jni {
struct aws_mqtt5_client *client;
jobject jni_client;
Expand Down
28 changes: 1 addition & 27 deletions src/native/mqtt_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,7 @@
#include "http_request_utils.h"
#include "java_class_ids.h"
#include "mqtt5_client_jni.h"

/*******************************************************************************
* mqtt_jni_async_callback - carries an AsyncCallback around as user data to mqtt
* async ops, and is used to deliver callbacks. Also hangs on to JNI references
* to buffers and strings that need to outlive the request
******************************************************************************/
struct mqtt_jni_async_callback {
struct mqtt_jni_connection *connection;
jobject async_callback;
struct aws_byte_buf buffer; /* payloads or other pinned resources go in here, freed when callback is delivered */
};

/*******************************************************************************
* mqtt_jni_connection - represents an aws_mqtt_client_connection to Java
******************************************************************************/
struct mqtt_jni_connection {
struct aws_mqtt_client *client; /* Provided to mqtt_connect */
struct aws_mqtt_client_connection *client_connection;
struct aws_socket_options socket_options;
struct aws_tls_connection_options tls_options;

JavaVM *jvm;
jweak java_mqtt_connection; /* MqttClientConnection instance */
struct mqtt_jni_async_callback *on_message;

struct aws_atomic_var ref_count;
};
#include "mqtt_connection.h"

/*******************************************************************************
* mqtt_jni_ws_handshake - Data needed to perform the async websocket handshake
Expand Down
46 changes: 46 additions & 0 deletions src/native/mqtt_connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#ifndef AWS_JAVA_CRT_MQTT_CONNECTION_H
#define AWS_JAVA_CRT_MQTT_CONNECTION_H

#include <jni.h>

#include <aws/common/atomics.h>
#include <aws/common/byte_buf.h>
#include <aws/io/socket.h>
#include <aws/io/tls_channel_handler.h>

struct aws_mqtt_client;
struct aws_mqtt_client_connection;
struct mqtt_jni_connection;

/*******************************************************************************
* mqtt_jni_async_callback - carries an AsyncCallback around as user data to mqtt
* async ops, and is used to deliver callbacks. Also hangs on to JNI references
* to buffers and strings that need to outlive the request
******************************************************************************/
struct mqtt_jni_async_callback {
struct mqtt_jni_connection *connection;
jobject async_callback;
struct aws_byte_buf buffer; /* payloads or other pinned resources go in here, freed when callback is delivered */
};

/*******************************************************************************
* mqtt_jni_connection - represents an aws_mqtt_client_connection to Java
******************************************************************************/
struct mqtt_jni_connection {
struct aws_mqtt_client *client; /* Provided to mqtt_connect */
struct aws_mqtt_client_connection *client_connection;
struct aws_socket_options socket_options;
struct aws_tls_connection_options tls_options;

JavaVM *jvm;
jweak java_mqtt_connection; /* MqttClientConnection instance */
struct mqtt_jni_async_callback *on_message;

struct aws_atomic_var ref_count;
};

#endif /* AWS_JAVA_CRT_MQTT_CONNECTION_H */
Loading
Loading