To be Reviewed By: September 24th, 2021

Authors: Alberto Gomez (alberto.gomez@est.tech)

Status: Draft | Discussion | Active | Dropped | Superseded

Superseded by: N/A

Related: N/A

Problem

Apache Geode is designed to provide linear-scaling low latency for transactions, reads, writes and query processing of indexed or unindexed data.

Even though the execution of a query in OQL is restricted to a maximum of one thread per cluster member, the number of objects generated to be garbage collected by an OQL query may be very high, which could lead to CPU spikes in all the CPUs of the cluster members and even JVM stops.
As a result, OQL queries executed on a Geode cluster could highly penalize the latency of transactions, reads and writes and make it very unpredictable.

Anti-Goals

This RFC does not intend to resolve the problem of OQL queries generating a lot of objects to be garbage collected.

Solution

A possible way to reduce long GC pauses when running an OQL query could be to slow it down (throttle) by pausing the execution at different points inside it. That way, the garbage collector will not get a too high amount of objects to be garbage collected at a too short period of time (will be able to garbage collect objects at a higher rate than they are created) and thus long GC stops will be avoided.

More specifically, queries could be throttled by adding sleeps at intervals of entries, while they are being processed as they are retrieved via an index or a region iterator.

This RFC proposes that query throttling is controlled by a throttling ratio parameter that specifies the percentage of time that the query must be paused with respect to the total time taken by the query without pauses.
For example, if a query without throttling would take normally 10 seconds and the throttling ratio is set to 1, then the time while the query will be paused would be 10 seconds and the time to get results would be 20 seconds. If the throttling ratio is set to 2, then the total time to get results would be 30 seconds.

The throttling will aim at doing cycles of (execution + pause) of 1 second. In order to achieve it, the amount of time to pause the query at each cycle and the moments at which the execution will be paused would be calculated as follows:

  • Initially, the first pause will be done after 1000 entries have been processed (1000 / number of active buckets in case of a partitioned region).
  • Once the entries have been processed, the execution will be paused by sleeping for execution_time_to_filter_the_ entries * throttling ratio.
  • Additionally, the number of entries to be processed before pausing in the next cycle, so that it is of around 1 second (execution + pause), will be adjusted:
    • If the cycle time was lower than a second, the number of entries to process before pausing in the next cycle must be increased. Otherwise, it should be decreased.
  • After the pause, the processing of entries will continue until the adjusted value is reached. Next, the execution will be paused again and the entries before pausing in the next cycle readjusted. This will be done until there are no more entries to be processed.

Changes and Additions to Public Interfaces

Adding throttling to an OQL query will be done by adding some meta information like "<throttle <throttleValue>>", in a similar way as it is done to get trace information when executing queries. The <throttle ...> part will be optional.

Example

Executing an OQL query with a throttling ratio of 2.5:

"<throttle 2.5f> select * from portfolios where portfolios.type = '3'"

Performance Impact

The performance of queries making use of the throttling mechanism will decrease but it is expected that the latency of reads, writes and transactions will increase (lower and more predictable latency) when they coexist with expensive queries.

Backwards Compatibility and Upgrade Path

No impacts as the throttling feature will be optional.

Prior Art

An alternative proposal to the one described in this RFC could be to lower the priority of the threads executing OQL queries. Some tests have been done in this respect but the observation has been that queries are slowed down only when the CPU available is close to 0. Therefore, this approach is not valid.

FAQ

Answers to questions you’ve commonly been asked after requesting comments for this proposal.

Errata

What are minor adjustments that had to be made to the proposal since it was approved?

  • No labels

13 Comments

  1. Since this RFC relates long query times to GC time, I was wondering if any testing has been done with newer garbage collectors such as ZGC. ZGC looks very  promising since the current version promises pause times of less than 10ms and future releases will have pause times of <1ms!

    1. We have run tests with ZGC and Shenandoah and they offer good GC pauses, but there are some drawbacks with Geode:
      1. Geode is still not fully supporting features like the ResourceManager with ZGC or Shenandoah

      2. ZGC and Shenandoah are CPU and memory hungry specially on a use case like Geod. The main reason is that both GCs are not generational and GC cycles need to mark the whole heap in order to perform collections. In some use cases (Geode with OQLs is a good example), the real memory utilization is really low, because you need a huge amount of free memory to allow OQLs to run and have enough buffer space to allow GC cycles to complete. This is pretty well explained in this presentation: https://assets.ctfassets.net/oxjq45e8ilak/5jaNfSH90hHLpSk2gCtIzT/a7e6cb8f20446023ea5953f859be760d/18_30_Kelvin_Nilsen_JPoint2021__-__________________________.pdf

      3. Probably when ZGC and Shenandoah are generational, the situation will greatly improve. You can find that both GCs are working on this!!!

      Anyhow, even if these garbage collectors are more efficient, there could always be situations when the application is faster allocating memory than the garbage collector freeing it. We have seen this happen when running OQL queries that create objects a higher rate than the GC is able to free memory.

      1. Certifying Geode with Z and Shenandoah in conjunction with profiling and eliminating transient garbage production will go a long way to resolving many of these issues. If we still need some throttling after this then we should consider the alternatives suggested. I fear that simply pausing each query is just a tuning nightmare especially if you can't guarantee which and how many queries are running concurrently. At any point when the saturation level of all the combine queries is 100% then the GC issues will persist. 

  2. This is an interesting feature, if I understand it correctly, we want to be able to continue running queries but throttle them so that they don't take up a large amount of resources at one time.Is there a way to hook into the resource manager, and add a new threshold or two? Let's say a query throttle and query cancel threshold.
    If the query throttle threshold is crossed, we could block query execution until the pressure goes below throttle threshold.

    If the query cancel threshold is crossed, we could then cancel queries and reject them until pressure goes below cancel threshold?

    Another option might be to limit the number of queries that can be executed in total across the system with some semaphore.

    1. The problem I see with using thresholds in the resource manager is how to detect the problem. If what it measures is the lenght of GC pauses then it would be tool late to react by the time the threshold is crossed.

      Keep in mind that the problem is created when a lot of objects are allocated (while the OQL is being executed) but by the time we realize we are in trouble the query may have already finished.

      That's why we are proposing to limit beforehand the speed of the query, after having obtained an idea of how demanding a query is by running tests or by estimating the amount of objects it will create.

      1. Hi Alberto,

        I think if the threshold were configured in a way where you have enough buffer (critical threshold > eviction > query cancel > query threshold) it should trigger well before we run out of heap space.


        I think the current proposed sleep solution may have some problems, although low chance, where if enough queries were to execute, you may have the same burst of objects that occurs prior to them all pausing. This might work better if allocate some shared configurable resource (semaphore/count etc) that gets taken from for each query object iteration/creation and released when queries complete. If a query can't acquired the resource, it would then throttle itself and retry at some throttle time?

        Just trying to put other ideas on the table for discussion.



  3. Nice!  Resource management in databases is an important and well-studied topic.  I think adding a capability like this to Geode would be a good addition.  However I think I would go in a slightly different direction.  In other database systems you have something like the following:

    1. A query planner that can estimate of the cost of a query
    2. A Resource Manager that knows the capacity of the system (cpu, memory, disk) and can arbitrate access to those resources
    3. A query execution system that coordinates with the Resource Manager

    I would like to see abstractions that introduce these ideas into the OQL engine.  Even we start with simplistic implementations it gives us a foundation for building more useful behaviors.

    I would argue the injection of "sleep" statements in the execution path is problematic for a few reasons:

    1. Uncontended queries would be slowed down unnecessarily.
    2. Tuning would be quite challenging as it depends on both load and capacity.  Values would be different for dev vs production.  And results on heterogeneous clusters would be quite variable.
    3. Small changes to the query code path (such as a refactoring) could alter the throttling behavior in unpredictable ways.



  4. Alberto Gomez

    Thanks for bringing this topic. Managing/monitoring resource consumption by a query is a long debated and a hard issue to tackle in Data systems. With the data as a service becoming more and more prominent this issue has become critical in providing accepted SLAs for multi users/tenants of the SaaS system.

    Small request, the RFC topic name needs to be changed to appropriately reflect the problem to solve (smile) rather than throttling query.

    While implementing query feature in Geode; first we try to look at how other databases have tried to address/implemented it; you may have already done that, thought of pointing it for general purpose.

    Looking through how other databases have managing this; mostly I come across resource monitoring articles/solution than any system providing resource allocation at query level; except this:

    https://docs.microsoft.com/en-us/sql/relational-databases/resource-governor/resource-governor?view=sql-server-ver15

    Please take a look at it. 

    As others have chimed in; it will be nice to have a well thought solution for this issue that could be implemented in steps; including resource monitor service, allocation service.

    The Geode query service provides Query Monitor capability; which currently supports:

    • Manage long running queries
    • Low memory situation (hooked into resource manager)

    Adding some hooks to get wait/notify with memory availability (resource manager) may be a way to address this (than using sleep) and expand in the future for other resource constraints.

    As you can see this is a hard issue to tackle; considering query prioritization and other...And many times this issues is specific to certain use-cases/scenarios. Other solutions adopted to tackle these are: 

    • Separate out the resource hungry queries and run them on dedicated systems/clusters (with replicated data)
    • Run it during non critical system time (if they are not critical)

    If you are looking to control a query without any product changes; geode provides query observer hook, which can be used to intercept a query...Currently this is internal, but you could try to see if this will help to manage resource utilization.  


  5. Here's an example of a db engine that provides a three-level approach, see section 6 (page 16):

    https://arxiv.org/pdf/2103.11080.pdf

  6. It's not a perfect solution, but you can actually implement this sort of throttling without product changes using method invocation in the query as well.

    I really wish OQL had a way to invoke static  methods, but you can invoke pretty much anything you want by passing an instance of a helper class as a query parameter.


    Here's an example where you call a method in the WHERE clause that just returns true. It will get called for every entry, so you can count the number of invocations, measure time, do whatever you want in the throttle method.


    cache.newQuery("select count(*) from /Portfolios p where ($1).throttle(2)")
    .execute(new TimeBasedThrottle())


    public static class TimeBasedThrottle implements Serializable {
    int count = 0;

    public boolean throttle(float ratio) {
    count++;
    // Do whatever throttling you want here based on count, elapsed time, etc.
    }
    }
    1. Dan Smith,

      I had not even thought about this approach. It requires little to no core product changes and it might actually "tunable" on a per query basis. NICE!!

      I need to caution that this approach will NOT solve the gc problem... only delay it...  As throttling only reduces the rate at which data now accumulates. It does not affect the size final size of the result. It also does not address the problem of a longer lasting higher memory effect on the running system whilst it slowly fills (over a longer time). Also it does not reduce the effect of the memory when it has to be serialized/sent and finally the size of the garbage left behind once it completes.

      Also, I would hate to be the person who decides on what the throttle ratio should be... One would almost ALWAYS be wrong... As a higher ration could be great at peak times, but the same ratio is bad when the system is under no load. Either way... one has the incorrect ratio, the second you picked the ratio...

      I think memory and resource management problems are hard to solve in a language where you have no control of the memory management.


    2. Thanks for the idea! This is something usable for the use case presented without changes in Geode.

      There are some things to sort out, though. I get a NullPointerException when using indexes in the query. Interestingly, it gets solved if I add a comparison to false after the invocation of the query:

      select count(*) from /Portfolios p where ($1).throttle(2) <> false

      I also needed to pass the region entry to my method because otherwise, when using indexes, I saw that a lot of memory was consumed during the execution of the query that was not without the injected call.

      So, the query using an index ended up as follows:

      select count(*) from /Portfolios p where p.indexed_field > 3 and ($1).throttle(e, 2) <> false

  7. Alberto Gomez,

    I cannot speak to the implementation of the existing query engine or even comment on the different approaches we could investigate on how to solve this problem. What I do know is:

    1. The proposed solution you describing (the throttling) is not one that I want Geode to have by default (or at all if possible)
    2. I think that we are looking to solve the problem in the wrong place
    3. There is possibly a different approach to resolve the problem
    4. There is no right solution


    1.

    To me the core behavior of the system is for it to complete an action as soon as it can, as quickly as it can. Whereas the proposed behavior is that we introduce a synthetic query processor limiter. Which goes against the "as fast as possible" mantra.

    Can I request that whilst you investigate this feature, that the investigation includes more thought into HOW we make this pluggable. So that the core system can still perform each query "as fast as possible", but with a simple configuration change, the system loads the "throttled" behavior. 

    Now, I'm not suggesting including branching logic in the code that follows the pattern of "if (throttling_enabled){} else{}". This would be completely counter productive, but rather something more modular. Like some of the work that had to be done to include Classloader isolation, where there had to be some extension points, some abstractions introduced, that allowed the system to load one implementation over another, without breaking the core system.

    What the modular approach allowed for is: to easily trial/implement different approaches to reducing the impact that large queries have on the core system, by toggling a configuration. It also allows for the extendibility of Geode on case by case basis or specifically make it behave differently and specifically in a way that would solve the particular problem you are trying to resolve without having to be build into the core system.


    2.

    From the little I do understand of the query engine is that, there will be garbage generation regardless of the throttling. So, regardless of region type, there will always be a node the will have to take the query result(s), possibly aggregate all results from all nodes (in the case of partitioned regions), serialize it and send it. Which in its own right is memory and CPU intensive. The generated serialized result will take up memory and depending on JVM settings, either kept in ParNew memory and the GC cycle is "cheap", or be dropped into the old memory and then GC cycle becomes more expensive.

    This is also describing the "good" case. Where these is no overflow/eviction involved. If you take the case where overflow is enabled, a query might actually cause the system to load a whole bunch of "cold" data into memory from disk. Possibly causing "hot" data to be overflowed to disk, to make space for the data loaded. This load/overflow cycle could then be a cause for another GC-cycle. Then you run the query, serialize and send data. (another possible GC-cycle) and then finally all the cold data will be overflowed to disk again, when the more "hot" data has to be loaded to be able to process new incoming data. (another GC-cycle). To make matters worse, all of this is running in real time and concurrently.

    So, there are multiple places where garbage will be generated regardless of the query throttling.


    3.

    I think the problem that is trying to be solved, is not unique. Trying to shield the core system from large GC's caused by possibly large queries. In practice the simplest would be to have two clusters, uni-directional WAN between them. One being the main ingestion cluster, working as fast as it can in consume/process income events and the second being a query cluster. BOTH are tuned differently for the different workloads they would experience.


    4.

    Both Anthony Bakerand Anilkumar Gingade have provided possible solutions to the problem. The main flaw I see in all of them, is that the proposed solutions are both on top of systems that are written in languages where garbage collection is a manual event and memory usage has to be explicitly controlled. JAVA, for all its "good" - "you don't have to care about memory" approach is also the main hinderance here. As whatever we do, we have no control over how much memory is used and when the GC kicks in. i.e yes, we could impose a synthetic limit of memory used by each query or query engine, but how the JVM behaves on when/how GC cycles are activated, we in reality have no control over. The only thing we can do is:

    • reduce when/how we generate garbage
    • tune the JVM to handle short-lived data (more OLTP) or longer/query/analysis