Sunday, October 20, 2013

Cache Replicatio and Partition -- How to Scale linearly using Cache Listeners?

1. The System in Question

I am working on a trade (credit default swap and interest rates swap) processing system that calculates the client's limit exposures in real time.
The system consists of a 4-node WebLogic cluster that retrieves trades from an IBM clustered queue using MDB and puts trade data into a distributed cache / IMDG.
Suppose each node in the cluster has the following capacity:

Here is the cluster that doesn't scale linearly:


2. Pros and Cons of the System

  • Even distribution of trade processing because any trade can be processed at any node and we use master-master/update-anywhere replication cache;
  • Scales reads linearly (the metric in green) because reads take place locally;
  • Easy application design because there is no customized trade routing algorithm;
  • Writes can't be scaled linearly (the metric in red) because of the cluster-wide replication and synchronization between nodes;
  • The cluster wide total memory is limited to a single node (the metric in red);
  • Be timeout and deadlock prone because a distributed locking / synchronization is needed across nodes if they process the same client's trades simultaneously;

3. How to Scale linearly?

  • Linear scalability can be achieved only when each node can process trades (nearly) independently.
    For reads, you can achieve linear scalability with replication;
    For writes, you must keep the data replication effort constant and eliminate distributed locking / synchronization. So partition is the only choice.
  • An application usually has both reads (such as reference data) and writes (such as limit exposures in my case).
    So you should replicate reads unless they are huge (if so, you have to partition them and still use near / local cache for the frequent used subset).
    You should partition writes in order to keep the replicas of primaries constant and to avoid using any distributed locking.

4. Scales the System linearly using Cache Listener-based Routing

So for my trading application, I just need to replace replication for partition for writes in order to scale linearly.
One naive implementation is to have a separate routing node between MQ and the WebLogic cluster and partition trades for the same client to the same nodes using a simple hash algorithm.
There are 2 big problems here. One is that the trade was routed to some node that may not have the partitioned cache because the distributed cache and the routing node may have different partition algorithms (most likely). If so, remote network calls are needed.
The other is that the simple hash algorithm can't deal with dynamic cluster. This is why most IMDG employ consistent hashing. If you want to use consistent hashing, the amount of work and complexity will overwhelm most application level developers.

All distributed cached / IMDG (Infinispan, Gemfire and GigaSpaces etc) support distributed cache listeners. When you put data into a partitioned cache, IMDB will automatically route the data to some primary partition member based on whatever hashing algorithms it internally employs, and only the listeners registered on the primary will be invoked and the listener will gets trades only on its local primary (not from other remote primaries).
We can use the distributed cache listener to do our routing. Here is the partitioned version:

4.1 Notes

  • We only have one replica per one primary for space reason. You should decide the number of replicas based on the balance between HA and write performance.  In order to have reasonable HA and durability, at least 2 backups across different racks are common practice;
  • For space reason, the diagram only generally says primary cache. Usually more than 1 different caches need partitioning. If so, they should col-locate on the same node in order to eliminate remote network calls. In my case, we col-locate data based on client ID;
  • The “Cache Message Bus” not only works as a parallel routing bus (put and take), but also as a cache bus for replication. The figure doesn’t connect the replication to the bus for space reason;
  • There is no deadlock or lock timeout issue across nodes because all updates for a client are coordinated / locally synchronized by the primary node;

4.2 Workflow

  1.  Any trade listener (MDB hosted by the WebLogic cluster) can get any post trade. It barely parses trades to extract client ID, and calls the partition strategy to route trades to some partition member. Thanks to the WebLogic cluster, this solution possesses load balancing and HA;
  2. We use client ID as the partition strategy. As soon as the listener extracts out the client ID, it just calls the cache PUT operation to automatically route trades based on its internal partition algorithm (Consistent Hash is commonly used);
  3. Only the cache listener on the primary takes the trades that were routed there (no remote takes from other primaries). Because the listener runs in the same thread of the PUT operation, it submits the retrieved trades to a thread pool for asynchronous execution in order not to delay invoking PUT;
  4. The executor processes trades that should only incur local cache operations (GET from the primary and PUT to backups).  If it incurs remote operations (from other partition), it means you didn’t col-locate data properly or didn’t replicate needed reference data;

Most developers knows how to use IMDG for data caching, but are not familiar with IMDB's listener capability (or event processing at an even higher level). On one hand, IMDB listeners are powerful and easy to understand just like DB triggers. On the other handle, they are not as robust as JMS / MDB.
For example, MDB provides threading pool and different level of delivery guarantee such as once and only once delivery.
However in my case,  we only use the IMDG listener lightly: as soon as the listener gets the trades, it submit them to an executor pool. If something goes wrong between PUT and listener TAKEN, the PUT will fail and you can employ retry logic or there delivery logic there.

Lastly, I must mention GigaSpaces XAP that provides the listener based routing naturally and natively. You can get many functions such as MDB-like threading pool out of the box. If you start a new project, you should seriously consider GigaSpaces XAP.

1 comment:

  1. Yong....I would like to talk to you about something we have at Pivotal in New York City. Can you contact me at sturner @ (remove spaces)