There are a number of thread pool factory methods in CoreLoggingExecutors (each static method returns an ExecutorService.) A number of them construct a new PooledExecutorWithDMStats:

  • newThreadPoolWithSynchronousFeed(int poolSize, String threadName, CommandWrapper commandWrapper)

    used by AcceptorImpl in org.apache.geode.internal.cache.tier.sockets


  • newThreadPoolWithSynchronousFeed(int poolSize, long keepAliveTime, TimeUnit unit, String threadName, CommandWrapper commandWrapper, PoolStatHelper poolStatHelper, ThreadsMonitoring threadsMonitoring)

    used by ExpiryTask in org.apache.geode.internal.cache


  • newThreadPoolWithFixedFeed(int poolSize, long keepAliveTime, TimeUnit unit, int workQueueSize, String threadName, CommandWrapper commandWrapper, PoolStatHelper poolStatHelper, ThreadsMonitoring threadsMonitoring)

    used by GemFireCacheImpl in org.apache.geode.internal.cache

  • newThreadPoolWithUnlimitedFeed(int poolSize, long keepAliveTime, TimeUnit unit, String threadName, ThreadInitializer threadInitializer, CommandWrapper commandWrapper, PoolStatHelper poolStatHelper, ThreadsMonitoring threadsMonitoring)

    used by AcceptorImpl in org.apache.geode.internal.cache.tier.sockets


  • newThreadPool(int poolSize, BlockingQueue<Runnable> workQueue,String threadName, ThreadInitializer threadInitializer, CommandWrapper commandWrapper, PoolStatHelper poolStatHelper, ThreadsMonitoring threadsMonitoring)

    used by ClusterOperationExecutors in
    org.apache.geode.distributed.internal


Each of those constructs a new PooledExecutorWithDMStats. Here's how that thing is structured…

PooledExecutorWithDMStats has (in general) 2 queues and a rejected execution handler:

  • ThreadPoolExecutor.workQueue: the input queue for the ThreadPoolExecutor. Always a SynchronousQueue. But why?
  • PooledExecutorWithDMStats.blockingWorkQueue: a queue with storage, used if client supplied one with storage, otherwise null
  • ThreadPoolExecutor.handler (rejected execution handler): a policy object invoked (with the task) when workQueue is full and the thread pool is empty (no more threads available)

A perusal of the constructor indicates that if caller supplies a synchronous queue (a queue with no storage) then:

  • workQueue: the synchronous queue provided by the caller
  • blockingWorkQueue: is null
  • rejected execution handler: policy is to run task immediately, directly in the thread calling ExecutorService.submit()

…otherwise (the caller supplied a non-synchronous queue, i.e. a queue with storage):

  • workQueue: a new SynchronousQueue
  • blockingWorkQueue: non-synchronous queue provided by caller
  • rejected execution handler: the policy (BufferHandler) is to queue the task in the blockingWorkQueue. The thread created in the PooledExecutorWithDMStats constructor then competes with task-submitting (client) threads to submit work from this queue to the ThreadPoolExecutor

In regards to the "competing": tasks submitted by the thread adding work directly to the ThreadPoolExecutor's workQueue are not susceptible to rejection. So that's a good thing. That thread (the one optionally spun up in the PooledExecutorWithDMStats constructor) blocks while adding an entry directly to the workQueue via put(). Only tasks submitted through the ExecutorService.submit() interface can be rejected.

This is not "fair" scheduling. Tasks that have been submit()ed and rejected and put in the blockingWorkQueue are not, in general, all processed ahead of new tasks arriving via other threads calling ExecutorService.submit(). The latter can cut in line in front of the thread servicing the blockingWorkQueue. This is not a bug per se, but it is surprising. The framework apparently doesn't have any fairness contract for these thread pools.

Why Do We Have This Unusual Arrangement

It's not clear why, even if the user supplies a non-synchronous queue to the constructor, PooledExecutorWithDMStats insists upon fabricating a synchronous queue for use as the ThreadPoolExecutor's workQueue.

Darrel Schneider thinks maybe this code also has something to do with core threads and wanting to get all the threads busy before we start buffering requests up in a queue. As he recalls, we saw that if we used a non-sync-queue to feed the executor, then it would end up doing all the work in a single thread. Here is one discussion that touches on this issue: https://stackoverflow.com/questions/47650247/synchronousqueue-in-threadpoolexecutor

But this code in AdoptOpenJDK 8 ThreadPoolExecutor.execute() looks like it would perform just fine if the workQueue was a non-synchronous queue:


Update 1:

Well this 2007 commit introduced the arrangement in question: 050a62da28157ba4d146d2acb9521a3a14e557ab

That commit mentions bug 37091 which we were able to dig up:


Interim Bug ID:  070706-111344-darrel-zeus
Bug-Number: 37091
Submitted by: darrel
Date: Friday July 6 2007 11:13
Priority: R2
Customer / HR:
Bug Comment:

If BridgeServer.SELECTOR is enabled server connection threads timeout and are constantly recreated if when system is busy

Product Line:    GemFire
Version: 5.0.2
Build:
Bug Description:

If the SELECTOR is enabled and the actual number of clients is less then the configured thread pool size then extra threads are constantly created and get timed out and recreated. The expected behaviour is for the thread pool to grow to the number of clients, in this case, and hold steady at that size without creating extra threads.

Assign to: darrel
Keywords:
Audit/Repair:
Stone Host/OS:
Gem Host/OS:
Dbf Host/OS:
Private cache:
Module:
Script/QC test:
Gemstone login:
Cc List:
Workaround:
Keep the the thread pool size <= to the number of clients.


It's not clear how/why the synchronous work queue prevents threads timing out when there are fewer clients than threads in the pool (the problem described in the ticket.)

But this article: provides insight into the way that the thread pool's "core pool size" relates to queueing behavior: Java ThreadPoolExecutor BlockingQueue Example from HowToDoInJava. From that article:

BlockingQueue works on following rules:

  • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

Since our core pool size is always 0 or 1 (as of the 2007 commit) we'd almost always have the executor preferring queuing over running immediately if we were using a non-synchronous work queue. It's hard to see the relationship between this theory and the subject of bug 37091 above.

It is counterintuitive that the commit in question changed the pool's core size from maxSize to 1 (when some maxSize is specified.) Offhand it seems as if this logic is working against the built-in JDK core size functionality, the very purpose of which is to maintain a minimum number of ready unused threads available when demand drops.


update 2:

As of the 2007 commit there were no calls to ThreadPoolExecutor.allowCoreThreadTimeout(boolean). That method was introduced in Java 6 (1.6) late in 2006. Perhaps Geode at the time of the 2007 commit could not rely on that JDK version. Without that method there was no ability to time out core threads. As a result "large" core pool sizes would result in large numbers of potentially idle threads when demand dropped. Though that issue is not mentioned in ticket 37091, it seems clear that the 2007 commit was addressing that issue (by changing the core pool size maximum to 1 thread.)

Another issue not mentioned in ticket 37091 but which seems, in discussions, to have played a part here, was to try and prefer immediate execution (of a task) to queueing all the way up to maximumPoolSize. But by setting allowCoreThreadTimeout(true), and setting the corePoolSize = maximumPoolSize we could, with a modern JDK have our cake and eat it too, and eliminate the extra queue and thread, and eliminate the unfairness too! With those settings and any caller-supplied  BlockingQueue as the workQueue (including a non-synchronous one) we should be able to avoid queueing and immediately dispatch tasks to threads, right up to the maximumPoolSize (since that's also the corePoolSize.) Then the thread keepAliveTime would kick in and start aging out any unused threads to eventually bring us all the way back down to zero if all demand was removed for long enough.

An experiment with code derived from that Stack Overflow question seems to confirm that Geode could now use corePoolSize = maximumPoolSize and allowCoreThreadTimeout(true) to get immediate dispatch of tasks (without queueing) right up to the maximumPoolSize, and also get pool draining when demand drops.

Here's that experiment:

https://gist.github.com/Bill/3e0587f43171d2d72718a945deb79fb3

It seems that this "unusual arrangement" (forcing all task submission through a synchronous queue, shunting failed tasks to another queue, and an extra thread competing (unfairly) with new task submissions, to drain that queue) isn't needed as of this writing.

Fixing this to eliminate the extra queue and the extra thread will entail running performance tests. Along with this work we should investigate whether the similar unusual arrangement in FunctionExecutionPooledExecutor can also be addressed. The latter has slightly different behavior in the backlog-processing thread: instead of put()ing items into the ThreadPoolExecutors workQueue, it offer()s and if the offer() fails, the thread runs the task directly

  • No labels