Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
  1. Table of Contents

    Status

    Current state: UNDER DISCUSSION 

    Discussion thread: <TBD>

    JIRA: SAMZA-2421

    Released:

    Problem 

    Samza jobs can currently produce to Kafka, HDFS, EventHubs and ElasticSearch systems. We wish to enable jobs to send messages to Azure Blob Storage. However, at this point consuming from Azure Blob Storage is out-of-scope and hence no plans are made for a SystemConsumer counterpart.

    Azure Background

    Azure Blob Storage: The storage system consists of a three level hierarchy: A storage account can include an unlimited number of containers, and a container can store an unlimited number of blobs. A container is equivalent to a directory in a file system. However, a container can not contain more containers. 

    Block Blob is comprised of blocks which are first uploaded and then a commit operation is performed to seal all these blocks into forming a blob. If the commit operation is omitted then no blob is formed and the uploaded blocks are purged out of Storage. Each block can be upto 100Mb and there can be upto 50,000 blocks in a blob.

    Proposed Changes

    A native SystemProducer in Samza will be introduced which will send messages to Azure Blob Storage. A Samza job can choose this producer by configuring to use the corresponding SystemFactory and providing Azure account credentials such as account name and key.

    Samza jobs produce to a System Stream Partition (SSP) of the destination "system". This SystemProducer maps the Azure container to the System and blob name is formatted as Stream/Partition/timestamp. 

    The flow of data is as follows.

    1. Messages to be sent to Azure Blob Storage are buffered in-memory. 

    2. Every time the buffer reaches the maxBlock size, the block(stageBlock) is uploaded to Azure Blob Storage asynchronously.  

    3. List of all blocks uploaded for a blob is maintained in memory. 

    4. During flush of SystemProducer, for each stream, remaining buffer is uploaded as a new block and the block list is committed (commitBlockList) which creates the Block Blob with list of blocks uploaded. 

    5. Committing block list has to be a blocking call to wait for all pending block uploads of the blob to finish before the commit as this is expected by Azure Blob Storage. 

    6. Messages sent through the SystemProducer after a flush, are part of a new blob. Hence timestamp is added to the blob name as there could be multiple blobs (one per flush of SystemProducer) for the same SSP. 

    7. Optionally, a random string can be suffixed to the blob name to avoid blob name collisions for cases when two tasks write to the same SSP at the same time. 

    Blocks of different streams can be uploaded in parallel which leads to better network utilization. 


    Gliffy Diagram
    namesequence
    pagePin5



    Public interfaces

    SystemProducer API

    SystemProducer.send(String source, OutgoingMessageEnvelope ome)

    1. Source: is the source of the message. It indicates the origin of this message.

      1. Example: for messages coming from Samza job’s task source is the task name, for coordinator messages source is “SamzaContainer”, for checkpoint messages source is task name, for diagnostics messges it is “DiagnosticManager”, for metrics related messages it is either “ApplicationMaster” or “StreamProcessor” and so on.

    2. ome.systemStream: 

      1. SystemStream.system: Azure container name. Azure account name is provided as a config.

      2. SystemStream.stream: Azure blob name prefix.

      3. Example: to send message to SSP system = Azure container name = “azure-container”, Stream = “azure-stream”, the blob with name “azure-stream/..” is created in Azure container “azure-container”. The blob can be found at https://<azure-account-name>.blob.core.windows.net/azure-container/azure-stream/.../timestamp.avro where the blob name itself is azure-stream/../timestamp.avro

    3. ome.partition : empty or partition. If not empty, then blob name is “ome.stream/ome.partition/timestamp” else it is “ome.stream/timestamp". 

    4. ome.keySerializerName: no-op serde

    5. ome.messageSerializerName: no-op serde

    6. Ome.message: supports a single message as an Avro.IndexedRecord. 

    7. Ome.key: this will be ignored. 



    SystemProducer.flush(String source): flush for all the streams associated with that source. During flush, the SystemProducer waits till all the pending blocks for all streams are asynchronously uploaded and then blobs are created. All messages coming after a flush will go into a new blob.

    Schema of the incoming Avro records is expected to be the same for all messages between two flushes of the SystemProducer. This is because an Avro file expects all of its entries to conform to a schema and here blob is an Avro file.

    Metrics

    This SystemProducer emits the following metrics.

    1. sent messages count
    2. sent bytes count
    3. compressed bytes count
    4. message send error count
    5. block upload count
    6. blob commit count and 
    7. azure connection error count. 

    It puts out these metrics for three groups — "aggregate", system name and source of the OutgoingMessageEnvelope.

    Configs

    This SystemProducer allows the following properties to be configurable.

    1. Azure account name
    2. Azure account key
    3. proxy to be used, its hostname and port
    4. compression type - can be none or gzip
    5. max size of the uncompressed block to be uploaded
    6. max size of the blob
    7. max number of messages per blob
    8. number of threads for the asynchronous uploading of blocks
    9. timeouts for uploads and commits to Azure
    10. Whether a random string should be suffixed to blob name to avoid blob name collisions

    Implementation / Test Plan 

      

    Gliffy Diagram
    nameSystemProducer Component
    pagePin3


    AzureBlobSystemProducer which implements the existing org.apache.samza.system.SystemProducer interface.

    Code Block
    public class AzureBlobSystemProducer implements SystemProducer {
        /**
         * send messages from the source to Azure Blob Storage
         * get or create a writer for the <source, stream-partition> pair 
         * pass the envelope to the writer
         */
        public void send(String source, OutgoingMessageEnvelope messageEnvelope) {
        }
    
       /**
         *  commit all blobs pertaining to this source
         *  close and delete all writers associated with this source
         *  next message written for this source will create new writers.
         */
       public void flush(String source) {
       }  
    }
    

    AzureBlobWriter interface which is used by AzureBlobSystemProducer to send messages to Azure Blob Storage.

    Code Block
    public interface AzureBlobWriter {
    	/**
    	* write given message envelope to the blob opened
    	*/
    	void write(OutgoingMessageEnvelope messageEnvelope);
    
    	/** 
    	* Asynchronously upload messages written as a blob. 
    	* Messages written after this go as a new block.
    	*/
    	void flush();
     
    	/**
    	* Close the writer and all of its underlying components.
    	* At the end of close, all the messages sent to the writer should be persisted in a blob.
    	* flush should be called explicitly before close.
    	* It is not the responsibility of close to upload blocks.
    	* After close, no other operations can be performed.
    	*/
    	void close();	
    }


    AzureBlobAvroWriter is an AzureBlobWriter implementation to write messages which are Avro records.

    Code Block
    public class AzureBlobAvroWriter implements AzureBlobWriter { 
    	// Avro's DataFileWriter is used to write the incoming Avro records. The sink for DataFileWriter is AzureBlobOutputStream.
        /**
         * write the message to blob via DataFileWriter which writes to AzureBlobOutputStream
         */
    	public void write(OutgoingMessageEnvelope messageEnvelope) {
    	}
        
    	/**
         * flush the contents of DataFileWriter and underlying AzureBlobOutputStream 
         * this operation uploads a block.
         */
    	public void flush() {
    	}
    
       /**
        * Close the DataFileWriter and underlying AzureBlobOutputStream
        * this operation commits the blob with all blocks uploaded so far.
        */
    	public void close() {
    	}
    }


    AzureBlobAvroWriter uses AzureBlobOutputStream for buffering a block and uploading it to Azure and committing a blob.

    Code Block
    public class AzureBlobOutputStream implements OutputStream {
    	// ByteArrayOutputStream is used as the in-memory buffer to hold a block until ready to upload
    	// BlockBlobAsyncClient is used to upload blocks and commit blobs on Azure Blob Storage 
    
    	/**
    	* write is invoked by DataFileWriter.append() while writing an Avro record.
    	* the byte[] is buffered in ByteArrayOutputStream 
        * if buffer size reaches max block size, buffer contents are uploaded as a block
    	*/
    	public void write(byte[] b, int off, int len) {
    	}
    
    	/**
    	* flush is invoked by DataFileWriter.flush().
        * uploads buffer contents as a block
    	*/
    	public void flush() {
    	}
    
    	/**
    	* close is invoked by DataFileWriter.close()
    	* The list of blocks uploaded so far are committed to form a blob.
    	*/
    	public void close() {
    	}
    }


    Test Plan

    1. Unit tests for AzureBlobSystemProducer, AzureBlobAvroWriter and AzureBlobOutputStream.
    2. Sample Samza job which produces to Azure Blob Storage using this SystemProducer natively.

    Migration Plan and Compatibility

    A Samza job can use this SystemProducer by configuring  systems.<Azure-container-name>.samza.factory  to be the SystemFactory of this producer. The job configs also need to include other essential Azure connection info such as account name, account key and port details. Thus any Samza job can migrate to this SystemProducer by just changing its configs.

    Since this is a new SystemProducer being added, it is fully backward compatible.

    Rejected Alternatives

    Buffering and uploads

    Design 1: No Buffering

    In this design, in-memory buffering of messages is not required and each message is uploaded as a block. 

    Problem with this approach is poor QPS and very high network usage. Additionally, the number of transactions to Azure Blob Storage will be very high which results in high cost to serve.

    Design 2: File Buffering

    In this design, the messages are buffered in the file system. A file is created per SSP and messages are written to it in one of the formats Azure Blob Storage accepts. During SystemProducer.flush(), Azure Blob Storage API TransferManager.uploadFileToBlockBlob can be leveraged which takes max block size as input and split the file into multiple blocks, stage all the blocks and commit the block list to the blockBlob. This api uploads the blocks in parallel and then wait for their completion to commit the block list.

    Problem with this approach is the use of file system and overhead associated with it like disk corruption, reading from and writing to disk.


    Design 3: In-Memory Buffering with BlockingGet

    In this design, in-memory buffer is used for messages but blocks are uploaded to Azure in a blocking manner.

    Problem with this approach is to wait for each operation to be finished which will slow the QPS across multiple streams. 

    SystemProducer API 

    1. AzureBlobSystemProducer extends samza-azure’s AsyncSystemProducer instead of natively implementing SystemProducer

      1. AsyncSystemProducer is an abstract class requiring its subclass to implement sendAsync which performs the actual sending of the OutgoingMessageEnvelope to the underlying system and returns a CompletableFuture. This future is then tracked by the AsyncSystemProducer and waited upon during flush(). 

      2. Con: The proposed design for buffering and upload of blocks requires that an asynchronous upload of the block does not happen at every message send. Hence, a CompletableFuture can not be generated for every message send making it infeasible to extend AsyncSystemProducer.

    2. Granularity of OutgoingMessageEnvelope: SystemProducer accepts entire blob in the OutgoingMessageEnvelope. 

      1. Con: it would make user code in Samza job very complex with need to buffer and create stage blocks and appropriately stitch them.

    3. Serialization: user of SystemProducer serializes and SystemProducer always gets byte array. 

      1. Con: if SystemProducer gets a byte[] then it is hard to specify a delimiter between two messages and the schema of the message to ensure it can be read from Azure blob storage. 

    4. Partition key optional: user of SystemProducer can omit the partition key in the OutgoingMessageEnvelope to improve performance.

      1. Consider the two cases: (case 1) blob name based on stream/timestamp and  (case 2) blob name based on stream/partition/timestamp. In case 1, there is a single blob at a time for a stream while there are as many blob as partitions in case 2. This affects the upload performance and RAM usage as follows. Suppose the block size for upload is set to 10mb and number of partitions is 1000 and qps per partition is 5mb/min. For case 1, since its a single block per stream and blocks are uploaded as soon as full, there is a continuous upload throughout the 1 min. At flush at the end of a min, there is only a 10mb block to be uploaded and the blob is created. The RAM usage is ~10Mb as only that much is kept in memory before upload. For case 2, with the same block size of 10mb, now there are 1000 blobs to be created and 1000 blocks sitting in memory as none of them hit the 10 mb limit before 1 min. At flush, there are now 1000 blocks of 5mb each to be uploaded which saturates the upload channels and causes timeouts. Apart from that, there were no uploads empty until flush and RAM usage grows to 5GB within a min. To partially overcome this issue, the block size can be tuned to 1mb which will still mean a RAM usage of 1GB by flush. 

    Azure blob type

    Azure Blob Storage supports three kinds of blobs - Page blob, Append blob and Block blob. 

    Page blob is a blob which is divided into 512 byte page and supports random read and write operations.

    Append blob is comprised of blocks and is optimized for append operations. It does not expose its blockIds and maximum block size supported is 4MB

    Block blob is also comprised of blocks but exposes its blockIds and maximum block size is 100MB. 

    Con: if Page or Append blob is chosen then the number of transactions to Azure Blob Storage will be much higher and also result in increased network calls.

    References

    1. https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction
    2. https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs
    3. https://azure.microsoft.com/en-us/pricing/details/storage/blobs/