Skip to content

D3log Design

Daniel Müller edited this page Dec 4, 2019 · 6 revisions

D3log Design

D3log is an effort to distribute differential-datalog computation among a set of compute nodes. This document acts as the design space exploration document.

A few words on terminology that is used throughout this document:

  • node: A node is a placeholder for an entity participating in the overall system that has some compute capabilities. In this first version all nodes are considered equal. In the future further constraints may be imposed on some of them by the computation, and only members that satisfy those constraints could actually take over this part.
  • member: A member is a concrete entity participating in the system. It is suitably addressable (in this first instance that merely means that it has an IP address that all other members can reach).
  • DKVS (distributed key value store): some off-the-shelve distributed and highly available key value store that can be reached from all members of the system and which is used to "persist" (not necessarily on disk) our configuration and system state.

Architecture

We identified two possible architectures. In each nodes are connected through TCP.

Central Controller

The first architecture has a global control plane that is assumed to be highly available and that is orchestrating changes to the system. The entity is assumed to have the full view of the system. High availability is ensured through replication or similar means.

                               +----------------------+
                               | Global Control Plane |
                               | (highly available)   |
                               +-/------/----\--------+
                           /-----     /-      -\
                    /------         /-          -\
              /-----               /              -\
  +---------------+              /-                 -\
  | Node 1        |             /                     \
  |---------------|           /-                       -\
  | Local Control ---       /-                           -\
  | Plane         |  \---  /                               -\
  |               |      /----                               -\
  +---------------+    /-     \----                            \
                      /            \---                         -\
                    /-                 \----                      -\
                   /                        \---                    -\
                 /-                             \----                 -\
               /-                                    \----      +---------------+
      +---------------+                                   \---  | Node 3        |
      | Node 2        |                                       \-----------------|
      |---------------|                                         | Local Control |
      | Local Control ------------------------------------------| Plane         |
      | Plane         |                                         |               |
      |               |                                         +---------------+
      +---------------+

(https://textik.com/#89fae6998c0d415f)

It is imaginable that the global control plane is implemented using a replicated state machine that runs on actual compute nodes, i.e., that resources are shared between control plane and compute nodes.

Distributed Controller

An alternative approach uses a distributed key-value store (DKVS) as the source of truth pertaining the system state. There would be no global controller in this model. Rather, all nodes subscribe to updates on this store and would react to them locally. A selective subscription mechanism, i.e., one that allows subscription to a certain namespace or set of keys, would reduce the amount of chatter and unnecessary computation happening. E.g., certain nodes may only be interested in changes to certain keys and with a selective subscription approach they would only receive updates for those as opposed to also receiving updates for unrelated changes which they would have to identify as irrelevant.

The main difference to a central controller approach is that there is no central "active" entity taking care of the orchestration. Instead, that logic is split between the individual compute nodes.

Similar to the global control plane approach, the distributed key-value store itself could share resources with the actual computation (i.e., run on the same nodes).

Design

For the sake of simplicity, we decided to go with the distributed controller approach. Not having a global controller perhaps most importantly eliminates cases where the controller is partitioned off from the rest of the system. With a central controller approach, it is not clear how we would be able to keep the computation running in such a case, because the central controller would interpret such a partition as failure of those nodes and recreate the relevant part of the program.

It also eliminates the need to implement a "control channel", as we would really only need a connection to the DKVS. On the other hand, now we have to deal with creating a proper schema for how to represent our state in the DKVS. We probably would need something similar anyway at some point, but for an initial version that presumably could be more ad hoc.

State

This section lays out the state that our system will require and maintain.

Program

The program defines what our computation looks like. It is not clear how we could easily represent the computation we want to perform and put it into our DKVS. At least in part the complication arises from the fact that we have a compilation step (producing Rust source code) involved.

If we were to hand over the vanilla Datalog program (which also does not yet support annotating parts of the computation ready for distribution; something that clearly requires much more thought), we would need to compile that. That's a lot of plumbing that we should be able to punt to a later point in time.

So instead, in the first version we assume that each node has the produced Rust artifact for the program to compute present locally. That obviously limits us in that we can now only compute a single program (easily), but that seems to be a reasonable compromise to cut down on features for the initial version.

With that we gain the ability to directly reference relations (with their numerical ID) and can use that to describe input & output relationships between nodes (i.e., which relation is connected to which other).

Configuration

Given the assumption that the program is present on each node already, everything we need to instantiate the computation is mostly information on how to connect relations on the individual nodes. That is, we need to describe:

  1. How input relations receive their data (those that are not fed with deltas from other output relations).
  2. Which output relations are connected to which input relations.
  3. How data is produced (or fed into the computation) and later consumed (be made available).

All this configuration pertaining the computation is available in our "schema", which defines the format as well as how to serialize and deserialize it. Right now this definition is merely implicit based on the Rust type definitions.

The schema will not be replicated here, but can be looked at directly in code.

Membership

The aforementioned configuration describes the distributed computation, but it does not detail which exact member will instantiate which part.

The information which members participate in the computation is also contained in the DKVS. There will be a dedicated membership service which takes care of updating this data as members join and leave the system (e.g., due to failure or changes to the topology).

This membership information is part of the input to the overall system configuration computation which each node performs whenever a relevant change was detected in the DKVS. As part of this computation an assignment mapping nodes to members is created. This assignment is then the authoritative information which part of the computation is taken over by which member.

Member State

In order to make the system work the individual members need to know how to access the DKVS. In a first version this knowledge (which probably merely includes the IP addresses and ports of the constituent DKVS nodes) is embedded in the program itself. Alternatively, it can be persisted on the node in some configuration file located at a known location.

System Instantiation

We want to avoid having different "phases" as much as we can. Phases typically require some form of coordination between members before a transition can happen. Such coordination is a source of complexity in a distributed system where components can theoretically fail at one point in time.

We assume that the DKVS is set up and configured before any of the configuration of the distributed computation mentioned in this document is happening.

If a member boots up in the system, we further assume that an agent is started along. That can either happen automatically (e.g., as part of the boot process) or through human interaction.

Said agent contains the core d3log logic. Once started, it extracts the information on how to reach the DKVS and opens a connection to it.

From the DKVS, the agent would read the system configuration and current membership. It would then compute which part of the computation would be handled by this member, if anything. Lastly, this configuration would be instantiated.

This instantiation includes the creation of the various entities that ultimately power the computation. How this instantiation is performed can be seen in instantiate.rs.

The agent also subscribes to changes in the DKVS on both the membership and the configuration. When either of those changes it will then re-compute and re-instantiate its part.

Every member participating in the system performs the above logic and the computation of the configuration is fully deterministic and the same for each. The only slight difference is to be found in the input to it, which includes an identifier representing the member itself.

Failures

Note that we generally assume a non-byzantine failure model.

We have identified three types of failures:

  • Process failure:
    • the local control plane monitors all the individual processes spun up as part of program instantiation
    • a process maps directly to a logical compute node in the overall computation
    • if a process dies we essentially need to restart the computation that is downstream of this compute node (i.e., that has data from it as input)
    • for that to work we need to:
      • remove data previously produced by this compute node from downstream nodes
      • restart the feeding of data from the data source if the data source is still available (if it isn't we can't do anything)
  • Node failure:
    • detected via heartbeat timeout on control channel
    • conceptually (and from a computational perspective), a node is nothing but the sum of the individual processes running on it
    • as such, remediation is the same as it is for single process failure but for all the processes that ran on the node
  • Network partition:
    • could be between global control plane and local ones, just between local ones, or anything in between
    • unclear how we deal with that