diff --git a/bindings/mqtt/core/pom.xml b/bindings/mqtt/core/pom.xml new file mode 100644 index 000000000..0fd493ac6 --- /dev/null +++ b/bindings/mqtt/core/pom.xml @@ -0,0 +1,31 @@ + + + + cloudevents-parent + io.cloudevents + 2.5.0-SNAPSHOT + ../../../pom.xml + + 4.0.0 + + cloudevents-mqtt-core + CloudEvents - MQTT Common + jar + + + io.cloudevents.mqtt.core + + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + + diff --git a/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java new file mode 100644 index 000000000..86f3c590f --- /dev/null +++ b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/BaseMqttBinaryMessageReader.java @@ -0,0 +1,106 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.core; + +import io.cloudevents.SpecVersion; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl; +import io.cloudevents.core.v1.CloudEventV1; + +import java.util.function.BiConsumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Enable the hydration of a CloudEvent in binary mode from an MQTT message. + *

+ * This abstract class provides common behavior across different MQTT + * client implementations. + */ +public abstract class BaseMqttBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl { + + /** + * CloudEvent attribute names must match this pattern. + */ + private static final Pattern CE_ATTR_NAME_REGEX = Pattern.compile("^[a-z\\d]+$"); + private final String contentType; + + /** + * Initialise the binary message reader. + * + * @param version The CloudEvent message version. + * @param contentType The assigned media content type. + * @param payload The raw data payload from the MQTT message. + */ + protected BaseMqttBinaryMessageReader(final SpecVersion version, final String contentType, final byte[] payload) { + super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null); + this.contentType = contentType; + } + + // --- Overrides + + @Override + protected boolean isContentTypeHeader(String key) { + return false; // The content type is not defined in a user-property + } + + @Override + protected boolean isCloudEventsHeader(String key) { + + // The binding specification does not require name prefixing, + // as such any user-property is a potential CE Context Attribute. + // + // If the name complies with CE convention then we'll assume + // it's a context attribute. + // + Matcher m = CE_ATTR_NAME_REGEX.matcher(key); + return m.matches(); + } + + @Override + protected String toCloudEventsKey(String key) { + return key; // No special prefixing occurs in the MQTT binding spec. + } + + + @Override + protected void forEachHeader(BiConsumer fn) { + + // If there is a content-type then we need set it. + // Inspired by AMQP/Proton code :-) + + if (contentType != null) { + fn.accept(CloudEventV1.DATACONTENTTYPE, contentType); + } + + // Now process each MQTT User Property. + forEachUserProperty(fn); + + } + + @Override + protected String toCloudEventsValue(Object value) { + return value.toString(); + } + + /** + * Visit each MQTT user-property and invoke the supplied function. + * + * @param fn The function to invoke for each MQTT User property. + */ + protected abstract void forEachUserProperty(BiConsumer fn); +} diff --git a/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java new file mode 100644 index 000000000..5953c0c4d --- /dev/null +++ b/bindings/mqtt/core/src/main/java/io/cloudevents/mqtt/core/MqttUtils.java @@ -0,0 +1,53 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.core; + +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.provider.EventFormatProvider; + +/** + * General MQTT Utilities and Helpers + */ +public class MqttUtils { + + private static final String DEFAULT_FORMAT = "application/cloudevents+json"; + + private MqttUtils() { + } + + /** + * Obtain the {@link EventFormat} to use when working with MQTT V3 + * messages. + * + * @return An event format. + */ + public static EventFormat getDefaultEventFormat() { + + return EventFormatProvider.getInstance().resolveFormat(DEFAULT_FORMAT); + + } + + /** + * Get the default content type to assume for MQTT messages. + * + * @return A Content-Type + */ + public static final String getDefaultContentType() { + return DEFAULT_FORMAT; + } + +} diff --git a/bindings/mqtt/hivemq/pom.xml b/bindings/mqtt/hivemq/pom.xml new file mode 100644 index 000000000..e3106fca3 --- /dev/null +++ b/bindings/mqtt/hivemq/pom.xml @@ -0,0 +1,80 @@ + + + + cloudevents-parent + io.cloudevents + 2.5.0-SNAPSHOT + ../../../pom.xml + + + 4.0.0 + + cloudevents-mqtt-hivemq + CloudEvents - MQTT HiveMQ Binding + jar + + + io.cloudevents.mqtt.hivemq + 1.3.0 + + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + io.cloudevents + cloudevents-mqtt-core + ${project.version} + + + + com.hivemq + hivemq-mqtt-client + ${hivemq.version} + provided + + + + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + + + + io.cloudevents + cloudevents-json-jackson + ${project.version} + test + + + + org.assertj + assertj-core + ${assertj-core.version} + test + + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + + + diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java new file mode 100644 index 000000000..4b689dc0e --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/BinaryMessageReader.java @@ -0,0 +1,49 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import io.cloudevents.SpecVersion; +import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader; + +import java.util.function.BiConsumer; + +final class BinaryMessageReader extends BaseMqttBinaryMessageReader { + + Mqtt5Publish message; + + BinaryMessageReader(final SpecVersion version, final String contentType, Mqtt5Publish message) { + super(version, contentType, message.getPayloadAsBytes()); + + this.message = message; + } + + @Override + protected void forEachUserProperty(BiConsumer fn) { + + message.getUserProperties().asList().forEach(up -> { + + final String key = up.getName().toString(); + final String val = up.getValue().toString(); + + if (key != null && val != null) { + fn.accept(key, val); + } + }); + + } +} diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java new file mode 100644 index 000000000..7a850d26f --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/MqttMessageFactory.java @@ -0,0 +1,121 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.datatypes.MqttUtf8String; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.message.impl.GenericStructuredMessageReader; +import io.cloudevents.core.message.impl.MessageUtils; +import io.cloudevents.core.v1.CloudEventV1; +import io.cloudevents.mqtt.core.MqttUtils; + +import java.util.List; +import java.util.Optional; + +/** + * A factory to obtain: + * - {@link MessageReader} instances to read CloudEvents from MQTT messages. + * - {@link MessageWriter} instances to write CloudEvents into MQTT messages. + */ +public class MqttMessageFactory { + + // Prevent Instantiation. + private MqttMessageFactory() { + } + + /** + * Create a {@link MessageReader} for an MQTT V3 message. + *

+ * As-Per MQTT Binding specification this only supports + * a structured JSON Format message. + * + * @param message An MQTT V3 message. + * @return MessageReader. + */ + public static MessageReader createReader(Mqtt3Publish message) { + return new GenericStructuredMessageReader(MqttUtils.getDefaultEventFormat(), message.getPayloadAsBytes()); + } + + /** + * Create a {@link MessageReader} for an MQTT V5 message + * + * @param message An MQTT V5 message. + * @return A message reader. + */ + public static MessageReader createReader(Mqtt5Publish message) { + + Optional cType = message.getContentType(); + + String contentType = cType.isPresent() ? cType.get().toString() : null; + + return MessageUtils.parseStructuredOrBinaryMessage( + () -> contentType, + format -> new GenericStructuredMessageReader(format, message.getPayloadAsBytes()), + () -> getSpecVersion(message), + sv -> new BinaryMessageReader(sv, contentType, message) + ); + } + + + /** + * Create a {@link MessageWriter} for an MQTT V5 Message. + * + * @param builder {@link Mqtt5PublishBuilder.Complete} + * @return A message writer. + */ + public static MessageWriter createWriter(Mqtt5PublishBuilder.Complete builder) { + return new V5MessageWriter(builder); + } + + /** + * Create a {@link MessageWriter} for an MQTT V3 Message. + *

+ * Only supports structured messages. + * + * @param builder {@link Mqtt3PublishBuilder.Complete} + * @return A message writer. + */ + public static MessageWriter createWriter(Mqtt3PublishBuilder.Complete builder) { + return new V3MessageWriter(builder); + } + + + // -- Private functions + + /** + * Find the value of the CloudEvent 'specversion' in the MQTT V5 User Properties. + * + * @param message An MQTT message. + * @return spec version attribute content. + */ + private static String getSpecVersion(Mqtt5Publish message) { + + List props = (List) message.getUserProperties().asList(); + + Optional up = props.stream().filter(p -> p.getName().toString().equals(CloudEventV1.SPECVERSION)).findFirst(); + + return (up.isPresent()) ? up.get().getValue().toString() : null; + + } + +} diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java new file mode 100644 index 000000000..62a5ecf49 --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V3MessageWriter.java @@ -0,0 +1,72 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; + +class V3MessageWriter implements MessageWriter, Mqtt3PublishBuilder> { + + Mqtt3PublishBuilder.Complete builder; + + V3MessageWriter(Mqtt3PublishBuilder.Complete builder) { + this.builder = builder; + } + + @Override + public CloudEventWriter create(SpecVersion version) throws CloudEventRWException { + // No-Op + throw CloudEventRWException.newOther("Internal Error"); + } + + @Override + public Mqtt3PublishBuilder setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + // No-Op + throw CloudEventRWException.newOther("Internal Error"); + } + + @Override + public Mqtt3PublishBuilder writeStructured(CloudEvent event, String format) { + final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(format); + + if (eventFormat != null) { + return writeStructured(event, eventFormat); + } else { + throw CloudEventRWException.newOther("Unsupported Format: " + format); + } + } + + @Override + public Mqtt3PublishBuilder writeStructured(CloudEvent event, EventFormat format) { + final byte[] data = format.serialize(event); + builder.payload(data); + return builder; + } + + @Override + public Mqtt3PublishBuilder writeBinary(CloudEvent event) { + + throw CloudEventRWException.newOther("MQTT V3 Does not support CloudEvent Binary mode"); + + } +} diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java new file mode 100644 index 000000000..0bb70369b --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/V5MessageWriter.java @@ -0,0 +1,66 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.rw.CloudEventContextWriter; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; + +class V5MessageWriter implements MessageWriter, Mqtt5PublishBuilder.Complete>, CloudEventWriter { + + private final Mqtt5PublishBuilder.Complete builder; + + V5MessageWriter(Mqtt5PublishBuilder.Complete builder) { + this.builder = builder; + } + + @Override + public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { + builder.userProperties().add(name, value).applyUserProperties(); + return this; + } + + @Override + public Mqtt5PublishBuilder.Complete end(CloudEventData data) throws CloudEventRWException { + builder.payload(data.toBytes()); + return end(); + } + + @Override + public Mqtt5PublishBuilder.Complete end() throws CloudEventRWException { + return builder; + } + + + @Override + public CloudEventWriter create(SpecVersion version) throws CloudEventRWException { + withContextAttribute("specversion", version.toString()); + return this; + } + + @Override + public Mqtt5PublishBuilder.Complete setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + builder.contentType(format.serializedContentType()); + builder.payload(value); + return end(); + } +} diff --git a/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java new file mode 100644 index 000000000..f30422548 --- /dev/null +++ b/bindings/mqtt/hivemq/src/main/java/io/cloudevents/mqtt/hivemq/package-info.java @@ -0,0 +1,13 @@ +/** + * This module implements the MQTT binding specification using the + * HiveMQ MQTT client library. + *

+ * Use the {@link io.cloudevents.mqtt.hivemq.MqttMessageFactory} to obtain + * CloudEvent reader and writer instances. + *

+ * Both V3 and V5 versions of MQTT are supported. + * + * @since 2.5.0 + */ + +package io.cloudevents.mqtt.hivemq; diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java new file mode 100644 index 000000000..377a24fe4 --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/MqttMessageFactoryTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class MqttMessageFactoryTest { + + @Test + public void createV3Writer() { + } + + @Test + public void createV5Writer() { + Assertions.assertNotNull(MqttMessageFactory.createWriter((Mqtt5PublishBuilder.Complete) Mqtt5Publish.builder())); + } + + @Test + public void create3Reader() { + + Mqtt3Publish msg = Mqtt3Publish.builder().topic("test").build(); + Assertions.assertNotNull(MqttMessageFactory.createReader(msg)); + } + + @Test + public void createV5ReaderFromStructured() { + + // If the content-type is present then hopefully it's a + // cloudvent one. + + EventFormat ef = CSVFormat.INSTANCE; + + EventFormatProvider.getInstance().registerFormat(ef); + + Mqtt5Publish msg = Mqtt5Publish.builder() + .topic("test") + .contentType(ef.serializedContentType()) + .build(); + + Assertions.assertNotNull(MqttMessageFactory.createReader(msg)); + + } + + @Test + public void createV5ReaderFromBinary() { + + Mqtt5Publish msg = Mqtt5Publish.builder() + .topic("test") + .userProperties().add("specversion", "1.0").applyUserProperties() + .build(); + Assertions.assertNotNull(MqttMessageFactory.createReader(msg)); + + } +} diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java new file mode 100644 index 000000000..79897805b --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3MessageWriterTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.core.test.Data; +import io.cloudevents.rw.CloudEventRWException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class V3MessageWriterTest { + + Mqtt3PublishBuilder.Complete builder; + V3MessageWriter writer; + EventFormat csvFormat = CSVFormat.INSTANCE; + + + V3MessageWriterTest() { + + builder = (Mqtt3PublishBuilder.Complete) Mqtt3Publish.builder(); + writer = new V3MessageWriter(builder); + EventFormatProvider.getInstance().registerFormat(csvFormat); + } + + @Test + void create() { + } + + @Test + void setEvent() { + } + + @Test + void writeStructuredA() { + assertNotNull(writer.writeStructured(Data.V1_MIN, csvFormat.serializedContentType())); + } + + @Test + void testWriteStructuredB() { + assertNotNull(writer.writeStructured(Data.V1_MIN, csvFormat)); + } + + @Test + void writeBinary() { + + // This should fail + Assertions.assertThrows(CloudEventRWException.class, () -> { + writer.writeBinary(Data.V1_MIN); + }); + } +} diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java new file mode 100644 index 000000000..0227d4734 --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V3RoundTripTests.java @@ -0,0 +1,94 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.test.Data; +import io.cloudevents.jackson.JsonFormat; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +/** + * Round-Trip Tests + *

+ * - serialize a CloudEvent into an MQTT Message. + * - de-serialize the message into a new CloudEvent + * - verify that the new CE matches the original CE + */ +public class V3RoundTripTests { + + + /** + * This test set is limited owing to the fact that: + * (a) We only support JSON Format + * (b) Round-tripping of events with JSON 'data' doesn't reliably work owing to the way the equality tests work on the event. + * + * @return + */ + static Stream simpleEvents() { + return Stream.of( + Data.V03_MIN, + Data.V03_WITH_TEXT_DATA, + Data.V1_MIN, + Data.V1_WITH_TEXT_DATA, + Data.V1_WITH_XML_DATA + ); + } + + @ParameterizedTest + @MethodSource("simpleEvents") + public void roundTrip(CloudEvent ce) { + + EventFormat format = new JsonFormat(); + Assertions.assertNotNull(format); + + Mqtt3Publish message = null; + Mqtt3PublishBuilder.Complete builder = (Mqtt3PublishBuilder.Complete) Mqtt3Publish.builder(); + builder.topic("test.test.test"); + + // Write the event out as a message. + MessageWriter writer = MqttMessageFactory.createWriter(builder); + Assertions.assertNotNull(writer); + + writer.writeStructured(ce, format); + message = builder.build(); + + Assertions.assertNotNull(message); + + // Read it back and verify + + // Read the message back into an event + MessageReader reader = MqttMessageFactory.createReader(message); + Assertions.assertNotNull(reader); + + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And now ensure we got back what we wrote + Assertions.assertEquals(ce, newCE); + + } + +} diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java new file mode 100644 index 000000000..0e9cdc49c --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5MessageWriterTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.data.BytesCloudEventData; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; + +public class V5MessageWriterTest { + + private final Mqtt5PublishBuilder builder; + private final V5MessageWriter writer; + + V5MessageWriterTest() { + builder = Mqtt5Publish.builder(); + writer = new V5MessageWriter((Mqtt5PublishBuilder.Complete) builder); + builder.topic("tester"); + } + + @Test + public void testWithContextAttribute() { + + Assertions.assertNotNull(writer.withContextAttribute("test", "testing")); + + Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build(); + + ensureProperty(msg, "test", "testing"); + } + + @Test + public void testWithContextAttributes() { + + Assertions.assertNotNull(writer.withContextAttribute("test1", "testing1")); + Assertions.assertNotNull(writer.withContextAttribute("test2", "testing2")); + + Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build(); + + ensureProperty(msg, "test1", "testing1"); + ensureProperty(msg, "test2", "testing2"); + } + + @Test + public void testEnd() { + Assertions.assertNotNull(writer.end()); + } + + @Test + public void testEndWithData() { + final byte[] tData = {0x00, 0x02, 0x42}; + + Assertions.assertNotNull(writer.end(BytesCloudEventData.wrap(tData))); + + Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build(); + + Assertions.assertNotNull(msg.getPayloadAsBytes()); + Assertions.assertEquals(msg.getPayloadAsBytes().length, tData.length); + + } + + @Test + public void testCreate() { + Assertions.assertNotNull(writer.create(SpecVersion.V1)); + + Mqtt5Publish msg = ((Mqtt5PublishBuilder.Complete) builder).build(); + ensureProperty(msg, "specversion", SpecVersion.V1.toString()); + + } + + private void ensureProperty(Mqtt5Publish msg, String name, String val) { + + List props = (List) msg.getUserProperties().asList(); + + Mqtt5UserProperty prop = null; + + for (Mqtt5UserProperty up : props) { + + if (up.getName().toString().equals(name)) { + prop = up; + break; + } + } + + Assertions.assertNotNull(prop); + Assertions.assertEquals(prop.getValue().toString(), val); + + } +} diff --git a/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java new file mode 100644 index 000000000..dd78177f5 --- /dev/null +++ b/bindings/mqtt/hivemq/src/test/java/io/cloudevents/mqtt/hivemq/V5RoundTripTests.java @@ -0,0 +1,100 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.hivemq; + +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.mock.CSVFormat; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Round-Trip Tests + *

+ * For both Binary and Structured modes: + * - serialize a CloudEvent into an MQTT Message. + * - de-serialize the message into a new CloudEvent + * - verify that the new CE matches the original CE + */ +public class V5RoundTripTests { + + private static void readAndVerify(CloudEvent ce, Mqtt5Publish message) { + + Assertions.assertNotNull(message); + + // Read the message back into an event + MessageReader reader = MqttMessageFactory.createReader(message); + Assertions.assertNotNull(reader); + + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And now ensure we got back what we wrote + Assertions.assertEquals(ce, newCE); + } + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void roundTripBinary(CloudEvent ce) { + + // Write the event out as a message. + Mqtt5Publish message = null; + Mqtt5PublishBuilder.Complete builder = (Mqtt5PublishBuilder.Complete) Mqtt5Publish.builder(); + builder.topic("test.test.test"); + + + MessageWriter writer = MqttMessageFactory.createWriter(builder); + Assertions.assertNotNull(writer); + + writer.writeBinary(ce); + + message = builder.build(); + + // Read it back and verify + readAndVerify(ce, message); + } + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void roundTripStructured(CloudEvent ce) { + + EventFormat format = CSVFormat.INSTANCE; + + Mqtt5Publish message = null; + Mqtt5PublishBuilder.Complete builder = (Mqtt5PublishBuilder.Complete) Mqtt5Publish.builder(); + builder.topic("test.test.test"); + + // Write the event out as a message. + MessageWriter writer = MqttMessageFactory.createWriter(builder); + Assertions.assertNotNull(writer); + + writer.writeStructured(ce, format); + + message = builder.build(); + + // Read it back and verify + readAndVerify(ce, message); + + } + + +} diff --git a/bindings/mqtt/paho/pom.xml b/bindings/mqtt/paho/pom.xml new file mode 100644 index 000000000..7324163f8 --- /dev/null +++ b/bindings/mqtt/paho/pom.xml @@ -0,0 +1,106 @@ + + + + + 4.0.0 + + + io.cloudevents + cloudevents-parent + 2.5.0-SNAPSHOT + ../../../pom.xml + + + cloudevents-mqtt-paho + CloudEvents - MQTT Paho Binding + jar + + + io.cloudevents.mqtt.paho + 1.2.5 + 3.12.0 + + + + + + io.cloudevents + cloudevents-core + ${project.version} + + + + io.cloudevents + cloudevents-mqtt-core + ${project.version} + + + + + + org.eclipse.paho + org.eclipse.paho.mqttv5.client + ${paho.version} + provided + + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + ${paho.version} + provided + + + + + io.cloudevents + cloudevents-core + tests + test-jar + ${project.version} + test + + + + org.assertj + assertj-core + ${assertj-core.version} + test + + + + org.junit.jupiter + junit-jupiter + ${junit-jupiter.version} + test + + + + + + io.cloudevents + cloudevents-json-jackson + 2.5.0-SNAPSHOT + test + + + + + diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java new file mode 100644 index 000000000..fcbf631b0 --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/BinaryMessageReader.java @@ -0,0 +1,57 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.SpecVersion; +import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; + +final class BinaryMessageReader extends BaseMqttBinaryMessageReader { + + private final List userProperties; + + BinaryMessageReader(final SpecVersion version, final String contentType, MqttMessage message) { + super(version, contentType, message.getPayload()); + + // Sanity Check + if (message.getProperties().getUserProperties() != null) { + userProperties = message.getProperties().getUserProperties(); + } else { + userProperties = Collections.emptyList(); + } + } + + @Override + protected void forEachUserProperty(BiConsumer fn) { + + userProperties.forEach(up -> { + + final String key = up.getKey(); + final String val = up.getValue(); + + if (key != null && val != null) { + fn.accept(key, val); + } + }); + + } +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java new file mode 100644 index 000000000..098bab2d1 --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/PahoMessageUtils.java @@ -0,0 +1,70 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import java.util.List; +import java.util.Optional; + +/** + * General Utility functions + */ +final class PahoMessageUtils { + + /** + * Prevent Instantiation + */ + private PahoMessageUtils() { + } + + /** + * Get the value of a specific user property from a message. + * + * @param msg The MQTT Message + * @param name The property to retrieve. + * @return property value or NULL if not set. + */ + static String getUserProperty(final MqttMessage msg, final String name) { + + final MqttProperties mProps = msg.getProperties(); + + return (mProps == null) ? null : getUserProperty(mProps.getUserProperties(), name); + + } + + /** + * Get the value of a specific user property from a message. + * + * @param props The List of MQTT Message properties + * @param name The property to retrieve. + * @return property value or NULL if not set. + */ + public static String getUserProperty(final List props, final String name) { + + if (props == null) { + return null; + } else { + + Optional up = props.stream().filter(p -> p.getKey().equals(name)).findFirst(); + + return up.map(UserProperty::getValue).orElse(null); + } + } +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java new file mode 100644 index 000000000..844e7ca68 --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MessageWriter.java @@ -0,0 +1,101 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.provider.EventFormatProvider; +import io.cloudevents.mqtt.core.MqttUtils; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * A {@link MessageWriter} that writes an CloudEvent to a V3 MQTT Message. + *

+ * Note: This only supports Structured messages in JSON format as defined + * by the MQTT CloudEvent binding specification. + */ +class V3MessageWriter implements MessageWriter, MqttMessage> { + + private final MqttMessage message; + + V3MessageWriter() { + message = new MqttMessage(); + } + + /** + * Ensure the supplied content type is appropriate for V3 messages + * as-per binding specification. + *

+ * Raises exception if not valid. + * + * @param contentType + */ + private void ensureValidContent(String contentType) { + + if (!MqttUtils.getDefaultContentType().equals(contentType)) { + + throw CloudEventRWException.newOther("MQTT V3 Does not support contentType: " + contentType); + + } + } + + @Override + public MqttMessage writeStructured(CloudEvent event, String format) { + + final EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(format); + + // Sanity Check + if (eventFormat == null) { + + } + + return writeStructured(event, eventFormat); + } + + @Override + public MqttMessage writeStructured(CloudEvent event, EventFormat format) { + // Ensure format is valid + ensureValidContent(format.serializedContentType()); + // Populate the structured format. + message.setPayload(format.serialize(event)); + // Done. + return message; + } + + @Override + public MqttMessage writeBinary(CloudEvent event) { + // This operation is not allowed. + // This should fail + throw CloudEventRWException.newOther("MQTT V3 Does not support CloudEvent Binary mode"); + } + + @Override + public CloudEventWriter create(SpecVersion version) throws CloudEventRWException { + return null; + } + + @Override + public MqttMessage setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + ensureValidContent(format.serializedContentType()); + message.setPayload(value); + return message; + } +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java new file mode 100644 index 000000000..b4474b95b --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V3MqttMessageFactory.java @@ -0,0 +1,64 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.message.impl.GenericStructuredMessageReader; +import io.cloudevents.mqtt.core.MqttUtils; +import io.cloudevents.rw.CloudEventWriter; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * MQTT V3 factory to : + * - Obtain a {@link MessageReader} to read CloudEvents from MQTT messages. + * - Create a {@link MessageWriter} enabling CloudEVents to be written to an MQTT message. + *

+ * NOTE: The V3 binding only supports structured messages using a JSON Format. + */ + +public final class V3MqttMessageFactory { + + /** + * Prevent instantiation. + */ + private V3MqttMessageFactory() { + + } + + /** + * Create a {@link MessageReader} to read a V3 MQTT Messages as a CloudEVents + * + * @param mqttMessage An MQTT Message. + * @return {@link MessageReader} + */ + public static MessageReader createReader(MqttMessage mqttMessage) { + return new GenericStructuredMessageReader(MqttUtils.getDefaultEventFormat(), mqttMessage.getPayload()); + } + + /** + * Creates a {@link MessageWriter} to write a CloudEvent to an MQTT {@link MqttMessage}. + *

+ * NOTE: This implementation *only* supports JSON structured format as-per the MQTT binding specification. + * + * @return A {@link MessageWriter} to write a {@link io.cloudevents.CloudEvent} to MQTT. + */ + public static MessageWriter, MqttMessage> createWriter() { + return new V3MessageWriter(); + } + +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java new file mode 100644 index 000000000..ec10170b6 --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MessageWriter.java @@ -0,0 +1,80 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.rw.CloudEventContextWriter; +import io.cloudevents.rw.CloudEventRWException; +import io.cloudevents.rw.CloudEventWriter; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; + +import java.util.ArrayList; +import java.util.List; + +class V5MessageWriter implements MessageWriter, MqttMessage>, CloudEventWriter { + + private final List userProperties; + private final MqttMessage message; + + V5MessageWriter() { + userProperties = new ArrayList<>(10); + message = new MqttMessage(); + message.setProperties(new MqttProperties()); + } + + // -- Implementation Overrides + + @Override + public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { + final UserProperty up = new UserProperty(name, value); + userProperties.add(up); + + return this; + } + + @Override + public MqttMessage end(CloudEventData data) throws CloudEventRWException { + message.setPayload(data.toBytes()); + return end(); + } + + @Override + public MqttMessage end() throws CloudEventRWException { + if (userProperties.size() != 0) { + message.getProperties().setUserProperties(userProperties); + } + return message; + } + + @Override + public CloudEventWriter create(SpecVersion version) throws CloudEventRWException { + userProperties.add(new UserProperty("specversion", version.toString())); + return this; + } + + @Override + public MqttMessage setEvent(EventFormat format, byte[] value) throws CloudEventRWException { + message.getProperties().setContentType(format.serializedContentType()); + message.setPayload(value); + return end(); + } +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java new file mode 100644 index 000000000..20d7141f1 --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/V5MqttMessageFactory.java @@ -0,0 +1,70 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.message.impl.GenericStructuredMessageReader; +import io.cloudevents.core.message.impl.MessageUtils; +import io.cloudevents.core.v1.CloudEventV1; +import io.cloudevents.rw.CloudEventWriter; +import org.eclipse.paho.mqttv5.common.MqttMessage; + +/** + * MQTT V5 factory to : + * - Obtain a {@link MessageReader} to read CloudEvents from MQTT messages. + * - Create a {@link MessageWriter} enabling CloudEVents to be written to an MQTT message. + */ + +public final class V5MqttMessageFactory { + + /** + * Prevent instantiation. + */ + private V5MqttMessageFactory() { + + } + + /** + * Create a {@link MessageReader} to read MQTT Messages as CloudEVents + * + * @param mqttMessage An MQTT Message. + * @return {@link MessageReader} + */ + public static MessageReader createReader(MqttMessage mqttMessage) { + + final String contentType = mqttMessage.getProperties().getContentType(); + + return MessageUtils.parseStructuredOrBinaryMessage( + () -> contentType, + format -> new GenericStructuredMessageReader(format, mqttMessage.getPayload()), + () -> PahoMessageUtils.getUserProperty(mqttMessage, CloudEventV1.SPECVERSION), + sv -> new BinaryMessageReader(sv, contentType, mqttMessage) + ); + } + + /** + * Creates a {@link MessageWriter} capable of translating both a structured and binary CloudEvent + * to an MQTT {@link MqttMessage} + * + * @return A {@link MessageWriter} to write a {@link io.cloudevents.CloudEvent} to MQTT using structured or binary encoding. + */ + public static MessageWriter, MqttMessage> createWriter() { + return new V5MessageWriter<>(); + } + +} diff --git a/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java new file mode 100644 index 000000000..13cfd8764 --- /dev/null +++ b/bindings/mqtt/paho/src/main/java/io/cloudevents/mqtt/paho/package-info.java @@ -0,0 +1,10 @@ +/** + * This module implements the MQTT binding specification using the + * Paho MQTT client library. + *

+ * Separate factories are provided for MQTT V3 and V5. + * + * @since 2.5.0 + */ + +package io.cloudevents.mqtt.paho; diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java new file mode 100644 index 000000000..a7661c306 --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/PahoMessageUtilsTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +public class PahoMessageUtilsTest { + + @Test + void verifyPropertyList() { + + List props = new ArrayList<>(5); + + // Ensure Works with null List + Assertions.assertNull(PahoMessageUtils.getUserProperty(props, "id")); + + // Ensure works with empty list. + Assertions.assertNull(PahoMessageUtils.getUserProperty(props, "id")); + + // Create some props + props = new ArrayList<>(5); + props.add(new UserProperty("id", "aaa-bbb-ccc")); + props.add(new UserProperty("specversion", "v1.0")); + + // Ensure Presence + Assertions.assertEquals("aaa-bbb-ccc", PahoMessageUtils.getUserProperty(props, "id")); + + // Ensure Absence + Assertions.assertNull(PahoMessageUtils.getUserProperty(props, "scoobydoo")); + + } + + @Test + void verifyMessageProperties() { + + MqttMessage msg = new MqttMessage(); + + // Verify message with no props + Assertions.assertNull(PahoMessageUtils.getUserProperty(msg, "id")); + + // Create some props + List props = null; + props = new ArrayList<>(5); + props.add(new UserProperty("id", "aaa-bbb-ccc")); + props.add(new UserProperty("specversion", "v1.0")); + + msg.setProperties(new MqttProperties()); + msg.getProperties().setUserProperties(props); + + // Ensure Presence + Assertions.assertEquals("aaa-bbb-ccc", PahoMessageUtils.getUserProperty(msg, "id")); + + // Ensure Absence + Assertions.assertNull(PahoMessageUtils.getUserProperty(msg, "scoobydoo")); + + } +} diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java new file mode 100644 index 000000000..885296dd7 --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3MessageFactoryTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventDeserializationException; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.test.Data; +import io.cloudevents.rw.CloudEventRWException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +public class V3MessageFactoryTest { + + @Test + public void ensureSerializationFormat() { + + MqttMessage message = null; + + // This should fail as we don't support CSV Format + + MessageWriter writer = V3MqttMessageFactory.createWriter(); + Assertions.assertNotNull(writer); + + // Expect an exception + + Assertions.assertThrows(CloudEventRWException.class, () -> { + writer.writeStructured(Data.V1_MIN, CSVFormat.INSTANCE); + }); + } + + + @ParameterizedTest() + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void ensureDeserialization(CloudEvent ce) { + + + final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"; + final byte[] contentPayload = CSVFormat.INSTANCE.serialize(ce); + + // Build the MQTT Message + + MqttMessage m = new MqttMessage(); + m.setPayload(contentPayload); + + // Get a reader + MessageReader reader = V3MqttMessageFactory.createReader(m); + Assertions.assertNotNull(reader); + + // This should fail + // Expect an exception + + Assertions.assertThrows(EventDeserializationException.class, () -> { + reader.toEvent(); + }); + + } +} diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java new file mode 100644 index 000000000..6be040f61 --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V3RoundTripTests.java @@ -0,0 +1,87 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.test.Data; +import io.cloudevents.jackson.JsonFormat; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +/** + * Round-Trip Tests + *

+ * - serialize a CloudEvent into an MQTT Message. + * - de-serialize the message into a new CloudEvent + * - verify that the new CE matches the original CE + */ +public class V3RoundTripTests { + + + /** + * This test set is limited owing to the the fact that: + * (a) We only support JSON Format + * (b) Round-tripping of events with JSON 'data' doesn't reliably work owing to the way the equality tests work on the event. + * + * @return + */ + static Stream simpleEvents() { + return Stream.of( + Data.V03_MIN, + Data.V03_WITH_TEXT_DATA, + Data.V1_MIN, + Data.V1_WITH_TEXT_DATA, + Data.V1_WITH_XML_DATA + ); + } + + @ParameterizedTest + @MethodSource("simpleEvents") + public void roundTrip(CloudEvent ce) { + + EventFormat format = new JsonFormat(); + Assertions.assertNotNull(format); + + // Write the event out as a message. + MessageWriter writer = V3MqttMessageFactory.createWriter(); + Assertions.assertNotNull(writer); + + MqttMessage message = (MqttMessage) writer.writeStructured(ce, format); + Assertions.assertNotNull(message); + + // Read it back and verify + + // Read the message back into an event + MessageReader reader = V3MqttMessageFactory.createReader(message); + Assertions.assertNotNull(reader); + + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And now ensure we got back what we wrote + Assertions.assertEquals(ce, newCE); + + } + +} diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java new file mode 100644 index 000000000..dc3402c04 --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5MessageFactoryTest.java @@ -0,0 +1,162 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.message.Encoding; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.mock.CSVFormat; +import io.cloudevents.core.test.Data; +import io.cloudevents.core.v03.CloudEventV03; +import io.cloudevents.types.Time; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.eclipse.paho.mqttv5.common.packet.UserProperty; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +public class V5MessageFactoryTest { + + private static final String DATACONTENTTYPE_NULL = null; + private static final byte[] DATAPAYLOAD_NULL = null; + + private static Stream binaryTestArguments() { + + return Stream.of( + // V03 + Arguments.of( + properties( + property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()), + property(CloudEventV03.ID, Data.ID), + property(CloudEventV03.TYPE, Data.TYPE), + property(CloudEventV03.SOURCE, Data.SOURCE.toString()) + ), + DATACONTENTTYPE_NULL, + DATAPAYLOAD_NULL, + Data.V03_MIN + ), + Arguments.of( + properties( + property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()), + property(CloudEventV03.ID, Data.ID), + property(CloudEventV03.TYPE, Data.TYPE), + property(CloudEventV03.SOURCE, Data.SOURCE.toString()), + property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()), + property(CloudEventV03.SUBJECT, Data.SUBJECT), + property(CloudEventV03.TIME, Time.writeTime(Data.TIME)) + ), + Data.DATACONTENTTYPE_JSON, + Data.DATA_JSON_SERIALIZED, + Data.V03_WITH_JSON_DATA + ) + ); + } + + private static UserProperty property(String key, String val) { + return new UserProperty(key, val); + } + + private static List properties(final UserProperty... props) { + return Stream.of(props).collect(Collectors.toList()); + } + + @Test + public void testWriteBinary() { + + final MqttMessage message = V5MqttMessageFactory.createWriter().writeBinary(Data.V1_MIN); + Assertions.assertNotNull(message); + } + + // Test Data + + @Test + public void testWriteStructured() { + final MqttMessage message = V5MqttMessageFactory.createWriter().writeStructured(Data.V1_MIN, CSVFormat.INSTANCE); + Assertions.assertNotNull(message); + } + + @ParameterizedTest() + @MethodSource("binaryTestArguments") + public void testReadBinary(List userProps, String contentType, byte[] data, CloudEvent ce) { + MqttMessage msg = new MqttMessage(); + + // Populate Properties + MqttProperties props = new MqttProperties(); + props.setUserProperties(userProps); + msg.setProperties(props); + + // Populate payload & contentType + if (data != null) { + msg.setPayload(data); + } + + if (contentType != null) { + msg.getProperties().setContentType(contentType); + } + + MessageReader reader = V5MqttMessageFactory.createReader(msg); + + Assertions.assertNotNull(reader); + assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY); + + CloudEvent newCe = reader.toEvent(); + + assertThat(newCe).isEqualTo(ce); + + } + + @ParameterizedTest() + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void testReadStructured(CloudEvent ce) { + + + final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8"; + final byte[] contentPayload = CSVFormat.INSTANCE.serialize(ce); + + // Build the MQTT Message + + MqttMessage m = new MqttMessage(); + + MqttProperties props = new MqttProperties(); + props.setContentType(contentType); + m.setProperties(props); + m.setPayload(contentPayload); + + // Get a reader + MessageReader reader = V5MqttMessageFactory.createReader(m); + Assertions.assertNotNull(reader); + assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED); + + // Re-Hydrate the CloudEvent + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And hopefully they match + assertThat(newCE).isEqualTo(ce); + + } +} diff --git a/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java new file mode 100644 index 000000000..f692283a1 --- /dev/null +++ b/bindings/mqtt/paho/src/test/java/io/cloudevents/mqtt/paho/V5RoundTripTests.java @@ -0,0 +1,86 @@ +/* + * Copyright 2018-Present The CloudEvents Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.cloudevents.mqtt.paho; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.format.EventFormat; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.core.message.MessageWriter; +import io.cloudevents.core.mock.CSVFormat; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Round-Trip Tests + *

+ * For both Binary and Structured modes: + * - serialize a CloudEvent into an MQTT Message. + * - de-serialize the message into a new CloudEvent + * - verify that the new CE matches the original CE + */ +public class V5RoundTripTests { + + private static void readAndVerify(CloudEvent ce, MqttMessage message) { + + Assertions.assertNotNull(message); + + // Read the message back into an event + MessageReader reader = V5MqttMessageFactory.createReader(message); + Assertions.assertNotNull(reader); + + CloudEvent newCE = reader.toEvent(); + Assertions.assertNotNull(newCE); + + // And now ensure we got back what we wrote + Assertions.assertEquals(ce, newCE); + } + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void roundTripBinary(CloudEvent ce) { + + // Write the event out as a message. + MessageWriter writer = V5MqttMessageFactory.createWriter(); + Assertions.assertNotNull(writer); + + MqttMessage message = (MqttMessage) writer.writeBinary(ce); + + // Read it back and verify + readAndVerify(ce, message); + } + + @ParameterizedTest + @MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions") + public void roundTripStructured(CloudEvent ce) { + + EventFormat format = CSVFormat.INSTANCE; + + // Write the event out as a message. + MessageWriter writer = V5MqttMessageFactory.createWriter(); + Assertions.assertNotNull(writer); + + MqttMessage message = (MqttMessage) writer.writeStructured(ce, format); + + // Read it back and verify + readAndVerify(ce, message); + + } + + +} diff --git a/docs/index.md b/docs/index.md index d8ec2ebe9..8078c4e3d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -27,23 +27,26 @@ Using the Java SDK you can: ## Supported features | | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) | -| :------------------------------------------------: | :---------------------------------------------------: | :---------------------------------------------------: | +|:--------------------------------------------------:|:-----------------------------------------------------:|:-----------------------------------------------------:| | CloudEvents Core | :heavy_check_mark: | :heavy_check_mark: | | AMQP Protocol Binding | :x: | :x: | | - [Proton](amqp-proton.md) | :heavy_check_mark: | :heavy_check_mark: | + | MQTT Protocol Binding | :x: | :x: | + | - [Paho](mqtt.md) | :heavy_check_mark: | :heavy_check_mark: | + | - [HiveMQ](mqtt.md) | :heavy_check_mark: | :heavy_check_mark: | | AVRO Event Format | :x: | :x: | | HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: | | - [Vert.x](http-vertx.md) | :heavy_check_mark: | :heavy_check_mark: | | - [Jakarta Restful WS](http-jakarta-restful-ws.md) | :heavy_check_mark: | :heavy_check_mark: | | - [Basic](http-basic.md) | :heavy_check_mark: | :heavy_check_mark: | | - [Spring](spring.md) | :heavy_check_mark: | :heavy_check_mark: | -| - [http4k][http4k] | :heavy_check_mark: | :heavy_check_mark: | +| - [http4k][http4k] | :heavy_check_mark: | :heavy_check_mark: | | JSON Event Format | :heavy_check_mark: | :heavy_check_mark: | | - [Jackson](json-jackson.md) | :heavy_check_mark: | :heavy_check_mark: | -| Protobuf Event Format | :heavy_check_mark: | :heavy_check_mark: | -| - [Proto](protobuf.md) | :heavy_check_mark: | :heavy_check_mark: | -| XML Event Format | :heavy_check_mark: | :heavy_check_mark: | -| - [XML](xml.md) | :heavy_check_mark: | :heavy_check_mark: | +| Protobuf Event Format | :heavy_check_mark: | :heavy_check_mark: | +| - [Proto](protobuf.md) | :heavy_check_mark: | :heavy_check_mark: | +| XML Event Format | :heavy_check_mark: | :heavy_check_mark: | +| - [XML](xml.md) | :heavy_check_mark: | :heavy_check_mark: | | [Kafka Protocol Binding](kafka.md) | :heavy_check_mark: | :heavy_check_mark: | | MQTT Protocol Binding | :x: | :x: | | NATS Protocol Binding | :x: | :x: | diff --git a/docs/mqtt.md b/docs/mqtt.md new file mode 100644 index 000000000..4d1aacee0 --- /dev/null +++ b/docs/mqtt.md @@ -0,0 +1,60 @@ +--- +title: CloudEvents MQTT +nav_order: 5 +--- + +# MQTT Support + +The SDK supports both V3 and V5 MQTT binding specifications via these Java client libraries: + + * [Paho]() + * [HiveMQ]() + +NOTE: MQTT V3 *only* supports structured mode transfer of CloudEVents. Operations related to binary mode transmission +are either not available or will throw runtime exceptions if an attempt is made to use them. + +Both client library implementations rely on a *provided* maven dependency. + +# General Usage + +There is a slight variance in usage between the two supported client libraries owing to the way those clients +have implemented support for the two versions of MQTT but the general pattern is the same as every other protocol +binding. + +## Creating a message from a CloudEvent + + 1. Obtain a `MessageWriter` from a factory. + 2. Use the writer to populate the MQTT message using structured or binary mode. + * `mqttMessage = messageWriter.writeBinary(cloudEvent);` or, + * `mqttMessage = messageWriter.writeStructured(cloudEvent, eventFormat);` + +## Creating a CloudEvent from a message. + + 1. Obtain a 'MessageReader' from a message factory for an MQTT message. + 2. Obtain a CloudEvent from the reader. + * _CloudEvent cloudEvent = reader.toEvent();_ + + +# PAHO Client Usage + +## Maven + +```xml + + io.cloudevents + cloudevents-mqtt-paho + 2.x.y + +``` + +# HiveMQ Client Usage + +## Maven + +```xml + + io.cloudevents + cloudevents-mqtt-hivemq + 2.x.y + +``` diff --git a/pom.xml b/pom.xml index b0f706009..9bfdd34a9 100644 --- a/pom.xml +++ b/pom.xml @@ -71,6 +71,9 @@ formats/json-jackson formats/protobuf formats/xml + bindings/mqtt/core + bindings/mqtt/paho + bindings/mqtt/hivemq amqp http/basic http/vertx