Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

With the three combined features of Data Partitioning, Data Affinity and Data Aware Routing, Geode can receive a request on any node and route that request to a specific peer server that hosts the data required to execute that function. With Data Affinity (aka data colocation), all of the data needed to execute that function will be hosted by that same node, even when the data is spread over multiple data regions. Because all of the data is hosted locally, there is no need to coordinate a transaction with any other node. If pessimistic locking is required, it is reduced to a thread level semaphore in the local JVM which is much faster than coordinating with a transaction service. With distributed transactions eliminated and processing spread over a series of nodes (that can be increased dynamically) the architecture is well on the way to linear scalability.

Data Affinity in a single partitioned region (custom partitioning using PartitionResolver)

When storing entries in partitioned data regions, Geode uses a 'hashing' policy by default - the entry key is hashed to compute an integer which should be evenly distributed across a large range of values. The integer is used to identify a 'bucket' to which the entry will be assigned. The bucket is in turn mapped to a physical cache member node where the primary copy of the data will be managed. Each data node hosting a partitioned region will normally manage multiple buckets. When capacity is increased (more members are added) it is the buckets that are distributed across the members. Through the Geode APIs and functions, applications are totally abstracted away from the physical location of the entry and (for optimal scalability) should not attempt to control where the data will be managed.

...

If you want to colocate all Trades by symbol, here is an example of how to do it in Geode.

Data Affinity across multiple partitioned regions

To collocate entries across multiple data regions, the application has to do two things:

...

[Here](Colocating-related-entries-across-multiple-partitioned-regions"> is an example which demonstrates how you can achieve colocation for the above scenario.

Example use cases in finance where colocation is useful

Bi-temporal data management for Financial Risk analytics

In many applications, particularly in financial applications, data has a temporal nature - it is valid 'at' and possibly 'for' a specific period of time. Bi-temporal modeling includes valid time ranges for every entry in the cache. This causes significant growth in the quantity of data. Every update or delete operation is recorded as a new entry in the cache. Any data change operation becomes a new cache entry and uses a timestamp along with the business (i.e entry) key to uniquely identify the object. Applications typically want to access the value of a financial instrument or product at a particular point in time ('asOf' some timestamp). Such a time based request requires Geode to execute a query (relational operators). The 'best practices' approach is to colocate all temporal data corresponding to any given business key (i.e. for a specific financial instrument). The temporal query can then be focused on a target subset of entries that can potentially satisfy the query. Growth of the system, in terms of handling additional financial instruments, is best achieved by establishing additional partitions in order to spread the data and processing across more machines.

Pricing engine for Financial Derivative product pricing

A given financial security may have hundreds or thousands of 'derivative' produts. Any time the security price changes, the new price of each derivative product is recalculated. These calculation can be complex and computationally expensive and hence it makes a lot of sense to distribute all the securities and the workload across many nodes. Securities and their derivatives need to be colocated. Each calculation may also depend on other reference data which typically will be colocated and replicated on each node.

Colocating related entries in a single partitioned region

Say, for example, you want to colocate all Trades by Symbol.The key is implemented by TradeKey class which also implements the PartitionResolver interface

...

Code Block
languagexml
 <cache>
  <region name="myPrDataRegion">
    <region-attributes>
      <partition-attributes>
         <partition-resolver>
              <class-name>com.gemstone.gemfire.cache.MyPartitionResolver</class-name>
          </partition-resolver>
       <partition-attributes>
     </region-attributes>
  </region>
<cache>
 
Code Block
languagejava
 //Create a new PartitionResolver 
PartitionResolver resolver = new MyPartitionResolver();

//Set the PartitionResolver to partition attributes
PartitionAttributes attrs = new PartitionAttributesFactory().setPartitionResolver(resolver).create();
//Create a partition data region
Region region = new RegionFactory().setPartitionAttributes(attrs).create("myPrDataRegion");
Colocating related entries across multiple partitioned regions 

So, for instance, in a Orders partitioned region, all order entries that return the same CustomerID will be guaranteed to reside on the same node.

...

+The region name passed in setCollocatedWith() method should be previously created, otherwise IllegalStateException is thrown. +Collocated entities should have custom partitioning enabled, otherwise IllegalStateException is thrown. +Collocated Partitioned regions should have same PartitionResolver (must return the same routing object) +Collocated Partitioned Regions should have same partition attributes (such as, totalNoOfBuckets, redundantCopies)

Data aware behavior routing using Geode Function Service

Geode's function execution service enables both cache clients and peer nodes to execute arbitrary, application functions on the data fabric. Then the data is partitioned across a number of members for scalability, Geode can route the function transparently to the node that carries the data subset required by the function and avoid moving the taget data around on the network. This is called 'data aware' function routing. Applications employing data aware routing do not need to have any knowledge of where the data is managed.

...

  1. Application wants to execute a server side transaction or carry out data updates using the Geode distributed lock service.
  2. Application wants to initialize some of its components once on each server which might be used later by executed functions
  3. Initialization and startup of a 3rd party service such a messaging service
  4. Any arbitrary aggregation operation that requires iteration over local data sets done more efficiently through a single call to the cache server
Registering Functions to FunctionService

Applications can declare and register the functions using declarative means (cache.xml) or through the Geode API. All registered functions have an identifier. Identifying functions allows the administrator to monitor function activity and cancel them on demand.

...

+Functions that need to be executed across remote members should be registered in each member before invoking. +Applications may create inline functions which need not be registered. +Id (returned from Function.getFunctionId()) can be any arbitrary string. +Modifying function instance after registration has no effect on function execution.

Example 1 : Data aware routing and colocated transactions

Suppose Customers, Orders and Shipments are colocated as described in last example , here. And a user wants following behavior:

...

Using FunctionService, this can be achieved as demonstrated [here]

Example 2 : Data independent parallel execution on all data nodes.

Suppose a user wants to do an aggregation operation across the partitioned region on all nodes. Specifically, user is interested in avg sales from orders region.

Using FunctionService, this can be achieved as demonstrated [here].

Example 3 : fire-n-forget function execution.

Suppose a user wants to execute a function which doesn't return any result

What is available from Geode to application function ?

An instance of FunctionContext is made available to the function when and where it executes. It is required by Function#execute(FunctionContext) to execute a Function on a particular member. An user can retrieve following information from FunctionContext

...

APIDescription
getLocalData(Region r)Given a partitioned Region return a Region providing read access limited to the local heap, writes using this Region have no constraints and behave the same as a partitioned Region.
getColocatedRegions (Region r)Given a partitioned Region, return a map of colocated Regions.
getLocalColocatedRegions(RegionFunctionContext context)Given a RegionFunctionContext for a partitioned Region return a map of colocated Regions with read access limited to the context of the function.
How does function execution work ?

When an user invokes a function, depending on filter passed, target nodes for this function execution are identified. If possible the nodes are pruned to minimum set of nodes where all the data is present. function execution message is sent asynchronously to all the target nodes, using a configurable thread pool. Each target node then sends the function execution results to the caller.The caller waits for the result using ResultCollector.getResult().

Default implementation of ResultCollector called DefaultResultCollector waits for each node to respond with a result and returns the unordered set of results to the caller. These results from the target nodes are added using ResultCollector#add(Serializable oneResult)API. Using this API an user can customize aggregation of results.

What if my function execution result is large ?

An user can optionally use the ResultSender to chunk the results, and send back to the caller. The ResultSender class provides methods to send results back to the ResultCollector. Instead of getting the result of a function execution when the execution is complete, ResultSender provides mechanism to send individual results back back to the caller prior to completion. It may also break a result into multiple chunks and send each result back to the caller. To signal the calling thread to stop waiting for the result, the function should use the lastResult using the ResultSender.

How does ResultSender play with the ResultCollector?

Each time a function sends a result using ResultSender it gets added to the ResultCollector at the caller node. So, the partial sent results are available to the application program instantaneously. This facilitates the developer to work on partial results and can decide on application logic without waiting for all the results.

What happens when one of the function execution target nodes goes down ?

FunctionException is thrown with cause as FunctionInvocationTargetException,this usually indicates that the node that was executing the function failed mid-process. Applications can catch the FunctionInvocationTargetException and choose to re-execute the function. It is the function implementation's responsibility to provide any desired idempotent behavior.

For instance, any generated state as the function is being executed should be stored in Geode with redundancy. So, when the function fails, the client can re-execute and with a flag that indicates that the function execution is a possible duplicate. The function implementation could check this flag, use the partial state stored in Geode to complete the remainder of the function.

Some useful FunctionService statistics

Geode captures several statistics on each member to allow monitoring application behavior on data nodes.

...