Skip to content

The Red Executor Module

Aviv C edited this page Jul 1, 2017 · 7 revisions

The Red Executor module is the core business of the JavaRed library. It provides various executors to support simple and intuitive way of defining flows of graph executions, without having to deal with callbacks, error handling or verbose timing of tasks.

The executors module currently provides a single executor called Red Synchronizer.

Red Synchronizer

The Red Synchronizer executor provides an interface for writing asynchronous-graph-flows in a super simple, synchronous-like syntax. Le't s dive directly to an example, suppose we want to implement this flow of tasks:

Red Synchronizer Flow Example

where each task produces a String result, and expects the result of the previous one, and we should return the result of task H either by String or by Future<String>. Lastly, the below methods are provided:

ListenableFuture<String> executeA();
ListenableFuture<String> executeB(String aResult);
ListenableFuture<String> executeC(String aResult);
ListenableFuture<String> executeD(String bResult, String cResult);
ListenableFuture<String> executeE(String aResult);
ListenableFuture<String> executeF(String eResult);
ListenableFuture<String> executeG(String fResult);
ListenableFuture<String> executeH(String dResult, String gResult);

Now let's try implementing using blocking idiom:

Blocking Implementation
String aResult = executeA().get();
ListenableFuture<String> bFuture = executeB(aResult);
ListenableFuture<String> cFuture = executeC(aResult);
ListenableFuture<String> eFuture = executeE(aResult);
ListenableFuture<String> dFuture = executeD(bFuture.get(), cFuture.get());
ListenableFuture<String> fFuture = executeF(eFuture.get());
ListenableFuture<String> gFuture = executeG(fFuture.get());
ListenableFuture<String> hFuture = executeH(dFuture.get(), gFuture.get());
return hFuture.get();

Comparing to the flow complexity, the code is quite simple an elegant. Now let's talk efficiency. First of all, this is not the only way to implement the flow. We could alternatively, for example, invoke F before invoking D. This is probably the main downside of synchronous execution. We have to take a guess which one would finish faster, to prevent it from slowing the whole execution. Suppose both B and F are very time consuming comparing to other tasks, then we have a big problem. We are going to block on bFuture.get() for a while and not invoke F although E has already finished. This is not real concurrency. We want to create an idiom where each edge of the graph is completely independent. Additionally, it should be noted that the whole process could take a while, and the executing thread is blocking the entire time, although most of the time it's probably just awaiting some IO response.

Let's switch to async implementation.

Naive Asynchronous Implementation
SettableFuture<String> result = SettableFuture.create();
AtomicReference<String> dResultReference = new AtomicReference<>(null);
AtomicReference<String> gResultReference = new AtomicReference<>(null);
CountDownLatch latch = new CountDownLatch(2);
Runnable finish = () -> {
    latch.countDown();
    if (latch.getCount() == 0) {
        Futures.addCallback(executeH(dResultReference.get(), gResultReference.get()), new FutureCallback<String>() {
            @Override
            public void onSuccess(String hResult) {
                result.set(hResult);
            }
            @Override
            public void onFailure(Throwable t) {
                result.setException(t);
            }
        });
    }
};
Futures.addCallback(executeA(), new FutureCallback<String>() {
    @Override
    public void onSuccess(String aResult) {
        ListenableFuture<String> bFuture = executeB(aResult);
        ListenableFuture<String> cFuture = executeC(aResult);
        Futures.addCallback(Futures.allAsList(bFuture, cFuture), new FutureCallback<List<String>>() {
            @Override
            public void onSuccess(List<String> results) {
                Futures.addCallback(executeD(results.get(0), results.get(1)), new FutureCallback<String>() {
                    @Override
                    public void onSuccess(String dResult) {
                        dResultReference.set(dResult);
                        finish.run();
                    }
                    @Override
                    public void onFailure(Throwable t) {
                        result.setException(t);
                    }
                });
            }
            @Override
            public void onFailure(Throwable t) {
                result.setException(t);
            }
        });
        Futures.addCallback(executeE(aResult), new FutureCallback<String>() {
            @Override
            public void onSuccess(String eResult) {
                Futures.addCallback(executeF(eResult), new FutureCallback<String>() {
                    @Override
                    public void onSuccess(String fResult) {
                        Futures.addCallback(executeG(fResult), new FutureCallback<String>() {
                            @Override
                            public void onSuccess(String gResult) {
                                gResultReference.set(gResult);
                                finish.run();
                            }
                            @Override
                            public void onFailure(Throwable t) {
                                result.setException(t);
                            }
                        });
                    }
                    @Override
                    public void onFailure(Throwable t) {
                        result.setException(t);
                    }
                });
            }
            @Override
            public void onFailure(Throwable t) {
                result.setException(t);
            }
        });
    }
    @Override
    public void onFailure(Throwable t) {
        result.setException(t);
    }
});
return result;

W O W that escalated quickly! Needless to say, no one wants to write, and surely not to maintain that piece of code. As far as efficiency goes, this is not bad. The practical down side is error handling, it's quite hard to determine which task failed, it's harder (although very possible) to make the execution stop if a parallel branch has already failed.

Not we're ready to see the magic of RedSynchronizer:

RedSynchronizer Asynchronous Implementation
Result<String> aResult = produceFutureOf(String.class).byExecuting(() -> executeA());
Result<String> bResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeB(a));
Result<String> cResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeC(a));
Result<String> eResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeE(a));
Result<String> dResult = ifResults(bResult, cResult).succeed().produceFutureOf(String.class).byExecuting((b, c) -> executeD(b, c));
Result<String> fResult = ifResult(eResult).succeed().produceFutureOf(String.class).byExecuting(e -> executeF(e));
Result<String> gResult = ifResult(fResult).succeed().produceFutureOf(String.class).byExecuting(f -> executeG(f));
return ifResults(dResult, gResult).succeed().produceFutureOf(String.class).byExecuting((d, g) -> executeH(d, g));

Now, this code is completely asynchronous, like the previous one, It automatically catches and propagate failures at any point, and the idiom is neat! Altough this ia a matter of discussion. Note that each line of code exactly matches a line of code in the synchronous code, and this can be even further simplified to (notice the lambda expression):

Result<String> aResult = produceFutureOf(String.class).byExecuting(this::executeA);
Result<String> bResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(this::executeB);
Result<String> cResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(this::executeC);
Result<String> eResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(this::executeE);
Result<String> dResult = ifResults(bResult, cResult).succeed().produceFutureOf(String.class).byExecuting(this::executeD);
Result<String> fResult = ifResult(eResult).succeed().produceFutureOf(String.class).byExecuting(this::executeF);
Result<String> gResult = ifResult(fResult).succeed().produceFutureOf(String.class).byExecuting(this::executeG);
return ifResults(dResult, gResult).succeed().produceFutureOf(String.class).byExecuting(this::executeH);

Now that you've seen the magic, let's dive into the world of RedSynchronizer and get to know the syntax.

The Basics

RedSynchronizer is an abstract class to be extended in order to write an implementation of a flow. The entire idiom is inherited and may be used inside. A RedSynchronizer instance receives an input of some type, and produces an output of some type. This is the most common use case. In some flows, no value is actually produced (for example if we want to write to a DB), In such cases we would want to extend RedVoidSynchronizer which has no output type. When extending either RedSynchronizer<INPUT> or RedVoidSynchronizer<INPUT, OUTPUT> we have to implement handle(INPUT input) method which defines the execution of the flow like so:

public class Example1 extends RedSynchronizer<String, Boolean> {
    @Override
    protected Result<Boolean> handle(String s) throws Throwable {
        // implementation of the flow
    }
}

public class Example2 extends RedVoidSynchronizer<String> {
    @Override
    protected Marker handle(String s) throws Throwable {
        // implementation of the flow
    }
}

Note the return types of Result<OUTPUT> and Marker - we will discuss them below.

Defining the Flow

The flow is defined inside the handle method by using the synchronizer idiom. The synchronizer idiom is divided into statements, each results in an execution of a single task. A task can either produce a value, or be void. A statement includes a few constant phases:

  • Declaring the preconditions.
  • Transforming the preconditions.
  • Adding markers.
  • Declaring the return method.
  • Declaring the actual function or command to be executed.

Tasks producing a value of T will return a Result<T>, while tasks of void (like writing to DB) will return a Marker. These Result and Marker objects may be:

  • Used as preconditions for following task declaration.
  • Be returned as the result of the flow.
Declaring the Preconditions

Each task may have preconditions. In our example from above:

  • B has one precondition - A.
  • D has two preconditions - C and B.
  • A has no preconditions.

To begin a new statement with preconditions we need the Result instance of those preconditions, then we can use ifResult or ifResults. To begin a new statement with no preconditions we can skip directly to Declaring the Return Method.

Examples:

ifResult(result)
ifResults(result1, result2)
Transforming the Preconditions

Once we declared our preconditions using Result objects, we have to declare what exactly we want to expect from them. This way, we can schedule a task to be executed only in a case where another tasks failed. Available transformations are:

  • Succeed - will be executed if and when all the given preconditions successfully completed.
  • Fail - will be executed if and when all the given preconditions failed.
  • Finish - will be executed when all the given preconditions completed whether by success or failure.

Examples:

ifResult(result).succeed()
ifResults(result1, result2).fail()
Adding Markers

Once we declared our transformations, we may or may not add additional void preconditions. Any task that does not produce a value may be added at this point by calling the method andMarkers(Marker... markers). If we choose to add markers, we go back to the transforming phase to transform the newly added markers. Each time we add markers, we then need to transform them, and the transformation will apply only on the lastly added markers. If we don't want to add markers, we can skip directly to Declaring the Return Method.

Examples:

// this means we schedule the task to be executed if and when result, marker1 and marker2 all succeed
ifResult(result).succeed().andMarkers(marker1, marker2).succeed()
// this means we schedule the task to be executed if and when result succeed, marker1 succeed and marker2 fails
ifResults(result1, result2).fail().andMarkers(marker1).succeed().andMarker(marker2).fail()
Declaring the Return Method

After setting all of our preconditions, we declare the way we want to produce and return a value. At this point we have to answer 3 things:

  1. Do we want to return a value?
  2. If we do, a value of what type?
  3. If we do, how do we want to return it:
    1. Directly returning the result.
    2. Returning a Future of the result
    3. Returning a ListenableFuture of the result.
    4. Returning a RedFuture of the result.

We do it by calling either:

  • execute(command) - which would expect us to return nothing (more on that later).
  • produce(Class<T> resultClass) - which would expect us to return the actual result.
  • produceFutureOf(Class<T> resultClass) - which would expect us to return a future of the result (this could be anything that extends Future<T>, i.e. ListenableFuture<T>, RedFutureOf<T>, etc...).

Examples:

// no preconditions, producing directly a value of String
produce(String.class)
// no preconditions, no value produced
execute(...)
// one result precondition, producing a future of a User class
ifResult(result).succeed().produceFutureOf(User.class)
// many different preconditions, producing a future of boolean
ifResults(result1, result2).fail().andMarkers(marker1).succeed().produceFutureOf(Boolean.class)
// one result precondition, no value produced
ifResult(result).succeed().execute(...)
Declaring the Actual Function or Command to be Executed

Now let's run some logic! If we reached this point we've actually finished! At this point all we need is to call byExecuting and let the IDE auto-complete everything. If we've declared 3 Result instances in as preconditions, we will receive their result at the expression, if we've declared we should return a Future<Boolean>, this is exactly what we should return.

Let's see some full examples of tasks producing values:

// no preconditions
Result<Boolean> booleanResult = produce(Boolean.class).byExecuting(() -> true);
Result<String> stringResult = produceFutureOf(String.class).byExecuting(() -> generateFutureOfString());
// one precondition
ifResult(booleanResult).succeed().produceFutureOf(String.class).byExecuting(bool -> {
    if (bool) {
        return generateFutureOfStringMethod1();
    } else {
        return generateFutureOfStringMethod2();
    }
});
// two preconditions and markers
ifResults(booleanResult, stringResult).finish().andMarkers(marker).fail().produceFutureOf(String.class).byExecuting((bool, string) -> RedFuture.resolvedOf(bool + string));

Now let's see some void tasks. Note that void tasks receive a PendingMarker instance. This instance will be used to mark the status of the execution. Once should use pendingMarker.complete() and pendingMarker.fail(Throwable cause) to indicate success or failure respectively. Examples:

// no preconditions
Marker marker = execute(pendingMarker -> schedule(() -> pendingMarker.complete()));
// preconditions and markers
ifResult(booleanResult).succeed().andMarkers(marker).fail().execute((pendingMarker, bool) -> {
    if (bool) {
        pendingMarker.complete();
    } else {
        pendingMarker.fail(new RuntimeException("the cause of failure"));
    }
});
Last Few Things to Note

If we transform preconditions with fail, it means that we expect all preconditions to fail. In such a case, non of them will produce results. This means that the executed function will not receive any parameters. For example:

ifResult(booleanResult).fail().execute((pendingMarker) -> {
    // note that we did not receive any boolean
});
ifResult(booleanResult).fail().produce(String.class).byExecuting(() -> {
    // note that we did not receive any boolean
});

If we transform preconditions with finish, it means that each precondition might either fail or succeed, This means that the executed function will receive parameters, but either one of them can be a result, or null in case the precondition failed. Also note that a successful result of null is indistinguishable from failure.

If we wish to have more than 10 Result preconditions, we would not receive them as parameters in the executing function or command, but we will receive an instance of Results. This object may be queried with an index and expected type with the method result(int index, Class<T> tClass).

Further reading is available at the API reference of: