Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted (2.2.0)

Discussion thread: Link here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-7402 [Change the link from KAFKA-1 to your own ticket]

Motivation

Quoting John John Roesler:

"Various components in Streams have close methods but do not implement AutoCloseable. This means that they can't be used in try-with-resources blocks.

...

KafkaStreams itself is a notable example of this, but we can take the opportunity to look for other components that make sense as AutoCloseable as well.".

So these classes can be used in a try-with-resources Statement after implementing AutoCloseable.


From AutoCloseable.java's Javadoc:

...

An

...

object

...

that

...

may

...

hold

...

resources

...

(such

...

as

...

file

...

or

...

socket

...

handles)

...

until

...

it

...

is

...

closed.

...

The

...

{@link

...

#close()}

...

method

...

of

...

an

...

{@code

...

AutoCloseable}

...

object

...

is

...

called

...

automatically

...

when

...

exiting

...

a

...

{@code

...

try}-with-resources

...

block

...

for

...

which

...

the

...

object

...

has

...

been

...

declared

...

in

...

the

...

resource

...

specification

...

header.

...

This

...

construction

...

ensures

...

prompt

...

release,

...

avoiding

...

resource

...

exhaustion

...

exceptions

...

and

...

errors

...

that

...

may

...

otherwise

...

occur.

Public Interfaces

By going over the project, here is a list that I found which can implement AutoCloseable. Suggestions are welcome.

  1. org.apache.kafka.streams.KafkaStreams
  2. org.apache.kafka.streams.processor.internals.RecordCollectorImplRecordCollector
  3. org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup
  4. org.apache.kafka.connect.transforms.TimestampRouter
  5. org.apache.kafka.tools.VerifiableProducer

Proposed Changes

...

  1. org.apache.kafka.connect.runtime.WorkerConnector.ConnectorMetricsGroup
  2. org.apache.kafka.common.

...

  1. record.

...

  1. MemoryRecordsBuilder
  2. org.apache.kafka.common.metrics.MetricsReporter

...

  1. org.apache.kafka.

...

  1. common.security.

...

  1. oauthbearer.internals.expiring.

...

  1. ExpiringCredentialRefreshingLogin
  2. org.apache.kafka.common.network.KafkaChannel
  3. org.apache.kafka.clients.consumer.ConsumerInterceptor
  4. org.apache.kafka.

...

  1. common.

...

  1. network.

...

  1. Selector.

...

  1. SelectorMetrics
  2. org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread

Proposed Changes

Here are an example of how it would look like:

org.apache.kafka.connectstreams.transforms.TimestampRouterKafkaStreams

Code Block
public class TimestampRouter<RKafkaStreams extends ConnectRecord<R>> implements Transformation<R>, AutoCloseable {
...
}

org.apache.kafka.tools.VerifiableProduc

Code Block
	public class VerifiableProducer implements Closeable {void close()
...
}


Compatibility, Deprecation, and Migration Plan

  • This change will not have backward compatibility issues. It is legal to implement this AutoClosable interface without declaring "throws Exception" on close(), so the close() method signature won't be changed.  


Rejected Alternatives

  • We decided not to choose Closeable(). Closeable extends AutoCloseable, and AutoCloseable throws a broader exception than Closeable,  so we think that AutoCloseable is a more generic and a more compatible option.N/A