Fineract Events

Apache Fineract Events are triggered on every action which implements the Events Producer, also Apache Fineract can implement Listeners for consuming the messages that arrive to the topics, i.e. the Community Application consumes the messages for presenting the events to the backoffice user, also the message has persistence in the Database.

Apache Fineract Events implements the CQRS pattern for running the read and write Events operations.

The following Events are produced in Kafka in the current implementation:

  • ACTIVATE_CLIENT New client created
  • APPROVE_LOAN New loan created
  • DISBURSE_LOAN New loan approved
  • READ_LOAN Loan closed
  • READ_Rescheduled Loan has been rescheduled
  • READ_LOAN Repayment made

Apache Kafka

Apache Kafka is a messaging service which can be implemented with the following approach:

  • Synchronous
  • Asynchronous

In Apache Fineract the asynchronous messaging has been implemented for producing the messages in order to avoid any kind of blocking while running under huge workloads.

Persisting the Kafka Messages

Apache Fineract Events are stored in the DB for persisting the events. 

The tables are used by the UI (Community Application) to list and retrieve the most recent actions executed by the Apache Fineract.

Architecture - High Level - Online Transactions

Architecture - High Level - Batch Transactions

Events - Transactions - Producer

For enabling the Transactions while producing Kafka Events it is required to implement transaction-aware Producer:


Producer properties:

enable.idempotence: true

transactional.id: prod-1


Because we've enabled idempotence, Kafka will use this transaction id as part of its algorithm to deduplicate any message this producer sends, ensuring idempotency.

Simply put, if the producer accidentally sends the same message to Kafka more than once, these settings enable it to notice.

Events - Transactions - Consumer

For enabling the Transactions while consuming Kafka Events it is required to implement  transaction-aware Consumer:


Consumer properties:

enable.auto.commit: false

isolation.level: read_committed


Using a value of read_committed ensures that we don't read any transactional messages before the transaction completes.

Events - Transactions - Producer/Consumer

The proposed properties allows to have the producer and consumer both configured to write and read transactionally:

The Transactional Java Implementation will cover the:

  • Sender/Consumer API
  • Committing Offsets
  • Committing or Aborting the Transaction

Events - Error Handling - Producer

If anything goes wrong while we are processing we catch the exception and can call abortTransaction:


try {

    producer.commitTransaction();

} catch ( Exception e )

{

    producer.abortTransaction();

}


And drop any buffered messages and remove the transaction from the broker.

If Fineract neither commit nor abort before the broker-configured max.transaction.timeout.ms, the Kafka broker will abort the transaction itself.

Events - Error Handling

The implementation of the Error Handling can catch the error or rollback and sending the messages to a Dead Letter Queue.


@Bean

    public ErrorHandler errorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler) {

        //set with retry policy higher than KafkaListenerErrorHandler

        return new SeekToCurrentErrorHandler((data, thrownException) -> {

                deadLetterQueueHandler.send(data, thrownException);

        }, new FixedBackOff(15000, 20));

    }

Events - Rollback Processor


@Bean

    public AfterRollbackProcessor<?, ?> afterRollbackProcessor(MyDeadLetterQueueHandler deadLetterQueueHandler) {

        //set with retry policy higher than KafkaListenerErrorHandler

        final var afterRollbackProcessor = new DefaultAfterRollbackProcessor<Object, Object>(((data, thrownException) -> {

                deadLetterQueueHandler.send(data, thrownException);

        }, new FixedBackOff(15000, 20));

        afterRollbackProcessor.setCommitRecovered(true);

        return afterRollbackProcessor;

    }

Events - Error Handling


@Primary

    KafkaListenerErrorHandler kafkaListenerErrorHandler(MyDeadLetterQueueHandler deadLetterQueueHandler,

                                                        MyExceptionHandler exceptionHandler) {

        return (message, exception) -> {

            final var cause = (Exception) exception.getCause();

            final var consumerRecord = message.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);

            if (shouldGoToDLT(cause)) {

                sendToDeadLetterTopic(deadLetterQueueHandler, consumerRecord, cause);

                return new CustomResponse(cause.getMessage());

                // should end transaction rollback and go to next transaction

          } else{//retry logic}

References

Apache Fineract GitHub Repository: https://github.com/apache/fineract  

Apache Fineract Release License: https://github.com/apache/fineract/blob/develop/LICENSE_RELEASE

Apache Fineract Source License: https://github.com/apache/fineract/blob/develop/LICENSE_SOURCE

Apache Kafka: https://kafka.apache.org/