Friday, June 27, 2014

Simple Comparisons on some In-Memory Data Grids

 
Fetures Feature Description GemFire7 Coherence3.7 Infinispan5 Gigaspaces7 Notes
1. topologies peer-peer; client-server High High High High
2. cross-site /
  WAN replication
datacenter-datacenter; region-region High Low Med High Coherence support by incubator
"Push replication";
Infinispan basic support in V5.2
3. read-most scalability via replication High High High High
4. write-most scalability via partitioning and replication
between primary and backups
High High Med High
5. high availability / HA via replication and persistence
 into disk (DB)
High High High High
6. asychronous
    replication
useful for slow changing data and
WAN replication
High Med High High Coherence support by incubator
"Push replication";
7. partition rehashing consistent hashing to reduce
data relocation
High High High High all seem to use consistent
hashing-like algorithms
8. dynamic clustering adds or loses nodes High High High Med
9. updating among
    partition backups
master-backup: less deadlock prone
and IO master-master/update
anywhere: more dealock prone and IO
High High Low High Infinispan only has master-master
that incurs deadlock for slow
network and large caches
10. cache loader and
      writer
2 application interfaces: loades data
from DB and saves data into Cache.
With cache loader and writer,
applications only need to interace
with Cache!
High High Med High Infinispan writers only support JPA, not JDBC or Hibernate!
11. read-through and
     read-ahead
populates cache with DB
in batch mode
Med High Med High no prefetch/batch support from
Gemfire and infinispan
12. write-through and
     write-behind
persists cache into wherever they
were loaded.write-behind persists
data asynchronously
High High Med High
13. event notification Cache also works as a messaging and
parallel processing bus like JMS/MDB
Med Med Med High
14. continous querying register contents-based interests
with Cache and receive updates
continously.
High High Low High
15. cache querying SQL like query: not only searches
 based on key matching
High High Low High
16. locking and tx JTA (global tx) and ACID properties High High Med High
17. off-heap puts cache data off Java heap
so that GC pause time is reduce!
Low High Low Low
18. key affinity /
     colocation
colocates related cache objects on the
same partition to reduce IO
High High High High
19. customized
     partitioning
puts cache into specific cluster node
to bypass cache hashing algorithm
High Low Med Low
20. synchronous
     requests like RFQ
synchronou requests should go
through the same partition routing as
asynchrous messages.
Low Low Low High
21. API Map,Restful and appropriate interfaces Med Med Med High
22. language bindings Java, C++ etc Med Med Med High
23. map-reduce /
     scatter-gather
submits aggregation tasks to run
across multiple or all cluster nodes
High High High High
24. monitoring monitor the health of clusters High Med Med High
25. J2EE (JMS,Web,
     Remoting)
how much J2EE? to support? Low Low Low High
26. migration effort how to migrate to a different cache
production in a J2EE app. server
High High High Low GigaSpaces is an app. server

Monday, June 23, 2014

JMS Connection and Session on WebSphere MQ 7.1

The difference between JMS API's Connection and Session will probably confuse many naive developers. Their performance implications will only make things worse.
This is primarily because we are used to thinking connection and session are the same and can be used interchangeably to represent a unique single-threaded conversation between a server and a client. This is indeed the case for DB servers and clients.

The basic idea of JMS API's connection and session is to first create a top level heavyweight physical connection, then several lightweight sessions / logical connections under the only physical connection.
Because of the connection's heavyweight nature, we usually pool them just like DB connection pooling.  We hope the connection can create lightweight sessions very fast and service its session operations (sending, receiving, tx etc) efficient by multiplexing them through its only physical connection.
Add to this, the following Spring configures supports caching a single JMS connection and multiple sessions:

<bean id="cachedConnectionFactory"  class="org.springframework.jms.connection.CachingConnectionFactory"
        p:targetConnectionFactory-ref="myConnFactory" p:sessionCacheSize="10"/>


However the JMS specification doesn't say how the MOM vendors should implement connections and sessions. For earlier WebSphere MQ versions before 7, each JMS session actually also represents a brand new physical connection / channel instance just like a JMS connection.
MQ version 7 introduces the so called "sharing conversations" which allows a configurable number of JMS connections and sessions (both represent conversations in MQ terms) to share a single physical connection / channel instance.
The number of conversations that can be shared across a single physical connection / channel instance is determined by the WebSphere MQ channel property SHARECNV. The default value of this property for Server Connection Channels is 10. A value of 0 or 1 basically disables the sharing.
In order to use this feature, your client side JMS connection factory also need to enable property
SHARECONVALLOWED.
For example, assuming SHARECNV is 10, you created 5 connections and 15 sessions (it doesn't matter which connections created which sessions from sharing perspective), the total physical connections needed is (5+15)/10=2
For more information, you refer to this page.

If you are developing a low latency and high performance application, you must know how to tune JMS connections and sessions on your MQM vendor.
  • How to Create Sessions quickly?
    If you use the previous Spring config, you should estimate the maximum sessions you needs and then pre-populate some of them based on the SHARECNV value. Otherwise some session creations will be as slow as creating a new physical connection.
  • How Many Connections to Create?
    Due to the multiplexing nature, if most of the multiplexed sessions under a connection are busy most of the time, you should move those busy sessions to new connections. Of course, you should manage connections using pooling such as retrieving them from an application servers.

Thursday, May 29, 2014

Class Loading Issue when Running Gemfire 7.x OQL in Weblogic 10.3.x

I have a very simple OQL like "SELECT * FROM /FxLimitsRegion" that ran successfully in my unit/integration test. But I got the following exception when trying to run it inside my WebLogic EAR (our Gemfire cache and Weblogic application server run in the same JVM):
com.gemstone.gemfire.cache.query.QueryInvalidException: Syntax error in query:  Invalid class or can't make instance, com.gemstone.gemfire.cache.query.internal.parse.ASTSelect
    at com.gemstone.gemfire.cache.query.internal.QCompiler.compileQuery(QCompiler.java:80)
    at com.gemstone.gemfire.cache.query.internal.DefaultQuery.(DefaultQuery.java:156)
    at com.gemstone.gemfire.cache.query.internal.DefaultQueryService.newQuery(DefaultQueryService.java:107)
    at com.gemstone.gemfire.internal.cache.LocalDataSet.query(LocalDataSet.java:129)
    at com.jpmorgan.gcrm.sef.business.cache.impl.SEFCacheManagerGemfireImpl.queryPartitionedRegionLocally(SEFCacheManagerGemfireImpl.java:187)
    at com.jpmorgan.gcrm.fno.business.datamanager.impl.FxDataManager.selectActiveFxLimits(FxDataManager.java:57)
    ... 13 more
Caused by: java.lang.IllegalArgumentException: Invalid class or can't make instance, com.gemstone.gemfire.cache.query.internal.parse.ASTSelect
    at antlr.ASTFactory.createUsingCtor(ASTFactory.java:251)
    at antlr.ASTFactory.create(ASTFactory.java:210)
    at com.gemstone.gemfire.cache.query.internal.parse.OQLParser.selectExpr(OQLParser.java:1090)
    at com.gemstone.gemfire.cache.query.internal.parse.OQLParser.query(OQLParser.java:217)
    at com.gemstone.gemfire.cache.query.internal.parse.OQLParser.queryProgram(OQLParser.java:115)
    at com.gemstone.gemfire.cache.query.internal.QCompiler.compileQuery(QCompiler.java:76)
The class "com.gemstone.gemfire.cache.query.internal.parse.ASTSelect" to load was included in the gemfire.jar in my EAR. So the exception seems to be confusing in the beginning.
From the above stack trace, you can see it relates to the ANTLR jar. Both my EAR and Weblogic itself have ANTLR. But my EAR uses version 2.7.7 while Weblogic 10.3.2 uses a pretty old version.
So we need to know which ANTLR version was used. Here is the call workflow:
1. my EAR thread issued a OQL ->
2. Gemfire found  ANTLR to parse the OQL to AST ->
3. ANTLR tried to load Gemfire class "com.gemstone.gemfire.cache.query.internal.parse.ASTSelect"

By default, Step2 found the Weblogic ANTLR by the Weblogic system class loader. But the Gemfire class is in the child EAR classloader. So Step 3 threw the exception.
If we can tell Weblogic to use my EAR ANTLR, the exception should be gone. In order to do so, I just added the following excerpt to your weblogic-application.xml:
         <prefer-application-packages>
               <package-name>antlr.*</package-name>
         </prefer-application-packages>
New version of ANTLR changed to load classes from the thread context class loader by default which was the EAR classloader in my case. So if the Weblogic had new version of ANTLR, the exception should have not been there.


Sunday, October 20, 2013

Cache Replicatio and Partition -- How to Scale linearly using Cache Listeners?

1. The System in Question

I am working on a trade (credit default swap and interest rates swap) processing system that calculates the client's limit exposures in real time.
The system consists of a 4-node WebLogic cluster that retrieves trades from an IBM clustered queue using MDB and puts trade data into a distributed cache / IMDG.
Suppose each node in the cluster has the following capacity:













Here is the cluster that doesn't scale linearly:

 

2. Pros and Cons of the System

Pros
  • Even distribution of trade processing because any trade can be processed at any node and we use master-master/update-anywhere replication cache;
  • Scales reads linearly (the metric in green) because reads take place locally;
  • Easy application design because there is no customized trade routing algorithm;
Cons
  • Writes can't be scaled linearly (the metric in red) because of the cluster-wide replication and synchronization between nodes;
  • The cluster wide total memory is limited to a single node (the metric in red);
  • Be timeout and deadlock prone because a distributed locking / synchronization is needed across nodes if they process the same client's trades simultaneously;

3. How to Scale linearly?

  • Linear scalability can be achieved only when each node can process trades (nearly) independently.
    For reads, you can achieve linear scalability with replication;
    For writes, you must keep the data replication effort constant and eliminate distributed locking / synchronization. So partition is the only choice.
  • An application usually has both reads (such as reference data) and writes (such as limit exposures in my case).
    So you should replicate reads unless they are huge (if so, you have to partition them and still use near / local cache for the frequent used subset).
    You should partition writes in order to keep the replicas of primaries constant and to avoid using any distributed locking.

4. Scales the System linearly using Cache Listener-based Routing

So for my trading application, I just need to replace replication for partition for writes in order to scale linearly.
One naive implementation is to have a separate routing node between MQ and the WebLogic cluster and partition trades for the same client to the same nodes using a simple hash algorithm.
There are 2 big problems here. One is that the trade was routed to some node that may not have the partitioned cache because the distributed cache and the routing node may have different partition algorithms (most likely). If so, remote network calls are needed.
The other is that the simple hash algorithm can't deal with dynamic cluster. This is why most IMDG employ consistent hashing. If you want to use consistent hashing, the amount of work and complexity will overwhelm most application level developers.

All distributed cached / IMDG (Infinispan, Gemfire and GigaSpaces etc) support distributed cache listeners. When you put data into a partitioned cache, IMDB will automatically route the data to some primary partition member based on whatever hashing algorithms it internally employs, and only the listeners registered on the primary will be invoked and the listener will gets trades only on its local primary (not from other remote primaries).
We can use the distributed cache listener to do our routing. Here is the partitioned version:


4.1 Notes

  • We only have one replica per one primary for space reason. You should decide the number of replicas based on the balance between HA and write performance.  In order to have reasonable HA and durability, at least 2 backups across different racks are common practice;
  • For space reason, the diagram only generally says primary cache. Usually more than 1 different caches need partitioning. If so, they should col-locate on the same node in order to eliminate remote network calls. In my case, we col-locate data based on client ID;
  • The “Cache Message Bus” not only works as a parallel routing bus (put and take), but also as a cache bus for replication. The figure doesn’t connect the replication to the bus for space reason;
  • There is no deadlock or lock timeout issue across nodes because all updates for a client are coordinated / locally synchronized by the primary node;

4.2 Workflow

  1.  Any trade listener (MDB hosted by the WebLogic cluster) can get any post trade. It barely parses trades to extract client ID, and calls the partition strategy to route trades to some partition member. Thanks to the WebLogic cluster, this solution possesses load balancing and HA;
  2. We use client ID as the partition strategy. As soon as the listener extracts out the client ID, it just calls the cache PUT operation to automatically route trades based on its internal partition algorithm (Consistent Hash is commonly used);
  3. Only the cache listener on the primary takes the trades that were routed there (no remote takes from other primaries). Because the listener runs in the same thread of the PUT operation, it submits the retrieved trades to a thread pool for asynchronous execution in order not to delay invoking PUT;
  4. The executor processes trades that should only incur local cache operations (GET from the primary and PUT to backups).  If it incurs remote operations (from other partition), it means you didn’t col-locate data properly or didn’t replicate needed reference data;

Most developers knows how to use IMDG for data caching, but are not familiar with IMDB's listener capability (or event processing at an even higher level). On one hand, IMDB listeners are powerful and easy to understand just like DB triggers. On the other handle, they are not as robust as JMS / MDB.
For example, MDB provides threading pool and different level of delivery guarantee such as once and only once delivery.
However in my case,  we only use the IMDG listener lightly: as soon as the listener gets the trades, it submit them to an executor pool. If something goes wrong between PUT and listener TAKEN, the PUT will fail and you can employ retry logic or there delivery logic there.

Lastly, I must mention GigaSpaces XAP that provides the listener based routing naturally and natively. You can get many functions such as MDB-like threading pool out of the box. If you start a new project, you should seriously consider GigaSpaces XAP.