Skip to content

Commit

Permalink
Merge branch 'release/0.8.1' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Pascal Holy committed Feb 11, 2021
2 parents 6392daf + b58c1ff commit 1dd80c7
Show file tree
Hide file tree
Showing 120 changed files with 2,200 additions and 532 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.8.0
0.8.1
1 change: 1 addition & 0 deletions backend/api/admin/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ app_deps = [
"//backend/model/channel",
"//backend:tag",
"//backend:webhook",
"//backend/model/metadata",
"//lib/java/uuid",
"//lib/java/spring/auth:spring-auth",
"//lib/java/spring/web:spring-web",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import co.airy.avro.communication.Channel;
import co.airy.avro.communication.ChannelConnectionState;
import co.airy.core.api.admin.payload.ChannelsResponsePayload;
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.model.channel.ChannelPayload;
import co.airy.model.channel.dto.ChannelContainer;
import co.airy.model.metadata.MetadataKeys;
import co.airy.model.metadata.dto.MetadataMap;
import co.airy.spring.web.payload.EmptyResponsePayload;
import co.airy.uuid.UUIDv5;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
Expand All @@ -22,63 +22,103 @@
import java.util.List;
import java.util.UUID;

import static co.airy.model.channel.ChannelPayload.fromChannel;
import static co.airy.model.channel.ChannelPayload.fromChannelContainer;
import static co.airy.model.metadata.MetadataRepository.newChannelMetadata;
import static java.util.stream.Collectors.toList;

@RestController
public class ChannelsController {
private final Stores stores;
private final KafkaProducer<String, Channel> producer;
private final String applicationCommunicationChannels = new ApplicationCommunicationChannels().name();

public ChannelsController(Stores stores, KafkaProducer<String, Channel> producer) {
public ChannelsController(Stores stores) {
this.stores = stores;
this.producer = producer;
}

@PostMapping("/channels.list")
ResponseEntity<ChannelsResponsePayload> listChannels() {
final List<Channel> channels = stores.getChannels();

ResponseEntity<ChannelsResponsePayload> listChannels(@RequestBody @Valid ListChannelRequestPayload requestPayload) {
final List<ChannelContainer> channels = stores.getChannels();
final String sourceToFilter = requestPayload.getSource();
return ResponseEntity.ok(new ChannelsResponsePayload(channels.stream()
.map(ChannelPayload::fromChannel)
.filter((container) -> sourceToFilter == null || sourceToFilter.equals(container.getChannel().getSource()))
.map(ChannelPayload::fromChannelContainer)
.collect(toList())));
}

@PostMapping("/channels.info")
ResponseEntity<?> getChannel(@RequestBody @Valid GetChannelRequestPayload requestPayload) {
final ChannelContainer container = stores.getChannel(requestPayload.getChannelId().toString());
if (container == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(new EmptyResponsePayload());
}

return ResponseEntity.ok(fromChannelContainer(container));
}

@PostMapping("/channels.update")
ResponseEntity<?> updateChannel(@RequestBody @Valid UpdateChannelRequestPayload requestPayload) {
final String channelId = requestPayload.getChannelId().toString();
final ChannelContainer container = stores.getChannel(channelId);
if (container == null) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(new EmptyResponsePayload());
}

container.getMetadataMap();
if (requestPayload.getName() != null) {
container.getMetadataMap().put(MetadataKeys.ChannelKeys.NAME, newChannelMetadata(channelId, MetadataKeys.ChannelKeys.NAME, requestPayload.getName()));
}
if (requestPayload.getImageUrl() != null) {
container.getMetadataMap().put(MetadataKeys.ChannelKeys.IMAGE_URL, newChannelMetadata(channelId, MetadataKeys.ChannelKeys.IMAGE_URL, requestPayload.getName()));
}

try {
stores.storeMetadataMap(container.getMetadataMap());
return ResponseEntity.ok(fromChannelContainer(container));
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}

}

@PostMapping("/channels.chatplugin.connect")
ResponseEntity<?> connect(@RequestBody @Valid ConnectChannelRequestPayload requestPayload) {
final String sourceChannelId = requestPayload.getName();
final String sourceIdentifier = "chat_plugin";

final String channelId = UUIDv5.fromNamespaceAndName(sourceIdentifier, sourceChannelId).toString();

final Channel channel = Channel.newBuilder()
.setId(channelId)
.setConnectionState(ChannelConnectionState.CONNECTED)
.setSource(sourceIdentifier)
.setSourceChannelId(sourceChannelId)
.setName(requestPayload.getName())
.build();
final ChannelContainer container = ChannelContainer.builder()
.channel(
Channel.newBuilder()
.setId(channelId)
.setConnectionState(ChannelConnectionState.CONNECTED)
.setSource(sourceIdentifier)
.setSourceChannelId(sourceChannelId)
.build()
)
.metadataMap(MetadataMap.from(List.of(
newChannelMetadata(channelId, MetadataKeys.ChannelKeys.NAME, requestPayload.getName())
))).build();

try {
producer.send(new ProducerRecord<>(applicationCommunicationChannels, channel.getId(), channel)).get();
stores.storeChannelContainer(container);
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}

return ResponseEntity.ok(fromChannel(channel));
return ResponseEntity.ok(fromChannelContainer(container));
}

@PostMapping("/channels.chatplugin.disconnect")
ResponseEntity<?> disconnect(@RequestBody @Valid ChannelDisconnectRequestPayload requestPayload) {
final String channelId = requestPayload.getChannelId().toString();

final Channel channel = stores.getConnectedChannelsStore().get(channelId);
final ChannelContainer container = stores.getConnectedChannelsStore().get(channelId);

if (channel == null) {
if (container == null) {
return ResponseEntity.notFound().build();
}

final Channel channel = container.getChannel();
if (channel.getConnectionState().equals(ChannelConnectionState.DISCONNECTED)) {
return ResponseEntity.accepted().body(new EmptyResponsePayload());
}
Expand All @@ -87,7 +127,7 @@ ResponseEntity<?> disconnect(@RequestBody @Valid ChannelDisconnectRequestPayload
channel.setToken(null);

try {
producer.send(new ProducerRecord<>(applicationCommunicationChannels, channel.getId(), channel)).get();
stores.storeChannel(channel);
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}
Expand All @@ -97,6 +137,28 @@ ResponseEntity<?> disconnect(@RequestBody @Valid ChannelDisconnectRequestPayload

}

@Data
@NoArgsConstructor
class ListChannelRequestPayload {
private String source;
}

@Data
@NoArgsConstructor
class GetChannelRequestPayload {
@NotNull
private UUID channelId;
}

@Data
@NoArgsConstructor
class UpdateChannelRequestPayload {
@NotNull
private UUID channelId;
private String name;
private String imageUrl;
}

@Data
@NoArgsConstructor
class ConnectChannelRequestPayload {
Expand Down
50 changes: 44 additions & 6 deletions backend/api/admin/src/main/java/co/airy/core/api/admin/Stores.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

import co.airy.avro.communication.Channel;
import co.airy.avro.communication.ChannelConnectionState;
import co.airy.avro.communication.Metadata;
import co.airy.avro.communication.Tag;
import co.airy.avro.communication.Webhook;
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationTags;
import co.airy.kafka.schema.application.ApplicationCommunicationWebhooks;
import co.airy.kafka.streams.KafkaStreamsWrapper;
import co.airy.model.channel.dto.ChannelContainer;
import co.airy.model.metadata.dto.MetadataMap;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
Expand All @@ -26,6 +32,10 @@
import java.util.List;
import java.util.concurrent.ExecutionException;

import static co.airy.model.metadata.MetadataRepository.getId;
import static co.airy.model.metadata.MetadataRepository.getSubject;
import static co.airy.model.metadata.MetadataRepository.isChannelMetadata;

@Component
public class Stores implements HealthIndicator, ApplicationListener<ApplicationStartedEvent>, DisposableBean {
private static final String appId = "api.AdminStores";
Expand All @@ -44,6 +54,7 @@ public class Stores implements HealthIndicator, ApplicationListener<ApplicationS
private final String applicationCommunicationChannels = new ApplicationCommunicationChannels().name();
private final String applicationCommunicationWebhooks = new ApplicationCommunicationWebhooks().name();
private final String applicationCommunicationTags = new ApplicationCommunicationTags().name();
private final String applicationCommunicationMetadata = new ApplicationCommunicationMetadata().name();

public Stores(KafkaStreamsWrapper streams, KafkaProducer<String, SpecificRecordBase> producer) {
this.streams = streams;
Expand All @@ -54,8 +65,15 @@ public Stores(KafkaStreamsWrapper streams, KafkaProducer<String, SpecificRecordB
public void onApplicationEvent(ApplicationStartedEvent event) {
final StreamsBuilder builder = new StreamsBuilder();

// metadata table keyed by channel id
final KTable<String, MetadataMap> metadataTable = builder.<String, Metadata>table(applicationCommunicationMetadata)
.filter((metadataId, metadata) -> isChannelMetadata(metadata))
.groupBy((metadataId, metadata) -> KeyValue.pair(getSubject(metadata).getIdentifier(), metadata))
.aggregate(MetadataMap::new, MetadataMap::adder, MetadataMap::subtractor);

builder.<String, Channel>table(applicationCommunicationChannels)
.filter((k, v) -> v.getConnectionState().equals(ChannelConnectionState.CONNECTED), Materialized.as(connectedChannelsStore));
.filter((k, v) -> v.getConnectionState().equals(ChannelConnectionState.CONNECTED))
.leftJoin(metadataTable, ChannelContainer::new, Materialized.as(connectedChannelsStore));

builder.<String, Webhook>stream(applicationCommunicationWebhooks)
.groupBy((webhookId, webhook) -> allWebhooksKey)
Expand All @@ -79,6 +97,21 @@ public void storeWebhook(Webhook webhook) throws ExecutionException, Interrupted
producer.send(new ProducerRecord<>(applicationCommunicationWebhooks, allWebhooksKey, webhook)).get();
}

public void storeChannelContainer(ChannelContainer container) throws ExecutionException, InterruptedException {
storeChannel(container.getChannel());
storeMetadataMap(container.getMetadataMap());
}

public void storeMetadataMap(MetadataMap metadataMap) throws ExecutionException, InterruptedException {
for (Metadata metadata : metadataMap.values()) {
producer.send(new ProducerRecord<>(applicationCommunicationMetadata, getId(metadata).toString(), metadata)).get();
}
}

public void storeChannel(Channel channel) throws ExecutionException, InterruptedException {
producer.send(new ProducerRecord<>(applicationCommunicationChannels, channel.getId(), channel)).get();
}

public void storeTag(Tag tag) throws ExecutionException, InterruptedException {
producer.send(new ProducerRecord<>(applicationCommunicationTags, tag.getId(), tag)).get();
}
Expand All @@ -87,16 +120,21 @@ public void deleteTag(Tag tag) {
producer.send(new ProducerRecord<>(applicationCommunicationTags, tag.getId(), null));
}

public ReadOnlyKeyValueStore<String, Channel> getConnectedChannelsStore() {
public ReadOnlyKeyValueStore<String, ChannelContainer> getConnectedChannelsStore() {
return streams.acquireLocalStore(connectedChannelsStore);
}

public List<Channel> getChannels() {
final ReadOnlyKeyValueStore<String, Channel> store = getConnectedChannelsStore();
public ChannelContainer getChannel(String channelId) {
final ReadOnlyKeyValueStore<String, ChannelContainer> store = getConnectedChannelsStore();
return store.get(channelId);
}

public List<ChannelContainer> getChannels() {
final ReadOnlyKeyValueStore<String, ChannelContainer> store = getConnectedChannelsStore();

final KeyValueIterator<String, Channel> iterator = store.all();
final KeyValueIterator<String, ChannelContainer> iterator = store.all();

List<Channel> channels = new ArrayList<>();
List<ChannelContainer> channels = new ArrayList<>();
iterator.forEachRemaining(kv -> channels.add(kv.value));

return channels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import co.airy.avro.communication.Channel;
import co.airy.avro.communication.ChannelConnectionState;
import co.airy.kafka.schema.application.ApplicationCommunicationChannels;
import co.airy.kafka.schema.application.ApplicationCommunicationMetadata;
import co.airy.kafka.schema.application.ApplicationCommunicationTags;
import co.airy.kafka.schema.application.ApplicationCommunicationWebhooks;
import co.airy.kafka.test.KafkaTestHelper;
Expand All @@ -27,6 +28,7 @@

import static co.airy.test.Timing.retryOnException;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
Expand All @@ -43,6 +45,7 @@ public class ChannelsControllerTest {
private static KafkaTestHelper kafkaTestHelper;
private static final ApplicationCommunicationChannels applicationCommunicationChannels = new ApplicationCommunicationChannels();
private static final ApplicationCommunicationWebhooks applicationCommunicationWebhooks = new ApplicationCommunicationWebhooks();
private static final ApplicationCommunicationMetadata applicationCommunicationMetadata = new ApplicationCommunicationMetadata();
private static final ApplicationCommunicationTags applicationCommunicationTags = new ApplicationCommunicationTags();

@Autowired
Expand All @@ -53,6 +56,7 @@ static void beforeAll() throws Exception {
kafkaTestHelper = new KafkaTestHelper(sharedKafkaTestResource,
applicationCommunicationChannels,
applicationCommunicationWebhooks,
applicationCommunicationMetadata,
applicationCommunicationTags
);
kafkaTestHelper.beforeAll();
Expand All @@ -65,13 +69,10 @@ static void afterAll() throws Exception {

private static boolean testDataInitialized = false;

static final String facebookToken = "token";
static final Channel connectedChannel = Channel.newBuilder()
.setConnectionState(ChannelConnectionState.CONNECTED)
.setId(UUID.randomUUID().toString())
.setName("connected channel name")
.setSource("facebook")
.setToken(facebookToken)
.setSourceChannelId("source-channel-id")
.build();

Expand All @@ -98,11 +99,10 @@ void canListChannels() throws Exception {
Channel.newBuilder()
.setConnectionState(ChannelConnectionState.DISCONNECTED)
.setId(disconnectedChannel)
.setName("channel-name-2")
.setSource("facebook")
.setSourceChannelId("ps-id-2")
.build()))
);
.build())
));

retryOnException(() -> webTestHelper.post("/channels.list", "{}", "user-id")
.andExpect(status().isOk())
Expand All @@ -111,4 +111,28 @@ void canListChannels() throws Exception {
"/channels.list did not return the right number of channels");
}

@Test
void canUpdateChannel() throws Exception {
final String expectedChannelName = "channel name";

retryOnException(() -> webTestHelper.post("/channels.info", String.format("{\"channel_id\":\"%s\"}", connectedChannel.getId()), "user-id")
.andExpect(status().isOk())
.andExpect(jsonPath("$.id", equalTo(connectedChannel.getId())))
.andExpect(jsonPath("$.metadata.name", not(equalTo(expectedChannelName)))),
"/channels.info did not return the right channel");

webTestHelper.post("/channels.update", String.format("{\"channel_id\":\"%s\",\"name\":\"%s\"}",
connectedChannel.getId(), expectedChannelName), "user-id")
.andExpect(status().isOk())
.andExpect(jsonPath("$.id", equalTo(connectedChannel.getId())))
.andExpect(jsonPath("$.metadata.name", not(equalTo(connectedChannel.getId()))))
.andExpect(jsonPath("$.source", equalTo(connectedChannel.getSource())));

retryOnException(() -> webTestHelper.post("/channels.info", String.format("{\"channel_id\":\"%s\"}", connectedChannel.getId()), "user-id")
.andExpect(status().isOk())
.andExpect(jsonPath("$.id", equalTo(connectedChannel.getId())))
.andExpect(jsonPath("$.metadata.name", equalTo(expectedChannelName))),
"/channels.update did not update");
}

}
Loading

0 comments on commit 1dd80c7

Please sign in to comment.