You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

 

Status

Current state"Under Discussion"

Discussion thread: here

JIRA: here  Unable to render Jira issues macro, execution error.


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is aim to add support for describing consumer groups and list consumer groups to `KafkaAdminClient` class. This functionality is required on the Streams Resetter Tool as describe here:  Unable to render Jira issues macro, execution error.

Public Interfaces

API


Add the following API in AdminClient:

public abstract class AdminClient implements AutoCloseable {
 	
	public abstract DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options);
 
	public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds) {
    	return describeConsumerGroups(groupIds, new DescribeConsumerGroupsOptions());
	}
 
    public abstract ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

    public ListConsumerGroupsResult listConsumerGroups() {
        return listConsumerGroups(new ListConsumerGroupsOptions());
    }
}
 
public class DescribeConsumerGroupOptions extends AbstractOptions<DescribeConsumerGroupOptions> {
}
 
public class DescribeConsumerGroupResult {
    private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures;
    public DescribeConsumerGroupsResult(Map<String, KafkaFuture<ConsumerGroupDescription>> futures) {
        this.futures = futures;
    }

    public Map<String, KafkaFuture<ConsumerGroupDescription>> values() {
        return futures;
    }

    public KafkaFuture<Map<String, ConsumerGroupDescription>> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
            thenApply(new KafkaFuture.Function<Void, Map<String, ConsumerGroupDescription>>() {
                @Override
                public Map<String, ConsumerGroupDescription> apply(Void v) {
                    Map<String, ConsumerGroupDescription> descriptions = new HashMap<>(futures.size());
                    for (Map.Entry<String, KafkaFuture<ConsumerGroupDescription>> entry : futures.entrySet()) {
                        try {
                            descriptions.put(entry.getKey(), entry.getValue().get());
                        } catch (InterruptedException | ExecutionException e) {
                            // This should be unreachable, because allOf ensured that all the futures
                            // completed successfully.
                            throw new RuntimeException(e);
                        }
                    }
                    return descriptions;
                }
            });
    }
}
 
public class ConsumerGroupDescription {
    private final String groupId;
    private final List<ConsumerDescription> consumers;
}
 
public class ConsumerDescription {
    private final String consumerId;
    private final String clientId;
    private final String host;
    private final List<TopicPartition> assignment;
}
 
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
}

public class ListConsumerGroupsResult {
    final KafkaFuture<Map<String, ConsumerGroupListing>> future;

    ListConsumerGroupsResult(KafkaFuture<Map<String, ConsumerGroupListing>> future) {
        this.future = future;
    }

    /**
    * Return a future which yields a map of topic names to TopicListing objects.
    */
    public KafkaFuture<Map<String, ConsumerGroupListing>> namesToListings() {
        return future;
    }

    /**
     * Return a future which yields a collection of TopicListing objects.
     */
    public KafkaFuture<Collection<ConsumerGroupListing>> listings() {
        return future.thenApply(new KafkaFuture.Function<Map<String, ConsumerGroupListing>, Collection<ConsumerGroupListing>>() {
            @Override
            public Collection<ConsumerGroupListing> apply(Map<String, ConsumerGroupListing> namesToDescriptions) {
                return namesToDescriptions.values();
            }
        });
    }

    /**
    * Return a future which yields a collection of topic names.
    */
    public KafkaFuture<Set<String>> names() {
        return future.thenApply(new KafkaFuture.Function<Map<String, ConsumerGroupListing>, Set<String>>() {
            @Override
            public Set<String> apply(Map<String, ConsumerGroupListing> namesToListings) {
                return namesToListings.keySet();
            }
        });
    }
}
 
public class ConsumerGroupListing {
    private final String name;
}
 

This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

Proposed Changes

  1. Add `#describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options)` and `#describeConsumerGroups(Collection<String> groupIds)` to `AdminClient` API 
  2. Add `#listConsumerGroups(ListConsumerGroupsOptions options)` and `#listConsumerGroups()` to `AdminClient` API and 
  3. Implement it on `KafkaAdminClient`.
  4. Solve  Unable to render Jira issues macro, execution error.

Compatibility, Deprecation, and Migration Plan

  • There won't be any impact on existing users.
  • There won't be any change of current behavior.
  • No migration tool required.

Rejected Alternatives

We are moving one more functionality from `core`'s `AdminClient` API to `clients`'s `AdminClient` API. This is aim to remove dependencies to `core` module.

Future Work

  • Streams Resetter Tool will be remove it's dependency to `core` module for `KafkaAdminClient`.

 

  • No labels