Status

Discussion threadhttps://lists.apache.org/thread.html/r67610aa2d4dfdaf3b027b82edd1a3f46771f0d58902a4258d931e5a5%40%3Cdev.flink.apache.org%3E
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.14

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka has introduced the Prefixed ACLs feature, by which producers may only be granted permissions to use "transactional.id"s with certain prefixes on a shared multiple-tenant Kafka cluster. Currently, the FlinkKafkaProducer generates "transactional.id" based on the task name and the operator's uid, which makes it hard and not straightforward to determine the "transactional.id" prefix ACL that should be onboard-ed on a Kafka cluster.

The purpose of this improvement is to let the "transactional.id" prefix that is generated and used by FlinkKafkaProducer be deterministic and customizable.

Public Interfaces

A new method `setTransactionalPrefix(String)` is added to the FlinkKafkaProducer class.

FlinkKafkaProducer.java
public class FlinkKafkaProducer {

    /**
      * Specify the transactional.id prefix to be used by the producers when communicating with Kafka.
      *
      * @param transactionalIdPrefix the transactional.id prefix
      */
    public void setTransactionalIdPrefix(String transactionalIdPrefix);
}


Proposed Changes

FlinkKafkaProducer

The FlinkKafkaProducer uses the TransactionalIdsGenerator class to generate the transactional id during its state initialization. By introducing the new method `setTransactionalPrefix`, the prefix, the first argument of the TransactionalIdsGenerator's constructor, will be,

  • taskName + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID() which keeps the behavior as it if `transactionalPrefix` is absent.
  • the transactionalPrefix value if present
FlinkKafkaProducer.java
transactionalIdsGenerator =
        new TransactionalIdsGenerator(
                transactionalIdPrefix != null
                    ? transactionalIdPrefix
                    : taskName
                        + "-"
                        + ((StreamingRuntimeContext) getRuntimeContext())
                                .getOperatorUniqueID(),
                ...);


Compatibility, Deprecation, and Migration Plan

This improvement is compatible and will not impact existing users, because the transactional id will keep the older behavior if it is absent.

Test Plan

It will be covered by unit and integration tests.

Rejected Alternatives

N/A