Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStoreQueryParams.java
collapsetrue
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.streams;

import java.util.Objects;

/**
 * Represents all the query options that a user can provide to state what kind of stores it is expecting.
* The options would be whether a user would want to enable/disable stale stores
* or whether it knows the list of partitions that it specifically wants to fetch.
* If this information is not provided the default behavior is to fetch the stores for all the
* partitions available on that instance* for that particular store name.
 * It contains a list of partitions, which for a point queries can be populated from the the  KeyQueryMetadata.
 */
public class StoreQueryParams {


    private final List<Integer>Integer partitionspartition;


    private final boolean includeStaleStores;


    public StoreQueryParams(final List<Integer>Integer partitionspartition, final boolean includeStaleStores) {
        this.partitionspartition = partitionspartition;
        this.includeStaleStores = includeStaleStores;
    }


    /**
     * Get the list of partitions to be used to fetch list of Queryable store from QueryableStoreProvider. 
     *
     * @return list of partitions
     */
    public List<Integer>Integer getPartitionsgetPartition() {
        return partitionspartition;
    }

    /**
     * Get the flag includeStaleStores. If true, include standbys and recovering stores along with running stores
     *
     * @return includeStaleStores
     */
    public boolean getIncludeStaleStores() {
        return includeStaleStores;
    }


    @Override
    public boolean equals(final Object obj) {
        if (!(obj instanceof StoreQueryParams)) {
return false;
}
            return false;
        }
        final StoreQueryParams storeQueryParams = (StoreQueryParams) obj;
        return Objects.equals(storeQueryParams.partitionspartition, partitionspartition)
                && Objects.equals(storeQueryParams.includeStaleStores, includeStaleStores);
    }


    @Override
    public String toString() {
        return "StoreQueryParams {" +
"partitions +
                "partition=" + partitionspartition +
                ", includeStaleStores=" + includeStaleStores +
                '}';
    }


    @Override
    public int hashCode() {
        return Objects.hash(partitionspartition, includeStaleStores);
    }
}



Code Block
languagejava
titleKafkaStreams.java
collapsetrue
 public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
 return store(storeName, queryableStoreType, new StoreQueryParams(null, false));
 }   


public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType, final StoreQueryParams storeQueryParams) {
validateIsRunningOrRebalancing();
return queryableStoreProvider.getStore(storeName, queryableStoreType, storeQueryParams);
}


...