Tuesday, February 18, 2014

Re-entrant Threads in a Distributed Cache (Coherence)

There's a fundamental nastiness in making a call to a distributed cache that then accessing the cache in that call.

In Oracle Coherence, if you have something like this:

public class MyEntryProcessor implements Serializable, EntryProcessor { 
.
.
        public Object process(Entry entry) { 
            // this doesn't work...
            NamedCache myCache = CacheFactory.getCache(CACHE_NAME);
.
.

(where CACHE_NAME is the name of the cache against which you run this entry processor) you'll get:

2014-02-18 17:15:17.909/10.736 Oracle Coherence GE 3.7.1.0 (thread=DistributedCache, member=2): Assertion failed: poll() is a blocking call and cannot be called on the Service thread

(Note: if CACHE_NAME is different to the cache against which we run this entry processor, we still get this problem.).

This is because we "are making a re-entrant call back into a cache service from the service thread or worker thread of a cache service. This is a bad thing to do as you risk deadlocking your cluster by consuming all of the threads in the service." [1]

One way around it is to enforce an access order. "As with traditional locking, using ordered access (for example, service A can call service B, but not vice versa) can help." [2]

Another is the slightly hacky:

        public Object process(Entry entry) { 
            BackingMapManagerContext    context     = getContext(entry); 
            Map<Binary, Binary>         myCache     = getBackingMap(context); 
            
            Binary                      binaryKey   = (Binary) context.getKeyToInternalConverter().convert(myKey); 
            Binary                      binaryValue = (Binary) context.getValueToInternalConverter().convert(myValue); 
            boolean                     wasEmpty    = myCache.get(binaryKey) == null; 
            myCache.put(binaryKey, binaryValue); 
            
            return wasEmpty; 
        } 

        protected BackingMapManagerContext getContext(Entry entry) {
            BinaryEntry binaryEntry = (BinaryEntry) entry;
            BackingMapManagerContext context = binaryEntry.getContext();
            return context;
        }

        private Map getBackingMap(BackingMapManagerContext context) {
            Map backingMap = context.getBackingMap(CACHE_NAME);
            return backingMap;
        } 

This uses the deprecated BackingMapManagerContext.getBackingMap method but other people seem to have used this approach as well.

[1] Assertion failed: poll() is a blocking call - Oracle Community
[2] Constraints on Re-entrant calls.

Further Tibco Notes


Sending messages from the command line

In one console, run this:

C:\Users\henphi> tibrvlisten -daemon tcp:HOST:TCP_PORT -service SERVICE_ID "testmsg"

where HOST, TCP_PORT and SERVICE_ID are values suitable for your environment.

In another console, run this:

C:\Users\henphi> tibrvsend -daemon tcp:HOST:TCP_PORT -service SERVICE_ID "testmsg" "message content"
Publishing: subject=testmsg "message content"

and you'll see the first console output:

2014-02-14: 13:46:57 (2014-02-07 13:46.57.889000000Z): subject=testmsg, message={DATA="message content}

Wildcard subjects

If you want to listen to all messages, not just those with a given subject, run:

C:\Users\henphi> tibrvsend -daemon tcp:HOST:TCP_PORT "*.>"

Automatic clustering

Instances of the Tib daemon will automatically cluster if they're on the same subnet unless you start the daemon with something like:

> rvrd -store STORE_FILE -listen TCP_PORT -logfile LOG_FILE -log-max-size 1024 -log-max-rotations 5 -no-multicast

Otherwise, you can have two listeners listening to different HOSTs and they will both receive messages.

Thursday, February 6, 2014

Miscellaneous Tibco notes.

[I am not a Tibco guru. These notes are just me playing around in our UAT environment]

If you listen to a service on a given box, you won't be able to listen to the same service on another daemon on that same box.

For example, if I run:

C:\Users\henphi>tibrvlisten -daemon tcp:HOST:18223 -service 7546 "TESTMSG"
tibrvlisten: Listening to subject TESTMSG
.
.
.

then connecting to the same service on a different daemon on the same box gives:

C:\Users\henphi>tibrvlisten -daemon tcp:HOST:18200 -service 7546 "TESTMSG"
tibrvlisten: Failed to initialize transport: Initialization failed

for reasons outlined below. The same is true for tibrvsend.


Some miscellaneous Tib notes (a.k.a. brain dump) for the interested

Subscription failover demonstration 

If I start another RVRD process, for some reason I don't understand, the store file needs to be different to the one used by the already running Tib daemon (the store file contains the config, I am told).

Anyway, here goes:

mds@HOST[dev]:~> /opt/mds/tibco_8_1/tibrv/8.1/bin/rvrd -store /tmp/rvrd.store -listen 18223 -logfile /tmp/rvrd.log -log-max-size 1024 -log-max-rotations 5
mds@HOST[dev]:~> ps aux | grep rvrd
mds        669  0.3  0.1 104612 57668 ?        Ssl  Jan13   7:10 /opt/mds/tibco_8_1/tibrv/8.1/bin/rvrd -store /opt/mds/tibco_8_1/tibrv/8.1/rvrd.store -listen 18200 -logfile /var/log/mds/rvrd/rvrd.log -log-max-size 1024 -log-max-rotations 5
mds      15235  0.0  0.0  43992 15860 ?        Ssl  15:24   0:00 /opt/mds/tibco_8_1/tibrv/8.1/bin/rvrd -store /tmp/rvrd.store -listen 18223 -logfile /tmp/rvrd.log -log-max-size 1024 -log-max-rotations 5
mds      16952  0.0  0.0   3936   824 pts/3    S+   15:26   0:00 grep rvrd
mds@HOST[dev]:~>

So, our new process has PID 15235. Seeing what ports it has open:

mds@HOST[dev]:~> netstat -nap 2>/dev/null | grep 15235 
tcp        0      0 0.0.0.0:18223           0.0.0.0:*               LISTEN      15235/rvrd 
tcp        0      0 0.0.0.0:62737           0.0.0.0:*               LISTEN      15235/rvrd 
tcp        0      0 0.0.0.0:57401           0.0.0.0:*               LISTEN      15235/rvrd 
udp        0      0 0.0.0.0:54339           0.0.0.0:*                           15235/rvrd 
udp        0      0 0.0.0.0:7543            0.0.0.0:*                           15235/rvrd 
udp        0      0 0.0.0.0:7544            0.0.0.0:*                           15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115673 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115672 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115662 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115661 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115655 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115654 15235/rvrd 
mds@HOST[dev]:~> 

Upon starting a listener on my desktop:

C:\Users\henphi>tibrvlisten -daemon tcp:HOST:18223 -service 7546 "TESTMSG" 

Causes

mds@HOST[dev]:~> netstat -nap 2>/dev/null | grep 15235 
tcp        0      0 0.0.0.0:18223           0.0.0.0:*               LISTEN      15235/rvrd 
tcp        0      0 0.0.0.0:62737           0.0.0.0:*               LISTEN      15235/rvrd 
tcp        0      0 0.0.0.0:57401           0.0.0.0:*               LISTEN      15235/rvrd 
tcp        0      0 128.162.27.135:18223    128.30.71.147:61239     ESTABLISHED 15235/rvrd 
udp        0      0 0.0.0.0:54339           0.0.0.0:*                           15235/rvrd 
udp        0      0 0.0.0.0:7543            0.0.0.0:*                           15235/rvrd 
udp        0      0 0.0.0.0:7544            0.0.0.0:*                           15235/rvrd 
udp        0      0 0.0.0.0:7546            0.0.0.0:*                           15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115673 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115672 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115662 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115661 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115655 15235/rvrd 
unix  3      [ ]         STREAM     CONNECTED     2016115654 15235/rvrd 

Note how the service ID maps to the UDP multicast port Tib uses.

Now, back on my desktop in another window:

C:\Users\henphi>tibrvsend -daemon tcp:HOST2:18200 -service 7546 "TESTMSG" "BLAAAHHHH!!!" 
Publishing: subject=TESTMSG "BLAAAHHHH!!!" 

causes

2014-01-14 16:06:52 (2014-01-14 16:06:52.102000000Z): subject=TESTMSG, message={DATA="BLAAAHHHH!!!"} 

to be output to the other. Since we're sending the message to a different box, this demonstrates that the Tib subscriptions are automatically clustering.

Killing our second Tib process:

mds@HOST[dev]:~> netstat -nap 2>/dev/null | grep 7546 
udp        0      0 0.0.0.0:7546            0.0.0.0:*                           15235/rvrd 
mds@HOST[dev]:~> kill 15235 
mds@HOST[dev]:~> netstat -nap 2>/dev/null | grep 7546 
mds@HOST[dev]:~> 

causes the listening Tib to output

2014-01-14 16:12:17 RV: TIB/Rendezvous Error Not Handled by Process: 
{ADV_CLASS="WARN" ADV_SOURCE="SYSTEM" ADV_NAME="RVD.DISCONNECTED"} 

Subsequent tibrvsends are not received and I can't see any UDP ports being opened on the box it's sending to.

Restarting the listener produces:

C:\Users\henphi>tibrvlisten -daemon tcp:HOST:18223 -service 7546 "TESTMSG" 
tibrvlisten: Failed to initialize transport: Not connected to daemon 

If I restart the daemon listening on port 18223, I can resume tibrvsend and tibrvlisten as normal. 

Monday, February 3, 2014

In Place

I came across some code I wrote that was prefixed with the term 'in-place'. I had forgotten what it meant so to refresh my memory: it transforms input using a "constant amount of extra storage space" [1]. Lafore defines it as "meaning beside the initial array, very little extra memory is required." [2]

Sedgewick [3] uses the example of a mergesort where "it would be much more desirable to have an in-place method so that we could sort the first half of the array in place, then sort the second half of the array in place, then do the merge of the two halves by moving the items around within the array, without using a significant amount of extra space."

Cuckoo hashing is also an in-place algorithm.

[1] Wikipedia.
[2] Data Structure and Algorithms in Java, Robert Lafore
[3] Algorithms, Sedgewick.

Sunday, February 2, 2014

TCP Chatter

The netstat command lies. If you run it, you'll see packets larger than your MTU (maximum transmission unit - you can see this if you run the ifconfig command). This is because netstat is reporting what's happening at the OS level and "the operating system is passing packets larger than MTU to the network adapter, and the network adapter driver is breaking them up so that they fit within the MTU." [1]

This is called TCP Large Segment Offload (TSO or LSO) and is an optimization but makes seeing what is really going on harder. You can turn this off using the ethtool command [2].

Capturing the conversations between clients that are uploading 100MB of data to a server running JStringServer (work in progress), focussing on a particular conversation (by looking at the ports chosen in the exchange) and cleaning the data (with sed) such that data looks like:

34.565659  SERVER > CLIENT
34.606133  SERVER > CLIENT
34.606143  SERVER > CLIENT
.
.

(where the number is the time in seconds) for the server-to-client and

34.562933  CLIENT > SERVER
34.600927  CLIENT > SERVER
34.603579  CLIENT > SERVER
.
.

for the client-to-server allows me to generate a histogram of packets per time interval.

Using a little bit of the R-language:

server2Client <- read.table="" span="">"~/Documents/Temp/43645_server2Client.txt", header=FALSE, sep=" ")
client2Server <- read.table="" span="">"~/Documents/Temp/43645_client2Server.txt", header=FALSE, sep=" ")

numBreaks = 20
hist(c(client2Server$V1), breaks= numBreaks, col=rgb(1,0,0,0.5), xlab="time", main="Packets")

hist(c(server2Client$V1), breaks= numBreaks, col=rgb(0,0,1,0.5), xlab="time", main="Packets", add=TRUE)

yields this:

Packet Histogram. Red is client-to-server. Blue is server-to-client.
Red are the packets from the client uploading its data to the server. Blue is the server packets back to the client (note: these are almost entirely ACK packets). (Purple are the two superimposed on each other. Note, there is only one time interval where the number of server-to-client packets exceeds the client-to-server).

The upshot of all this is that TCP is terribly chatty. The server returns nothing but the string "OK" when the exchange is complete. But this doesn't stop it from sending lots of packets to the client ACKnowledging receipt of a chunk of data. That's a lot of redundant bandwidth.

Further Reading

Interesting paper on TCP burstiness here.

[1] Wireshark docs.
[2] Segmentation and Checksum Offloading - Steven Gordon.

Saturday, February 1, 2014

TLABs

I sometimes peruse the hotspoc-gc-dev lists for grins and saw this post on G1 garbage collecting:

"Unlike the other collectors G1 did not use dynamic sizes for the TLABs that the Java threads use for allocation. That means that the TLABs when using G1 were all fixed size and very small. This patch adds dynamic sizing of TLABs to G1 too..."

What are these mysterious TLABs?

"A Thread Local Allocation Buffer (TLAB) is a region of Eden that is used for allocation by a single thread.  It enables a thread to do object allocation using thread local top and limit pointers, which is faster than doing an atomic operation on a top pointer that is shared across threads." [1]

Or, to put it another way:

"In HotSpot, almost all application allocation is performed in the eden space of the young generation, using contiguous (aka "pointer-bumping") allocation. That is, there's a current pointer and an end address, and as long the next allocation fits we return the current pointer value and increment it by the allocation size.

"But on a multiprocessor, this isn't safe: two threads could both read the current alloc pointer value, and both update, and think they were using the same memory for different objects. So some form of synchronization is necessary. We could just take a lock, but we can do better by using an atomic hardware instruction, a compare-and-swap, or CAS (in the SPARC architecture; the same thing is called compare-and-exchange on x86). So in our race scenario, one thread's CAS succeeds and the other fails; the latter retries.

"This still has two problems: atomic hardware operations are expensive on most architectures, and on machines with many processors this could be a source of cache contention, making them even more expensive.

"So we avoid this with TLABs: each thread allocates a medium-sized chunk and saves it, allocating within it with no synchronization required. Only when its TLAB is used does it go back to allocating from the shared space." [2]

Detlef points out: the bigger the TLAB, the less often you must contend for shared memory but also the more young generation you will use [2].

There are JVM flags to manipulate them (-XX:TLABSize, -XX:-ResizeTLAB) and even to print summaries (-XX:+PrintTLAB) [1].

[1] Jon Masamitsu's Weblog.
[2] David Detlef's Weblog.