Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Azure Blob Storage: The storage system consists of a three level hierarchy: A storage account can include an unlimited number of containers, and a container can store an unlimited number of blobs. A container is equivalent to a directory in a file system. However However, a container can not contain more containers. 

...

  1. Messages to be sent to Azure Blob Storage are buffered in-memory. 

  2. Every time the buffer reaches the maxBlock size, the block(stageBlock) is block is uploaded to Azure Blob Storage asynchronously using stageBlock API provided by Azure.  

  3. List of all blocks uploaded for a blob is maintained in memory. 

  4. During flush of SystemProducer, for each stream-partition, remaining buffer is uploaded as a new block and the block list is committed (commitBlockList) which creates the Block Blob with list of blocks uploaded using the commitBlockList API provided by Azure

  5. Committing block list has to be a blocking call to wait for all pending block uploads of the blob to finish before the commit as this is expected by Azure Blob Storage. 

  6. Messages sent through the SystemProducer after a flush, are part of a new blob. Hence timestamp is timestamp is added to the blob name as there could be multiple blobs (one per flush of SystemProducer) for the same SSP.  This timestamp corresponds to the time when the first message is received after a commit.

  7. Optionally, a random string can be suffixed to the blob name to avoid blob name collisions for cases when two tasks write to the same SSP at the same time. 

...

Gliffy Diagram
nameSequence
pagePin14



Public interfaces

SystemProducer API

...

  1. Source: is the source of the message. It indicates the origin of this message.

    1. Example: for messages coming from Samza job’s task source is the task name, for coordinator messages source is “SamzaContainer”, for checkpoint messages source is task name, for diagnostics messges it is “DiagnosticManager”, for metrics related messages it is either “ApplicationMaster” or “StreamProcessor” and so on.

  2. ome.systemStream: 

    1. SystemStream.system: Azure container name. Azure account name is provided as a config.

    2. SystemStream.stream: Azure blob name prefix.

    3. ome.partition : empty or partition. If not empty, then blob name is “ome.stream/ome.partition/timestamp” else it is “ome.stream/timestamp". 
    4. Example: to send message to SSP system = Azure container name = “azure-container”, Stream = “azure-stream”, the blob with name “azure-stream/..” is created in Azure container “azure-container”. The blob can be found at https://<azure-account-name>.blob.core.windows.net/azure-container/azure-stream/.../timestamp.avro where the blob name itself is azure-stream/../timestamp.avro

  3. ome.partition : empty or partition. If not empty, then blob name is “ome.stream/ome.partition/timestamp” else it is “ome.stream/timestamp". 

  4. ome.keySerializerName: no-op serde

  5. ome.messageSerializerName: no-op serde

  6. keySerializerName: no-op serde

  7. ome.messageSerializerName: no-op serde

  8. Ome.message: supports a single message as an Avro.IndexedRecord. 

  9. Ome.key: this will be ignored. 

...

SystemProducer.flush(String source): flush for all the streams associated with that source. During flush, the SystemProducer waits till all the pending blocks for all streams are asynchronously uploaded and then blobs are created. All messages coming after a flush will go into a new blob.

Schema of the incoming Avro records is expected to be the same for all messages between two flushes of the SystemProducer. This is because an Avro file expects all of its entries to conform to a schema and here blob is an Avro file.

Metrics

This SystemProducer emits the following metrics.

Metrics

This SystemProducer emits the following metrics.

  1. sent messages count
  2. sent bytes count
  3. compressed bytes count
  4. message send error count
  5. block upload count
  6. blob commit count and 
  7. azure connection error count. 

...

This SystemProducer allows the following properties to be configurable.

  1. Azure account name
    1. Azure account key
      1.  config: systems.<azure-container-name>.azureblob.account.name
      2. default: none. 
    2. Azure account key
      1. config: systems.%s.azureblob.account.key
      2. default: none. 
    3. proxy to be used, its hostname and port
    4. compression type - can be none or gzip
    5. max size of the uncompressed block to be uploaded
    6. max size of the blob
    7. max number of messages per blob
      1. config: systems.%s.azureblob.proxy.use 
        1. default: false
      2. config: systems.%s.azureblob.proxy.hostname
        1. default: none
      3. config: systems.%s.azureblob.proxy.port
        1. default: none
    8. compression type - can be none or gzip
      1. config: systems.%s.azureblob.compression.type
      2. default: "none". other values: "gzip"
    9. max size of the uncompressed block to be uploaded in bytes
      1. config: systems.%s.azureblob.maxFlushThresholdSize
      2. default: 10485760 (10 MB), Max is 100MB
    10. max size of the uncompressed blob in bytes
      1. config: systems.%s.azureblob.maxBlobSize
      2. default: Long.MAX_VALUE - unlimited capped only by Azure BlockBlob size of  4.75 TB (100 MB per block X 50,000 blocks)
    11. max number of messages per blob
      1. config: systems.%s.azureblob.maxMessagesPerBlob
      2. default: Long.MAX_VALUE - unlimited
    12. number of threads for the asynchronous uploading of blocks
      1. config: systems.%s.azureblob.threadPoolCount
      2. default: 2
      number of threads for the asynchronous uploading of blocks
    13. timeouts for uploads and commits to Azure
      1. config: systems.%s.azureblob.flushTimeoutMs
        1. default: 3 mins
      2. config: systems.%s.azureblob.closeTimeoutMs
        1. default: 5 mins
    14. Whether a random string should be suffixed to blob name to avoid blob name collisionsname collisions
      1. config: systems.%s.azureblob.suffixRandomStringToBlobName
      2. default: false

    Implementation / Test Plan 

    ...

    Code Block
    public interface AzureBlobWriter {
    	/**
    	* write given message envelope to the blob opened
    	*/
    	void write(OutgoingMessageEnvelope messageEnvelope);
    
    	/** 
    	* Asynchronously upload messages written as a blob. 
    	* Messages written after this go as a new block.
    	*/
    	void flush();
     
    	/**
    	* Close the writer and all of its underlying components.
    	* At the end of close, all the messages sent to the writer should be persisted in a blob.
    	* flush should be called explicitly before close.
    	* It is not the responsibility of close to upload blocks.
    	* After close, no other operations can be performed.
    	*/
    	void close();	
    }


    AzureBlobAvroWriter AvroAzureBlobWriter is an AzureBlobWriter implementation to write messages which are Avro records.

    Schema of the incoming Avro records is expected to be the same for all messages between two flushes of the SystemProducer. This is because an Avro file expects all of its entries to conform to a schema and here blob is an Avro file.

    Code Block
    public class AzureBlobAvroWriterAvroAzureBlobWriter implements AzureBlobWriter { 
    	// Avro's DataFileWriter is used to write the incoming Avro records. The sink for DataFileWriter is AzureBlobOutputStream.
        /**
         * write the message to blob via DataFileWriter which writes to AzureBlobOutputStream
         */
    	public void write(OutgoingMessageEnvelope messageEnvelope) {
    	}
        
    	/**
         * flush the contents of DataFileWriter and underlying AzureBlobOutputStream 
         * this operation uploads a block.
         */
    	public void flush() {
    	}
    
       /**
        * Close the DataFileWriter and underlying AzureBlobOutputStream
        * this operation commits the blob with all blocks uploaded so far.
        */
    	public void close() {
    	}
    }


    AzureBlobAvroWriter uses AvroAzureBlobWriter uses AzureBlobOutputStream for buffering a block and uploading it to Azure and committing a blob.

    ...

    1. Unit tests for AzureBlobSystemProducer, AzureBlobAvroWriter AvroAzureBlobWriter and AzureBlobOutputStream.
    2. Sample Samza job which produces to Azure Blob Storage using this SystemProducer natively.

    ...