...
- LowMemoryException - when one or more of the receiving site's members is low on memory
- CacheWriterException - when a CacheWriter before* method throws an exception
- PartitionOfflineException - when all the members defining a persistent bucket are offline
- RegionDestroyedException - when the region doesn't exist in the remote site
- Malformed data exception (unable to deserialize)
...
- Deprecate (and later remove) the internal system property REMOVE_FROM_QUEUE_ON_EXCEPTION, but detect if it is set to false and support existing behavior (infinite retries)
- Create a new callback API that will be executed when an exception is returned with the acknowledgement from the receiver
- Provide an example implementation of the callback that saves events with exceptions returned from the receiver in a 'dead-letter' queue on the sender (on disk)
- Add a new property for the gateway sender to specify the path to the custom implementation of the callback.
- If no path is provided, use default, example implementation
- if property is not specified, revert to existing behavior (removing events from the queue when ack is received, ignoring batch exceptions)
- Add 2 new properties for the gateway receiver to control when to send the acknowledgement with the exceptions:
- the number of retries for events failing with an exception
- the wait time between retries
...
- Create gateway receiver including new options for specifying # of retries and wait time between retries
- Deploy jar on gateway sender(s) containing callback implementation
- Create gateway sender with option to add callback
API Change
...
New API + Changes to existing APIs
Java Definition
The new GatewayEventFailureListener interface is defined like:
Code Block |
---|
language | java |
---|
title | GatewayEventFailureListener |
---|
|
public interface GatewayEventFailureListener extends CacheCallback {
/**
* Callback invoked on the GatewaySender when an event fails to be processed by the
* GatewayReceiver
*
* @param event The event that failed
*
* @param exception The exception that occurred
*/
void onFailure(GatewayQueueEvent event, Throwable exception);
} |
Example:
Code Block |
---|
language | java |
---|
title | LoggingGatewayEventFailureListener |
---|
|
public class LoggingGatewayEventFailureListener implements GatewayEventFailureListener, Declarable {
private Cache cache;
public void onFailure(GatewayQueueEvent event, Throwable exception) {
this.cache.getLogger().warning("LoggingGatewayEventFailureListener onFailure: region=" + event.getRegion().getName() + "; operation=" + event.getOperation() + "; key=" + event.getKey() + "; value=" + event.getDeserializedValue() + "; exception=" + exception);
}
public void initialize(Cache cache, Properties properties) {
this.cache = cache;
}
} |
This LoggingGatewayEventFailureListener will log warnings like:
Code Block |
---|
[warning 2018/11/05 17:30:41.613 PST ln-1 <AckReaderThread for : Event Processor for GatewaySender_ny_3> tid=0x75] LoggingGatewayEventFailureListener onFailure: region=data; operation=CREATE; key=8360; value=Trade[id=8360; cusip=PVTL; shares=100; price=18]; exception=org.apache.geode.cache.persistence.PartitionOfflineException: Region /data bucket 73 has persistent data that is no longer online stored at these locations: [...] |
Java Configuration
GatewaySender
The GatewaySenderFactory adds the ability to add a GatewayEventFailureListener:
Code Block |
---|
|
/**
* Sets the provided <code>GatewayEventFailureListener</code> in this GatewaySenderFactory.
*
* @param listener The <code>GatewayEventFailureListener</code>
*/
GatewaySenderFactory setGatewayEventFailureListener(GatewayEventFailureListener listener); |
The GatewaySender adds the ability to get a GatewayEventFailureListener:
Code Block |
---|
|
/**
* Returns this <code>GatewaySender's</code> <code>GatewayEventFailureListener</code>.
*
* @return this <code>GatewaySender's</code> <code>GatewayEventFailureListener</code>
*/
GatewayEventFailureListener getGatewayEventFailureListener(); |
Example:
Code Block |
---|
|
GatewaySender sender = cache.createGatewaySenderFactory()
.setParallel(true)
.setGatewayEventFailureListener(new FileGatewayEventFailureListener(new File(...)))
.create("ln", 2); |
GatewayReceiver
The GatewayReceiverFactory adds the ability to set retry attempts and wait time between retry attempts:
Code Block |
---|
|
/**
* Sets the number of retry attempts to apply failing events from remote GatewaySenders
*
* @param retryAttempts The retry attempts
*/
GatewayReceiverFactory setRetryAttempts(int retryAttempts);
/**
* Sets the wait time between retry attempts to apply failing events from remote GatewaySenders
*
* @param waitTimeBetweenRetryAttempts The wait time in milliseconds
*/
GatewayReceiverFactory setWaitTimeBetweenRetryAttempts(long waitTimeBetweenRetryAttempts); |
The GatewayReceiver adds the ability to get retry attempts and wait time between retry attempts:
Code Block |
---|
|
/**
* Returns the number of times to retry a failing event before throwing an exception.
*
* @return the number of times to retry a failing event before throwing an exception
*/
int getRetryAttempts();
/**
* Returns the amount of time in milliseconds to wait between attempts to apply a failing event.
*
* @return the amount of time in milliseconds to wait between attempts to apply a failing event
*/
long getWaitTimeBetweenRetryAttempts(); |
Example:
Code Block |
---|
|
GatewayReceiver receiver = cache.createGatewayReceiverFactory()
.setRetryAttempts(10)
.setWaitTimeBetweenRetryAttempts(100)
.create(); |
Gfsh Configuration
gateway-sender
The create gateway-sender command defines this new parameter:
Name | Description |
---|
gateway-event-failure-listener | The fully qualified class name of GatewayEventFailureListener to be set in the GatewaySender |
Example:
Code Block |
---|
Cluster-1 gfsh>create gateway-sender --id=ln --parallel=true --remote-distributed-system-id=2 --gateway-event-failure-listener=LoggingGatewayEventFailureListener
Member | Status
------ | ------------------------------------
ny-1 | GatewaySender "ln" created on "ny-1" |
gateway-receiver
The create gateway-receiver command defines these new parameters:
Name | Description |
---|
retry-attempts | The number of retry attempts for failed events processed by the GatewayReceiver |
wait-time-between-retry-attempts | The amount of time to wait between retry attempts for failed events processed by the GatewayReceiver |
Example:
Code Block |
---|
Cluster-2 gfsh>create gateway-receiver --retry-attempts=10 --wait-time-between-retry-attempts=100
Member | Status | Message
------ | ------ | ---------------------------------------------------------------------------
ln-1 | OK | GatewayReceiver created on member "ln-1" and will listen on the port "5296" |
XML Configuration
gateway-sender
The <gateway-sender> element defines the <gateway-event-failure-listener> sub-element. The <gateway-event-failure-listener> sub-element is like any other Declarable.
Example:
Code Block |
---|
<gateway-sender id="...">
<gateway-event-failure-listener>
<class-name>FileGatewayEventFailureListener</class-name>
</gateway-event-failure-listener>
</gateway-sender> |
gateway-receiver
The <gateway-receiver> element defines the retry-attempts and wait-time-between-retry-attempts attributes.
Example:
Code Block |
---|
<gateway-receiver retry-attempts="5" wait-time-between-retry-attempts="100"/> |
Risks and Unknowns
How to handle class not found exception for sender callback
- Default behavior when no callback is provided for sender? - Should be same as current behavior
- Backward compatibility behavior
- old sender connected to new receiver using new options
- new sender with callback implemented connected to old receiver
- Sort out security privileges needed for deploying vs installing with sender vs reading values for failed events written to disk.
...
- Ability to modify batch removal to remove specific events from the batch
- Ability to resend events saved in dead-letter queue
Current Implementation
PlantUML |
---|
entity EventProcessor as A
entity RemoteDispatcher as B
entity ServerConnection as C
entity ReceiverCommand as D
box "Site 1" #LightBlue
participant A
participant B
endbox
box "Site 2" #LightBlue
participant C
participant D
endbox
A -> A: peekBatchFromQueue
A -> B: dispatchBatch
B -> B: getConnection
B -> C: sendBatch
C -> C: readRequest
C -> C: createCommand
C -> D: execute
D -> D: readBatchEvents
loop For Each Batch Event
loop Retry
D -> D: determineOperation (create, update, destroy)
D -> D: executeOperation
alt Successful executeOperation:
D -> D: breakRetry
else Failed executeOperation:
alt Remove from queue on exception:
D -> D: storeException
D -> D: breakRetry
else Keep in queue on exception:
D -> D: sleep N milliseconds
D -> D: continueRetry
end
end
end
end
D -> B: sendAcknowledgement
B -> B: readAcknowledgement
B -> B: logExceptions (if necessary)
A -> A: removeBatchFromQueue |
Proposed Implementation
PlantUML |
---|
entity EventProcessor as A
entity RemoteDispatcher as B
entity ServerConnection as C
entity ReceiverCommand as D
entity FailedEventHandler as E
box "Site 1" #LightBlue
participant A
participant B
participant E
endbox
box "Site 2" #LightBlue
participant C
participant D
endbox
A -> A: peekBatchFromQueue
A -> B: dispatchBatch
B -> B: getConnection
B -> C: sendBatch
C -> C: readRequest
C -> C: createCommand
C -> D: execute
D -> D: readBatchEvents
loop For Each Batch Event
loop Retry numberOfRetries
D -> D: determineOperation (create, update, destroy)
D -> D: executeOperation
alt Successful executeOperation:
D -> D: breakRetry
else Failed executeOperation:
D -> D: storeException
D -> D: sleep waitTimeBetweenRetries milliseconds
D -> D: continueRetry
end
end
end
D -> B: sendAcknowledgement
B -> B: readAcknowledgement
loop For Each Failed Batch Event
B -> E: onException
end
A -> A: removeBatchFromQueue |