-
Notifications
You must be signed in to change notification settings - Fork 159
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
30 changed files
with
1,804 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>cloudevents-parent</artifactId> | ||
<groupId>io.cloudevents</groupId> | ||
<version>2.5.0-SNAPSHOT</version> | ||
<relativePath>../../../pom.xml</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>cloudevents-mqtt-core</artifactId> | ||
<name>CloudEvents - MQTT Common</name> | ||
<packaging>jar</packaging> | ||
|
||
<properties> | ||
<module-name>io.cloudevents.mqtt.core</module-name> | ||
</properties> | ||
|
||
<dependencies> | ||
|
||
<dependency> | ||
<groupId>io.cloudevents</groupId> | ||
<artifactId>cloudevents-core</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
</dependencies> | ||
|
||
</project> |
88 changes: 88 additions & 0 deletions
88
bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package io.cloudevents.mqtt.core; | ||
|
||
import io.cloudevents.SpecVersion; | ||
import io.cloudevents.core.data.BytesCloudEventData; | ||
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl; | ||
import io.cloudevents.core.v1.CloudEventV1; | ||
|
||
import java.util.function.BiConsumer; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
||
/** | ||
* Enable the hydration of a CloudEvent in binary mode from an MQTT message. | ||
* <p> | ||
* This abstract class provides common behavior across different MQTT | ||
* client implementations. | ||
*/ | ||
public abstract class BaseMqttBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> { | ||
|
||
/** | ||
* CloudEvent attribute names must match this pattern. | ||
*/ | ||
private static final Pattern CE_ATTR_NAME_REGEX = Pattern.compile("^[a-z\\d]+$"); | ||
private final String contentType; | ||
|
||
/** | ||
* Initialise the binary message reader. | ||
* @param version The CloudEvent message version. | ||
* @param contentType The assigned media content type. | ||
* @param payload The raw data payload from the MQTT message. | ||
*/ | ||
protected BaseMqttBinaryMessageReader(final SpecVersion version, final String contentType, final byte[] payload) { | ||
super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null); | ||
this.contentType = contentType; | ||
} | ||
|
||
// --- Overrides | ||
|
||
@Override | ||
protected boolean isContentTypeHeader(String key) { | ||
return false; // The content type is not defined in a user-property | ||
} | ||
|
||
@Override | ||
protected boolean isCloudEventsHeader(String key) { | ||
|
||
// The binding specification does not require name prefixing, | ||
// as such any user-property is a potential CE Context Attribute. | ||
// | ||
// If the name complies with CE convention then we'll assume | ||
// it's a context attribute. | ||
// | ||
Matcher m = CE_ATTR_NAME_REGEX.matcher(key); | ||
return m.matches(); | ||
} | ||
|
||
@Override | ||
protected String toCloudEventsKey(String key) { | ||
return key; // No special prefixing occurs in the MQTT binding spec. | ||
} | ||
|
||
|
||
@Override | ||
protected void forEachHeader(BiConsumer<String, Object> fn) { | ||
|
||
// If there is a content-type then we need set it. | ||
// Inspired by AMQP/Proton code :-) | ||
|
||
if (contentType != null) { | ||
fn.accept(CloudEventV1.DATACONTENTTYPE, contentType); | ||
} | ||
|
||
// Now process each MQTT User Property. | ||
forEachUserProperty(fn); | ||
|
||
} | ||
|
||
@Override | ||
protected String toCloudEventsValue(Object value) { | ||
return value.toString(); | ||
} | ||
|
||
/** | ||
* Visit each MQTT user-property and invoke the supplied function. | ||
* @param fn The function to invoke for each MQTT User property. | ||
*/ | ||
protected abstract void forEachUserProperty(BiConsumer<String, Object> fn); | ||
} |
35 changes: 35 additions & 0 deletions
35
bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package io.cloudevents.mqtt.core; | ||
|
||
import io.cloudevents.core.format.EventFormat; | ||
import io.cloudevents.core.provider.EventFormatProvider; | ||
|
||
/** | ||
* General MQTT Utilities and Helpers | ||
*/ | ||
public class MqttUtils { | ||
|
||
private MqttUtils() {} | ||
|
||
private static final String DEFAULT_FORMAT = "application/cloudevents+json"; | ||
|
||
/** | ||
* Obtain the {@link EventFormat} to use when working with MQTT V3 | ||
* messages. | ||
* | ||
* @return An event format. | ||
*/ | ||
public static EventFormat getDefaultEventFormat () { | ||
|
||
return EventFormatProvider.getInstance().resolveFormat(DEFAULT_FORMAT); | ||
|
||
} | ||
|
||
/** | ||
* Get the default content type to assume for MQTT messages. | ||
* @return A Content-Type | ||
*/ | ||
public static final String getDefaultContentType() { | ||
return DEFAULT_FORMAT; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>cloudevents-parent</artifactId> | ||
<groupId>io.cloudevents</groupId> | ||
<version>2.5.0-SNAPSHOT</version> | ||
<relativePath>../../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>cloudevents-mqtt-hivemq</artifactId> | ||
<name>CloudEvents - MQTT HiveMQ Binding</name> | ||
<packaging>jar</packaging> | ||
|
||
<properties> | ||
<module-name>io.cloudevents.mqtt.hivemq</module-name> | ||
<hivemq.version>1.3.0</hivemq.version> | ||
</properties> | ||
|
||
<dependencies> | ||
|
||
<dependency> | ||
<groupId>io.cloudevents</groupId> | ||
<artifactId>cloudevents-core</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>io.cloudevents</groupId> | ||
<artifactId>cloudevents-mqtt-core</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.hivemq</groupId> | ||
<artifactId>hivemq-mqtt-client</artifactId> | ||
<version>${hivemq.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<!-- Testing Dependencies --> | ||
|
||
<dependency> | ||
<groupId>io.cloudevents</groupId> | ||
<artifactId>cloudevents-core</artifactId> | ||
<classifier>tests</classifier> | ||
<type>test-jar</type> | ||
<version>${project.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<!-- We need a JSON Format for V3 compliance checking --> | ||
|
||
<dependency> | ||
<groupId>io.cloudevents</groupId> | ||
<artifactId>cloudevents-json-jackson</artifactId> | ||
<version>${project.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.assertj</groupId> | ||
<artifactId>assertj-core</artifactId> | ||
<version>${assertj-core.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter</artifactId> | ||
<version>${junit-jupiter.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
</dependencies> | ||
|
||
</project> |
33 changes: 33 additions & 0 deletions
33
bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package io.cloudevents.mqtt.hivemq; | ||
|
||
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; | ||
import io.cloudevents.SpecVersion; | ||
import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader; | ||
|
||
import java.util.function.BiConsumer; | ||
|
||
final class BinaryMessageReader extends BaseMqttBinaryMessageReader { | ||
|
||
Mqtt5Publish message; | ||
|
||
BinaryMessageReader(final SpecVersion version, final String contentType, Mqtt5Publish message) { | ||
super(version, contentType, message.getPayloadAsBytes()); | ||
|
||
this.message = message; | ||
} | ||
|
||
@Override | ||
protected void forEachUserProperty(BiConsumer<String, Object> fn) { | ||
|
||
message.getUserProperties().asList().forEach(up -> { | ||
|
||
final String key = up.getName().toString(); | ||
final String val = up.getValue().toString(); | ||
|
||
if (key != null && val != null) { | ||
fn.accept(key, val); | ||
} | ||
}); | ||
|
||
} | ||
} |
105 changes: 105 additions & 0 deletions
105
bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package io.cloudevents.mqtt.hivemq; | ||
|
||
import com.hivemq.client.mqtt.datatypes.MqttUtf8String; | ||
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; | ||
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; | ||
import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; | ||
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; | ||
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; | ||
import io.cloudevents.core.message.MessageReader; | ||
import io.cloudevents.core.message.MessageWriter; | ||
import io.cloudevents.core.message.impl.GenericStructuredMessageReader; | ||
import io.cloudevents.core.message.impl.MessageUtils; | ||
import io.cloudevents.core.v1.CloudEventV1; | ||
import io.cloudevents.mqtt.core.MqttUtils; | ||
|
||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
/** | ||
* A factory to obtain: | ||
* - {@link MessageReader} instances to read CloudEvents from MQTT messages. | ||
* - {@link MessageWriter} instances to write CloudEvents into MQTT messages. | ||
* | ||
*/ | ||
public class MqttMessageFactory { | ||
|
||
// Prevent Instantiation. | ||
private MqttMessageFactory() { | ||
} | ||
|
||
/** | ||
* Create a {@link MessageReader} for an MQTT V3 message. | ||
* <p> | ||
* As-Per MQTT Binding specification this only supports | ||
* a structured JSON Format message. | ||
* | ||
* @param message An MQTT V3 message. | ||
* @return MessageReader. | ||
*/ | ||
public static MessageReader createReader(Mqtt3Publish message) { | ||
return new GenericStructuredMessageReader(MqttUtils.getDefaultEventFormat(), message.getPayloadAsBytes()); | ||
} | ||
|
||
/** | ||
* Create a {@link MessageReader} for an MQTT V5 message | ||
* | ||
* @param message An MQTT V5 message. | ||
* @return A message reader. | ||
*/ | ||
public static MessageReader createReader(Mqtt5Publish message) { | ||
|
||
Optional<MqttUtf8String> cType = message.getContentType(); | ||
|
||
String contentType = cType.isPresent() ? cType.get().toString() : null; | ||
|
||
return MessageUtils.parseStructuredOrBinaryMessage( | ||
() -> contentType, | ||
format -> new GenericStructuredMessageReader(format, message.getPayloadAsBytes()), | ||
() -> getSpecVersion(message), | ||
sv -> new BinaryMessageReader(sv, contentType, message) | ||
); | ||
} | ||
|
||
|
||
/** | ||
* Create a {@link MessageWriter} for an MQTT V5 Message. | ||
* | ||
* @param builder {@link Mqtt5PublishBuilder.Complete} | ||
* @return A message writer. | ||
*/ | ||
public static MessageWriter createWriter(Mqtt5PublishBuilder.Complete builder) { | ||
return new V5MessageWriter(builder); | ||
} | ||
|
||
/** | ||
* Create a {@link MessageWriter} for an MQTT V3 Message. | ||
* | ||
* Only supports structured messages. | ||
* | ||
* @param builder {@link Mqtt3PublishBuilder.Complete} | ||
* @return A message writer. | ||
*/ | ||
public static MessageWriter createWriter(Mqtt3PublishBuilder.Complete builder) { | ||
return new V3MessageWriter(builder); | ||
} | ||
|
||
|
||
// -- Private functions | ||
|
||
/** | ||
* Find the value of the CloudEvent 'specversion' in the MQTT V5 User Properties. | ||
* @param message An MQTT message. | ||
* @return spec version attribute content. | ||
*/ | ||
private static String getSpecVersion(Mqtt5Publish message) { | ||
|
||
List<Mqtt5UserProperty> props = (List<Mqtt5UserProperty>) message.getUserProperties().asList(); | ||
|
||
Optional<Mqtt5UserProperty> up = props.stream().filter(p -> p.getName().toString().equals(CloudEventV1.SPECVERSION)).findFirst(); | ||
|
||
return (up.isPresent()) ? up.get().getValue().toString() : null; | ||
|
||
} | ||
|
||
} |
Oops, something went wrong.