Skip to content

Commit

Permalink
Adds basic MQTT support with remote events
Browse files Browse the repository at this point in the history
- adds short cut to signal restart on rc 42
- adds heartbeat over mqtt

Signed-off-by: Patrick Reinhart <[email protected]>
  • Loading branch information
reinhapa committed Apr 7, 2023
1 parent 2a020ee commit 9c5b5ae
Show file tree
Hide file tree
Showing 17 changed files with 621 additions and 33 deletions.
14 changes: 14 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ updates:
# Allow up to N open pull requests (independant of source)
open-pull-requests-limit: 50

- package-ecosystem: "gradle"
# Any gradle based dependencies
directory: "/mqtt"
# Raise pull requests for version updates on `master` branch
target-branch: "master"
schedule:
interval: "daily"
# Check for npm updates at 0815hrs UTC
time: "08:15"
reviewers:
- "TweetWallFX/tweetwallfx-admins"
# Allow up to N open pull requests (independant of source)
open-pull-requests-limit: 50

- package-ecosystem: "gradle"
# Any gradle based dependencies
directory: "/stepengine-dataproviders"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2017-2022 TweetWallFX
* Copyright (c) 2017-2023 TweetWallFX
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -104,7 +104,7 @@ public Class<ScheduleDataProvider> getDataProviderClass() {
* Param {@code scheduleDuration} Fixed rate of / delay between consecutive
* executions in seconds. Defaults to {@code 300L}.
*/
private static record Config(
public record Config(
ScheduleType scheduleType,
Long initialDelay,
Long scheduleDuration) implements ScheduledConfig {
Expand Down
3 changes: 2 additions & 1 deletion generic2d/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015-2022 TweetWallFX
* Copyright (c) 2015-2023 TweetWallFX
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -25,6 +25,7 @@
dependencies {
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation project(':tweetwallfx-2d')
implementation project(':tweetwallfx-mqtt')
implementation project(':tweetwallfx-tweet-api')
runtimeOnly 'org.apache.logging.log4j:log4j-slf4j2-impl:2.20.0'
}
30 changes: 28 additions & 2 deletions generic2d/src/main/java/org/tweetwallfx/generic/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package org.tweetwallfx.generic;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.scene.Scene;
Expand All @@ -39,16 +40,32 @@
import org.slf4j.LoggerFactory;
import org.tweetwallfx.config.Configuration;
import org.tweetwallfx.config.TweetwallSettings;
import org.tweetwallfx.mqtt.MqttProcess;
import org.tweetwallfx.tweet.api.Tweeter;
import org.tweetwallfx.twod.TagTweets;

import static org.tweetwallfx.mqtt.MqttEvent.RESTART;
import static org.tweetwallfx.mqtt.MqttEvent.STOP;

public class Main extends Application {
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
private static final AtomicInteger RC = new AtomicInteger();

final MqttProcess mqttProcess = new MqttProcess();

@Override
public void start(Stage primaryStage) {
new Thread(mqttProcess).start();
mqttProcess.addMqttEventHandler(e -> {
if (STOP.equals(e.getEventType())) {
exitApplication(0); // normal exit
} else if (RESTART.equals(e.getEventType())) {
exitApplication(42); // restart
}
});

BorderPane borderPane = new BorderPane();
Scene scene = new Scene(borderPane, 1920, 1280);
Scene scene = new Scene(borderPane, 1920, 1080);
borderPane.getStyleClass().add("splash");

final TweetwallSettings tweetwallSettings
Expand Down Expand Up @@ -79,7 +96,8 @@ public void start(Stage primaryStage) {
switch (character) {
case "D" -> toggleStatusLine(borderPane, spa, statusLineHost);
case "F" -> primaryStage.setFullScreen(!primaryStage.isFullScreen());
case "X", "Q" -> Platform.exit();
case "R" -> exitApplication(42); // restart
case "X", "Q" -> exitApplication(0); // normal exit
default -> LOG.warn("Unknown character: '{}'", character);
};
}
Expand All @@ -92,6 +110,12 @@ public void start(Stage primaryStage) {
primaryStage.setFullScreen(!Boolean.getBoolean("org.tweetwallfx.disable-full-screen"));
}

private void exitApplication(int exitCode) {
LOG.info("Exit application with rc={}", exitCode);
RC.set(exitCode);
Platform.exit();
}

private static void toggleStatusLine(BorderPane borderPane, StringPropertyAppender spa, HBox statusLineHost) {
final LoggerConfig rootLogger = LoggerContext.getContext(false).getConfiguration().getRootLogger();
if (null == statusLineHost.getParent()) {
Expand All @@ -107,6 +131,7 @@ private static void toggleStatusLine(BorderPane borderPane, StringPropertyAppend
public void stop() {
LOG.info("closing...");
Tweeter.getInstance().shutdown();
mqttProcess.stop();
}

/**
Expand All @@ -116,5 +141,6 @@ public void stop() {
*/
public static void main(String[] args) {
launch(args);
System.exit(RC.get());
}
}
29 changes: 29 additions & 0 deletions mqtt/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* The MIT License
*
* Copyright 2023-2023 TweetWallFX
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

dependencies {
implementation project(':tweetwallfx-configuration')
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation 'org.slf4j:slf4j-api:2.0.6'
}
37 changes: 37 additions & 0 deletions mqtt/src/main/java/org/tweetwallfx/mqtt/MqttEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2023 TweetWallFX
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package org.tweetwallfx.mqtt;

import javafx.event.Event;
import javafx.event.EventType;

public class MqttEvent extends Event {
public static final EventType<MqttEvent> ANY = new EventType<>(Event.ANY, "ANY");
public static final EventType<MqttEvent> RESTART = new EventType<>(Event.ANY, "RESTART");
public static final EventType<MqttEvent> STOP = new EventType<>(Event.ANY, "STOP");

public MqttEvent(Object source, EventType<? extends Event> eventType) {
super(source, null, eventType);
}
}
181 changes: 181 additions & 0 deletions mqtt/src/main/java/org/tweetwallfx/mqtt/MqttProcess.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2023 TweetWallFX
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package org.tweetwallfx.mqtt;

import javafx.beans.property.BooleanProperty;
import javafx.beans.property.ReadOnlyBooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import javafx.event.EventHandler;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tweetwallfx.config.Configuration;
import org.tweetwallfx.mqtt.config.MqttSettings;

import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.tweetwallfx.util.JsonDataConverter.convertToBytes;

public class MqttProcess implements Runnable {
private static final String TWEETWALL_STATE = "tweetwall/state/";
private static final Logger LOG = LoggerFactory.getLogger(MqttProcess.class);

private final BooleanProperty stopProperty = new SimpleBooleanProperty();
private final BooleanProperty runningProperty = new SimpleBooleanProperty();
private final List<EventHandler<MqttEvent>> handlers = new ArrayList<>();
private final AtomicReference<MqttClient> clientRef = new AtomicReference<>();

private void fire(MqttEvent mqttEvent) {
handlers.forEach(h -> h.handle(mqttEvent));
}

public void addMqttEventHandler(EventHandler<MqttEvent> handler) {
handlers.add(Objects.requireNonNull(handler));
}

public ReadOnlyBooleanProperty runningProperty() {
return ReadOnlyBooleanProperty.readOnlyBooleanProperty(runningProperty);
}

public void stop() {
stopProperty.set(true);
while (runningProperty.get()) {
waitFor(MILLISECONDS, 500);
}
}

@Override
public void run() {
try {
Thread.currentThread().setName("MQTT-Command-Dispatcher");
final MqttSettings mqttSettings = Configuration.getInstance()
.getConfigTyped(MqttSettings.CONFIG_KEY, MqttSettings.class);
if (!mqttSettings.enabled()) {
LOG.info("MQTT disabled");
return;
}
long lastHeartbeat = 0;
while (!stopProperty.get()) {
final String broker = mqttSettings.brokerUrl();
final String clientId = mqttSettings.clientId();
try (MqttClientPersistence persistence = new MemoryPersistence();
MqttClient mqttClient = new MqttClient(broker, clientId, persistence)) {
clientRef.set(mqttClient);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setConnectionTimeout(0);
connOpts.setKeepAliveInterval(30);
connOpts.setAutomaticReconnect(true);
final MqttSettings.Auth auth = mqttSettings.auth();
Optional.ofNullable(auth.userName()).ifPresent(connOpts::setUserName);
Optional.ofNullable(auth.secret()).ifPresent(pw -> connOpts.setPassword(pw.toCharArray()));
LOG.info("Connect to {}", broker);
mqttClient.connect(connOpts);
stopProperty.addListener((observableValue, oldValue, newValue) -> {
if (newValue) {
sendMessage(TWEETWALL_STATE, State.stopping());
try {
LOG.info("Disconnecting");
mqttClient.disconnect();
} catch (MqttException e) {
LOG.error("Failed to send stop notification", e);
}
}
});
runningProperty.set(true);
sendMessage(TWEETWALL_STATE, State.starting(SystemInfo.info()));
LOG.info("Connection established");
mqttClient.subscribe("tweetwall/action/#", (t, m) -> handleActionMessage(clientId, t, m));
while (!stopProperty.get()) {
// heart beat task
long currentTime = System.currentTimeMillis();
long duration = MILLISECONDS.toSeconds(currentTime - lastHeartbeat);
if (duration >= mqttSettings.heartbeatSeconds()) {
lastHeartbeat = currentTime;
LOG.debug("Sending heart beat message");
sendMessage(TWEETWALL_STATE, State.alive());
} else {
waitFor(MILLISECONDS, 500);
}
}
} catch (MqttException e) {
LOG.error("Failure while handling MQTT", e);
} finally {
clientRef.set(null);
}
}
} finally {
runningProperty.set(false);
}
}

private static void waitFor(TimeUnit unit, long amount) {
try {
unit.sleep(amount);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while waiting", e);
}
}

private void handleActionMessage(String clientId, String topic, MqttMessage message) {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
if (topic.equals("tweetwall/action/" + clientId)) {
switch (payload) {
case "stop" -> fire(new MqttEvent(this, MqttEvent.STOP));
case "restart" -> fire(new MqttEvent(this, MqttEvent.RESTART));
case "info" -> sendMessage(TWEETWALL_STATE, State.info(SystemInfo.info()));
default -> LOG.warn("Unknown action payload: {}", payload);
}
} else {
LOG.warn("Unknown payload '{}' for topic {}", payload, topic);
}
}

private void sendMessage(String topic, Object messageObject) {
final MqttClient mqttClient = clientRef.get();
if (mqttClient == null) {
LOG.error("Failed to send '{}' for topic {} as no client available", messageObject, topic);
} else {
try {
mqttClient.publish(topic + mqttClient.getClientId(), convertToBytes(messageObject), 2, false);
} catch (MqttException | UncheckedIOException e) {
LOG.error("Failed to send '{}' for topic {}", messageObject, topic, e);
}
}
}
}
Loading

0 comments on commit 9c5b5ae

Please sign in to comment.