...
Introduce two new exception classes - `RecordDeserializationException` which extends `SerializationException` and `InoperativeRecordException` which extends `KafkaException`. These new exceptions will hold two additional parameters - `TopicPartition partition` and `long offset`, allowing the user to catch them and access the metadata about the record which caused the exception.
Code Block |
---|
package org.apache.kafka.common.errors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
/**
* Any exception during deserialization in the consumer
*/
public class InoperativeRecordException extends KafkaException {
private static final long serialVersionUID = 1L;
private TopicPartition partition;
private long offset;
public InoperativeRecordException(TopicPartition partition, long offset, String message, Throwable cause) {
super(message, cause);
this.partition = partition;
this.offset = offset;
}
public TopicPartition getPartition() {
return partition;
}
public long getOffset() {
return offset;
}
} |
Code Block |
---|
package org.apache.kafka.common.errors;
import org.apache.kafka.common.TopicPartition;
/**
* Any exception during deserialization in the consumer
*/
public class RecordDeserializationException extends SerializationException {
private static final long serialVersionUID = 1L;
private TopicPartition partition;
private long offset;
public RecordDeserializationException(TopicPartition partition, long offset, String message, Throwable cause) {
super(message, cause);
this.partition = partition;
this.offset = offset;
}
public TopicPartition getPartition() {
return partition;
}
public long getOffset() {
return offset;
}
/* avoid the expensive and useless stack trace for serialization exceptions */
@Override
public Throwable fillInStackTrace() {
return this;
}
}
|
These exceptions will be raised instead of `SerializationException` and `KafkaException` for invalid deserialization and invalid record handling logic, in the `Fetcher.java` class, essentially being propagated to the Consumer and allowing the user to handle them.
...