Skip to content

Commit

Permalink
feat(connection): Integrating new telestion-api
Browse files Browse the repository at this point in the history
  • Loading branch information
cb0s committed Dec 6, 2021
2 parents 3a8f021 + 3a8f021 commit 3970db1
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,36 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
import de.wuespace.telestion.api.verticle.TelestionConfiguration;
import de.wuespace.telestion.api.verticle.TelestionVerticle;
import de.wuespace.telestion.api.verticle.trait.WithEventBus;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public final class Broadcaster extends AbstractVerticle {
public final class Broadcaster extends TelestionVerticle<Broadcaster.Configuration> implements WithEventBus {

public final static int NO_BROADCASTING = -1;
public final static int DEFAULT_ID = 0;

public final record Configuration(@JsonProperty
public record Configuration(@JsonProperty
String inAddress,
@JsonProperty
int id) implements JsonMessage {
int id) implements TelestionConfiguration {
public Configuration() {
this(null, DEFAULT_ID);
}
}

@Override
public void start(Promise<Void> startPromise) {
this.config = Config.get(this.config, new Configuration(), this.config(), Configuration.class);
public void onStart(Promise<Void> startPromise) throws Exception {
this.addressList = new HashSet<>();

setDefaultConfig(new Configuration());
this.config = getConfig();

if (broadcasterMap.containsKey(this.config.id())) {
startPromise.fail("The broadcasters id #%d is already taken!".formatted(this.config.id()));
Expand All @@ -38,11 +44,6 @@ public void start(Promise<Void> startPromise) {
startPromise.complete();
}

@Override
public void stop(Promise<Void> stopPromise) {
stopPromise.complete();
}

/**
* Registers a new address for the broadcaster with the given id if it was previously specified in the config.
*
Expand Down Expand Up @@ -82,22 +83,12 @@ public String[] getAddresses() {
return addressList.toArray(String[]::new);
}

public Broadcaster() {
this(null);
}

public Broadcaster(Configuration config) {
this.config = config;
this.addressList = new HashSet<>();
}

private void send(RawMessage msg) {
addressList.forEach(addr -> vertx.eventBus().publish(addr, msg.json()));
addressList.forEach(addr -> publish(addr, msg));
}

private Set<String> addressList;
private Configuration config;
private final Set<String> addressList;

private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
private static final Map<Integer, Broadcaster> broadcasterMap = new HashMap<>();
private static Logger logger = LoggerFactory.getLogger(Broadcaster.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,4 @@
*/
public record ConnectionData(@JsonProperty byte[] rawData,
@JsonProperty ConnectionDetails conDetails) implements JsonMessage {

private ConnectionData() {
this(null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@

@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, property="className")
public interface ConnectionDetails extends JsonMessage {

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ private StaticHandler staticHandler() {
* @param outboundPermitted permitted eventbus addresses for outbound connections
*/
@SuppressWarnings({"unused"})
private static record Configuration(@JsonProperty String host, @JsonProperty int port,
@JsonProperty List<String> inboundPermitted,
@JsonProperty List<String> outboundPermitted) {
private record Configuration(@JsonProperty String host, @JsonProperty int port,
@JsonProperty List<String> inboundPermitted,
@JsonProperty List<String> outboundPermitted) {
private Configuration() {
this("127.0.0.1", 9870, Collections.emptyList(), Collections.emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,4 @@
import de.wuespace.telestion.api.message.JsonMessage;

public record RawMessage(@JsonProperty byte[] data) implements JsonMessage {
private RawMessage() {
this(null);
}
}
Original file line number Diff line number Diff line change
@@ -1,68 +1,35 @@
package de.wuespace.telestion.services.connection;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
import de.wuespace.telestion.api.verticle.TelestionConfiguration;
import de.wuespace.telestion.api.verticle.TelestionVerticle;
import de.wuespace.telestion.api.verticle.trait.WithEventBus;

public final class Receiver extends AbstractVerticle {
public final class Receiver extends TelestionVerticle<Receiver.Configuration> implements WithEventBus {

@Override
public void start(Promise<Void> startPromise) {
config = Config.get(config, config(), Configuration.class);
public void onStart() throws Exception {
var config = getConfig();

for (var con : config.connectionAddresses()) {
vertx.eventBus().consumer(con,
raw -> JsonMessage.on(ConnectionData.class, raw,
msg -> {
logger.debug("Connection-Message received on {}", con);
vertx.eventBus().publish(config.outputAddr(), msg.json());
}));
register(con, raw -> {
JsonMessage.on(ConnectionData.class, raw,
msg -> {
logger.debug("Connection-Message received on {}", con);
publish(config.outAddress(), msg);
});
});
}
startPromise.complete();
}

@Override
public void stop(Promise<Void> stopPromise) {
stopPromise.complete();
}

/**
* @param outputAddr
* @param outAddress
* @param connectionAddresses
*/
public record Configuration(
@JsonProperty String outputAddr,
@JsonProperty String... connectionAddresses) {

private Configuration() {
this(null);
}
@JsonProperty String outAddress,
@JsonProperty String... connectionAddresses) implements TelestionConfiguration {
}

public Receiver() {
this(null);
}

/**
*
* @param config {@link Configuration} for the creation
*/
public Receiver(Configuration config) {
this.config = config;
}

/**
*
*/
private Configuration config;

/**
*
*/
private static final Logger logger = LoggerFactory.getLogger(Receiver.class);

}
Original file line number Diff line number Diff line change
@@ -1,76 +1,38 @@
package de.wuespace.telestion.services.connection;

import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.wuespace.telestion.api.verticle.TelestionConfiguration;
import de.wuespace.telestion.api.verticle.TelestionVerticle;
import de.wuespace.telestion.api.verticle.trait.WithEventBus;

public final class Sender extends AbstractVerticle {
public final class Sender extends TelestionVerticle<Sender.Configuration> implements WithEventBus {

@Override
public void start(Promise<Void> startPromise) {
config = Config.get(config, config(), Configuration.class);

vertx.eventBus().consumer(config.inputAddress,
raw -> {
JsonMessage.on(SenderData.class, raw, this::handleMessage);
JsonMessage.on(ConnectionData.class, raw, msg -> handleMessage(SenderData.fromConnectionData(msg)));
});
startPromise.complete();
}

@Override
public void stop(Promise<Void> stopPromise) {
stopPromise.complete();
public void onStart() throws Exception {
register(getConfig().inAddress(), raw -> {
JsonMessage.on(SenderData.class, raw, this::handleMessage);
JsonMessage.on(ConnectionData.class, raw, msg -> handleMessage(SenderData.fromConnectionData(msg)));
});
}

/**
* @param inputAddress
* @param inAddress
* @param connectionAddresses
*/
public record Configuration(
@JsonProperty String inputAddress,
@JsonProperty String... connectionAddresses) implements JsonMessage {

@SuppressWarnings("unused")
private Configuration() {
this(null);
}
}

public Sender() {
this(null);
}

/**
*
* @param config {@link Configuration} for the creation
*/
public Sender(Configuration config) {
this.config = config;
@JsonProperty String inAddress,
@JsonProperty String... connectionAddresses) implements TelestionConfiguration {
}

/**
*
* @param msg to send
*/
private void handleMessage(SenderData msg) {
for (var s : config.connectionAddresses()) {
for (var s : getConfig().connectionAddresses()) {
logger.debug("Sending Message to {}", s);
vertx.eventBus().publish(s, msg.json());
publish(s, msg.json());
}
}

/**
*
*/
private Configuration config;

/**
*
*/
private static final Logger logger = LoggerFactory.getLogger(Sender.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@
*/
public record SenderData(@JsonProperty byte[] rawData,
@JsonProperty ConnectionDetails... conDetails) implements JsonMessage {

@SuppressWarnings("unused")
private SenderData() {
this(null);
}

public static SenderData fromConnectionData(ConnectionData data) {
return new SenderData(data.rawData(), data.conDetails());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,23 @@
package de.wuespace.telestion.services.connection;

import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.wuespace.telestion.api.verticle.TelestionConfiguration;
import de.wuespace.telestion.api.verticle.TelestionVerticle;
import de.wuespace.telestion.api.verticle.trait.WithEventBus;

public final class StaticSender extends AbstractVerticle {
public final class StaticSender extends TelestionVerticle<StaticSender.Configuration> implements WithEventBus {

public record Configuration(@JsonProperty String inAddress,
@JsonProperty String outAddress,
@JsonProperty ConnectionDetails staticDetails) implements JsonMessage {
private Configuration() {
this(null, null, null);
}
@JsonProperty ConnectionDetails staticDetails) implements TelestionConfiguration {
}

@Override
public void start(Promise<Void> startPromise) {
config = Config.get(config, config(), Configuration.class);

vertx.eventBus().consumer(config.inAddress(), raw -> JsonMessage.on(RawMessage.class, raw, msg -> {
public void onStart() throws Exception {
register(getConfig().inAddress, body -> JsonMessage.on(RawMessage.class, body, msg -> {
logger.debug("Sending static message");
vertx.eventBus().publish(config.outAddress(), new ConnectionData(msg.data(),
config.staticDetails()).json());
publish(getConfig().outAddress(), new ConnectionData(msg.data(), getConfig().staticDetails()));
}));

startPromise.complete();
}

@Override
public void stop(Promise<Void> stopPromise) {
stopPromise.complete();
}

public StaticSender() {
this(null);
}

public StaticSender(Configuration config) {
this.config = config;
}

private Configuration config;
private final static Logger logger = LoggerFactory.getLogger(StaticSender.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class TcpServer extends BaseTcpVerticle {
* @param clientTimeout time until timeout
* @param broadcasterId id of the broadcaster you want to use
*/
public final record Configuration(@JsonProperty String inAddress,
public record Configuration(@JsonProperty String inAddress,
@JsonProperty String outAddress,
@JsonProperty String host,
@JsonProperty int port,
Expand Down
Loading

0 comments on commit 3970db1

Please sign in to comment.