Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: grammar nits

 

This page discusses the implementation of Flink's distributed communication via Akka, which has been adopted in version 0.9. With Akka, all remote procedure calls are now realized as asynchronous messages. This mainly affects the components JobManager, TaskManager and JobClient. In the future, it is likely that even more components will be transformed into an actor, allowing them to send and process asynchronous messages.

...

Akka is a framework to develop concurrent, fault-tolerant and scalable applications. It is an implementation of the actor model and thus similar to Erlang's concurrency model. In the context of the actor model, all acting entities are considered independent actors. Actors communicate with other actors by sending asynchronous messages to each other. The strength of the actor model arises from this asynchronism. It is also possible to explicitly wait for a response which allows you to perform synchronous operations. Synchronous messages are strongly discouraged, though, because they limit the scalability of the system. Each actor has a mailbox in which the received messages are stored. Furthermore, each actor maintains its own isolated state. An example network of several actors is given below.

 


An actor has a single processing thread which polls the actor's mailbox and processes the received messages successively. As a result of a processed message, the actor can change its internal state, send new messages or spawn new actors. If the internal state of an actor is exclusively manipulated from within its processing thread, then there is no need to make the actor's state thread safe. Even though an individual actor is sequential by nature, a system consisting of several actors is highly concurrent and scalable, because the processing threads are shared among all actors. This sharing is also the reason why one should never call blocking calls from within an actor thread. Such a call would block the thread from being used by other actors to process their own messages.

...

An actor system is the container in which all actors live. It provides shared services such as scheduling, configuration, and logging. The actor system also contains the thread pool from where all actor threads are recruited.
Multiple actor system systems can coexist on a single machine. If the actor system is started with a RemoteActorRefProvider, then it can be reached from another actor system possibly residing on a remote machine. The actor system automatically recognises whether actor messages are addressed to an actor living in the same actor system or in a remote actor system. In the case of local communication, the message is efficiently transmitted using shared memory. In the case of remote communication, the message is sent through the network stack.

All actors are organized in a hierarchy. Each newly created actor gets its creating actor assigned as its parent assigned. The hierarchy is used for supervision. Each parent is responsible for the supervision of its children. If an error occurs in one of its children, then he the parent gets notified. If the actor can resolve the problem, then he the parent can resume or restart his its child. In case of a problem which that is out of his scope to deal with, he it can escalate the error to his its own parent. Escalating an error simply means that a hierarchy layer above the current one is now responsible for resolving the problem. Details about Akka's supervision and monitoring can be found here.

...

An actor is itself a container for state and behaviour.  It's Its actor thread sequentially processes the incoming messages. It alleviates the user from the error-prone task of locking and thread management because only one thread at a time is active for one actor. However, one must make sure that the internal state of an actor is only accessed from this actor thread. The behaviour of an actor is defined by a receive function which contains for each message some logic which is executed upon receiving this message.

The Flink system consists of three distributed components which have to communicate: The JobClient, the JobManager and the TaskManager. The JobClient takes a Flink job from the user and submits it to the JobManager. The JobManager is then responsible for orchestrating the job execution. First of all, it allocates the required amount of resources. This mainly includes the execution slots on the TaskManagers.

After resource allocation, the JobManager deploys the individual tasks of the job to the respective TaskManagers Upon receiving a task, the TaskManager spawns a thread which executes the task. State changes such as starting the calculation or finishing it are sent back to the JobManager. Based on these state updates, the JobManager will steer the job execution until it is finished. Once the job is finished, the result of it will be sent back to the JobClient which tells the user about it. The job execution process is depicted in the figure below.

...

Wherever possible, Flink tries to use asynchronous messages and to handle responses as futures. Futures and the few existing blocking calls have a timeout after which the operation is considered failed. This prevents the system from getting deadlocked in case a message gets lost or a distributed component crashes. However, if you happen to have a really large cluster or a slow network, timeouts might be triggered wrongly. Therefore, the timeout for these operations can be specified via "akka.ask.timeout" in the configuration.

Before an actor can talk to another actor it has to retrieve an ActorRef for it. The lookup for this operation requires also a timeout. In order to make the system fail fast if an actor is not started, the lookup timeout is set to a
smaller a smaller value than the regular timeout. In case that you experience lookup timeouts, you can increase the lookup time via "akka.lookup.timeout" in the configuration.

Another peculiarity of Akka is that it sets a limit for the maximum message size it can send. The reason for this is that it reserves a serialization buffer of the same size and does not want to waste memory. If you should ever encounter a transmission error because the message exceeded the maximum size, you can increase the framesize via "akka.framesize" in the configuration.

...

  • akka.ask.timeout: Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT: 100 s).

...

  • akka.

...

  • lookup.

...

  • timeout

...

  • Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d) (DEFAULT:

...

  •  10 s

...

  • ).

...

  • akka.

...

  • framesize:

...

  •  Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier (DEFAULT:

...

  •  10485760b

...

  • ).

...

  • akka.watch.heartbeat.

...

  • interval:

...

  •  Heartbeat interval for Akka's DeathWatch mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should increase this value. A thorough description of Akka's DeathWatch can be

...

...

  •  (DEFAULT: akka.ask.timeout/10

...

  • ).

...

  • akka.watch.heartbeat.

...

  • pause:

...

  •  Acceptable heartbeat pause for Akka's DeathWatch mechanism. A low value does not allow a irregular heartbeat. A thorough description of Akka's DeathWatch can be

...

  • found here (DEFAULT:

...

  •  akka.ask.timeout

...

  • ).

...

  • akka.watch.

...

  • threshold:

...

  •  Threshold for the DeathWatch failure detector. A low value is prone to false positives whereas a high value increases the time to

...

  • detect a dead TaskManager. A thorough description of Akka's DeathWatch can be found here (DEFAULT: 12).
  • akka.transport.heartbeat.

...

  • interval:

...

  •  Heartbeat interval for Akka's transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT:

...

  •  1000 s

...

  • ).

...

  • akka.transport.heartbeat.

...

  • pause:

...

  •  Acceptable heartbeat pause for Akka's transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d) (DEFAULT:

...

  •  6000 s

...

  • ).

...

  • akka.transport.

...

  • threshold:

...

  •  Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value (DEFAULT:

...

  •  300

...

  • ).

...

  • akka.tcp.

...

  • timeout:

...

  •  Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value (DEFAULT:

...

  •  akka.ask.timeout

...

  • ).

...

  • akka.

...

  • throughput:

...

  •  Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness (DEFAULT:

...

  •  15

...

  • ).

...

  • akka.log.lifecycle.

...

  • events:

...

  •  Turns on the Akka's remote logging of events. Set this value to 'on' in case of debugging (DEFAULT:

...

  •  off

...

  • ).

...

  • akka.startup-

...

  • timeout:

...

  •  Timeout after which the startup of a remote component is considered being failed (DEFAULT:

...

  •  akka.ask.timeout

...

  • ).