From b8927814c17e8d928b8cfc12de057d9606ce22f2 Mon Sep 17 00:00:00 2001 From: lucapette Date: Tue, 29 Dec 2020 12:12:31 +0100 Subject: [PATCH] [#526] Introduce namespacing for topics (#566) --- .github/workflows/main.yml | 1 - docs/docs/guides/airy-core-in-production.md | 8 ++- docs/docs/overview/kafka.md | 17 +++--- infrastructure/README.md | 42 --------------- .../scripts/provision/create-topics.sh | 36 +++++++++---- .../co/airy/tools/topics/Application.java | 52 +++++++++---------- .../topics/src/main/resources/topic.sh.tpl | 1 - .../co/airy/kafka/schema/AbstractTopic.java | 14 ++--- .../ApplicationCommunicationReadReceipts.java | 3 +- package.json | 2 - scripts/lint.sh | 6 +++ 11 files changed, 74 insertions(+), 108 deletions(-) delete mode 100644 infrastructure/tools/topics/src/main/resources/topic.sh.tpl diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9edb288fc3..b7c87c2b3a 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -39,7 +39,6 @@ jobs: - name: Lint run: | ./scripts/lint.sh - yarn eslint - name: Test run: | diff --git a/docs/docs/guides/airy-core-in-production.md b/docs/docs/guides/airy-core-in-production.md index a51a1a4461..283e556df0 100644 --- a/docs/docs/guides/airy-core-in-production.md +++ b/docs/docs/guides/airy-core-in-production.md @@ -29,7 +29,8 @@ Schema registry servers. We recommend using at least five Kafka brokers and set the replication factor of _all_ topics to `3`. So that even if two brokers would become unavailable at the same time, the system would still function. -We also recommend running at least three Zookeepers and two instances of the Confluent Schema registry. +We also recommend running at least three Zookeepers and two instances of the +Confluent Schema registry. If you decide to run the Kafka cluster on Kubernetes, we recommend running Kafka and Zookeeper as StatefulSet workloads, for persistent storage. The Confluent @@ -51,7 +52,8 @@ The Kafka cluster is usually started in the following order: parameter `kafkastore.bootstrap.server` of the configuration file `/etc/schema-registry/schema-registry.properties`. -The location of the configuration files can vary depending on the particular installation. +The location of the configuration files can vary depending on the particular +installation. Once the Kafka cluster is up and running, the required topics must be created. You can find them in the Bash script @@ -61,6 +63,8 @@ following environment variables to run: - `ZOOKEEPER` (default: airy-cp-zookeeper:2181) - `PARTITIONS` (default: 10) - `REPLICAS` (default: 1) +- `AIRY_CORE_NAMESPACE` (default: ''). Helpful to namespace your topics in case + you are installing the Airy Core Platform in an existing Kafka cluster We do not recommend running Kafka on docker for production environments. However, we provide a way to deploy the whole Kafka cluster on top of Kubernetes diff --git a/docs/docs/overview/kafka.md b/docs/docs/overview/kafka.md index f2fa2661d6..e81b9fd1a6 100644 --- a/docs/docs/overview/kafka.md +++ b/docs/docs/overview/kafka.md @@ -9,7 +9,7 @@ the Airy Core Platform. ## Topic naming conventions Inspired by [this -article](https://medium.com/@criccomini/how-to-paint-a-bike-shed-kafka-topic-naming-conventions-1b7259790073), +article](https://riccomini.name/how-paint-bike-shed-kafka-topic-naming-conventions), our naming conventions follow these rules: - A topic has a three-part name: `..` @@ -21,19 +21,14 @@ Each part defines a more granular scope: - `kind` is the type of data the topic contains at the highest level possible. Valid examples are: `etl`, `logging`, `tracking`. - `domain` is what you would call a database name in a traditional - rdms. -- `dataset` is what you would call a database table in a traditional rdms. + RDMS. +- `dataset` is what you would call a database table in a traditional RDMS. Given these rules, here are a few examples: ``` -tracking.user.clicks -tracking.page.views - -etl.billing.invalid-cc-cards -etl.billing.frauds - -application.entity.organizations -application.communication.conversations application.communication.messages +application.communication.metadata + +ops.application.health-checks ``` diff --git a/infrastructure/README.md b/infrastructure/README.md index 772cc2aa80..2b4e5f6a6e 100644 --- a/infrastructure/README.md +++ b/infrastructure/README.md @@ -1,43 +1 @@ # Infrastructure - -- [Infrastructure](#infrastructure) - - [Components](#components) - - [Networking](#networking) - -## Components - -The Airy Core Platform components require the following systems to run: - -- A [Kafka](https://kafka.apache.org) cluster (which in turn requires [zookeeper](https://zookeeper.apache.org)) -- The [Confluent Schema registry](https://github.com/confluentinc/schema-registry) -- A [PostgreSQL](https://www.postgresql.org/) database - -We also provide Kubernetes manifests for the Airy Core Platform applications, -they are located in [this folder](/infrastructure/deployments). If you need to -customize any aspect of the deployment process, the `../scripts/bootstrap.sh` -and `provisioning.sh` scripts are good entry-points. - -## Networking - -As the Airy Core Platform runs in Kubernetes, using the [K3OS](https://k3os.io/) distribution. -All of the necessary services exposed with [Traefik](https://traefik.io/) ingress controller. -Through that ingress controller, the internal services are exposed and can be -accessed from outside of the Vagrant box. - -Since k3os kubernetes clusters are usually not exposed to the public internet, -we included an ngrok client to facilitate the integration of sources (via webhooks). - -For the Airy Core Platform to be accessible from the outside (for -example from Facebook, in order to send events to the Facebook webhook), the -system must have public access. To facilitate the process, we included a -[ngrok](https://ngrok.com/) client deployment inside the cluster. The ngrok -client connects to our hosted ngrok server at `tunnel.airy.co`, creates a unique -public endpoint (ex. https://some-random-string.tunnel.airy.co/facebook) and redirects -the traffic to the local Facebook webhook pod. When starting, the Airy Core -Platform prints the public URL for the Facebook webhook. You can also check it -by running the `/vagrant/scripts/status.sh` script from inside the Airy Core -Platform box or directly: - -```sh -vagrant ssh -c /vagrant/scripts/status.sh -``` diff --git a/infrastructure/scripts/provision/create-topics.sh b/infrastructure/scripts/provision/create-topics.sh index 7c9dd395f4..dc8a925d8d 100755 --- a/infrastructure/scripts/provision/create-topics.sh +++ b/infrastructure/scripts/provision/create-topics.sh @@ -1,29 +1,43 @@ #!/bin/bash + +########################################################################## +# THIS FILE WAS GENERATED. DO NOT EDIT. See /infrastructure/tools/topics # +########################################################################## + set -euo pipefail IFS=$'\n\t' ZOOKEEPER=airy-cp-zookeeper:2181 PARTITIONS=${PARTITIONS:-10} REPLICAS=${REPLICAS:-1} +AIRY_CORE_NAMESPACE=${AIRY_CORE_NAMESPACE:-} + +echo "Creating Kafka topics" + +if [ -n "${AIRY_CORE_NAMESPACE}" ] +then + AIRY_CORE_NAMESPACE="${AIRY_CORE_NAMESPACE}." + echo "Using ${AIRY_CORE_NAMESPACE} to namespace topics" +fi + -echo "Creating Kafka topics..." +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.channels" 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.channels 1>/dev/null +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.messages" --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.messages --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.metadata" --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.metadata --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.read-receipt" --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.read-receipt 1>/dev/null +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.tags" --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.tags --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.webhooks" 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.webhooks 1>/dev/null +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}ops.application.health" --config retention.ms=3600000 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic ops.application.health --config retention.ms=3600000 1>/dev/null +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}source.facebook.events" 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic source.facebook.events 1>/dev/null +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}source.google.events" 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic source.google.events 1>/dev/null +kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}source.twilio.events" 1>/dev/null -kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic source.twilio.events 1>/dev/null diff --git a/infrastructure/tools/topics/src/main/java/co/airy/tools/topics/Application.java b/infrastructure/tools/topics/src/main/java/co/airy/tools/topics/Application.java index 36ee904da4..058e09d3fe 100644 --- a/infrastructure/tools/topics/src/main/java/co/airy/tools/topics/Application.java +++ b/infrastructure/tools/topics/src/main/java/co/airy/tools/topics/Application.java @@ -1,9 +1,5 @@ package co.airy.tools.topics; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.ArrayList; @@ -16,15 +12,38 @@ public class Application { public static void main(String[] args) { + String createTopicTemplate = "kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic \"${AIRY_CORE_NAMESPACE}%s\" %s 1>/dev/null"; + String headerTemplate = "#!/bin/bash\n" + + "\n" + + "##########################################################################\n" + + "# THIS FILE WAS GENERATED. DO NOT EDIT. See /infrastructure/tools/topics #\n" + + "##########################################################################\n" + + "\n" + + "set -euo pipefail\n" + + "IFS=$'\\n\\t'\n" + + "\n" + + "ZOOKEEPER=airy-cp-zookeeper:2181\n" + + "PARTITIONS=${PARTITIONS:-10}\n" + + "REPLICAS=${REPLICAS:-1}\n" + + "AIRY_CORE_NAMESPACE=${AIRY_CORE_NAMESPACE:-}\n" + + "\n" + + "echo \"Creating Kafka topics\"\n" + + "\n" + + "if [ -n \"${AIRY_CORE_NAMESPACE}\" ]\n" + + "then\n" + + " AIRY_CORE_NAMESPACE=\"${AIRY_CORE_NAMESPACE}.\"\n" + + " echo \"Using ${AIRY_CORE_NAMESPACE} to namespace topics\"\n" + + "fi"; + TopicsFinder finder = new TopicsFinder(); - String template = getTemplate(); List topics = finder.findTopics(); Method name; Method config; + System.out.println(headerTemplate + "\n\n"); try { for (String result : topics) { Class topicClass = Class.forName(result); @@ -51,31 +70,10 @@ public static void main(String[] args) { topicConfigFormatted = "--config " + topicConfigFormatted; } - System.out.println(String.format(template, topicName, topicConfigFormatted)); + System.out.println(String.format(createTopicTemplate, topicName, topicConfigFormatted) + "\n"); } } catch (Exception e) { e.printStackTrace(); } } - - private static String getTemplate() { - String line = ""; - StringBuilder buffer = new StringBuilder(); - - try { - try (InputStream is = (Application.class.getResourceAsStream("/topic.sh.tpl"))) { - BufferedReader reader = new BufferedReader(new InputStreamReader(is)); - while ((line = reader.readLine()) != null) { - buffer.append(line).append("\n"); - } - - reader.close(); - } - } catch (IOException e) { - e.printStackTrace(); - } - - return buffer.toString(); - } - } \ No newline at end of file diff --git a/infrastructure/tools/topics/src/main/resources/topic.sh.tpl b/infrastructure/tools/topics/src/main/resources/topic.sh.tpl deleted file mode 100644 index 9a5650c480..0000000000 --- a/infrastructure/tools/topics/src/main/resources/topic.sh.tpl +++ /dev/null @@ -1 +0,0 @@ -kafka-topics --create --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic %s %s diff --git a/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/AbstractTopic.java b/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/AbstractTopic.java index e4958d247f..1477743f9f 100644 --- a/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/AbstractTopic.java +++ b/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/AbstractTopic.java @@ -4,21 +4,15 @@ public abstract class AbstractTopic implements Topic { @Override - public String name() { - return testNamespace() + String.format("%s.%s.%s", kind(), domain(), dataset()); - } + public String name() { return namespace() + String.format("%s.%s.%s", kind(), domain(), dataset()); } @Override public Map config() { return Map.of(); } - private String testNamespace() { - String testTarget = System.getenv("TEST_TARGET"); - if (testTarget == null || "".equals(testTarget)) { - return ""; - } - - return testTarget.split(":")[1]; // take the name of the test as a namespace + private String namespace() { + String namespace = System.getenv("AIRY_CORE_NAMESPACE"); + return namespace == null ? "" : namespace + "."; } } diff --git a/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/application/ApplicationCommunicationReadReceipts.java b/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/application/ApplicationCommunicationReadReceipts.java index 2cd97d50eb..d9f32f1383 100644 --- a/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/application/ApplicationCommunicationReadReceipts.java +++ b/lib/java/kafka/schema/src/main/java/co/airy/kafka/schema/application/ApplicationCommunicationReadReceipts.java @@ -12,6 +12,7 @@ public String dataset() { @Override public Map config() { - return Map.of(); + return Map.of("cleanup.policy", "compact", "segment.bytes", "10485760", "min.compaction.lag.ms", "86400000"); } + } diff --git a/package.json b/package.json index d8945cdc19..9bf313abec 100644 --- a/package.json +++ b/package.json @@ -34,8 +34,6 @@ "react-window-infinite-loader": "1.0.5", "reselect": "4.0.0" }, - - "devDependencies": { "@babel/core": "7.8.4", "@babel/preset-env": "^7.8.4", diff --git a/scripts/lint.sh b/scripts/lint.sh index 63eed4e6c2..d0bdc44cfd 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -2,8 +2,14 @@ set -eo pipefail IFS=$'\n\t' +echo "Check create-topics.sh is in sync" +cmp <(bazel run //infrastructure/tools/topics:app) infrastructure/scripts/provision/create-topics.sh +echo echo "Running Bazel lint" bazel run @com_github_airyhq_bazel_tools//code-format:check_buildifier echo echo "Running Prettier and Java tests" bazel test --test_tag_filters=lint //... +echo +echo "Running eslint" +yarn eslint