Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
Produce Response (Version: 8) => [responses] throttle_time_ms 
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code base_offset log_append_time log_start_offset 
      partition => INT32
      error_code => INT16
      base_offset => INT64
      log_append_time => INT64
      log_start_offset => INT64
      error_records => [INT64INT32]         // new field, encodes the relative offset of the records that caused error
      error_message => STRING          // new field, encodes the error message that client can use to log itself
    throttle_time_ms => INT32

...

  1. Let InvalidRecordException to not inherit from CorruptedException anymore, instead inherit from FatalException KafkaException directly.
  2. For the above cases which throws InvalidRecordException that indicates fatal errors (i.e. except the case of CRC checksum failures which we would change the error code to CORRUPT_MESSAGE), return the new error code INVALID_RECORD.
  3. For all error code, encode the new error message with the thrown exception message, and then on the client side use this error_message whenever needed (e.g. in logging) instead of falling back to the default error message from Errors.
    1. More specifically, for errors that are triggered by single record(s) instead of the record batch; encode error message that this record was rejected NOT because of itself, but because of some other records.
    2. For this case, also encode the list of error_records as relative offsets of records that are causing the whole batch to reject.
  4. On the client side, augment the error handling so that:
    1. If the error_code's corresponding exception is re-triable, follow the current behavior to retry the whole batch as-is;
    2. If the error_code's corresponding exception is not re-triable, check if error_records is empty or not:
      1. If it is empty, reject the whole batch and set the
      error code is not API exception and is not retriable, still retry by
      1. exception for all the records' future.
      2. If it is not empty, only remove those records in the field, and then retry by creating a new batch with those error records removed (for idempotent producers, also reset the sequence number as well as offset). In this way, records in the same batch would not be rejected as a whole, but some records may still succeed while those culprits be rejected
      .If the error_records is empty, follow the current behavior depending on the the exception's retriability
      1. .


Compatibility, Deprecation, and Migration Plan

...