Target release
Epic Unable to render Jira issues macro, execution error.
Document statusFINAL
Document owner

Koji Kawamura

Designer
Developers
QA

Goals

  • Exchange flow files between two NiFi environments using Site-to-Site via HTTP(S) 

Background and strategic fit

Some environments only allow network communication through HTTP(S) port, typically with multi-datacenter deployments. In order to exchange data between NiFi environments using Site-to-Site in such restricted deployments, we should add HTTP(S) as a transport protocol for Site-to-Site.

Assumptions

Requirements

#TitleUser StoryImportanceNotes
1Minimize required network pots to go through FirewallThe target NiFi server only allows access for HTTP/HTTPS. Raw Socket Site-to-Site requires additional port (typically 9990).Must Have

To minimize required open ports, the new HTTP endpoints are added under /nifi-api/site-to-site, using the same port with the existing NiFi API.

2Selectable Transport protocolA DFM can select transport protocol to use from NiFi Web UI. Available protocols are 'RAW' and 'HTTP'.Must Have 
3Support HTTPS and authThe network communications can be secured by HTTPS. When to do so, use source NiFi sends its certificate and target NiFi validates if it is registered within a trust store.Must Have 
4Support HTTP ProxyTo reach the target NiFi all communications have to go through a HTTP Proxy server.Must HaveThere is an existing JIRA issue to allow enabling security per Port NIFI-304 , but this proposal doesn't address it, provide security per server basis as Raw Socket does.
5Same level of transaction characteristics as RAW Socket

For the flow-files transferred from NiFi-A to NiFi-B, the transaction should be committed on NiFi-A and NiFi-B, only if NiFi-A confirms that NiFi-B received the all sent data intact.

Similar for flow-files retrieval operation. Details are described below.

Must Have 
6Same level of port availability check as RAW Socket

The availability of data transport should be the same as RAW socket such as followings:

  • If the target port doesn't exist
  • If the target port is not running
  • If the target ports destination is full

If the target port is not validated, then the peer (a host owning the port) should be penalized for a while to let other peers to be used.

Must Have 
7Load balancingLoad balancing capability same as RAW Socket should be provided. The target port which has more data in its queue will receive less than others for sending flow-files, and it will be pulled more often than others for receiving. See Peer Selection for details.Must Have 
8Follow target NiFi environment topology changeIf target NiFi cluster add/remove nodes and its topology changed, then the source NiFi environment should be able to detect the change automatically, meaning be able to use newly added nodes, or stop sending requests to removed nodes.Must Have 
9Protocol version management

In order to provide backward compatibility in the future, the client and server component should negotiate protocol version, and downgrade its behavior when counter part only supports old version.

RAW Socket implementation already has protocol versions from 1 to 5 as of this writing. In order to let HTTP transport protocol version improve independently, yet reuse the existing same logic with Socket impl, this proposal uses two protocol versions, 'transport protocol version' and 'transaction protocol version'.

Must Have

Since this is the 1st timing to introduce HTTP Site-to-Site protocol:

transport protocol ver: 1

transaction protocol ver: 5

10Batch up multiple files transportThe batch transport mechanism is the same as RAW socket protocol. How NiFi controls batch count, size and duration can be specified by HTTP headers. Must Have 
11CompressionWhether compress data packets can be specified by a HTTP header, refer HTTP headers. Must Have 

User interaction and design

This proposal add new UI input in Remote Process Group configuration dialog as the following image:

 

 

  • Transport Protocol: defaults to RAW
  • HTTP Proxy server hostname: Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance.
  • HTTP Proxy server port: Specify the proxy server's port number, optional. If not specified, default port 80 will be used.
  • HTTP Proxy user: Specify an user name to connect to the proxy server, optional.
  • HTTP Proxy password: Specify an user password to connect to the proxy server, optional.

nifi.properties

This proposal uses following configurations in nifi.properties :

 keydefault valuedescription
 nifi.web.http.port8080 
 nifi.web.https.port(blank) 
renamed

nifi.remote.input.socket.host

nifi.remote.input.host

(blank)Specify a hostname with that clients can reach to this host. This will be used by both RAW socket and HTTP.
 nifi.remote.input.socket.port(blank)Specify a port number to listen. RAW socket Site-to-Site is enabled when this property is set.
 

nifi.remote.input.secure

true

If it is true, then both RAW socket and HTTP should be secured, hence HTTPS protocol will be used.

newnifi.remote.input.http.enabledtrueSpecify true if HTTP Site-to-Site should be enabled on this host. This defaults to true, to use Site-to-Site without any property configuration.
newnifi.remote.input.http.transaction.ttl30 secSpecify how long a transaction can live on server, measured from the point of transaction creation.

 

Deployment examples

The following diagrams illustrate some deployment options to describe key features, it isn't meant to limit the deployment patterns. Although, following diagrams only show a single Site-to-Site client server,  the client can be one of a NiFi node within a NiFi cluster. Site-to-Site supports cluster to cluster data transport.

To Standalone NiFi : Socket

This is an existing deployment option using RAW Socket, here to describe the difference between HTTP Site-to-Site. It also supports secure communication, and NiFi cluster. With RAW Socket, it first retrieves the remote NiFi site info by sending a HTTP request to /nifi-api/site-to-site. After that, it uses Socket networking to exchange data.

NiFi-Site-to-Site-deployment-patterns

 

To Standalone NiFi : HTTP

If HTTP is used for Transport Protocol, then all communications between Site-to-Site client and the remote NiFi instance are done with HTTP protocol.

NiFi-Site-to-Site-deployment-http

To Standalone NiFi : HTTPS

Network traffic to a remote NiFi can be secured by setting nifi.remote.input.secure to true. When it's true, a remote NiFi instance is only accessible with HTTPS protocol.

NiFi-Site-to-Site-deployment-https

To Standalone NiFi : HTTP using Proxy

 If a remote NiFi instance is behind a firewall which only expose http port to a Proxy Server, its Site-to-Site client can be configured as shown in this diagram to use that proxy server.

NiFi-Site-to-Site-deployment-http-proxy

To NiFi Cluster : HTTP

If the target NiFi is a cluster, its client chooses which NiFi node to transport data based on Peer Selection each time when it transfers data, for example if the Site-to-Site client component is a Remote Process Group, it does peer selection when it's scheduled.

In order to allow a NiFi cluster to use HTTPS for Site-to-Site, but HTTP for communications within a cluster, siteToSiteHttpApiPort is added to NodeIdentifier. Because the existing apiPort is determined by if cluster protocol manager to node is secure.

NiFi-Site-to-Site-deployment-http-cluster

 

Peer Selection

If the remote NiFi forms a cluster, a Site-to-Site client has to determine which NiFi node to transfer data to/from, let's call the decision making process as 'Peer Selection'. There're two aspects for that, Flow file count, and Port Status. A Site-to-Site client does Peer Selection when startTransaction method is called.

Based on Flow file count

The first step is done before connecting any particular peer. Site-to-Site client uses Peer statuses that is retrieved from NCM. Peer statuses contains the number of flow files queued in each peer, and it is used to calculate how often a peer will be selected.

The frequency calculation logic is implemented in PeerSelector.java and both Raw socket and HTTP transport protocol uses it.

PeerSelector provides following load balancing characteristic when its getNextPeerStatus method called: "The target port which has more data in its queue will receive less than others for sending flow-files, and it will be pulled more often than others for receiving."

Based on Port Status

The second step is done at handshaking phase with a peer that is selected by PeerSelector.getNextPeerStatus().

In a clustered NiFi, the state of a Data Flow is synchronized. It means that if there is a Port named 'input-1', the port exists on every NiFi node with the same name and port identifier. Also If the port is stopped, the port is stopped on every NiFi node. So, in case of receiving UNKNOWN_PORT and PORT_NOT_IN_VALID_STATE, Site-to-Site client stops looking for another peer, because the same response will be returned from every NiFi node. Site-to-Site penalizes the peer and raises exception in these cases.

If a Site-to-Site client receives PORTS_DESTINATION_FULL, it only means that the port running on a particular NiFi node is full. So the client penalizes the peer, but continues looking for another peer. If all peers destination are full, then Site-to-Site client returns null as a return value for startTransaction method.

 

REST endpoints

Following REST endpoints will be added by this proposal:

  • /site-to-site/
    • GET: Returns required information of Site-to-Site for the source NiFi environment. Representing Controller of target NiFi environment.
  • /site-to-site/peers/
    • GET: Returns available peers of this NiFi environment.
  • /site-to-site/input-ports/{portId}/transactions/
    • POST: Initiate new transaction to send data from source to target NiFi. A new transaction id is published and returned.
  • /site-to-site/input-ports/{portId}/transactions/{transactionId}
    • PUT: Extends the transaction's TTL, used to let server know the client still working
    • DELETE: Commit the transaction which is held on server side.
  • /site-to-site/input-ports/{portId}/transactions/{transactionId}/flow-files
    • POST: Transfer data from source to target NiFi. The transaction will be held on server side instead of commit it immediately, in order to provide 2-phase style commit. Returns Checksum calculated on server side.
  • /site-to-site/output-ports/{portId}/transactions/
    • POST: Initiate new transaction to receive data from target to source NiFi. A new transaction id is published and returned.
  • /site-to-site/output-ports/{portId}/transactions/{transactionId}
    • PUT: Extends the transaction's TTL, used to let server know the client still working
    • DELETE: Commit the transaction which is held on server side. Client sends a Checksum calculated on client side.
  • /site-to-site/output-ports/{portId}/transactions/{transactionId}/flow-files
    • GET: Transfer data from target to source NiFi.  The transaction will be held on server side instead of commit it immediately, in order to provide 2-phase style commit.

 

Refer sequence diagrams 'REST interactions and scenarios' below for the details of how these REST endpoints are used.

 

Site-to-Site specific HTTP Headers

Following HTTP Headers are used in Site-to-Site REST requests:

Header Namedescriptiondefault value (if not specified)
x-nifi-site-to-site-protocol-version

Specify a preferred protocol version. If the version is supported by the remote NiFi, then it'll be used. If not, remote NiFi tries to find the closest supported version.

The negotiated protocol version is returned as response HTTP header with this header name.

This is designed to provide a backward compatibility, for example, when a newer client tries to send a request to older version of remote NiFi instance, the client would prefer to use ver 3, but if the remote only supports ver 2, then the client has to downgrade its behavior to ver 2.

It is required. If not specified, 404 Bad Request is returned.
x-nifi-site-to-site-use-compressionSet "true" if data packets should be compressed during transportation.false
x-nifi-site-to-site-request-expiration

Specify how long this communication can last in milliseconds. StandardRootGroupPort checks if the protocol is expired based on this value.

StandardRootGroupPort doesn't expire if it's not set.
x-nifi-site-to-site-batch-countNiFi limits the number of flow files for a batch operation up to this setting. If it is 0, NiFi doesn't limit based on flow file count.0
x-nifi-site-to-site-batch-sizeNiFi limits the total size of flow files for a batch operation up to this setting. If it is 0, NiFi doesn't limit based on total flow file size.0
x-nifi-site-to-site-batch-durationSpecify batch duration in milliseconds. If it is grater than 0, NiFi limits the batch operation time window up to this setting.

If none of batch-count, batch-size, nor batch-duration is specified, NiFi batches up flow files for 5 secs.

 

REST interaction sequence diagrams

Here are sequence diagrams describing complete HTTP request/response sequence of successful scenario for both input-ports and output-ports. Other semi-normal and error cases are described in Interaction Scenarios.

The A_component in the diagrams is a component which uses SiteToSiteClient class, such as:

  • NiFiReceiver for Apache Spark
  • NiFiBolt and NiFiSpoutReceiver for Apache Storm
  • StandardRemoteGroupPort, this is the Remote Process Group processor in a NiFi data flow

 

input-ports/

The input-ports endpoint is used for sending data from source NiFi to a remote target NiFi. When a transaction is created, a HTTP POST request is initiated with a flow-file url within a transactionId. While there is more data packet to be sent, the same HTTP POST request is used, to send data in streaming manner. When the client finishes sending all data packets to output stream, it flushes the stream. Finally, the client send a DELETE request to transactionId url to complete the transaction.

  A_component A_component HttpClient HttpClient HttpClientTransaction HttpClientTransaction SiteToSiteRestApiClient SiteToSiteRestApiClient SiteToSiteResource SiteToSiteResource createTransaction initiateTransaction POST /site-to-site/input-ports/{portId}/transactions transactionUrl, transactionProtocolVersion new state = TRANSACTION_STARTED openConnectionForSend POST /site-to-site/input-ports/{portId}/transactions/{transactionId}/flow-files Transaction alt[while there is data packet to send] send writes data to outputstream state = DATA_EXCHANGED confirm finishTransferFlowFiles 202 Accepted: returns serverChecksum validate server Checksum state = TRANSACTION_CONFIRMED complete commitTransferFlowFiles DELETE /site-to-site/input-ports/{portId}/transactions/{transactionId} 200 OK state = TRANSACTION_COMPLETED  

output-ports/

The output-ports endpoint is used for receiving data from a remote target NiFi to source NiFi. When a transaction is created, a HTTP GET request is initiated with a flow-file url within a transactionId. While there is more data packet to be received, the same HTTP GET request is used, to receive data in streaming manner. When the client finishes consuming all data packets from input stream, and confirm() method is called, the client send a DELETE request with a Checksum calculated at client side to complete the transaction.

The complete() method doesn't do anything other than update state to TRANSACTION_COMPLETED. A_component A_component HttpClient HttpClient HttpClientTransaction HttpClientTransaction SiteToSiteRestApiClient SiteToSiteRestApiClient SiteToSiteResource SiteToSiteResource createTransaction initiateTransaction POST /site-to-site/output-ports/{portId}/transactions transactionUrl, transactionProtocolVersion new state = TRANSACTION_STARTED openConnectionForReceive GET /site-to-site/output-ports/{portId}/transactions/{transactionId}/flow-files 202 Accepted Transaction alt[while there is data packet to receive] receive read from inputstream state = DATA_EXCHANGED data packet confirm commitReceivingFlowFiles(checksum) DELETE /site-to-site/output-ports/{portId}/transactions/{transactionId} validate client Checksum 200 OK state = TRANSACTION_CONFIRMED complete state = TRANSACTION_COMPLETED  

 

REST interactions and scenarios

input-ports/

Scenario Type

{portId}/transactions

{portId}/transactions/{transactionId}/flow-files{portId}/transactions/{transactionId}
Transaction Initiation Failure
  • Client sends a POST with handshake parameters
  • Server responds :
    • 400 Bad Request: if the protocol version is not specified, or not supported
    • 401 Unauthorized : If request is not authorized to the port
    • 403 Forbidden: if it is a NiFi Cluster Manager
    • 404 Not Found: if port is not found with portId
    • 503 Service unavailable : If the port is not running, not valid state, or port's destination is full
N/AN/A
Normal Case
  • Client sends POST with handshake parameters
  • Server creates a transaction
  • Server responds:
    • 201 Created: The created transaction's URL is returned with 'Location' header
  • Client sends a POST to the flow-files url
  • Client streams data packets
  • Client status -> DATA_EXCHANGED
  • Server consumes input stream to receive incoming data packets
  • Server responds 202 Accepted: a Checksum is returned with response body

-- confirm()

  • Client checks client Checksum and server Checksum
  • If Checksums are identical
  • Client state -> TRANSACTION_CONFIRMED

-- complete()

  • Client sends a DELETE to the transaction with CONFIRM_TRANSACTION
  • Server commits its session
  • Server responds 200 OK with TRANSACTION_FINISHED
  • Client state -> TRANSACTION_COMPLETED
Normal Case - Destination becomes full(after above interactions)

(after above interactions)

(branched from above interactions)

  • Server responds 200 OK with TRANSACTION_FINISHED _BUT_DESTINATION_FULL if there's no available relationships
  • Client state -> TRANSACTION_COMPLETED
  • Client panalize peer. TODO: only implemented in Socket, EndpointConnectionPool.java
BAD_CHECKSUM(after above interactions)

(after above interactions)

-- confirm()

  • Client checks client Checksum and server Checksum
  • If Checksums are not identical
  • Client sends a DELETE to the transaction with BAD_CHECKSUM
  • Client rollbacks its session
  • Server rollbacks its session
  • Server responds 200 OK with CANCEL_TRANSACTION
Cancel transaction(after above interactions)

(after above interactions)

-- cancel()

  • Client cancel the transaction for some reason (There's no component which cancels a transaction as of this writing, though)
  • TODO: not implemented yet
  • Client sends a DELETE to the transaction with CANCEL_TRANSACTION
  • Client status -> TRANSACTION_CANCELED
Defunct transaction(after above interactions)(after above interactions)
  • Client stops working
  • After passing certain amount of time, the transaction will be removed from server side
  • Transaction on client might be rollbacked, however we can't be sure, since it's not working properly
  • Transaction on server will be rollbacked when it gets expired

Expired transaction

(after above interactions)(after above interactions)

-- complete()

  • Client sends a DELETE to the transaction
  • Server responds 404 Not Found, if the transaction is already expired
  • Transaction on client should be rollbacked
  • Transaction on server has already rollbacked
Transaction Initiation Failure(after above interactions)(after above interactions)

-- complete()

  • Client sends a DELETE to the transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Transaction on server will be rollbacked when it gets expired
Defunct transaction(after above interactions)
  • Client stops working
  • After passing certain amount of time, the receiving transaction will be removed from server side
N/A
Expired transaction(after above interactions)
  • Client sends a POST to the flow-files url
  • Server responds 404 Not Found, if the receiving transaction is already removed because it's expired
N/A
Transaction Initiation Failure(after above interactions)
  • Client sends a POST to the flow-files url
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Server session hasn't started yet
N/A

 

output-ports/

Scenario Type

{portId}/transactions

{portId}/transactions/{transactionId}/flow-files{portId}/transactions/{transactionId}
Transaction Initiation Failure
  • same implemetation with input-ports
N/AN/A
Normal Case
  • Client sends POST with handshake parameters
  • Server creates a transaction
  • Server responds:
    • 201 Created: The created transaction's URL is returned with 'Location' header
  • Client sends a GET to the flow-files url
  • Client status -> DATA_EXCHANGED
  • Server responds 202 Accepted
  • Client consumes input stream to receive incoming data packets
  • Server writes data packets into output stream

-- confirm()

  • Client calculate Checksum of received data
  • Client sends a DELETE to the transaction with Checksum
  • Server checks client Checksum and server Checksum
  • If checksums are identical
  • Server commits its session
  • Server responds 200 OK with TRANSACTION_FINISHED
  • Client state -> TRANSACTION_CONFIRMED

-- complete()

  • Client state -> TRANSACTION_COMPLETED
Normal Case - Destination becomes full(after above interactions)

(after above interactions)

(branched from above interactions)

  • TODO Destination Full case is not implemented yet. Another roundtrip is required??? but there's no client side impl sending TRANSACTION_FINISHED _BUT_DESTINATION_FULL. Besides that, server side doesn't have to penalize a peer, does it?
BAD_CHECKSUM(after above interactions)

(after above interactions)

-- confirm()

  • Client calculate Checksum of received data
  • Client sends a DELETE to the transaction with Checksum
  • Server checks client Checksum and server Checksum
  • If Checksums are not identical
  • Server rollbacks its session
  • Server responds 400 Bad Request with BAD_CHECKSUM
  • Client raise an IOException
  • Client rollbacks its session
Cancel transaction(after above interactions)

(after above interactions)

-- cancel()

  • Client cancel the transaction for some reason (There's no component which cancels a transaction as of this writing, though)
  • TODO: not implemented yet
  • Client sends a DELETE to the transaction with CANCEL_TRANSACTION
  • Client status -> TRANSACTION_CANCELED
Defunct transaction(after above interactions)(after above interactions)
  • Client stops working
  • After passing certain amount of time, the transaction will be removed from server side
  • Transaction on client might be rollbacked, however we can't be sure, since it's not working properly
  • Transaction on server should be rollbacked when it gets expired

Expired transaction

(after above interactions)(after above interactions)

-- confirm()

  • Client sends a DELETE to the transaction
  • Server responds 404 Not Found, if the transaction is already removed because it's expired
  • Transaction on client should be rollbacked
  • Transaction on server has already rollbacked
Transaction Initiation Failure(after above interactions)(after above interactions)

-- confirm()

  • Client sends a DELETE to a holding transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Transaction on server will be rollbacked when it gets expired
Defunct transaction(after above interactions)
  • Client stops working
  • After passing certain amount of time, the transaction will be removed from server side
N/A
Expired transaction(after above interactions)
  • Client sends a GET to the flow-files url
  • Server responds 404 Not Found, if the transaction is already removed because it's expired
N/A
Transaction Initiation Failure(after above interactions)
  • Client sends a GET to the flow-files url
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Server session hasn't started yet
N/A

Questions

Below is a list of questions to be addressed as a result of this requirements document:

QuestionOutcome

Not Doing

  • Unable to render Jira issues macro, execution error.
  • Unable to render Jira issues macro, execution error.

1 Comment

  1. Koji Kawamura - This is fantastic and a model for how feature proposals should be written.  Thank you!  While I have not reviewed all the of the aspects in full detail one thing that caught my eye was the nifi.properties change from this 'nifi.remote.input.socket.host' to 'nifi.remote.input.host'.  I think it is certainly the right thing to do, however, we must account for backward compatibility considerations.  We either need to document this in our migration guide for moving from 0.x to 1.x or we should support the older property for a period of time as a deprecated property.