Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Introduce SubscriptionPattern.

...

Code Block
languagejava
linenumberstrue
package org.apache.kafka.clients.consumer;

public interface PartitionAssignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;

        /**
         * The topics' metadata.
         */
        List<TopicMetadata> topics;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

   		/**
		 * The reason reported by the member.
		 */
		byte reason;  

		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
    }

    class TopicMetadata {
      	/**
		 * The topic ID.
		 */
		Uuid topicId;

        /**
		 * The number of partitions.
		 */
		int numPartitions; 
    }

    class Assignment {
        /**
         * The assignment error.
         */
		byte error;

        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;

  		/**
		 * The error reported by the assignor.
		 */
		byte error; 

 		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

	class Metadata {
   		/**
		 * The reason reported by the assignor.
		 */
		byte reason; 

 		/**
		 * The version of the metadata encoded in {{@link Metadata#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata; 
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data that will be sent to the assignor.
     */
    Metadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    void onAssignment(MemberAssignment assignment);
}

Deprecate Consumer#enforceRebalance and Consumer#enforceRebalance(String)

...

New SubscriptionPattern class

We need to differentiate Google RE2/J regular expression from the java.util.regex.Pattern in our public APIs so we propose to introduce the SubscriptionPattern class for this purpose. This class is just a POJO as all the validation is on the server side.

Code Block
languagejava
linenumberstrue
package org.apache.kafka.clients.consumer;

/**
 * Represents a regular expression used to subscribe to topics. The pattern
 * must be a Google RE2/J compatible pattern.
 */
public class SubscriptionPattern {
    final private String pattern;

    public Pattern(final pattern) {
        this.pattern = pattern;
    }

    public String pattern() {
        return this.pattern;
    }
}

New Consumer methods

We introduce two new methods to subscribe with a SubscriptionPattern.

Code Block
languagejava
public interface Consumer<K, V> extends Closeable {
    ...

    /**
     * @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)
     */
    void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback);

    /**
     * @see KafkaConsumer#subscribe(Pattern)
     */
    void subscribe(SubscriptionPattern pattern);
}

Deprecate Consumer methods

The following methods will be deprecated:

  • Consumer#enforeRebalance
  • Consumer#enforeRebalance(String)
  • Consumer#subscribe(Pattern)
  • Consumer#subscribe(Pattern, ConsumerRebalanceListener)

Deprecate ConsumerPartitionAssignor interface.

...

  • None (0)
  • Shutdown (1)
  • AssignmentError (2)
  • InconsistentTopology (3)

Streams API

New Topology methods

All the Topology#addSource methods using java.util.regex.Pattern will get a corresponding overload using SubscriptionPattern.

Deprecated methods

The following methods will be deprecated:

  • Topology#addSource(String, Pattern)
  • Topology#addSource(AutoOffsetReset, String, Pattern)
  • Topology#addSource(TimestampExtractor, String, Pattern)
  • Topology#addSource(AutoOffsetReset, TimestampExtractor, String, Pattern)
  • Topology#addSource(String, Deserializer, Deserializer, Pattern)
  • Topology#addSource(AutoOffsetReset, String, Deserializer, Deserializer, Pattern)
  • Topology#addSource(AutoOffsetReset, String, TimestampExtractor, Deserializer, Deserializer, Pattern)

Streams Configurations

NameTypeDefaultDoc
group.protocolenumgeneric

A flag which indicates if the new protocol should be used or not. It could be: generic or consumer

...

The new rebalance protocol relies on the group coordinator to track the metadata changes so the regular expressions is not used locally anymore but remotely. The group coordinator uses the Google RE2/J engine so the regular expression used with either of the methods must be compatible. The Usage of methods subscribing to topics using java.util.regex.Pattern should be replaced by their homolog using SubscriptionPattern. By default, the java.util.regex.Pattern used to subscribe is toString'ed and passed to the group coordinator so simple regular expressions should keep working without any changes. If it is not compatible, the group coordinator will reject the ConsumerGroupHeartbeat request with an INVALID_REQUEST error. For simple regular expressions, we don't expect any changes to be required. It is recommended to test out the regex with the consumer-groups --verify-regex command line tool or with another group before migrating consumers.

...