Table of Contents |
---|
Please refer to Geode 1.2.0 documentation with final implementation is here.
*Work in Progress*
...
Requirements
Allow user to create Lucene Indexes on data stored in Geode
- Update the indexes asynchronously to avoid impacting write latency
Allow user to perform text (Lucene) search on Geode data using the Lucene index. Results from the text searches may be stale due to asynchronous index updates.
Provide highly available of indexes using Geodes HA capabilities
Provide high throughput indexing and querying by partitioning index data to match partitioning in GeodeGeode's HA capabilities
- Scalability
- Performance comparable to RAMFSDirectory
Building next/better Solr/Elasticsearch.
Enhance Enhancing the current Geode OQL to use Lucene index.
Related Documents
A previous integration of Lucene and GemFire:
Similar efforts done by other data products
Hibernate Search: Hibernate search
Solandra: Solandra embeds Solr in Cassandra.
Terminology
- Documents: In Lucene, a Document is the unit of search and index. An index consists of one or more Documents.
- Fields: A Document consists of one or more Fields. A Field is simply a name-value pair.
- Indexing involves adding Documents to an IndexWriter, and searching involves retrieving Documents from an index via an IndexSearcher.
API
User Input
- A region and list of to-be-indexed fields
- [ Optional ] Specified Analyzer for fields or Standard Analyzer or its implementation to be used with all the fields in a index
- [ Optional ] Field types. A string can be Text or String in lucene. The two have different behavior
- if not specified with fields
Key points
A single index will not support multiple regions (join . Join queries between regions are not supported)
- Heterogeneous objects in single region will be supported
- Only top level fields and of nested objects can be indexed, not nested collections
- The index needs to be created before adding the data region is created (for phase1)
- Pagination of results will be supported
Java API
Now that this feature has been implemented, please refer to the javadocs for details on the Java API.
Examples
Code Block |
---|
// Get LuceneService |
LuceneService luceneService = LuceneServiceProvider.get( |
cache);
|
// Create |
Index |
on |
fields |
with default analyzer |
LuceneQueryFactory
Code Block |
---|
public enum ResultType {
/**
* Query results only contain value, which is the default setting.
* If field projection is specified, use projected fields' values instead of whole domain object
*/
VALUE,
/**
* Query results contain score
*/
SCORE,
/**
* Query results contain key
*/
KEY
};
/**
* Set page size for a query result. The default page size is 0 which means no pagination.
* If specified negative value, throw IllegalArgumentException
* @param pageSize
* @return itself
*/
LuceneQueryFactory setPageSize(int pageSize);
/**
* Set max limit of result for a query
* If specified limit is less or equal to zero, throw IllegalArgumentException
* @param limit
* @return itself
*/
LuceneQueryFactory setResultLimit(int limit);
/**
* set weather to include SCORE, KEY in result
*
* @param resultTypes
* @return itself
*/
LuceneQueryFactory setResultTypes(ResultType... resultTypes);
/**
* Set a list of fields for result projection.
*
* @param fieldNames
* @return itself
*/
LuceneQueryFactory setProjectionFields(String... fieldNames);
/**
* Create wrapper object for lucene's QueryParser object.
* The queryString is using lucene QueryParser's syntax. QueryParser is for easy-to-use
* with human understandable syntax.
*
* @param regionName region name
* @param indexName index name
* @param queryString query string in lucene QueryParser's syntax
* @param analyzer lucene Analyzer to parse the queryString
* @return LuceneQuery object
* @throws ParseException
*/
public LuceneQuery create(String indexName, String regionName, String queryString,
Analyzer analyzer) throws ParseException;
/**
* Create wrapper object for lucene's QueryParser object using default standard analyzer.
* The queryString is using lucene QueryParser's syntax. QueryParser is for easy-to-use
* with human understandable syntax.
*
* @param regionName region name
* @param indexName index name
* @param queryString query string in lucene QueryParser's syntax
* @return LuceneQuery object
* @throws ParseException
*/
public LuceneQuery create(String indexName, String regionName, String queryString)
throws ParseException;
/**
* Create wrapper object for lucene's Query object.
* Advanced lucene users can customized their own Query object and directly use in this API.
*
* @param regionName region name
* @param indexName index name
* @param query lucene Query object
* @return LuceneQuery object
*/
public LuceneQuery create(String indexName, String regionName, Query query); |
LuceneQuery
Code Block |
---|
/**
* Provides wrapper object of Lucene's Query object and execute the search.
* <p>Instances of this interface are created using
* {@link LuceneQueryFactory#create}.
*
*/
public interface LuceneQuery {
/**
* Execute the search and get results.
*/
public LuceneQueryResults<?> search();
/**
* Get page size setting of current query.
*/
public int getPageSize();
/**
* Get limit size setting of current query.
*/
public int getLimit();
/**
* Get result types setting of current query.
*/
public ResultType[] getResultTypes();
/**
* Get projected fields setting of current query.
*/
public String[] getProjectedFieldNames();
} |
LuceneResultStruct
Code Block |
---|
/**
* <p>
* Abstract data structure for one item in query result.
*
* @author Xiaojian Zhou
* @since 8.5
*/
public interface LuceneResultStruct {
/**
* Return the value associated with the given field name
*
* @param fieldName the String name of the field
* @return the value associated with the specified field
* @throws IllegalArgumentException If this struct does not have a field named fieldName
*/
public Object getProjectedField(String fieldName);
/**
* Return key of the entry
*
* @return key
* @throws IllegalArgumentException If this struct does not contain key
*/
public Object getKey();
/**
* Return value of the entry
*
* @return value the whole domain object
* @throws IllegalArgumentException If this struct does not contain value
*/
public Object getValue();
/**
* Return score of the query
*
* @return score
* @throws IllegalArgumentException If this struct does not contain score
*/
public Double getScore();
/**
* Get the types of values ordered list
* Item in the list could be either ResultType, or field name
* @return the array of result types
*/
public Object[] getNames();
/**
* Get the values in same order as result types
* @return the array of values
*/
public Object[] getResultValues();
} |
Examples to use the APIs:
Code Block |
---|
// Get LuceneService
LuceneService luceneService = LuceneService.get(cache);
// Create Index
LuceneIndex index = luceneService.createIndex(indexName, regionName, "field1", "field2", "field3");
// create index on fields with specified analyzer:
LuceneIndex index = luceneService.createIndex(indexName, regionName, analyzerPerField);
// Create Query
LuceneQuery query = luceneService.createLuceneQueryFactory().setLimit(200).setPageSize(20)
.setResultType(SCORE, VALUE, KEY).setFieldProjection("field1", "field2")
.create(indexName, regionName, querystring, analyzer);
// Search using Query
LuceneQueryResults results = query.search();
List values = results.getNextPage(); // return all results in one page
// Pagination
while (results.hasNextPage())
List page = results.getNextPage(); // return result page by page
for (LuceneResultStruct r : page) {
System.out.prinlnt(r.getValue());
}
} |
Gfsh API
Code Block |
---|
// Create Index
gfsh> create lucene-index --name=indexName --region=/drugs --fields=sideEffects,manufacturer
// Destory Index
gfsh> destroy lucene-index --name=indexName --region=/drugs
Execute Lucene query
gfsh> luceneQuery --regionName=/drugs -queryStrings="" --limit=100 page-size=10 |
:
luceneService.createIndex(indexName, regionName, "field1", "field2", "field3");
// create index on fields with specified analyzer:
Map<String, Analyzer> analyzerPerField = new HashMap<String, Analyzer>();
analyzerPerfield.put("field1", new StandardAnalyzer());
analyzerPerfield.put("field2", new KeywardAnalyzer());
luceneService.createIndex(indexName, regionName, analyzerPerField);
Region region = cache.createRegionFactory(RegionShutcut.PARTITION).create(regionName);
// Create Query
LuceneQuery query = luceneService.createLuceneQueryFactory().setLimit(200).setPageSize(20)
.create(indexName, regionName, querystring, "field1" /* default field */);
// Search using Query
PageableLuceneQueryResults<K,Object> results = query.findPages();
// Pagination
while (results.hasNext()) {
results.next().stream().forEach(struct -> {
Object value = struct.getValue();
System.out.println("Key is "+struct.getKey()+", value is "+value);
});
} |
Gfsh API
Code Block |
---|
// List Index
gfsh> list lucene indexes [with-stats]
// Create Index
gfsh> create lucene index --name=indexName --region=/orders --field=customer,tags
// Create Index
gfsh> create lucene index --name=indexName --region=/orders --field=customer,tags --analyzer=org.apache.lucene.analysis.standard.StandardAnalyzer,org.apache.lucene.analysis.bg.BulgarianAnalyzer
Execute Lucene query
gfsh> search lucene --regionName=/orders -queryStrings="John*" --defaultField=field1 --limit=100 |
XML Configuration
Code Block |
---|
<cache
xmlns="http://geode.apache.org/schema/cache"
xmlns:lucene="http://geode.apache.org/schema/lucene"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://geode.apache.org/schema/cache
http://geode.apache.org/schema/cache/cache-1.0.xsd
http://geode.apache.org/schema/lucene
http://geode.apache.org/schema/lucene/lucene-1.0.xsd"
version="1.0">
<region name="region" refid="PARTITION">
<lucene:index name="index">
<lucene:field name="a" analyzer="org.apache.lucene.analysis.core.KeywordAnalyzer"/>
<lucene:field name="b" analyzer="org.apache.lucene.analysis.core.SimpleAnalyzer"/>
<lucene:field name="c" analyzer="org.apache.lucene.analysis.standard.ClassicAnalyzer"/>
</lucene:index>
</region>
</cache>
|
REST API
Spring Data GemFire Support
Implementation Flowchart
PlantUML |
---|
[LuceneIndex] --> [RegionDirectory]
() "User"
node "Colocated PR or Replicated Region" {
() User --> [User Data Region] : Puts
[User Data Region] --> [Async Queue]
[Async Queue] --> [LuceneIndex] : Batch Writes
[RegionDirectory] --> [Lucene Regions]
}
|
Inside LuceneIndex
PlantUML |
---|
node "LuceneIndex" {
[Reflective fields]
[AEQ listener]
[RegionDirectory array (one per bucket)]
[Query objects]
}
|
A closer look at Partitioned region data flow
PlantUML |
---|
() User -down-> [User Data Region] : PUTs
[User Data Region] ..> [Bucket 1]
[Bucket 1] -down-> [Async Queue Bucket 1]
node LuceneIndex {
[Async Queue Bucket 1] -down-> [AEQ listener processes events into index documents]:Batch Write
[AEQ listener processes events into index documents] -down-> [RegionDirectory1]
[RegionDirectory1] -down-> [file region bucket 1]
[file region bucket 1] -down-> [chunk region bucket 1]
}
[User Data Region] ..> [Bucket 2]
[Bucket 2] -down-> [Async Queue Bucket 2]
node LuceneIndex {
[Async Queue Bucket 2] -down-> [AEQ listener processes events into index documents]:Batch Write
[AEQ listener processes events into index documents] -down-> [RegionDirectory2]
[RegionDirectory2] -down-> [file region bucket 2]
[file region bucket 2] -down-> [chunk region bucket 2]
} |
Processing Queries
PlantUML |
---|
() User -down-> [LuceneQuery] : fields, Analyzer, query strings, or Query
[LuceneQuery] -down-> [User Data Region]: call search()
[User Data Region] -down-> [Function Execution]
[Function Execution] -down-> [Bucket 1]
[Bucket 1] -down-> [RegionDirectory for bucket 1]
[RegionDirectory for bucket 1] ..> [Bucket 1] : TopDocs, ScoreDocs
[Bucket 1] ..> [Function Execution] : score, key
[Function Execution] -down-> [Bucket 2]
[Bucket 2] -down-> [RegionDirectory for bucket 2]
[RegionDirectory for bucket 2] ..> [Bucket 2] : TopDocs, ScoreDocs
[Bucket 2] ..> [Function Execution] : score, key
|
Implementation Details
Index Storage
- FileRegion : holds the meta data about indexing files
- ChunkRegion : Holds the actual data chunks for a given index file.
- create document for indexed fields. Indexed field values are obtained from AsyncEvent through reflection (in case of domain object) or by PdxInstance interface (in case pdx or JSON); constructing Lucene document object and adding it to the LuceneIndex associated with that region.
- determine the bucket id of the entry.
- Get the RegionDirectory for that bucket, save the document into RegionDirectory.
Storage with different region types
PersistentRegions
Walkthrough creating index in Geode region
LuceneIndex can be created and destroy. We don't support creating index on a region with data for now.
Handling failures, restarts, and rebalance
The index region and async event queue will be restored with its colocated data region's buckets. So during failover the new primary should be able to read/write index as usual.
Aggregation
In the case of partitioned regions, the query must be sent out to all the primaries. The results will then need to be aggregated back together. Lucene search will use FunctionService to distribute query to primaries.
Input to primaries
- Serialized Query
- CollectorManager to be used for local aggregation
- Result limit
Output from primaries
- Merged collector created from results of search on local bucket indexes.
PlantUML |
---|
participant LuceneQuery
participant FunctionService
participant FunctionCollector
participant CollectorManager
participant M1_LuceneFunction
participant M1_CollectorManager
participant Index_1
participant Index_2
LuceneQuery -> FunctionService: Query
activate FunctionService
FunctionService --> M1_LuceneFunction : LuceneContext
activate M1_LuceneFunction
FunctionService --> M2_LuceneFunction: LuceneContext
activate M2_LuceneFunction
M1_LuceneFunction -> Index_1 : search(Collector_1)
Index_1 -> M1_LuceneFunction : loaded Collector_1
M1_LuceneFunction -> Index_2 : search(Collector_2)
Index_2 -> M1_LuceneFunction : loaded Collector_2
M1_LuceneFunction -> M1_CollectorManager : merge Collectors
activate M1_CollectorManager
M1_CollectorManager -> M1_LuceneFunction : merged Collector
deactivate M1_CollectorManager
activate FunctionCollector
M1_LuceneFunction -> FunctionCollector:Collector_M1
deactivate M1_LuceneFunction
M2_LuceneFunction -> FunctionCollector:Collector_M2
deactivate M2_LuceneFunction
FunctionCollector -> CollectorManager : merge Collectors
activate CollectorManager
CollectorManager -> FunctionCollector : Final Collector
deactivate CollectorManager
FunctionCollector -> FunctionService : Final Collector
deactivate FunctionCollector
FunctionService -> LuceneQuery : QueryResults
deactivate FunctionService |
We are still investigating options for how to aggregate the data, see Text Search Aggregation Options.
In case of replicated regions, query will be sent to one of the members and get the results there. Aggregation will be handled in that member before returned to the caller.
Result collection and paging
JMX MBean
A Lucene Service MBean is available and accessed through an ObjectName like:
GemFire:service=CacheService,name=LuceneService,type=Member,member=192.168.2.13(59583)<ec><v5>-1026
This MBean provides operations these operations:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Returns an array of {@link LuceneIndexMetrics} for the {@link com.gemstone.gemfire.cache.lucene.LuceneIndex}
* instances defined in this member
*
* @return an array of LuceneIndexMetrics for the LuceneIndexes defined in this member
*/
public LuceneIndexMetrics[] listIndexMetrics();
/**
* Returns an array of {@link LuceneIndexMetrics} for the {@link com.gemstone.gemfire.cache.lucene.LuceneIndex}
* instances defined on the input region in this member
*
* @param regionPath The full path of the region to retrieve
*
* @return an array of LuceneIndexMetrics for the LuceneIndex instances defined on the input region
* in this member
*/
public LuceneIndexMetrics[] listIndexMetrics(String regionPath);
/**
* Returns a {@link LuceneIndexMetrics} for the {@link com.gemstone.gemfire.cache.lucene.LuceneIndex}
* with the input index name defined on the input region in this member.
*
* @param regionPath The full path of the region to retrieve
* @param indexName The name of the index to retrieve
*
* @return a LuceneIndexMetrics for the LuceneIndex with the input index name defined on the input region
* in this member.
*/
public LuceneIndexMetrics listIndexMetrics(String regionPath, String indexName); |
A LuceneIndexMetrics data bean includes raw stat values like:
Code Block | ||
---|---|---|
| ||
Region=/data2; index=full_index
commitTime->107608255573
commits->5999
commitsInProgress->0
documents->498
queryExecutionTime->0
queryExecutionTotalHits->0
queryExecutions->0
queryExecutionsInProgress->0
updateTime->7313618780
updates->6419
updatesInProgress->0 |
Limitations include:
- no rates or average latencies are available
- no aggregation (which means no rollups across members in the GemFire -> Distributed MBean)
XML Configuration
Code Block |
---|
<region name="drugs">
<lucene-index indexName="luceneIndex">
<FieldDefinition name="fieldName" analyzer="KeywordAnalyzer"/>
</lucene-index>
</region> |
REST API
Spring Data GemFire Support
Implementation
Index Storage
PlantUML |
---|
[Lucene Indexer] --> [GeodeFSDirectory]
() "User"
node "Colocated and Replicated" {
() User --> [User Region] : Puts
[User Region] --> [Async Queue]
[Async Queue] --> [Lucene Indexer] : Batch Writes
[GeodeFSDirectory] --> [Lucene Regions]
}
|
Partitioned region data flow
PlantUML |
---|
() User -down-> [Cache] : PUTs
node cluster {
database {
() "indexBucket1Primary"
}
database {
() "indexBucket1Secondary"
}
[Cache] ..> [Bucket 1]
[Bucket 1] -down-> [Async Queue Bucket 1]
[Async Queue Bucket 1] -down-> [FSDirectoryBucket1] : Batch Write
[FSDirectoryBucket1] -> indexBucket1Primary
indexBucket1Primary -right-> indexBucket1Secondary
database {
() "indexBucket2Primary"
}
database {
() "indexBucket2Secondary"
}
[Cache] ..> [Bucket 2]
[Bucket 2] -down-> [Async Queue Bucket 2]
[Async Queue Bucket 2] -down-> [FSDirectoryBucket2] : Batch Write
[FSDirectoryBucket2] -> indexBucket2Primary
indexBucket2Primary -right-> indexBucket2Secondary
} |
Storage with different region types
Index Maintenance
Handling failures, restarts, and rebalance
The index and async event queue will be stored and a region with the same redundancy level as the original region. We will take care to ensure that all updates are written to the index files before removing events from the queue. So during failover the new primary should be able to read index files from disk.
Walkthrough creating index in Geode region
Processing Queries
Partitioned regions
In the case of partitioned regions, the query must be sent out to all of the primaries. The results will then need to be aggregated back together. We are still investigating options for how to aggregate the data, see Text / Lucene Search.
Replicated regions
TBD