Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When dump created each partition's data is stored into in a separate file. This is performed in several dump executor threads concurrently. At the same time user updates might also trigger storing dump entries in transaction threads to any partition, not necessarily to partitions currently processed by dump executor. From disk writing point of view often random switches to several files is not good for overall throughput. And throughput is one of the important characteristics of creating dump process.

Proposed solution

Main idea

The main idea is to use buffers for writing to disk so that the majority of time during I/O is spent on writing to file, not to switching between files.write large number of bytes per each disk write operation, minimizing switching between files and minimizing other overhead work between writes.


Buffer usage for writing

The file write operation should send large number of bytes. There are 3 options for this

  1. Accumulate bytes from serialized dumpEntries into a large ByteBuffer and send it to disk once it becomes full. This requires in-memory data copying from small buffers to large buffer.
  2. Accumulate ByteBuffers each containing one serialized dump entry and send all ByteBuffers to disk using write(ByteBuffer[]) operation.
  3. Serialize dump entries directly to large ByteBuffer. This is good for dump executor threads, but doesn't seem very good for transaction threads.

Need to investigate: Assuming that we want to write the same total number of bytes at what range of byte buffer array size invocation of write(ByteBuffer) and write(ByteBuffer[]) is the same?

High level view

ByteBuffer trip cycle will look like this

ByteBufferPool

...

partition/transaction thread

...

→ Partition Queue → Partition Queue Dispatcher Thread → Disk Writer Queue → Disk Writer Thread → Free Queue → ByteBuffer Releaser Thread → ByteBufferPool



ByteBufferPool


partition thread 


Partition Queue

transaction thread


ByteBuffer Releaser ThreadPartition Queue Dispatcher Thread

Free QueueDisk Writer ThreadDisk Writer Queue


Components

ByteBuffer pool

Current solution uses thread local ByteBuffers with expanding size. This is ok for single threaded usage, but not suitable for passing to other threads. And this is also not good for transaction threads.

We can use pool of ByteBuffers which provides newly allocated or reused ByteBuffers and doesn't go beyond its predefined limit. For example,

class ByteBufferPool

  • ByteBufferPool(size) - constructor with maximum number of bytes to allocate by all buffers together.   

  • ByteBuffer acquire(minSize) - returns ByteBuffer that has size = power of 2 and not smaller than minSize. If there is no buffer in pool, then a new buffer is allocated. If overall size occupied by all buffers has reached the limit, then this method will block until required buffer(s) returned to the pool.
  • release(ByteBuffer) - returns buffer to the pool and signals waiter on acquire

...

  • ByteBuffer acquireAny(preferredSize) - method for compression which don't require exact size buffer.

Using buffers only of size of power of 2 simplifies maintenance of the buffers and search. Internal structure to use:

List<ByteBuffer>[]

At position i there will be a list of available buffers of size 2^i.

In average the buffers will be filled to 75%.

Some use cases
No suitable buffer allocated

Let's assume there is a request for 200k buffer and lots of 128k buffers allocated, but no buffer larger than 128k allocated in the pool and there is no capacity remaining in the pool. In this case we will take 2 ByteBuffers with 128k and use in ByteBuffersWrapper

Pool limit exceeded

Let's assume there is a request for 11Mb buffer and pool limit is 10Mb. In this case we will wait until all buffers return to the pool, take them all, allocate a new HeapByteBuffer for 1Mb and wrap them into ByteBuffersWrapper. When buffers released, we will return only 10Mb buffers to the pool. The new 1Mb buffer will be given to GC.

ByteBuffersWrapper

Wraps several ByteBuffers, extends ByteBuffer. It is created in ByteBufferPool#acquire and destroyed in ByteBufferPool#release when all internal buffers returned to the pool.

Disk Writer Thread

We can often see that disk IO operation is the bottleneck in Ignite. So we should try to make writing to disk efficient. 

There should be a separate thread that saves data to disk and it should do minimum work besides writing to disk. For example, it could take buffers from queue and write to file. The buffers should be made ready by another thread and possibly returning buffers to the pool should be also delegated to another thread.

How many writer threads?

For desktop PCs it doesn't make sense to use more than one writer thread. But for servers using RAID storages writing to several files in parallel could be faster overall. The solution should be build in assumption that there could be several writers. Need to make sure that writers don't pick up data for the same file.

Partition queue

...

Other ideas

...

There will be a separate blocking queue for each partition providing fast info about its fullness

  • put(ByteBuffer) - blocking method
  • ByteBuffer[] takeAll() - blocking method, takes all elements
  • long bytes() - fast non-blocking method 

Partition Queue Dispatcher Thread

The thread will periodically check disk writer queue size making sure that disk writer always has enough data to process. So, minimum number of elements will be 2 for single disk writer. Once the size goes down, the thread will scan sizes of partition queues and choose the winner with the maximum number of bytes. After that it will take all buffers from the queue and will put them together with file info (channel) to the disk writer queue.

In case of multiple disk writers additionally a flag should be passed, which will be considered when choosing largest partition queue and will be reset on buffer release.

Disk Writer Queue and Free Queue 

They could be simple ArrayBlockingQueues.

Open question: Does it make sense to use non-blocking queue here for faster put/take queue operations?

Disk Writer Thread

Does 3 operations

  • Take next ByteBuffer[] from queue
  • Save to disk
  • Send ByteBuffer[] to Free Queue

ByteBuffer Releaser Thread

Takes buffers from queue and returns to the pool. This operation will probably be fast enough so that extra thread could be redundant.

Some functions

Open file limit

The solution assumes that all partition dump files are open simultaneously. This could break when number of partitions (default 1024) is bigger than OS limit for open files (ulimit -n).

We can address this issue like this

  • keep track of number of open files and last write time for each partition
  • identify number of allowed open files for creating dump, for example, like half of `ulimit -n`
  • when number of files reaches threshold, close the least recently written partition file without draining the partition queue
  • reopen closed file when written
  • log warning message about non-optimal configuration of partitions count and limit on open files

Encryption

In case of encryption 2 buffers will be required. First one as usual will be used for keeping serialized entry, and the second one will be for storing encrypted data from the first buffer. After encryption the first buffer will be returned back to the pool. The second buffer will go to the partition queue.

Note: Avoid deadlock when processing large objects not fitting into pool size. 

Compression

Compression is done after encryption (if enabled).

This can remain in the same thread (part/trans thread). But because of high CPU usage and decreased output size the bottleneck might move from disk writing to compression. If this is observed, we should extract compression to another thread.

Compression output is a sequence of buffers which can't be reordered. Compression per partition can't be done in 2 threads simultaneously.

The size of required buffer isn't known while requesting buffer from pool. It is preferable to use medium size buffers. And it is ok to get a bit smaller buffer for this, fill it, send it to queue and request another medium size buffer from pool.

Shutdown / Execution Completion

Once dump creating is over, all resources will be cleaned.

  • Newly created threads finish.
  • PooledByteBuffer will clean returning buffers to heap (HeapBB) or os (DirectBB).

Error handling

In case of error the whole execution will stop. Resuming creating dump isn't considered, the only option is to rerun creating dump.  

Questions

What happens if a dump entry doesn't fit into pool size? 

Is the order of entries in dump important? Is it acceptable to write ver 3 (from trans thread) before ver 2 (by par thread) ?

HeapByteBuffer or DirectByteBuffer ?

Is there a protection against multiple concurrent dump creation? Do we need one?


About SSD storage

There are several important points to consider when writing data to SSD storage

  • If writes are performed in the size of at least one clustered block, they will use all the levels of internal parallelism, and reach the same performance as sequential write.
  • Many concurrent small write requests will offer a better throughput than a single small write request.
  • A single large write request offers the same throughput as many small concurrent writes.

Typical SSD throughput versus write block size chart will look like this.

Image Added

One of the configuration aims is to find minimum block size when random and sequential writes show the same throughput.

Write measurement

Data write time was measured with different settings on MacBook.

There was 1Gb direct memory allocated and filled with random data. It was logically split into 8 partitions and saved into different files. There were different blocks used (16kb, 32 kb, ...16Mb) for writing in different number of threads (1, 2, 4, 8). The overall time required to fully write data to disk was measured. For each point there were 20 runs executed and average (time in milliseconds) was taken. After every run files were deleted and System.gc() was executed and 10 second pause was taken. Got the following diagram.

Image Added

Some conclusions for this run:

  • The minimum execution time started around 1Mb block size
  • Single threaded run was slightly slower even with large blocks