Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT Binding. #513

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions bindings/mqtt/core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-parent</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloudevents-mqtt-core</artifactId>
<name>CloudEvents - MQTT Common</name>
<packaging>jar</packaging>

<properties>
<module-name>io.cloudevents.mqtt.core</module-name>
</properties>

<dependencies>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.core;

import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import io.cloudevents.core.v1.CloudEventV1;

import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Enable the hydration of a CloudEvent in binary mode from an MQTT message.
* <p>
* This abstract class provides common behavior across different MQTT
* client implementations.
*/
public abstract class BaseMqttBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {

/**
* CloudEvent attribute names must match this pattern.
*/
private static final Pattern CE_ATTR_NAME_REGEX = Pattern.compile("^[a-z\\d]+$");
private final String contentType;

/**
* Initialise the binary message reader.
*
* @param version The CloudEvent message version.
* @param contentType The assigned media content type.
* @param payload The raw data payload from the MQTT message.
*/
protected BaseMqttBinaryMessageReader(final SpecVersion version, final String contentType, final byte[] payload) {
super(version, payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);
this.contentType = contentType;
}

// --- Overrides

@Override
protected boolean isContentTypeHeader(String key) {
return false; // The content type is not defined in a user-property
}

@Override
protected boolean isCloudEventsHeader(String key) {

// The binding specification does not require name prefixing,
// as such any user-property is a potential CE Context Attribute.
//
// If the name complies with CE convention then we'll assume
// it's a context attribute.
//
Matcher m = CE_ATTR_NAME_REGEX.matcher(key);
return m.matches();
}

@Override
protected String toCloudEventsKey(String key) {
return key; // No special prefixing occurs in the MQTT binding spec.
}


@Override
protected void forEachHeader(BiConsumer<String, Object> fn) {

// If there is a content-type then we need set it.
// Inspired by AMQP/Proton code :-)

if (contentType != null) {
fn.accept(CloudEventV1.DATACONTENTTYPE, contentType);
}

// Now process each MQTT User Property.
forEachUserProperty(fn);

}

@Override
protected String toCloudEventsValue(Object value) {
return value.toString();
}
JemDay marked this conversation as resolved.
Show resolved Hide resolved

/**
* Visit each MQTT user-property and invoke the supplied function.
*
* @param fn The function to invoke for each MQTT User property.
*/
protected abstract void forEachUserProperty(BiConsumer<String, Object> fn);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.core;

import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;

/**
* General MQTT Utilities and Helpers
*/
public class MqttUtils {

private static final String DEFAULT_FORMAT = "application/cloudevents+json";

private MqttUtils() {
}

/**
* Obtain the {@link EventFormat} to use when working with MQTT V3
* messages.
*
* @return An event format.
*/
public static EventFormat getDefaultEventFormat() {

return EventFormatProvider.getInstance().resolveFormat(DEFAULT_FORMAT);

}

/**
* Get the default content type to assume for MQTT messages.
*
* @return A Content-Type
*/
public static final String getDefaultContentType() {
return DEFAULT_FORMAT;
}

}
80 changes: 80 additions & 0 deletions bindings/mqtt/hivemq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-parent</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>cloudevents-mqtt-hivemq</artifactId>
<name>CloudEvents - MQTT HiveMQ Binding</name>
<packaging>jar</packaging>

<properties>
<module-name>io.cloudevents.mqtt.hivemq</module-name>
<hivemq.version>1.3.0</hivemq.version>
</properties>

<dependencies>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-mqtt-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>${hivemq.version}</version>
<scope>provided</scope>
</dependency>

<!-- Testing Dependencies -->

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- We need a JSON Format for V3 compliance checking -->

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.mqtt.hivemq;

import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.cloudevents.SpecVersion;
import io.cloudevents.mqtt.core.BaseMqttBinaryMessageReader;

import java.util.function.BiConsumer;

final class BinaryMessageReader extends BaseMqttBinaryMessageReader {

Mqtt5Publish message;

BinaryMessageReader(final SpecVersion version, final String contentType, Mqtt5Publish message) {
super(version, contentType, message.getPayloadAsBytes());

this.message = message;
}

@Override
protected void forEachUserProperty(BiConsumer<String, Object> fn) {

message.getUserProperties().asList().forEach(up -> {

final String key = up.getName().toString();
final String val = up.getValue().toString();

if (key != null && val != null) {
fn.accept(key, val);
}
});

}
}
Loading