Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Attribute

Alterable?

Purpose

Name

no

A unique identifier for the HDFSStore

NameNodeURL

no

HDFSStore persists data on a HDFS cluster identified by cluster's NameNode URL or NameNode Service URL. NameNode URL can also be provided via hdfs-site.xml (see HDFSClientConfigFile). If the NameNode url is missing HDFSStore creation will fail. HDFS client can also load hdfs configuration files in the classpath. NameNode URL provided in this way is also fine.

HomeDir

no

The HDFS directory path in which HDFSStore stores files. The value must not contain the NameNode URL. The owner of this node's JVM process must have read and write access to this directory. The path could be absolute or relative. If a relative path for HomeDir is provided, then the HomeDir is created relative to /user/JVM_owner_name or, if specified, relative to directory specified by the hdfs-root-dir property.

As a best practice, HDFS store directories should be created relative to a single HDFS root directory. As an alternative, an absolute path beginning with the "/" character to override the default root location can be provided.

HDFSClientConfigFile

no

The full path to the HDFS client configuration file, for e.g. hdfs-site.xml or core-site.xml. This file must be accessible to any node where an instance of this HDFSStore will be created. If each node has a local copy of this configuration file, it is important for all the copies to be "identical". Alternatively, by default HDFS client can also load some HDFS configuration files if added in the classpath.

MaxMemory

yes

The maximum amount of memory in megabytes used by HDFSStore. HDFSStore buffers data in memory to optimize HDFS IO operations. Once the configured memory is utilized, data may overflow to disk.

ReadCacheSize

no

The maximum amount of memory in megabytes used by HDFSStore read cache. HDFSStore can cache data in memory to optimize HDFS IO operations. Read cache shares memory allocated to HDFSStore. Increasing read cache memory can improve the read performance.

BatchSize

yes

HDFSStore buffer data is persisted on HDFS in batches, and the BatchSize defines the maximum size (in megabytes) of each batch that is written to HDFS. This parameter, along with BatchInterval determines the rate at which data is persisted on HDFS.

BatchInterval

yes

HDFSStore buffer data is persisted on HDFS in batches, and the BatchInterval defines the maximum time that can elapse between writing batches to HDFS. This parameter, along with BatchSize determines the rate at which data is persisted on HDFS.

DispatcherThreads

no

The maximum number of threads (per region) used to write batches of HDFS. If you have a large number of clients that add or update data in a region, then you may need to increase the number of dispatcher threads to avoid bottlenecks when writing data to HDFS.

BufferPersistent

no

Configure if HDFSStore in-memory buffer data, that has not been persisted on HDFS yet, should be persisted to a local disk to buffer prevent data loss. Persisting data may impact write performance. If performance is critical and buffer data loss is acceptable, disable persistence.

DiskStore

no

The named DiskStore to use for any local disk persistence needs of HDFSStore, for e.g. store's buffer persistence and buffer overflow. If you specify a value, the named DiskStore must exist. If you specify a null value or you omit this option, default DiskStore is used.

SynchronousDiskWrite

no

Include the isSynchronous option to enable or disable synchronous writes to the local DiskStore.

PurgeInterval

yes

HDFSStore creates new files as part of periodic maintenance activity. Existing files are deleted asynchronously. PurgeInterval defines the amount of time old files remain available for MapReduce jobs. After this interval has passed, old files are deleted.

Compaction

yes

Compaction reorganizes data in HDFS files to improve read performance and reduce number of data files on HDFS. It also removes old version and deleted records. Compaction process can be I/O-intensive. Tune the performance of compaction using CompactionThreads.

CompactionThreads

yes

The maximum number of threads that HDFSStore uses to perform compaction. You can increase the number of threads used for compactions on different buckets as necessary in order to fully utilize the performance of your HDFS cluster and its disks.

MaxWriteOnlyFileSize

yes

For HDFS write-only regions, this defines the maximum size (in megabytes) that an HDFS log file can reach before HDFSStore closes the file and begins writing to a new file. This clause is ignored for HDFS read/write regions. Keep in mind that the files are not available for MapReduce processing until the file is closed; you can also set WriteOnlyFileRolloverInterval to specify the maximum amount of time an HDFS log file remains open.

WriteOnlyFileRolloverInterval

yes

For HDFS write-only regions, this defines the maximum time that can elapse before HDFSStore closes an HDFS file and begins writing to a new file. This clause is ignored for HDFS read/write regions.

InputFormat

Read-Write Region

  1. Input
    1. Count of files is known. Each buffer flush operation will create a new oplog file. Periodically some files will be merged by compaction. When InputFormat is initialized it will see a list of oplog files.
    2. Number of buckets is known, say P
    3. Size of files is known
    4. Hdfs block size is known, say H
    5. File creation time is known
    6. Records within a file will be sorted by key
    7. Files can have overlapping key range
    8. Each oplog file has a mini index metadata, a subset of keys and their offset in the oplog is saved.
    9. Approximate count of unique keys can be computed.
  2. Constraints
    1. The process of split creation creation needs to be efficient. It is desired that the split creation is not dependent on contents of the oplog files. It is ok to create and read metadata files.
    2. Number of files can change while data is consumed
    3. Duplicate processing of the same event by two RecordReaders is an error

Algorithm

  1. Compute number of splitshow many splits should be created
    1. Compute the total size of all active files of a bucket, say b
    2. Find the Hdfs block size (H)
    3. Number of splits to be created, s = b/H. The idea is to split the bucket into s equal parts, size of Hdfs block size. Each split will be assigned a bucket and a process a disjoint key range, i such that 0 < i <= s
    4. Total number of splits per region will be S = s x P
    5. Assuming compaction will 
  2. Compute key range per split
    1. Identify a subset of large oplog files, i.e. files containing most data. Locality of these files matters the most
    2. Read keys and offset from the root index metadata of the selected files
    3. Merge the roots keys and construct a
    RecordReader
    1. Read root index metadata from each oplog file and construct an sorted array of unique root keys,A = [k1, k2, ... , kN] (N >> s)
    2. Split N root index keys into s ranges,  N / s = [ N1, N2, ... , Ns ]
    3. Create a scanner for range [ Ni, Ni+1), where i is the range assigned to the split assigned to this record reader
    4. If N < s, two record readers will end up having same range. In which case the record reader with minimum i will process the range.

Drawbacks

  1. Find preferred HDFS block for each range
    1. Fetch Hdfs block metadata from the NN for the selected files
    2. For each Hdfs block find the matching range using root index metadata fetched earlier
    3. For each Hdfs block compute how much data will actually be read. This is a function of total number of keys in the hdfs block and the overlap with key range.
    4. For each range find location of the hdfs block which will provide maximum data. Prefer newer blocks over old ones.
  2. Create split with keyRange, hdfs block location and oplog file paths
  3. Duplicate range computation, each record reader will read the index metadata and do the same computation.
  4. Each record reader may be reading multiple files. This may cause hight network activity. The following diagram illustrates this. Split -1 processor will need to read blocks for 3 oplogs, T3, T1 and T0, for scanning all keys in the range assigned to it.

Gliffy Diagram
nameParallel processing bucket data

Write-Only Region

Oplog files for write-Only HdfsRegions have disjoint ranges. Use CombineInputFormat to create splits and record readers