Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Messages has valid magic number compared with its outer record batch; if not throw InvalidRecordException.
  • Messages for compacted topics must have keys; if not throw InvalidRecordException.
  • When magic value >= 1, messages must have monotonically increasing (relative) offsets starting from 0; if not throw InvalidRecordException.
  • When magic value >= 1 and validate that timestamp Type is CREATE_TIME; and also message's timestamp is within the range of configured DiffMaxMs. If not throw InvalidTimestampException.
  • When magic value <= 1, validate message CRC (for magic > 1 there's no record-level CRC); if not validate throw InvalidRecordException.
  • When magic value <= 1, check whether the record is compressed, hence is nested record content (for magic > 1 it's always not compressed). If not throw InvalidTimestampException.When magic value >= 2, check that record batch has valid offset range, count, sequence number and are not control records; if failed throw InvalidRecordException.
  • For transactional / idempotent record batch, also validate the following:
    • Configured magic number should >= 2, if not throw UnsupportedForMessageFormatException.
    • Producer epoch should be larger than or equal to book-kept epoch, if not throw ProducerFencedException.
    • If producer epoch is larger than book-kept epoch, check sequence is 0; if not throw OutOfOrderSequenceException or UnknownProducerIdException depending on epoch.
    • If producer epoch is equal to book-kept epoch, check that sequence is continuous; if not throw OutOfOrderSequenceException or UnknownProducerIdException depending on sequence number.
    • NOTE that OutOfOrderSequenceException can only be thrown from the callback, while UnknownProducerIdException can be thrown directly from the caller of send() / commitTxn() etc as well.

...

Code Block
languagejava
INVALID_RECORD(85, "ThisSome record has failed the validation on broker and hence be rejected.", InvalidRecordException::new);

...

  1. Let InvalidRecordException to not inherit from CorruptedException anymore, instead inherit from ApiException directly (which is non-retriable). And also moved it to "org.apache.kafka.common" to become a public class.
  2. On the broker side:
    1. For the above cases which throws InvalidRecordException that indicates fatal errors (i.e. except the case of CRC checksum failures which we would change the error code to CORRUPT_MESSAGE), return the new error code INVALID_RECORD.
    For all
    1. When setting the error code,
    encode the new error message with the thrown exception message, and then on the client side use this error_message whenever needed (e.g. in logging) instead of falling back to the default error message from Errors.
    1. if there are multiple types of errors within a single batch, since for most cases we will throw the exception right away, we would only indicate one error code which would be the first error encountered while validating the batch.
      1. For that error code to set, we will try to
    2. More specifically, for errors that are triggered by single record(s) instead of the record batch; encode error message that this record was rejected NOT because of itself, but because of some other records.
    3. For this case, also
      1. encode the list of error_records as relative offsets of the records that are causing the whole batch
      to reject
      1. to be rejected (again, in most cases this would be empty since we throw immediately after the first error).
      2. For that error code to set, optionally try to encode the customized error message (in most cases it would be empty).
  3. On the client side, augment the error handling so that:
    1. If the error_code's corresponding exception is re-triable, follow the current behavior to retry the whole batch as-is (so far the only case would be CorruptedException);
    2. If the error_code's corresponding exception is not re-triable, check if error_records is empty or not:
      1. If it is empty, reject the whole batch and set the exception for all the records' future (e.g. UnsupportedForMessageFormatException or ProducerFencedException).
      2. If it is not empty, only remove those records in the field, and then retry by creating a new batch with those error records removed (for idempotent producers, also reset the sequence number as well as offset). In this way, records in the same batch would not be rejected as a whole, but some records may still succeed while those culprits be rejected (since this KIP cases would include InvalidRecordException and InvalidTimestampException).

...

Compatibility, Deprecation, and Migration Plan

  • With the new error code, old versioned client would see an UNKNOWN_SERVER_ERROR which translates to an UnknownServerException in the future whereas now it will see a confusing CorruptRecordExceptionFor old client, instead of returning the error code of CorruptRecordException which is concrete but incorrect, brokers would return a different error code for InvalidRequestException, which is more general than InvalidRecordException but is at least not mis-leading.
  • Old versioned broker would not be a problem since client can still handle all the existing error code normally.

...