KIP-367 Introduce close(Duration) to Producer and AdminClient instead of close(long, TimeUnit)
Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: discussing Accepted
Discussion thread: thread
JIRA: TBD KAFKA-7391
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
KIP-266 have replaced the (long, TimeUnit) by Duration for KafkaConsumer. For consistency, we should do the same APIs migration for KafkaProducer and KafkaAdminClient.
Public Interfaces
KafkaProducer
Code Block | ||||
---|---|---|---|---|
| ||||
+ * + * @deprecated Since 2.1. Use {@link #close(Duration)} or {@link #close()}. */ + @Deprecated @Override public void close(long timeout, TimeUnit timeUnit) { close(timeout, timeUnit, false); } + /** + * This method waits up to <code>timeout</code> for the producer to complete the sending of all incomplete requests. + * <p> + * If the producer is unable to complete all requests before the timeout expires, this method will fail + * any unsent and unacknowledged records immediately. + * <p> + * If invoked from within a {@link Callback} this method will not block and will be equivalent to + * <code>close(Duration.ofMillis(0))</code>. This is done since no further sending will happen while + * blocking the I/O thread of the producer. + * + * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be + * non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete. + * @throws InterruptException If the thread is interrupted while blocked + * @throws IllegalArgumentException If the <code>timeout</code> is negative. + */ + @Override + public void close(Duration timeout) { + close(timeout.toMillis(), TimeUnit.MILLISECONDS, false); + } |
Producer
Code Block | ||||
---|---|---|---|---|
| ||||
+ @Deprecated void close(long timeout, TimeUnit unit); + /** + * @see KafkaProducer#close(Duration) + */ + void close(Duration timeout); |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Override -public void close(Duration timeout) @Deprecated public void close(long duration, TimeUnit unit) { - long waitTimeMs = unit.toMillis(duration); + public void close(Duration timeout) { + long waitTimeMs = timeout.toMillis(); |
Noted: the implementation of close(long, TimeUnit) will be placed at AdminClient.
AdminClient
Code Block | ||||
---|---|---|---|---|
| ||||
+ * @deprecated Since 2.1. Use {@link #close(Duration)} or {@link #close()}. */ - @Deprecated public abstract void close(long duration, TimeUnit unit); + @Deprecated + public void close(long duration, TimeUnit unit) { + close(Duration.ofMillis(unit.toMillis(duration))); + } + + /** + * Close the AdminClient and release all associated resources. + * + * The close operation has a grace period during which current operations will be allowed to + * complete, specified by the given duration and time unit. + * New operations will not be accepted during the grace period. Once the grace period is over, + * all operations that have not yet been completed will be aborted with a TimeoutException. + * + * @param timeout The time to use for the wait time. + */ + public abstract void close(Duration timeout); |
Noted: there is a default implementation of close(long, TimeUnit).
Proposed Changes
New Public methods are proposed. see Public Interfaces
In short, this KIP is going to deprecate close(long, TimeUnit) of KafkaProducer and KafkaAdminClient and add a replacement method - close(Duration)
Compatibility, Deprecation, and Migration Plan
...