Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Collection<KafkaStreamsInstance> KafkaStreams.allInstancesWIthStore(String /* storeName */) would return only the KafkaStreamsInstances that include the given store name.

  • KafkaStreamsInstance KafkaStreams.instanceWithKey(String storeName, K key, Serializer<K> serializer) would return the KafkaStreamInstance that has the store that might have the key (the key might not exist in any store). The default StreamPartitioner will be used for key partitioning.

  • KafkaStreamsInstance KafkaStreams.instanceWithKey(String storeName, K key, StreamPartitioner<K, ?> partitioner) same as above but will use the provided StreamPartitioner

    An additional configuration parameter, StreamsConfig.USER_ENDPOINT_CONFIG, will be added. This is a host:port pair supplied by the streams developer and should map to a Server running in the same instance of the KafkaStreams application. The supplied host:port pair will form part of the KafkaStreamsInstance returned from the the above mentioned API calls.

The individual KafkaStreams instances will know about all mappings between state store names, keys and KafkaStream instances. We propose that they keep track of this information by piggybacking on the consumer group membership protocol. In particular, we propose to add the above mapping Map<HostInfo, Set<TopicPartition>> to StreamPartitionAssignor so that each consumer knows about all the other tasks metadata and host states in the system.

...

Code Block
public class HostInfo {
   String hostname;    /* hostname for instance that contains state store */
   int port;           /* listening port for instance that contains state store */
}
public class KafkaStreamsInstance {
    private final HostInfo hostInfo; /* hostInfo from above */
    private final Set<String> stateStores; /* state stores on this instance */
    private final Set<TopicPartition> topicPartitions; /* TopicPartitions on this instance */
}

/**
 * Find all of the instances of {@link KafkaStreamsInstance} in a {@link KafkaStreams application}
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @return collection containing all instances of {@link KafkaStreamsInstance} in this application
 */
public Collection<KafkaStreamsInstance> allInstances();

/**
 * Find the instances {@link KafkaStreamsInstance} for a given storeName
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName the storeName to find metadata for
 * @return  A collection containing instances of {@link KafkaStreamsInstance} that have the provided storeName
 */
public Collection<KafkaStreamsInstance> allInstancesWithStore(final String storeName);

/**
 * Find the {@link KafkaStreamsInstance} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param keySerializer     Serializer for the key
 * @param <K>               key type
 * @return  The {@link KafkaStreamsInstance} for the storeName and key
 */
public <K> KafkaStreamsInstance instanceWithKey(final String storeName, 
												   final K key, 
                                                   final Serializer<K> keySerializer);

/**
 * Find the {@link KafkaStreamsInstance} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param partitioner       Partitioner for the store
 * @param <K>               key type
 * @return  The {@link KafkaStreamsInstance} for the storeName and key
 */
public <K> KafkaStreamsInstance instanceWithKey(final String storeName,
                                                   final K key,
                                                   final StreamPartitioner<K, ?> partitioner);
Below is an example of how a developer might use the Discover API
Code Block
public static void main(String[] args) throws Exception {
  Properties streamsConfiguration = new Properties();
  ...
  /**
   * To use the Discovery API the developer must provide an host:port pair that
   * maps to an embedded service listening on this address. i.e.,
   * it is up to the developer to define the protocol and create the service
   */
  streamsConfiguration.put(StreamsConfig.USER_ENDPOINT_CONFIG, "localhost:7070");
  ...
  KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
  streams.start();

  /** 
   * Start your embedded service listening on the endpoint
   * provided above
   */
  new QueryableStateProxy(streams).start(7070);
}
 
/**
 * Example Rest Proxy that uses the Discovery API to locate the
 * KafkaStreamsInstances StateStores are running on. A developer would first connect to a well-known instance
 * to find where a particular store or store with key is located. They'd then use the returned KafkaStreamsInstances
 * to connect to the appropriate instances and perform queries, i.e, :
 * KafkaStreamsInstance instance = http.get("http://well-known-host:8080/state/instance/my-store/my-key");
 * Long result = http.get("http://" + instance.host() + ":" + instance.port() + "/state/stores/my-store/my-key");
 */
@Path("state")
public class QueryableStateProxy {
   /**
    * The KafkaStreams instance knows about all of the other instances 
    * in the application. This is maintained by the StreamPartitionAssignor
    * on partition assignments (rebalances) 
 */ 
   private final KafkaStreams streams;
   public QueryableStateProxy(final KafkaStreams streams) {
      this.streams = streams;
   }
   
   @GET()
   @Path("/instances")
   public Response streamsInstances() {
	 // get the current collection of KafkaStreamsInstances
     final Collection<KafkaStreamsInstance> instances = streams.allInstances();
     return respondWithInstances(streams.allInstances());
   }
 
   @GET()
   @Path("/instances/{storeName}")
   public Response streamsInstancesForStore(@PathParam("storeName") String store) {
      // find all the instances that have the provided store
      final Collection<KafkaStreamsInstance> instances = streams.allInstancesWithStore(store);
      return respondWithInstances(instances);
   }
 
   @GET()
   @Path("/instance/{storeName}/{key}")
   public Response streamsInstancesForStoreAndKey(@PathParam("storeName") String store,
                                                  @PathParam("key") String key) {
 
	  // locate the instance that would have the store with the provided key (if the key exists)
      final KafkaStreamsInstance instance = streams.instanceWithKey(store, key, new StringSerializer());
      if (instance == null) {
        return Response.noContent().build();
      }
      return Response.ok(instance.toString()).build();
   }
 
   @GET()
   @Path("/stores/{storeName}/{key}")
   public Response byKey(@PathParam("storeName") String storeName, @PathParam("id") String key) {
	  // Get a handle on the Store for the provides storeName
      final ReadOnlyKeyValueStore<String, Long> store = streams.getStore(storeName,
                                                                         QueryableStoreTypes.keyValueStore());
      // store may not exist or might not exist yet, i.e, if partitions haven't been assigned or 
      // a rebalance is in process
      if (store == null) {
        return Response.noContent().build();
      }

	  // we have a store so we can get the result
      final Long result = store.get(key);
      if (result == null) {
        return Response.noContent().build();
      }
      return Response.ok(result).build();
    }
}
 

 

Proposed implementation outline

...