Consuming a service is incredibly easy. There are two initialization steps necessary to establish a connection with the ZooKeeper ensemble as well as to create a ServicePool instance for you to interact with. After that, you can use a remote service as much as you desire.
In order to discover instances of services that are running and available a host discovery API is used. The standard
host discovery API is backed by ZooKeeper so a connection to the ZooKeeper ensemble is required. Ostrich comes with a
ZooKeeperConfiguration
class that can be used to encapsulate the connection details. ZooKeeperConfiguration uses
Chameleon to provide a default connection string
based upon which AWS Region (e.g., us_east_1, us_west_2, eu_west_1) and Environment (e.g., dev, qa, prod)
// Connect to the default ZooKeeper ensemble as inferred by Chameleon
ZooKeeperConnection zookeeper = new ZooKeeperConfiguration()
.withBoundedExponentialBackoffRetry(100, 3000, 3)
.connect();
A connection String may be explicitly specified to override the default Chameleon value through the environment
variable CHAMELEON_ZOOKEEPER_ENSEMBLE
, the system Property chameleon.zookeeper.ensemble
, or code through
ZooKeeperConnection.setConnectString()
.
// Explicitly specifying ZooKeeper location through setConnectString(String) method
String connectString = Joiner.on(",").join("zookeeper1:2181",
"zookeeper2:2181",
"zookeeper3:2181");
ZooKeeperConnection zookeeper = new ZooKeeperConfiguration()
.withConnectString(connectString)
.withBoundedExponentialBackoffRetry(100, 3000, 3)
.connect();
If you want the service pool to cache service instances, you'll need to set up a policy to handle caching service instances. Note that you'll want to choose settings suitable for your application depending on the nature of the service. The cache configuration options are:
- maxTotalServiceInstances - The maximum total number of service instances to be cached.
- maxServiceInstancesPerEndPoint - The maximum number of cached service instances for a single end point.
- maxServiceInstanceIdleTime - The amount of time a cached connection must be unused before it can be evicted.
Here's an example of creating a caching policy of size 100, 10 max per end point, and 10 minutes idle before potential eviction:
ServiceCachingPolicy cachingPolicy = new ServiceCachingPolicyBuilder()
.withMaxNumServiceInstances(100)
.withMaxNumServiceInstancesPerEndPoint(10)
.withMaxServiceInstanceIdleTime(5, TimeUnit.MINUTES)
.build();
A service pool is the heart of the consumer library that Ostrich provides. As a consumer you will receive instances of a particular service from it to work with. Internally it uses a host discovery mechanism to determine which servers that provide the desired service interface are up and currently available. As instances disappear or reappear they will automatically be managed by the service pool. If an operation that uses a remote server fails, the service pool can (at your discretion) automatically retry the operation on a different server. Additionally when an operation fails the service pool will remember the end point behind the server that failed and in the background will monitor the server's health. Ostrich will stop sending requests to the server until it declares itself as healthy again.
Here's an example of creating a service pool for the hypothetical CalculatorService
created in the service provider
quick start guide.
ServicePool<CalculatorService> pool = ServicePoolBuilder.create(CalculatorService.class)
.withZooKeeperHostDiscovery(zookeeper)
.withServiceFactory(new CalculatorServiceFactory())
.withCache(cachingPolicy)
.build();
Alternatively, the builder has a buildAsync()
method that will build an AsyncServicePool
whose execution returns
an asynchronous future rather than an immediate result. The AsyncServicePool
also provides executeOn
and
executeOnAll
methods that allow for executing the same callback on a subset of the currently registered end points.
NOTE: The CalculatorServiceFactory
class as well as the CalculatorService
interface are provided to you by the
team that builds the service. Each team that exposes some service using the Ostrich library should provide you with a
jar containing their service interface, a client implementation, as well as a service factory implementation.
Using the service is easy. You invoke the execute method on it providing two pieces of information. First a retry
strategy. This tells the service pool what to do if the operation fails in a retryable way. For as long as the retry
strategy permits it and there are service instances available, upon failure the operation will be retried with a
different service instance. Secondly, a callback is provided that receives a handle to the service instance for you to
use. The callback has a return type as well that you are completely free to choose, if you don't need to return a value
then Void
is a good choice.
int result = pool.execute(new ExponentialBackoffRetry(5, 50, 1000, TimeUnit.MILLISECONDS),
new ServiceCallback<CalculatorService, Integer>() {
@Override
public Integer call(CalculatorService service) throws ServiceException {
return service.add(1, 2);
}
});
Some services may be partitioned across servers. In this case, there is a version of the execute method that accepts a
PartitionContext
object to help properly route the request.
NOTE: It's important that your callbacks are intelligent and recognize that failures can happen at any time. If they maintain state internally you need to handle the case where an operation needs to be retried after part of it has already been executed. Of course the simplest thing to do would be to make your callbacks completely stateless.
In some cases it may be more convenient to have an object that actually implements the service interface instead of a
ServicePool
. In this case, you can build a dynamic proxy that will implement the requested interface, but pass all
calls through to service end points in the pool. In this case retry policy will have to be specified at build time.
CalculatorService service = ServicePoolBuilder.create(CalculatorService.class)
.withZooKeeperHostDiscovery(zooKeeper)
.withServiceFactory(new CalculatorServiceFactory())
.withCachingPolicy(cachingPolicy)
.buildProxy(new ExponentialBackoffRetry(5, 50, 1000, TimeUnit.MILLISECONDS));