You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »


Status

Current stateUnder Discussion

Discussion thread: TBD

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today when a ProduceRequest containing multiple partition data is received, the following validation logic will be executed before appending the data to corresponding partition logs:

  • Messages has valid magic number compared with its outer record batch; if not throw InvalidRecordException.
  • Messages for compacted topics must have keys; if not throw InvalidRecordException.
  • When magic value >= 1, messages must have monotonically increasing (relative) offsets starting from 0; if not throw InvalidRecordException.
  • When magic value >= 1 and validate that timestamp Type is CREATE_TIME; and also message's timestamp is within the range of configured DiffMaxMs. If not throw InvalidTimestampException.
  • When magic value <= 1, validate message CRC (for magic > 1 there's no record-level CRC); if not validate throw InvalidRecordException.
  • When magic value <= 1, check whether the record is compressed, hence is nested record content (for magic > 1 it's always not compressed). If not throw InvalidTimestampException.
  • When magic value >= 2, check that record batch has valid offset range, count, sequence number and are not control records; if failed throw InvalidRecordException.
  • For transactional / idempotent record batch, also validate the following:
    • Configured magic number should >= 2, if not throw UnsupportedForMessageFormatException.
    • Producer epoch should be larger than or equal to book-kept epoch, if not throw ProducerFencedException.
    • If producer epoch is larger than book-kept epoch, check sequence is 0; if not throw OutOfOrderSequenceException or UnknownProducerIdException depending on epoch.
    • If producer epoch is equal to book-kept epoch, check that sequence is continuous; if not throw OutOfOrderSequenceException or UnknownProducerIdException depending on sequence number.

And the above exceptions would cause the whole batch (and therefore the whole partition data) to be rejected with the corresponding error code – note that the only exception is InvalidRecordException, which inherits from CorruptRecordException and hence would result in CORRUPT_MESSAGE (2) which is a retriable error, but many of those InvalidRecordException cases above are actually not-retriable at all.


However, a lot of those errors above are actually triggered by a single record, not at the record-batch level; but nevertheless when the whole batch was rejected, and all record's Future callback will throw the same exception which is very confusing (think: a send() call of record B failed because another record A is corrupted, but the same exception would throw for record B's callback indicating corrupted error). So we'd like to 1) introduce more information in the returned error message of the ProduceResponse to improve such cases; also 2) introduce a separate error code from the retriable CORRUPT_MESSAGE which indicate fatal errors from invalid record.


Public Interfaces

We propose to add the following new fields into the produce response:

Produce Response (Version: 8) => [responses] throttle_time_ms 
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code base_offset log_append_time log_start_offset 
      partition => INT32
      error_code => INT16
      base_offset => INT64
      log_append_time => INT64
      log_start_offset => INT64
      error_records => [INT64]         // new field
      error_message => STRING          // new field
    throttle_time_ms => INT32


Also a new error code:

INVALID_RECORD(83, "This record has failed the validation on broker and hence be rejected.", InvalidRecordException::new);

Proposed Changes

  1. Let InvalidRecordException to not inherit from CorruptedException anymore, instead inherit from FatalException directly.
  2. For the above cases which throws InvalidRecordException that indicates fatal errors (i.e. except the case of CRC checksum failures), return the new error code INVALID_RECORD.
  3. For all error code, encode the new error message with the thrown exception message, and then on the client side use this error_message whenever needed (e.g. in logging) instead of falling back to the default error message from Errors.
    1. More specifically, for errors that are triggered by single record(s) instead of the record batch; encode error message that this record was rejected NOT because of itself, but because of some other records.
    2. For this case, also encode the list of error_records as relative offsets of records that are causing the whole batch to reject.
  4. On the client side, augment the error handling so that:
    1. If the error_records is not empty and the error code is not API exception and is not retriable, still retry by creating a new batch with those error records removed (for idempotent producers, also reset the sequence number as well as offset). In this way, records in the same batch would not be rejected as a whole, but some records may still succeed while those culprits be rejected.
    2. If the error_records is empty, follow the current behavior depending on the the exception's retriability.


Compatibility, Deprecation, and Migration Plan

  • With the new error code, old versioned client would see an UNKNOWN_SERVER_ERROR which translates to an UnknownServerException in the future whereas now it will see a confusing CorruptRecordException.
  • Old versioned broker would not be a problem since client can still handle all the existing error code normally.

Rejected Alternatives

None.

  • No labels