Skip to content

Commit

Permalink
[#526] Introduce namespacing for topics (#566)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucapette authored Dec 29, 2020
1 parent 1f3baf3 commit b892781
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 108 deletions.
1 change: 0 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ jobs:
- name: Lint
run: |
./scripts/lint.sh
yarn eslint
- name: Test
run: |
Expand Down
8 changes: 6 additions & 2 deletions docs/docs/guides/airy-core-in-production.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ Schema registry servers. We recommend using at least five Kafka brokers and set
the replication factor of _all_ topics to `3`. So that even if two brokers
would become unavailable at the same time, the system would still function.

We also recommend running at least three Zookeepers and two instances of the Confluent Schema registry.
We also recommend running at least three Zookeepers and two instances of the
Confluent Schema registry.

If you decide to run the Kafka cluster on Kubernetes, we recommend running Kafka
and Zookeeper as StatefulSet workloads, for persistent storage. The Confluent
Expand All @@ -51,7 +52,8 @@ The Kafka cluster is usually started in the following order:
parameter `kafkastore.bootstrap.server` of the configuration file
`/etc/schema-registry/schema-registry.properties`.

The location of the configuration files can vary depending on the particular installation.
The location of the configuration files can vary depending on the particular
installation.

Once the Kafka cluster is up and running, the required topics must be created.
You can find them in the Bash script
Expand All @@ -61,6 +63,8 @@ following environment variables to run:
- `ZOOKEEPER` (default: airy-cp-zookeeper:2181)
- `PARTITIONS` (default: 10)
- `REPLICAS` (default: 1)
- `AIRY_CORE_NAMESPACE` (default: ''). Helpful to namespace your topics in case
you are installing the Airy Core Platform in an existing Kafka cluster

We do not recommend running Kafka on docker for production environments.
However, we provide a way to deploy the whole Kafka cluster on top of Kubernetes
Expand Down
17 changes: 6 additions & 11 deletions docs/docs/overview/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ the Airy Core Platform.
## Topic naming conventions

Inspired by [this
article](https://medium.com/@criccomini/how-to-paint-a-bike-shed-kafka-topic-naming-conventions-1b7259790073),
article](https://riccomini.name/how-paint-bike-shed-kafka-topic-naming-conventions),
our naming conventions follow these rules:

- A topic has a three-part name: `<kind>.<domain>.<dataset>`
Expand All @@ -21,19 +21,14 @@ Each part defines a more granular scope:
- `kind` is the type of data the topic contains at the highest level possible.
Valid examples are: `etl`, `logging`, `tracking`.
- `domain` is what you would call a database name in a traditional
rdms.
- `dataset` is what you would call a database table in a traditional rdms.
RDMS.
- `dataset` is what you would call a database table in a traditional RDMS.

Given these rules, here are a few examples:

```
tracking.user.clicks
tracking.page.views
etl.billing.invalid-cc-cards
etl.billing.frauds
application.entity.organizations
application.communication.conversations
application.communication.messages
application.communication.metadata
ops.application.health-checks
```
42 changes: 0 additions & 42 deletions infrastructure/README.md
Original file line number Diff line number Diff line change
@@ -1,43 +1 @@
# Infrastructure

- [Infrastructure](#infrastructure)
- [Components](#components)
- [Networking](#networking)

## Components

The Airy Core Platform components require the following systems to run:

- A [Kafka](https://kafka.apache.org) cluster (which in turn requires [zookeeper](https://zookeeper.apache.org))
- The [Confluent Schema registry](https://github.com/confluentinc/schema-registry)
- A [PostgreSQL](https://www.postgresql.org/) database

We also provide Kubernetes manifests for the Airy Core Platform applications,
they are located in [this folder](/infrastructure/deployments). If you need to
customize any aspect of the deployment process, the `../scripts/bootstrap.sh`
and `provisioning.sh` scripts are good entry-points.

## Networking

As the Airy Core Platform runs in Kubernetes, using the [K3OS](https://k3os.io/) distribution.
All of the necessary services exposed with [Traefik](https://traefik.io/) ingress controller.
Through that ingress controller, the internal services are exposed and can be
accessed from outside of the Vagrant box.

Since k3os kubernetes clusters are usually not exposed to the public internet,
we included an ngrok client to facilitate the integration of sources (via webhooks).

For the Airy Core Platform to be accessible from the outside (for
example from Facebook, in order to send events to the Facebook webhook), the
system must have public access. To facilitate the process, we included a
[ngrok](https://ngrok.com/) client deployment inside the cluster. The ngrok
client connects to our hosted ngrok server at `tunnel.airy.co`, creates a unique
public endpoint (ex. https://some-random-string.tunnel.airy.co/facebook) and redirects
the traffic to the local Facebook webhook pod. When starting, the Airy Core
Platform prints the public URL for the Facebook webhook. You can also check it
by running the `/vagrant/scripts/status.sh` script from inside the Airy Core
Platform box or directly:

```sh
vagrant ssh -c /vagrant/scripts/status.sh
```
36 changes: 25 additions & 11 deletions infrastructure/scripts/provision/create-topics.sh
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
#!/bin/bash

##########################################################################
# THIS FILE WAS GENERATED. DO NOT EDIT. See /infrastructure/tools/topics #
##########################################################################

set -euo pipefail
IFS=$'\n\t'

ZOOKEEPER=airy-cp-zookeeper:2181
PARTITIONS=${PARTITIONS:-10}
REPLICAS=${REPLICAS:-1}
AIRY_CORE_NAMESPACE=${AIRY_CORE_NAMESPACE:-}

echo "Creating Kafka topics"

if [ -n "${AIRY_CORE_NAMESPACE}" ]
then
AIRY_CORE_NAMESPACE="${AIRY_CORE_NAMESPACE}."
echo "Using ${AIRY_CORE_NAMESPACE} to namespace topics"
fi


echo "Creating Kafka topics..."
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.channels" 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.channels 1>/dev/null
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.messages" --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.messages --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.metadata" --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.metadata --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.read-receipt" --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.read-receipt 1>/dev/null
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.tags" --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.tags --config cleanup.policy=compact min.compaction.lag.ms=86400000 segment.bytes=10485760 1>/dev/null
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}application.communication.webhooks" 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic application.communication.webhooks 1>/dev/null
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}ops.application.health" --config retention.ms=3600000 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic ops.application.health --config retention.ms=3600000 1>/dev/null
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}source.facebook.events" 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic source.facebook.events 1>/dev/null
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}source.google.events" 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic source.google.events 1>/dev/null
kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic "${AIRY_CORE_NAMESPACE}source.twilio.events" 1>/dev/null

kafka-topics --create --if-not-exists --zookeeper $ZOOKEEPER --replication-factor $REPLICAS --partitions $PARTITIONS --topic source.twilio.events 1>/dev/null
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package co.airy.tools.topics;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
Expand All @@ -16,15 +12,38 @@
public class Application {

public static void main(String[] args) {
String createTopicTemplate = "kafka-topics --create --if-not-exists --zookeeper ${ZOOKEEPER} --replication-factor ${REPLICAS} --partitions ${PARTITIONS} --topic \"${AIRY_CORE_NAMESPACE}%s\" %s 1>/dev/null";
String headerTemplate = "#!/bin/bash\n" +
"\n" +
"##########################################################################\n" +
"# THIS FILE WAS GENERATED. DO NOT EDIT. See /infrastructure/tools/topics #\n" +
"##########################################################################\n" +
"\n" +
"set -euo pipefail\n" +
"IFS=$'\\n\\t'\n" +
"\n" +
"ZOOKEEPER=airy-cp-zookeeper:2181\n" +
"PARTITIONS=${PARTITIONS:-10}\n" +
"REPLICAS=${REPLICAS:-1}\n" +
"AIRY_CORE_NAMESPACE=${AIRY_CORE_NAMESPACE:-}\n" +
"\n" +
"echo \"Creating Kafka topics\"\n" +
"\n" +
"if [ -n \"${AIRY_CORE_NAMESPACE}\" ]\n" +
"then\n" +
" AIRY_CORE_NAMESPACE=\"${AIRY_CORE_NAMESPACE}.\"\n" +
" echo \"Using ${AIRY_CORE_NAMESPACE} to namespace topics\"\n" +
"fi";

TopicsFinder finder = new TopicsFinder();

String template = getTemplate();

List<String> topics = finder.findTopics();

Method name;
Method config;

System.out.println(headerTemplate + "\n\n");
try {
for (String result : topics) {
Class<?> topicClass = Class.forName(result);
Expand All @@ -51,31 +70,10 @@ public static void main(String[] args) {
topicConfigFormatted = "--config " + topicConfigFormatted;
}

System.out.println(String.format(template, topicName, topicConfigFormatted));
System.out.println(String.format(createTopicTemplate, topicName, topicConfigFormatted) + "\n");
}
} catch (Exception e) {
e.printStackTrace();
}
}

private static String getTemplate() {
String line = "";
StringBuilder buffer = new StringBuilder();

try {
try (InputStream is = (Application.class.getResourceAsStream("/topic.sh.tpl"))) {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
while ((line = reader.readLine()) != null) {
buffer.append(line).append("\n");
}

reader.close();
}
} catch (IOException e) {
e.printStackTrace();
}

return buffer.toString();
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,15 @@

public abstract class AbstractTopic implements Topic {
@Override
public String name() {
return testNamespace() + String.format("%s.%s.%s", kind(), domain(), dataset());
}
public String name() { return namespace() + String.format("%s.%s.%s", kind(), domain(), dataset()); }

@Override
public Map<String, String> config() {
return Map.of();
}

private String testNamespace() {
String testTarget = System.getenv("TEST_TARGET");
if (testTarget == null || "".equals(testTarget)) {
return "";
}

return testTarget.split(":")[1]; // take the name of the test as a namespace
private String namespace() {
String namespace = System.getenv("AIRY_CORE_NAMESPACE");
return namespace == null ? "" : namespace + ".";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public String dataset() {

@Override
public Map<String, String> config() {
return Map.of();
return Map.of("cleanup.policy", "compact", "segment.bytes", "10485760", "min.compaction.lag.ms", "86400000");
}

}
2 changes: 0 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
"react-window-infinite-loader": "1.0.5",
"reselect": "4.0.0"
},


"devDependencies": {
"@babel/core": "7.8.4",
"@babel/preset-env": "^7.8.4",
Expand Down
6 changes: 6 additions & 0 deletions scripts/lint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
set -eo pipefail
IFS=$'\n\t'

echo "Check create-topics.sh is in sync"
cmp <(bazel run //infrastructure/tools/topics:app) infrastructure/scripts/provision/create-topics.sh
echo
echo "Running Bazel lint"
bazel run @com_github_airyhq_bazel_tools//code-format:check_buildifier
echo
echo "Running Prettier and Java tests"
bazel test --test_tag_filters=lint //...
echo
echo "Running eslint"
yarn eslint

0 comments on commit b892781

Please sign in to comment.