Sunday, July 15, 2012

ThreadPoolExecutor and RTFM

Take a look at the constructors for ThreadPoolExector. All of them take a maximumPoolSize argument. If you thought this was necessarily the maximum size of the thread pool, you'd be wrong.

To be fair, the documentation does say this might not necessarily be the case. But although we should have RTFM, we were in a hurry.

Basically, the pool size is partly determined by the type of BlockingQueue you use in the constructor. The rule is that if the BlockingQueue in question can take another task, it will rather save it for later rather than start a new thread.

So, if for you pass the constructor a LinkedBlockingQueue that in turn was instantiated without a limit, no new threads are created no matter what the maximum size of the pool. So, a conservative programmer may instantiate the thread pool with an initial size of 1 and a much larger maximum pool size. But if he passes in an unbound queue then it is all to no avail. The executor will not be multi-threaded.

Furthermore, if you pass the constructor an ArrayBlockingQueue (that is by its nature bounded) to a ThreadPoolExecutor with a core pool size of 1 and a maximum pool size much bigger, the executor won't be multi-threaded until the number of tasks to be executed exceeds the size of the ArrayBlockingQueue.

And if you use the a SynchronousQueue (where "each insert operation must wait for a corresponding remove operation by another thread"), don't be surprised when it throws a RejectedExecutionException as soon as there are no other threads to do any work.

How the executor differentiates between the various queue types happens in ThreadPoolExecutor.execute:


    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        for (;;) {
            if (runState != RUNNING) {
                reject(command);
                return;
            }
            if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                return;
            if (workQueue.offer(command))
                return;
            Runnable r = addIfUnderMaximumPoolSize(command);
            if (r == command)
                return;
            if (r == null) {
                reject(command);
                return;
            }
            // else retry
        }
    }


Queue.offer returns true it was possible to add something (the happy path of this method). This is determined by the type of queue and how it was instantiated as mentioned above. Only if the task was not added does the ThreadPoolExecutor code attempt to start another thread.

There were two other things that confused us. The first was that the timeout mentioned in ThreadPoolExecutor's method:

invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)

It refers to the total time it will wait not the the time for each Callable. During this time, it will poll the futures (by calling get())to see if they have anything to return. If it times out while polling, it will call cancel() on all futures.

The final gotcha we experienced was that isDone() on all the futures returned from invokeAll will return true. However, a call to get() will lead to a CancellationException being thrown. Not what we expected.

The long and short of it is that although these Java classes are beautifully crafted, they are not without their somewhat counterintuitive peculiarities.

No comments:

Post a Comment