Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This proposal contains a number of improvements to the function service to improve usability issues.

Table of Contents

 

Improvements

Minor changes

There are some minor improvements that already have JIRAs. We should just apply the changes described in these jiras.

Jira
showSummarytrue
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-625

Jira
showSummarytrue
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-393

Jira
showSummarytrue
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-641

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-336

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-728

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-2217
 

 

RegionFunctionContext should provide the local data set

 

Usability issues

 

Must use ResultCollector returned from FunctionService.execute

Below, we have to use the collector returned by FunctionService.execute, rather than just calling getResult on our own result collector. We run into problems otherwise, maybe because exceptions are not handed to the user supplied result collector?

 

Example:

Code Block
  ResultCollector<TopEntriesCollector, TopEntries> rc = (ResultCollector<TopEntriesCollector, TopEntries>) FunctionService.onRegion(region)
        .withArgs(context)
        .withCollector(collector)
        .execute(LuceneFunction.ID);
    
//This doesn't work
TopEntries entries = collector.getResult()

//This is what you have to do
TopEntries entries = rc.getResult();
  

 

Exceptions are handed to ResultCollector.addResult, causing ClassCastExceptions

On the ResultSender, there is a method to return exceptions using sendException.

However, the ResultCollector only has addResult(). Further, ResultCollector has generic types

Code Block
public interface ResultCollector<T,S> {
  public void addResult(DistributedMember memberID, T resultOfSingleExecution);

Since the type of T is the user result type, trying to call add result will almost invariably result in the ClassCastException, like below

 

No Format
[fatal 2015/12/03 10:58:20.809 PST <Function Execution Processor1> tid=Function Execution Processor1id] Unexpected exception during function execution on local node Partitioned Region
java.lang.ClassCastException: java.lang.Exception cannot be cast to com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntriesCollector
    at com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntriesFunctionCollector.addResult(TopEntriesFunctionCollector.java:1)
    at com.gemstone.gemfire.internal.cache.execute.LocalResultCollectorImpl.addResult(LocalResultCollectorImpl.java:85)
    at com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionResultSender.lastResult(PartitionedRegionFunctionResultSender.java:216)
    at com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionResultSender.lastResult(PartitionedRegionFunctionResultSender.java:174)
    at com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionResultSender.sendException(PartitionedRegionFunctionResultSender.java:309)
    at com.gemstone.gemfire.cache.lucene.internal.distributed.LuceneFunction.execute(LuceneFunction.java:60)
    at com.gemstone.gemfire.internal.cache.execute.AbstractExecution.executeFunctionLocally(AbstractExecution.java:367)
    at com.gemstone.gemfire.internal.cache.

Function invoked with onMember cannot be unit tested, because our API requires using a singleton cache

 

FunctionContext does not contain a cache. That means any function that you execute with onMember must contain code like this:

Code Block
public void execute(FunctionContext context) {
    Cache cache = CacheFactory.getAnyInstance();
//...do work with the cache
}

This makes it impossible to write pure unit tests for the function code that mock the cache.

 

Functions invoked with onRegion always have to cast the FunctionContext to RegionFunctionContext

 

Any function that is executed on a region has to do a cast. This is not an intuitive API; the user has to know that there is such a thing as RegionFunctionContext. This can also lead to ClassCastExceptions if the user tries to cast the FunctionContext and it is not actually a REgionFunctionContext

 

All onRegion function code basically has to start this way:

Code Block
public void execute(FunctionContext context) {
    RegionFunctionContext ctx = (RegionFunctionContext) context;
 

...

If you want to operate on the local data set for a function, you have to do this

...

  • It's not clear that ctx.getDataSet does not return the local data.
  • The local data set is not mockable for unit tests, because this static function call extracts it using internal APIs on concrete classes. This also prevents this sort of function from being tested in a pure unit test.
  • The user has to know that PartitionRegionHelper exists; the API is not obvious
  • The various methods on PartitionRegionHelper are confusing, specifically it's unclear to a user why they should use getLocalDataForContext instead of getLocalData

No way to get a spring data repository of the local data set of a function

This is not specifically an issue in the geode codebase, but it affects a lot of geode users.

Spring Gemfire provides an simplified function API. Spring Data Gemfire allows the user to generate a repository object to access a region. However, there is no way to generate a repository that accesses the local data for the function.

 

Here's an example

Code Block
public interface PostRepository extends GemfireRepository<Post, PostId> {
  @Query("select * from /posts where id.person=$1")
  public Collection<Post> findPosts(String personName);
}  

public class Functions {
  
  //This repository accesses the entire data for the region on all members.
  @Autowired
  PostRepository postRepository;
  
  @GemfireFunction(HA=true)
  public SentimentResult getSentimentDoesntWork(Region<PostId, Post> localPosts, 
      @Filter Set<String> personNames) throws Exception {
    String personName = personNames.iterator().next();
    
    //This works, because localPosts is bound to the local data for the region    
    Collection<Post> posts = localPosts.query("id.person='" + personName + "'");

    //But I really want to use spring data repositories. Unfortunately, 
    //I can't get a repository here
    PostRepository localDataRepository == ??
    Collection<Post> posts  = localDataRepository.findPosts(personName);
    
  }

 

FunctionService.onMembers relies on a singleton DistributedSystem

 

FunctionService.onMembers doesn't take a Cache or DistributedSystem as a parameter. That means we can't get rid of the singleton DistributedSystem, because FunctionService will always look up the singleton.

Having a static FunctionService.onXXX means users can't mock the functionService

 

To invoke a function, the user code needs to refer to the static FunctionService.onXXX methods ( see below). That means that code that contains these static calls can't be tested with a test that mocks the function service.

Code Block
FunctionService.onRegion(region)
        .withArgs(context)
        .withCollector(collector)
        .execute(LuceneFunction.ID);

It might be better to have a cache.getFunctionService

 

...

 

RegionFunctionContext should just provide a getLocalDataSet method. We will add this new method to the RegionFunctionContext interface. We will deprecate the PartitionRegionHelper.getLocalDataForContext.

 

See

Jira
showSummaryfalse
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-726

Functions executed on a region should implement a different interface

Any function that is executed on a region has to do a cast:

Code Block
public void execute(FunctionContext context) {
    RegionFunctionContext ctx = (RegionFunctionContext) context;
 

This is not an intuitive API; the user has to know that there is such a thing as RegionFunctionContext. This can also lead to ClassCastExceptions if the user tries to cast the FunctionContext and it is not actually a RegionFunctionContext

Instead, we will add a new RegionFunction interface that provides a RegionFunctionContext. FunctionService.onRegion will return a RegionExecution interface with an execute method that takes a RegionFunction.

See

Jira
showSummaryfalse
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-394
for more details.

The methods isHA, hasResult, optimizeForWrite should move from the Function interface to the Execution interface

isHA, hasResult, and optimizeForWrite are methods on Function that the user can override. However, that makes it harder to write lambda expressions for functions, because if you want to set one of these parameters you can now longer use a lambda.

Also the function service API provides a way to invoke functions using a string id. For example FunctionService.onServer().execute("MY_FUNCTION_ID"). If we remove these extra methods from the Function interface, this code will no longer need to look up the Function on the client and the Function class will not have to exist on the client any more.

We will add isHA(boolean), hasResult(Boolean) and optimizeForWrite(boolean) to the Execution interface. These methods will be deprecated on the function interface. Unfortunately, if someone invokes a function on a client given the string id, we will still need to look up the function class for now in case they have specified these parameters on their function. But in another release we can remove the deprecated parameters and stop using them.

 

See

Jira
showSummaryfalse
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-727

The Function interface should return a value

The function context has a ResultSender with methods like sendResult, lastResult, sendException, etc. Unfortunately, that makes the common case of a function that returns a single result unnecessarily verbose and error prone. For example, if no result is sent that can cause issues:

Code Block
public void execute(FunctionContext context) {
  if(someTest) {
	  context.getResultSender().lastResult("Done");
  }
  //Whoops I forgot to send a last result
}

Function code must exist on the client

The function service API provides a way to invoke functions by id, for example

Code Block
FunctionService.onServer().execute("MY_FUNCTION_ID")

However, internally, we look up the function object itself from the function service in order to read the isHA, optimizeForWrite, and hasResult properties of the function. That means the code must exist on the client and must be registered with the FunctionService.

This is also an opportunity for the user to see an exception if the class on the client does not match the class on the server for these properties.

Lambdas and functions

Lambdas cannot be used for Geode functions, because there are multiple non abstract methods on function like hasResult, isHA, etc.

It would be nice to able to write short functions as lambda expressions, For example

Code Block
FunctionService.onMembers().execute( () -> System.out.println() );

However, that is not possible currently because Function has several non default methods.

We can also get rid of FunctionAdapter in favor of using default methods on Function.

 

Suggested API changes

Instead of a single Function interface, there should be multiple interfaces for different types of functions. TODO - Do we really need VoidFunction (equivalent to hasResult() == false)

Instead, the function interfaces should return a value. For the simple case where there is only one result, the user can just return the result. For cases with multiple streaming results, the user can look up the result sender. For cases with no results the user can just return null. The return value will be ignored anyway if the user invokes the function with hasResult==false.

We can't change the existing Function interface, but we can add a return value to the new RegionFunction interface we're introducing. We can also introduce a MemberFunction interface that returns a value, to distinguish it from the RegionFunction.

Code Block
interface RegionFunction<T> {
  public T execute(RegionFunctionContext context)
}

interface MemberFunction<T> {
  public T execute(FunctionContext context)
}

Combining these changes with

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-2217
 would result in interfaces with two generic types, one for the function argument type and another for the function return type.

Code Block
interface RegionFunction<S, T> {
  public T execute(RegionFunctionContext<S> context)
}

interface MemberFunction<S, T> {
  public T execute(FunctionContext<S> context)
}

interface FunctionContext<S> {
  public S getArguments();
}



See

Jira
showSummaryfalse
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-729

Add Cache.getFunctionService and deprecate the static methods on FunctionService

This proposal might be a bit more controversial. The issue is that the current FunctionService leads people to write code that is harder to test, because they cannot mock the FunctionService if they are making static calls to FunctionService.onRegion. If FunctionService is something that could be provided by the cache, a user can mock FunctionService and assert that functions are invoked.

The other issue this fixes is that FunctionService has methods like onMembers(String ... groups) that internally look up a static cache. Besides making it hard for users to use Mocks with FunctionService code, this also ties us to having a singleton cache and distributed system. Since we're trying to get rid of singletons we should fix this API to not rely on an underlying singleton.

Stop Requiring the user to use the ResultCollector returned by the execute method.

The javadocs for execute say this

Code Block
/** @return ResultCollector to retrieve the results received. This is different
   *         object than the ResultCollector provided in
   *         {@link Execution#withCollector(ResultCollector)}. User has to use
   *         this reference to retrieve results.
*/

It's confusing to users that geode wraps their result collector in another collector that has different behavior.

We should make sure that users don't have to call getResults on the returned ResultCollector. If we want to make it easier for users to implement a ResultCollector that waits for all of the results to arrive, we should provide helper classes like a BlockingResultCollector that waits for all results to arrive in getResults.

Part of the reason is is required right now is the way that exceptions get handled. If a function throws an exception or even just sends an exception with ResultSender.sendResult, the user's ResultCollector will never see it. The only way to get the exception is to call getResult on the result collector returned by execute. See

Jira
showSummaryfalse
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyGEODE-625
.

 

Summary of changes

New interfaces

Code Block
interface RegionFunction<S, T>
Code Block
interface RegionFunction<T> {
  public T execute(RegionFunctionContextRegionFunctionContext<S> context)
}

interface Function<T>MemberFunction<S, T> {
  public T execute(FunctionContextFunctionContext<S> context)
}

 

RegionFunctionContext should have a method to the local data for the context

Code Block
RegionFunctionContext

interface RegionExecution<S, T, U> extends Execution {
  public Region getLocalData(ResultCollector<T, U> execute(RegionFunction<S,T> function);
}

 

 

 

FunctionContext should have a method to return the cache

Code Block
interface FunctionContext

interface MemberExecution<S, T, U> extends Execution {
  getCache(public ResultCollector<T, U> execute(MemberFunction<S,T> function);
}

 

New Methods

 

isHA, optimizeForWrite, and hasResult should be removed from Function and should be added to Execution instead

 

ResultCollector should have an addException method

Code Block
public interface ResultCollector<T,S> {
  public void addException(DistributedMember member, Throwable t);
...
}

 

Example of the improved API

Code Block
FunctionService Cache.getFunctionService()

Region RegionFunctionContext.getLocalDataSet();

Execution.isHA(boolean value)
Execution.optimizeForWrite(boolean value)
Execution.hasResult(boolean value)

//New instance methods, rather than static
FunctionService {
  public RegionExecution onRegion(Region region);
  public MemberExecution onServer(RegionService regionService);
  public MemberExecution onServers(RegionService regionService);
  public MemberExecution onMember(DistributedMember distributedMember);
  public MemberExecution onMembers(Set<DistributedMember> distributedMembers);
  public MemberExecution onMember(String... groups);
  public void registerFunction(Function function);
  public void unregisterFunction(String functionId);
  public boolean isRegistered(String functionId);
}

 

Deprecated Methods

Code Block
FunctionService {
  public static Execution onRegion(Region region);
  public static Execution onServer(Pool pool);
  public static Execution onServers(Pool pool);
  public static Execution onMember(DistributedMember distributedMember);
  public static Execution onMember(String... groups);
  public static Function getFunction(String functionId);
  public static void registerFunction(Function function);
  public static void unregisterFunction(String functionId);
  public static boolean isRegistered(String functionId);
}

Function {
  public boolean hasResult();
  public boolean optimizeForWrite();
  public boolean isHA();
}

 

 

Examples of API changes

 

Code Block
FunctionService functionService = cache.getFunctionService();

//RegionFunction with a single result
ResultCollector<String, Collection<String>> rc = functionService.onRegion(region)
  .execute(ctx -> ctx.getLocalData().get("key"))
Collection<String> values = rc.getResult()

//RegionFunction without a result, that is optimized for write, but is not HA
functionService.onRegion(region)
  .optimizeForWrite(true)
  .isHA(true)
  .hasResult(false)
  .execute(ctx -> ctx.getLocalData().put("key", "value))

//On member function that gets the cache from the context
functionService.onMembers()
   .hasResult(false)
   .execute(ctx -> ctx.getCache().closegetName());

//Function that streams results back using the result sender
functionService.onRegion(region).execute( ctx -> 
  {
    ResultSender<String> s = ctx.getResultSender();
    for(String value : ctx.getRegion().values()) {
      s.addResult(value)
    }
    return null;
  }