Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: UNDER DISCUSSION ACCEPTED

Discussion thread: <TBD>mail of vote result

JIRA: SAMZA-2421

Released:

...

Samza jobs produce to a System Stream Partition (SSP) of the destination "system". This SystemProducer maps the Azure container to the System and blob name is formatted as Stream/Partition/timestamp. 

The flow of data is as follows.

  1. Messages to be sent to Azure Blob Storage are buffered in-memory. 

  2. Every time the buffer reaches the maxBlock size, the block is uploaded to Azure Blob Storage asynchronously using stageBlock API provided by Azure.  

  3. List of all blocks uploaded for a blob is maintained in memory. 

  4. During flush of SystemProducer, for each stream-partition, remaining buffer is uploaded as a new block and the block list is committed which creates the Block Blob with list of blocks uploaded using the commitBlockList API provided by Azure. 

  5. Committing block list has to be a blocking call to wait for all pending block uploads of the blob to finish before the commit as this is expected by Azure Blob Storage. 

  6. Messages sent through the SystemProducer after a flush, are part of a new blob. Hence timestamp is added to the blob name as there could be multiple blobs (one per flush of SystemProducer) for the same SSP. This timestamp corresponds to the time when the first message is received by the SystemProducer after a commit.

  7. Optionally, a random string can be suffixed to the blob name to avoid blob name collisions for cases when two tasks write to the same SSP at the same time. 

Blocks of different streams can be uploaded in parallel which leads to better network utilization. 

Gliffy Diagram
nameSequence
pagePin4

Public interfaces

SystemProducer API

SystemProducer.send(String source, OutgoingMessageEnvelope ome)

  1. Source: is the source of the message. It indicates the origin of this message.

    1. Example: for messages coming from Samza job’s task source is the task name, for coordinator messages source is “SamzaContainer”, for checkpoint messages source is task name, for diagnostics messges it is “DiagnosticManager”, for metrics related messages it is either “ApplicationMaster” or “StreamProcessor” and so on.

  2. ome.systemStream: 

    1. SystemStream.system: Azure container name. Azure account name is provided as a config.

    2. SystemStream.stream: Azure blob name prefix.

    3. ome.partition : empty or partition. If not empty, then blob name is “ome.stream/ome.partition/timestamp” else it is “ome.stream/timestamp". 
    4. Example: to send message to SSP system = Azure container name = “azure-container”, Stream = “azure-stream”, the blob with name “azure-stream/..” is created in Azure container “azure-container”. The blob can be found at https://<azure-account-name>.blob.core.windows.net/azure-container/azure-stream/.../timestamp where the blob name itself is azure-stream/../timestamp

  3. ome.keySerializerName: no-op serde

  4. ome.messageSerializerName: no-op serde

  5. Ome.message: supports a single message. 

  6. Ome.key: this will be ignored. 

SystemProducer.flush(String source): flush for all the streams associated with that source. During flush, the SystemProducer waits till all the pending blocks for all streams are asynchronously uploaded and then blobs are created. All messages coming after a flush will go into a new blob.

Metrics

This SystemProducer emits the following metrics.

  1. sent messages count
  2. sent bytes count
  3. compressed bytes count
  4. message send error count
  5. block upload count
  6. blob commit count and 
  7. azure connection error count. 

It puts out these metrics for three groups — "aggregate", system name and source of the OutgoingMessageEnvelope.

Configs

This SystemProducer allows the following properties to be configurable.

...

number of threads for the asynchronous uploading of blocks

...

systems.%s.azureblob.blockingQueueSize

...

Thread Pool Count * 2

...

size of the queue to hold blocks ready to be uploaded by asynchronous threads. If all threads are busy uploading then blocks are queued and if queue is full then main thread will start uploading which will block processing of incoming messages

...

Samza SystemProducers get the message as an OutgoingMessageEnvelope which has an SSP. This producer, uses those fields to identify the Azure container and form a blob. The partition field of the envelope is optional and in this case the blob name is formatted as Stream/timestamp. If two tasks produce to the same SSP then two different blobs will be created with the same prefix of stream/partition but different timestamps. However, its recommended to turn on the config to add a random string suffix to avoid blob name collisions.


For example a blob with SSP: system = "oss-testcontainer", stream = "OSSWikipediaFeedStreamEvent" and partition = empty gives blob name as below

Image Added. Here the 2019/12/20/16/45-16-0 is the timestamp.

Similarly, with SSP: system = "oss-testcontainer", stream = "OSSWikipediaFeedStreamEvent" and partition = "partition-0" gives blob name as below

Image Added

If there are several blobs for the same SSP over a period of time then it looks as below 

Image Added 

Azure treats "/" in the blob name to organize the UI as virtual folders. The above blobs all share the same prefix and the timestamp differs only in the last few seconds.


The flow of data is as follows.

  1. Messages to be sent to Azure Blob Storage are buffered in-memory. 

  2. Every time the buffer reaches the maxBlock size, the block is uploaded to Azure Blob Storage asynchronously using stageBlock API provided by Azure.  

  3. List of all blocks uploaded for a blob is maintained in memory. 

  4. During flush of SystemProducer, for each stream-partition, remaining buffer is uploaded as a new block and the block list is committed which creates the Block Blob with list of blocks uploaded using the commitBlockList API provided by Azure. 

  5. Committing block list has to be a blocking call to wait for all pending block uploads of the blob to finish before the commit as this is expected by Azure Blob Storage. 

  6. Messages sent through the SystemProducer after a flush, are part of a new blob. Hence timestamp is added to the blob name as there could be multiple blobs (one per flush of SystemProducer) for the same SSP. This timestamp corresponds to the time when the first message is received by the SystemProducer after a commit.

  7. Optionally, a random string can be suffixed to the blob name to avoid blob name collisions for cases when two tasks write to the same SSP at the same time. 

Blocks of different streams can be uploaded in parallel which leads to better network utilization. 


Gliffy Diagram
nameSequence
pagePin4



Public interfaces

SystemProducer API

SystemProducer.send(String source, OutgoingMessageEnvelope ome)

  1. Source: is the source of the message. It indicates the origin of this message.

    1. Example: for messages coming from Samza job’s task source is the task name, for coordinator messages source is “SamzaContainer”, for checkpoint messages source is task name, for diagnostics messges it is “DiagnosticManager”, for metrics related messages it is either “ApplicationMaster” or “StreamProcessor” and so on.

  2. ome.systemStream: 

    1. SystemStream.system: Azure container name. Azure account name is provided as a config.

    2. SystemStream.stream: Azure blob name prefix.

    3. ome.partition : empty or partition. If not empty, then blob name is “ome.stream/ome.partition/timestamp” else it is “ome.stream/timestamp". 
    4. Example: to send message to SSP system = Azure container name = “azure-container”, Stream = “azure-stream”, the blob with name “azure-stream/..” is created in Azure container “azure-container”. The blob can be found at https://<azure-account-name>.blob.core.windows.net/azure-container/azure-stream/.../timestamp where the blob name itself is azure-stream/../timestamp

  3. ome.keySerializerName: no-op serde

  4. ome.messageSerializerName: no-op serde

  5. Ome.message: supports a single message. 

  6. Ome.key: key for the message. 



SystemProducer.flush(String source): flush for all the streams associated with that source. During flush, the SystemProducer waits till all the pending blocks for all streams are asynchronously uploaded and then blobs are created. All messages coming after a flush will go into a new blob.

Metrics

This SystemProducer emits the following metrics.

  1. sent messages count
  2. sent bytes count
  3. compressed bytes count
  4. message send error count
  5. block upload count
  6. blob commit count and 
  7. azure connection error count. 

It puts out these metrics for three groups — "aggregate", system name and source of the OutgoingMessageEnvelope.

Configs

This SystemProducer allows the following properties to be configurable.

NameDefaultDescription
systems.<azure-container-name>.azureblob.account.name
Azure account name. Is not optional
systems.<azure-container-name>.azureblob.account.key
Azure account key. Is not optional
systems.<azure-container-name>.azureblob.proxy.use "false"if true, proxy will be used to connect to Azure.
systems.<azure-container-name>.azureblob.proxy.hostname
if proxy.use is true then host name of proxy
systems.<azure-container-name>.azureblob.proxy.port
if proxy.use is true then port of proxy
systems.<azure-container-name>.azureblob.compression.type"none"type of compression to be used before uploading blocks. Can be "none" or "gzip"
systems.<azure-container-name>.azureblob.maxFlushThresholdSize10485760 (10 MB)max size of the uncompressed block to be uploaded in bytes. Maximum size allowed by Azure is 100MB.
systems.<azure-container-name>azureblob.maxBlobSizeLong.MAX_VALUE - unlimitedmax size of the uncompressed blob in bytes. If default value then size is unlimited capped only by Azure BlockBlob size of  4.75 TB (100 MB per block X 50,000 blocks).
systems.<azure-container-name>.azureblob.maxMessagesPerBlobLong.MAX_VALUE - unlimitedmax number of messages per blob
systems.<azure-container-name>.azureblob.threadPoolCount2

number of threads for the asynchronous uploading of blocks

systems.<azure-container-name>.azureblob.blockingQueueSize

Thread Pool Count * 2

size of the queue to hold blocks ready to be uploaded by asynchronous threads. If all threads are busy uploading then blocks are queued and if queue is full then main thread will start uploading which will block processing of incoming messages

systems.<azure-container-name>.azureblob.flushTimeoutMs3 minstimeout to finish uploading all blocks before committing a blob
systems.<azure-container-name>.azureblob.closeTimeoutMs5 minstimeout to finish committing all the blobs currently being written to. This does not include the flush timeout per blob
systems.<azure-container-name>.azureblob.suffixRandomStringToBlobNamefalse if true, a random string of 8 chars is suffixed to the blob name to prevent name collision when more than one Samza tasks are writing to the same SSP. It is advisable to set this to true


Implementation / Test Plan
 

  

Gliffy Diagram
nameSystemProducer Components
pagePin1


AzureBlobSystemProducer which implements the existing org.apache.samza.system.SystemProducer interface.

Code Block
public class AzureBlobSystemProducer implements SystemProducer {
    /**
     * send messages from the source to Azure Blob Storage
     * get or create a writer for the <source, stream-partition> pair 
     * pass the envelope to the writer
     */
    public void send(String source, OutgoingMessageEnvelope messageEnvelope) {
    }

   /**
     *  commit all blobs pertaining to this source
     *  close and delete all writers associated with this source
     *  next message written for this source will create new writers.
     */
   public void flush(String source) {
   }  
}

AzureBlobWriter interface which is used by AzureBlobSystemProducer to send messages to Azure Blob Storage.

Code Block
public interface AzureBlobWriter {
	/**
	* write given message envelope to the blob opened
	*/
	void write(OutgoingMessageEnvelope messageEnvelope);

	/** 
	* Asynchronously upload messages written as a blob. 
	* Messages written after this go as a new block.
	*/
	void flush();
 
	/**
	* Close the writer and all of its underlying components.
	* At the end of close, all the messages sent to the writer should be persisted in a blob.
	* flush should be called explicitly before close.
	* It is not the responsibility of close to upload blocks.
	* After close, no other operations can be performed.
	*/
	void close();	
}


AvroAzureBlobWriter is an AzureBlobWriter implementation to write messages which are Avro records.

  1. Schema of the incoming Avro records is expected to be the same for all messages between two flushes of the SystemProducer. This is because an Avro file expects all of its entries to conform to a schema and here blob is an Avro file.
  2. OutgoingMessageEnvelope.key will be ignored here. This is because the Avro blobs created consist of only the schema taken from the OutgoingMessageEnvelope.message.getSchema and all the messages which are Avro records. There is no way to insert a key into the records as that would make the record incompatible with the schema. Other implementations of AzureBlobWriter can utilize the key in a meaning fashion.
  3. Optionally, OutgoingMessageEnvelope.message can be a byte array provided the first message to the SystemProducer or the first message sent after a flush is an Avro record. The first message being an Avro record is needed to get the schema for the Avro file. If the subsequent messages are byte arrays, then the byte arrays are appended to the Avro file directly without any check to see if the byte array indeed is an encoded Avro record which adheres to the schema of the Avro file. It is responsibility of the user of the SystemProducer to ensure that the byte arrays are valid failing which the resulting Azure file will be unreadable.
  4. SystemProducer is agnostic to schema evolution as long as the user adheres to the same schema between two flushes. Hence, if the user wants to change the schema then a flush prior to the change is mandatory. This is because, all messages sent between two flushes go into a single Avro file and are expected to adhere to the schema of the

...

  

...

AzureBlobSystemProducer which implements the existing org.apache.samza.system.SystemProducer interface.

Code Block
public class AzureBlobSystemProducer implements SystemProducer {
    /**
     * send messages from the source to Azure Blob Storage
     * get or create a writer for the <source, stream-partition> pair 
     * pass the envelope to the writer
     */
    public void send(String source, OutgoingMessageEnvelope messageEnvelope) {
    }

   /**
     *  commit all blobs pertaining to this source
     *  close and delete all writers associated with this source
     *  next message written for this source will create new writers.
     */
   public void flush(String source) {
   }  
}

AzureBlobWriter interface which is used by AzureBlobSystemProducer to send messages to Azure Blob Storage.

Code Block
public interface AzureBlobWriter {
	/**
	* write given message envelope to the blob opened
	*/
	void write(OutgoingMessageEnvelope messageEnvelope);

	/** 
	* Asynchronously upload messages written as a blob. 
	* Messages written after this go as a new block.
	*/
	void flush();
 
	/**
	* Close the writer and all of its underlying components.
	* At the end of close, all the messages sent to the writer should be persisted in a blob.
	* flush should be called explicitly before close.
	* It is not the responsibility of close to upload blocks.
	* After close, no other operations can be performed.
	*/
	void close();	
}

AvroAzureBlobWriter is an AzureBlobWriter implementation to write messages which are Avro records.

...

  1. file.
Code Block
public class AvroAzureBlobWriter implements AzureBlobWriter { 
	// Avro's DataFileWriter is used to write the incoming Avro records. The sink for DataFileWriter is AzureBlobOutputStream.
    /**
     * write the message to blob via DataFileWriter which writes to AzureBlobOutputStream
     */
	public void write(OutgoingMessageEnvelope messageEnvelope) {
	}
    
	/**
     * flush the contents of DataFileWriter and underlying AzureBlobOutputStream 
     * this operation uploads a block.
     */
	public void flush() {
	}

   /**
    * Close the DataFileWriter and underlying AzureBlobOutputStream
    * this operation commits the blob with all blocks uploaded so far.
    */
	public void close() {
	}
}

...