-
Notifications
You must be signed in to change notification settings - Fork 140
ServiceBundle
There are some details about serviceBundles, and they are covered in the QBit microservices overview as well.
The serviceBundle
is a collection of services sitting behind serviceQueue
's. You use a serviceBundle
when you want to share a response queue
and a response queue thread. The serviceBundle
can also share the same thread for the request queue
but that is not the default. The ServiceEndpointServer
which is used to expose service actors as remote microservices via REST and WebSocket uses the serviceBundle
.
The serviceBundle
is also used to add other forms of services, like service pools, and sharded services.
Let's walk through an example. We will use the Todo
example that we used for serviceQueue
's. Since we are covering ServiceBundle
, we will add another service called Auditor
and its implementation called AuditorImpl
. We will change the TodoManagerImpl
to use the Auditor
.
Let's review our Todo example. The Todo example, has a TodoManagerClient
interface.
package com.mammatustech.todo;
import io.advantageous.reakt.promise.Promise;
import java.util.List;
public interface TodoManagerClient {
Promise<Boolean> add(Todo todo);
Promise<Boolean> remove(String id);
Promise<List<Todo>> list();
}
This is the interface we will use to invoke async methods.
To this we will add a new service called Auditor
.
package com.mammatustech.todo;
interface Auditor {
void audit(final String operation, final String log);
}
We will keep the implementation simple so we can focus on QBit and the serviceBundle
.
package com.mammatustech.todo;
public class AuditorImpl implements Auditor {
public void audit(final String operation, final String log) {
System.out.printf("operations %s, message %s log\n",
operation, log);
}
}
Now to mix things up a bit and since we are talking about a serviceBundle
, we will pass an Auditor
instance to the constructor of the TodoManagerImpl
.
package com.mammatustech.todo;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
public class TodoManagerImpl {
private final Map<String, Todo> todoMap = new TreeMap<>();
private final Auditor auditor;
public TodoManagerImpl(final Auditor auditor) {
this.auditor = auditor;
}
public void add(final Callback<Boolean> callback,
final Todo todo) {
todoMap.put(todo.getId(), todo);
auditor.audit("add", "added new todo");
callback.resolve(true);
}
public void remove(final Callback<Boolean> callback,
final String id) {
final Todo removed = todoMap.remove(id);
auditor.audit("add", "removed new todo");
callback.resolve(removed != null);
}
public void list(final Callback<ArrayList<Todo>> callback) {
auditor.audit("list", "auditor added");
callback.accept(new ArrayList<>(todoMap.values()));
}
@QueueCallback({QueueCallbackType.LIMIT,
QueueCallbackType.EMPTY,
QueueCallbackType.IDLE})
public void process() {
flushServiceProxy(auditor);
}
...
}
Note that the add
, remove
, list
all use the auditor
instance. Unlike the serviceQueue
there is no auto flush feature. This is typically because serviceBundles
s contain many serviceQueue
s. If you wanted to get auto-flush going with a serviceQueue
in a bundle, then you add the serviceQueue
to the bundle or you look up the serviceQueue
from the bundle and then use the serviceQueue
to create the auto flush client proxy. This is usually not needed as manually flushing at the right time is better for thread hand off performance and IO performance. QBit uses micro-batching to optimize sending operations to other local and remote service actors.
Since the TodoManagerImpl
is using another service actor, we will flush operations to that actor when the processing queue for the TodoManagerImpl
is idle, empty or reached its limit.
package com.mammatustech.todo;
...
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
public class TodoManagerImpl {
...
@QueueCallback({QueueCallbackType.LIMIT,
QueueCallbackType.EMPTY,
QueueCallbackType.IDLE})
public void process() {
flushServiceProxy(auditor);
}
...
You can do this with annotaitons. (You can also do this without using annotations, which will show later.). The above @QueueCallback
annotation says if the processing queue is empty
(QueueCallbackType.EMPTY
, no more requests or events in the queue), or if the request processing queue is idle (QueueCallbackType.IDLE
, not busy at all), or if we have hit the queue limit (QueueCallbackType.LIMIT can only happen under heavy load or if you set the limit very low). A queue limit of ten would have ten times less thread handoff time than a queue limit of size 1 (under heavy load). If the auditor were a remote service, having a larger batch size than 1 would save on the cost of the IO operations.
You can turn off micro-batching by setting the processing queue to 1.
Later when we introduce the Reactor
you can set up a reoccurring job that fires every 10ms or 100ms to flush collaborating services like the auditor
.
You can use QueueCallbacks
with any serviceQueue
and with any serviceBundle
.
There are other QueueCallbacks to get notified with the services has shutdown and when it has started.
public class TodoManagerImpl {
...
@QueueCallback({QueueCallbackType.INIT})
public void init() {
auditor.audit("init", "init service");
}
@QueueCallback({QueueCallbackType.SHUTDOWN})
public void shutdown() {
System.out.println("operation shutdown, shutdown service");
flushServiceProxy(auditor);
}
The init
operation would get called once when the serviceQueue
for the microservice actor starts up. The shutdown
operation would get called once when the when the microservice actor shuts down.
Let's create a serviceBundle
and add the auditor
and todoManager
services to it, and run them.
/** Object address to the todoManagerImpl service actor. */
private final String todoAddress = "todoService";
/** Object address to the auditorService service actor. */
private final String auditorAddress = "auditorService";
/** Service Bundle */
private ServiceBundle serviceBundle;
/** Client service proxy to the todoManager */
private TodoManagerClient client;
/** Client service proxy to the auditor. */
private Auditor auditor;
/* Create the serviceBundleBuilder. */
final ServiceBundleBuilder serviceBundleBuilder = serviceBundleBuilder();
/* Create the service bundle. */
serviceBundle = serviceBundleBuilder.build();
/* Add the AuditorImpl instance to the serviceBundle. */
serviceBundle.addServiceObject(auditorAddress, new AuditorImpl());
/* Create a service client proxy for the auditor. */
auditor = serviceBundle.createLocalProxy(Auditor.class, auditorAddress);
/* Create a todo manager and pass the
client proxy of the auditor to it. */
final TodoManagerImpl todoManager = new TodoManagerImpl(auditor);
// Add the todoManager to the serviceBundle.
serviceBundle
.addServiceObject(todoAddress, todoManager);
/* Create a client proxy to communicate
with the service actor. */
client = serviceBundle
.createLocalProxy(TodoManagerClient.class,
todoAddress);
// Start the service bundle.
serviceBundle.start();
Above we create the serviceBundleBuilder
which can be used to the response and request queue size, types, batch size, and more. Then we create the serviceBundle
. Next we add the auditor
microservice actor to the serviceBundle
under the address specified by auditorAddress
. Next we create a service client proxy for the auditor
microservice actor that we can pass to the TodoManagerImpl
. We then add the TodoManagerImpl
to form the microservice actor for the TodoManager
Service. Next we create a client
of the TodoManager
Service to test with. Then we start the serviceBundle
.
To use the `todoManager` service proxy client aka `client`, the code is much like it was before with the `serviceQueue` example except now we will flush (since by default the queue batch size is greater than 1).
final Promise<Boolean> promise = Promises.blockingPromiseBoolean();
// Add the todo item.
client.add(new Todo("write", "Write tutorial", timer.time()))
.invokeWithPromise(promise);
flushServiceProxy(client);
assertTrue("The call was successful", promise.success());
assertTrue("The return from the add call", promise.get());
final Promise<List<Todo>> promiseList = Promises.blockingPromiseList(Todo.class);
// Get a list of todo items.
client.list().invokeWithPromise(promiseList);
// Call flush since this is not an auto-flush. */
flushServiceProxy(client);
// See if the Todo item we created is in the listing.
final List<Todo> todoList = promiseList.get().stream()
.filter(todo -> todo.getName().equals("write")
&& todo.getDescription().equals("Write tutorial")).collect(Collectors.toList());
// Make sure we found it.
assertEquals("Make sure there is one", 1, todoList.size());
// Remove promise
final Promise<Boolean> removePromise =
Promises.blockingPromiseBoolean();
client.remove(todoList.get(0).getId())
.invokeWithPromise(removePromise);
flushServiceProxy(client);
final Promise<List<Todo>> promiseList2 =
Promises.blockingPromiseList(Todo.class);
// Make sure it is removed.
client.list().invokeWithPromise(promiseList2);
flushServiceProxy(client);
// See if the Todo item we created is removed.
final List<Todo> todoList2 = promiseList2.get().stream()
.filter(todo -> todo.getName().equals("write")
&& todo.getDescription()
.equals("Write tutorial"))
.collect(Collectors.toList());
// Make sure we don't find it.
assertEquals("Make sure there is one",
0, todoList2.size());
flushServiceProxy(client);
We can also repeat the async example were we executed more than one operation at a time.
/* A list of promises for things we
want to do all at once. */
final List<Promise<Boolean>> promises =
new ArrayList<>(3);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean();
/** Add a todoItem to the client add method */
final Todo todo = new Todo("write", "Write tutorial",
timer.time());
final Promise<Boolean> promise
= client.add(todo);
promises.add(promise);
/** Add two more. */
promises.add(client.add(new Todo("callMom",
"Call Mom", timer.time())));
promises.add(client.add(new Todo("callSis",
"Call Sister", timer.time())));
/** Now async wait for them all to come back. */
Promises.all(promises).then(done -> {
success.set(true);
latch.countDown();
}).catchError(e -> {
success.set(false);
latch.countDown();
});
/** Invoke the promises. */
promises.forEach(Promise::invoke);
flushServiceProxy(client);
/** They are all going to come back async. */
latch.await();
assertTrue(success.get());
Please note that you can explicitly flush an client microservice proxy, it will also flush if you go over the limit for the request queue, or you can set the batch size to 1.
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting