Skip to content

Commit

Permalink
Merge branch 'release/0.21.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Pröschel committed May 11, 2021
2 parents 68e8f05 + 5e5a376 commit b0afa64
Show file tree
Hide file tree
Showing 96 changed files with 1,915 additions and 1,555 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.20.0
0.21.0
41 changes: 20 additions & 21 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository")
# Airy Bazel tools
git_repository(
name = "com_github_airyhq_bazel_tools",
commit = "d18b5a4418a8b69c0a7177f2831f8aa62da038c1",
commit = "bbfbc0844c30b52e146690412030cfe9c6b475e3",
remote = "https://github.com/airyhq/bazel-tools.git",
shallow_since = "1618558833 +0200",
shallow_since = "1620236403 +0200",
)

load("@com_github_airyhq_bazel_tools//:repositories.bzl", "airy_bazel_tools_dependencies", "airy_jvm_deps")
Expand All @@ -35,10 +35,10 @@ maven_install(
"com.jayway.jsonpath:json-path:2.4.0",
"com.dinstone:beanstalkc:2.3.0",
"com.twilio.sdk:twilio:7.51.0",
"io.confluent:kafka-avro-serializer:5.5.1",
"io.confluent:kafka-schema-registry-client:5.5.1",
"io.confluent:kafka-schema-registry:5.5.1",
"io.confluent:kafka-streams-avro-serde:5.5.1",
"io.confluent:kafka-avro-serializer:6.1.1",
"io.confluent:kafka-schema-registry-client:6.1.1",
"io.confluent:kafka-schema-registry:6.1.1",
"io.confluent:kafka-streams-avro-serde:6.1.1",
"io.jsonwebtoken:jjwt-api:0.10.5",
"io.jsonwebtoken:jjwt-impl:0.10.5",
"io.jsonwebtoken:jjwt-jackson:0.10.5",
Expand All @@ -52,12 +52,12 @@ maven_install(
"org.apache.avro:avro-tools:1.10.0",
"org.apache.avro:avro:1.10.0",
"org.apache.curator:curator-test:4.2.0",
"org.apache.kafka:connect-api:2.5.1",
"org.apache.kafka:connect-transforms:2.5.1",
"org.apache.kafka:kafka-clients:2.5.1",
"org.apache.kafka:kafka-clients:jar:test:2.5.1",
"org.apache.kafka:kafka-streams:2.5.1",
"org.apache.kafka:kafka_2.12:2.5.1",
"org.apache.kafka:connect-api:2.7.0",
"org.apache.kafka:connect-transforms:2.7.0",
"org.apache.kafka:kafka-clients:2.7.0",
"org.apache.kafka:kafka-clients:jar:test:2.7.0",
"org.apache.kafka:kafka-streams:2.7.0",
"org.apache.kafka:kafka_2.13:2.7.0",
"org.apache.lucene:lucene-queryparser:8.7.0",
"org.apache.lucene:lucene-analyzers-common:8.7.0",
"org.apache.lucene:lucene-core:8.7.0",
Expand Down Expand Up @@ -149,9 +149,9 @@ protobuf_deps()

http_archive(
name = "io_bazel_rules_docker",
sha256 = "4521794f0fba2e20f3bf15846ab5e01d5332e587e9ce81629c7f96c793bb7036",
strip_prefix = "rules_docker-0.14.4",
urls = ["https://github.com/bazelbuild/rules_docker/releases/download/v0.14.4/rules_docker-v0.14.4.tar.gz"],
sha256 = "59d5b42ac315e7eadffa944e86e90c2990110a1c8075f1cd145f487e999d22b3",
strip_prefix = "rules_docker-0.17.0",
urls = ["https://github.com/bazelbuild/rules_docker/releases/download/v0.17.0/rules_docker-v0.17.0.tar.gz"],
)

load(
Expand All @@ -165,10 +165,6 @@ load("@io_bazel_rules_docker//repositories:deps.bzl", container_deps = "deps")

container_deps()

load("@io_bazel_rules_docker//repositories:pip_repositories.bzl", "pip_deps")

pip_deps()

load(
"@io_bazel_rules_docker//container:container.bzl",
"container_pull",
Expand Down Expand Up @@ -237,8 +233,11 @@ go_repositories()

http_archive(
name = "rules_pkg",
sha256 = "352c090cc3d3f9a6b4e676cf42a6047c16824959b438895a76c2989c6d7c246a",
url = "https://github.com/bazelbuild/rules_pkg/releases/download/0.2.5/rules_pkg-0.2.5.tar.gz",
sha256 = "038f1caa773a7e35b3663865ffb003169c6a71dc995e39bf4815792f385d837d",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/rules_pkg/releases/download/0.4.0/rules_pkg-0.4.0.tar.gz",
"https://github.com/bazelbuild/rules_pkg/releases/download/0.4.0/rules_pkg-0.4.0.tar.gz",
],
)

load("@rules_pkg//:deps.bzl", "rules_pkg_dependencies")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = AirySpringBootApplication.class)
@TestPropertySource(value = "classpath:test.properties", properties = {
"allowedOrigins=origin1,origin2",
})
@TestPropertySource(value = "classpath:test.properties")
@AutoConfigureMockMvc
@ExtendWith(SpringExtension.class)
public class WebhooksControllerTest {
Expand Down
3 changes: 3 additions & 0 deletions backend/avro/http-log.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
"fields": [
{"name": "uri", "type": "string"},
{"name": "body", "type": ["null", "string"], "default": null},
{"name": "userId", "type": ["null", "string"], "default": null},
{"name": "userName", "type": ["null", "string"], "default": null},
{"name": "userAvatar", "type": ["null", "string"], "default": null},
{
"name": "headers",
"type": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static void afterAll() throws Exception {

@BeforeEach
void beforeEach() throws Exception {
MockitoAnnotations.initMocks(this);
MockitoAnnotations.openMocks(this);
mediaUpload = new MediaUpload(amazonS3, bucket, path);
retryOnException(() -> assertEquals(stores.getStreamState(), RUNNING), "Failed to reach RUNNING state.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static void afterAll() throws Exception {

@BeforeEach
void beforeEach() throws Exception {
MockitoAnnotations.initMocks(this);
MockitoAnnotations.openMocks(this);
mediaUpload = new MediaUpload(amazonS3, bucket, path);
retryOnException(() -> assertEquals(stores.getStreamState(), RUNNING), "Failed to reach RUNNING state.");
}
Expand Down
2 changes: 1 addition & 1 deletion backend/sources/chat-plugin/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ app_deps = [
"//backend/model/message",
"//lib/java/uuid",
"//lib/java/date",
"//lib/java/spring/web:spring-web",
# "//lib/java/spring/web:spring-web",
"//lib/java/spring/kafka/core:spring-kafka-core",
"//lib/java/spring/kafka/streams:spring-kafka-streams",
"@maven//:javax_xml_bind_jaxb_api",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import co.airy.core.chat_plugin.config.Jwt;
import co.airy.core.chat_plugin.payload.AuthenticationRequestPayload;
import co.airy.core.chat_plugin.payload.AuthenticationResponsePayload;
import co.airy.core.chat_plugin.payload.RequestErrorResponsePayload;
import co.airy.core.chat_plugin.payload.ResumeTokenRequestPayload;
import co.airy.core.chat_plugin.payload.ResumeTokenResponsePayload;
import co.airy.core.chat_plugin.payload.SendMessageRequestPayload;
import co.airy.model.message.dto.MessageResponsePayload;
import co.airy.spring.web.payload.RequestErrorResponsePayload;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -30,7 +30,7 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static co.airy.spring.web.Headers.getAuthToken;
import static co.airy.core.chat_plugin.Headers.getAuthToken;
import static org.springframework.http.HttpStatus.NOT_FOUND;

@RestController
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package co.airy.spring.web;
package co.airy.core.chat_plugin;

import org.springframework.http.HttpHeaders;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.web.cors.CorsConfigurationSource;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;

import java.util.Arrays;
import java.util.List;

@Configuration
Expand Down Expand Up @@ -49,7 +50,7 @@ CorsConfigurationSource corsConfigurationSource(final Environment environment) {
final String allowed = environment.getProperty("allowedOrigins", "");
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedOriginPattern(allowed);
config.setAllowedOriginPatterns(Arrays.asList(allowed.split(",")));
config.addAllowedHeader("*");
config.setAllowedMethods(List.of("GET", "POST"));
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.io.IOException;
import java.util.List;

import static co.airy.spring.web.Headers.getAuthToken;
import static co.airy.core.chat_plugin.Headers.getAuthToken;

public class JwtAuthenticationFilter extends BasicAuthenticationFilter {
private final Jwt jwt;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package co.airy.core.chat_plugin.payload;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

@Data
@AllArgsConstructor
public class RequestErrorResponsePayload implements Serializable {
private String message;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static co.airy.model.message.MessageRepository.updateDeliveryState;
import static co.airy.model.metadata.MetadataKeys.ConversationKeys;
Expand Down Expand Up @@ -72,7 +73,7 @@ public boolean needsMetadataFetched(Conversation conversation) {
}

public List<KeyValue<String, Metadata>> fetchMetadata(String conversationId, Conversation conversation) {
final UserProfile profile = getProfile(conversation);
final UserProfile profile = Optional.ofNullable(getProfile(conversation)).orElse(new UserProfile());

final List<KeyValue<String, Metadata>> recordList = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ static void afterAll() throws Exception {

@BeforeEach
void beforeEach() {
MockitoAnnotations.initMocks(this);
MockitoAnnotations.openMocks(this);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static void afterAll() throws Exception {

@BeforeEach
void beforeEach() throws InterruptedException {
MockitoAnnotations.initMocks(this);
MockitoAnnotations.openMocks(this);
retryOnException(() -> assertEquals(stores.getStreamState(), RUNNING), "Failed to reach RUNNING state.");
}

Expand Down Expand Up @@ -141,7 +141,7 @@ void canSendMessageViaTheFacebookApi() throws Exception {
.setIsFromContact(false)
.build())
);

retryOnException(() -> {
final SendMessagePayload sendMessagePayload = payloadCaptor.getValue();
assertThat(sendMessagePayload.getRecipient().getId(), equalTo(sourceConversationId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ static void afterAll() throws Exception {

@BeforeEach
void beforeEach() throws InterruptedException {
MockitoAnnotations.initMocks(this);
MockitoAnnotations.openMocks(this);

retryOnException(() -> assertEquals(stores.getStreamState(), RUNNING), "Failed to reach RUNNING state.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static void afterAll() throws Exception {

@BeforeEach
void beforeEach() throws InterruptedException {
MockitoAnnotations.initMocks(this);
MockitoAnnotations.openMocks(this);
retryOnException(() -> assertEquals(stores.getStreamState(), RUNNING), "Failed to reach RUNNING state.");
}

Expand Down
3 changes: 2 additions & 1 deletion backend/webhook/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func main() {
Brokers: os.Getenv("KAFKA_BROKERS"),
SchemaRegistryURL: os.Getenv("KAFKA_SCHEMA_REGISTRY_URL"),
Group: "WebhookConsumer",
Topics: "application.communication.webhooks",
Topic: "application.communication.webhooks",
Partitions: 10,
}

wg.Add(1)
Expand Down
12 changes: 9 additions & 3 deletions backend/webhook/consumer/pkg/worker/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ type Consumer struct {
ready chan bool
webhookConfigStream chan string
schemaRegistryClient *srclient.SchemaRegistryClient
kafkaConsumerConfig KafkaConsumerConfig
}

type KafkaConsumerConfig struct {
Brokers, SchemaRegistryURL, Topics, Group string
Brokers, SchemaRegistryURL, Topic, Group string
Partitions int
}

func StartKafkaConsumer(
Expand All @@ -38,6 +40,7 @@ func StartKafkaConsumer(
ready: make(chan bool),
webhookConfigStream: webhookConfigStream,
schemaRegistryClient: schemaRegistryClient,
kafkaConsumerConfig: kafkaConsumerConfig,
}

client, err := sarama.NewConsumerGroup(strings.Split(kafkaConsumerConfig.Brokers, ","), kafkaConsumerConfig.Group, config)
Expand All @@ -47,7 +50,7 @@ func StartKafkaConsumer(

go func() {
for {
if err := client.Consume(ctx, strings.Split(kafkaConsumerConfig.Topics, ","), &consumer); err != nil {
if err := client.Consume(ctx, strings.Split(kafkaConsumerConfig.Topic, ","), &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
Expand All @@ -67,7 +70,10 @@ func StartKafkaConsumer(
}
}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {
for p := 0; p < consumer.kafkaConsumerConfig.Partitions; p++ {
session.ResetOffset(consumer.kafkaConsumerConfig.Topic, int32(p), 0, "")
}
close(consumer.ready)
return nil
}
Expand Down
63 changes: 35 additions & 28 deletions backend/webhook/consumer/pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,44 +79,51 @@ func (w *Worker) Run(ctx context.Context, wg *sync.WaitGroup) {
}
continue
}
w.processJob(ctx, id, event)
}
}
}

for {
select {
case <-ctx.Done():
log.Println("terminating worker: context cancelled")
return
default:
err = w.HandleEvent(event)
if err != nil {
w.logError(err)
time.Sleep(w.backoff.Duration())
continue
}
err = w.beanstalk.Delete(id)
if err != nil {
w.logError(err)
}
w.backoff.Reset()
break
}
func (w *Worker) processJob(ctx context.Context, jobId uint64, event []byte) {
for {
select {
case <-ctx.Done():
log.Println("terminating worker: context cancelled")
return
default:
err := w.HandleEvent(event)
if err != nil {
w.logError(err)
time.Sleep(w.backoff.Duration())
continue
}

err = w.beanstalk.Delete(jobId)
if err != nil {
w.logError(err)
}
w.backoff.Reset()
return
}
}
}

func (w *Worker) updateWebhookConfig(ctx context.Context, wg *sync.WaitGroup, webhookConfigStream chan string) {
defer wg.Done()
log.Println("Started updateWebhookConfig routine")
select {
case <-ctx.Done():
log.Println("terminating updateWebhookConfig: context cancelled")
case config := <-webhookConfigStream:
var webhookConfig = webhookConfig{}
if err := json.Unmarshal([]byte(config), &webhookConfig); err != nil {
log.Fatal(err)
for {
select {
case <-ctx.Done():
log.Println("terminating updateWebhookConfig: context cancelled")
return
case config := <-webhookConfigStream:
var webhookConfig = webhookConfig{}
if err := json.Unmarshal([]byte(config), &webhookConfig); err != nil {
log.Fatal(err)
}
w.endpoint = webhookConfig.Endpoint
w.customHeader = webhookConfig.Headers["map"]
}
w.endpoint = webhookConfig.Endpoint
w.customHeader = webhookConfig.Headers["map"]
}
}

Expand Down
Loading

0 comments on commit b0afa64

Please sign in to comment.