1. Definition of Scheduling
1.1 Key factors for the performance
I will define "scheduling" in OpenWhisk as a process to assign activations to proper invokers to maximize the performance.
In the container-based serverless world, it is an expensive operation to create and remove a container. It could take 200ms ~ 1s under heavy loads. If you think of the fact that a short running action can run only for 2ms, it takes 100 ~ 500 times more than execution. This is because Docker is not designed to manipulate multiple containers concurrently. This is the reason why it is important to minimize the container operation(especially "run/rm") to maximize the throughput.
Moreover, invoking activations is comprised of two steps, 1. initializing codes, 2. running codes. Once the container is initialized, it can be reused to omit initialization process. So it is also one of the key factors for performance to maximize the reuse of existing containers.
In short, we need to minimize container operations and maximize container reuse for better performance.
1.2 Difficulties for the optimal scheduling
Ideally, it would be best to schedule activations based on real-time resource states. Actually, however, it is not possible in OpenWhisk. Actions live in the world of 1 figure milliseconds. A simple action can run only for 2 ~ 3 ms. It means the resource states of invokers can change every 2 ~ 3 ms. Also when one controller decides to schedule an activation to an invoker, other controllers also can schedule activations to the invoker. So each controller should monitor extremely frequently changing resources, consider the decision made by other controllers, and finally decide the proper(optimal) invokers to send the activation within 3 ms.
If we take account into network round trip time to collect resources and decision results from other controllers, it is clearly not possible to finish all these processes within 3ms. More and more containers and invokers are utilized, it is getting worse.
In short, it is impossible to optimally schedule activations based on real-time resource states.
1.3 How does OpenWhisk schedule actions currently
Then let's look into how OpenWhisk deals with these difficulties currently.
Each action has "HomeInvoker" which is an index and is deterministically calculated based on the namespace and the action name using a hash function. It indicates the target invoker for the action. Controllers will try to send activations for the given action to the "HomeInvoker" first. If there is not enough resource in the Invoker, they will send activations to other than HomeInvoker. Since activations for the same action will be sent to the same invoker, it is highly likely to reuse containers.
Also, it is not a viable option to take account into all decision made by other controllers at scheduling time, OpenWhisk statically divides invoker resources into several pieces and assign them to each controller. This is the ShardingPoolBalancer. So each controller will have a portion of dedicated resources in each invoker. So controllers do not need to care of each other, they can just focus on scheduling based on resources assigned to them.
But this solution has an intrinsic limitation such as "action intervention", "inaccurate throttling" and "needs for vertical invoker sharding" and so on. I already shared issues in the previous proposal, so I don't want to describe them here in detail.
In short, invoker resources are evenly distributed among controllers and each controller schedule activations only based on its own resource.
2. Design consideration
With many benchmarks and deep ponderation, I finally reached the following conclusion.
2.1 Push vs Pull
In terms of passing activations from controllers to containers, there are two options.
- Push: Controllers push activations to the invokers(containers).
- Pull: Each container pulls activations for the given action.
Currently, OpenWhisk is taking push-based activation passing.
The pull-based method has a few advantages comparing to push-based one.
- Controllers do not need to care about the location of existing containers.
- All containers are fully reused as long as there are containers for the given action.
- Container creation does not affect the performance of existing containers.
So I take the pull-based method to pass activations to containers.
2.2 Single activation buffer vs Dedicated buffer for each action.
If one buffer is shared among different actions, there is a possibility that the execution of some actions can delay the others. If few activations are not properly handled and keep waiting in the queue, subsequent activations may not be processed in time or can be delayed a bit.
To minimize the influence among actions, I used a dedicated buffer for each action.
2.3 Separation of Execution path and Container creation path
This is already briefly mentioned above. Currently, an execution includes container creation. Since container creation is relatively slower than execution, it can hinder the action execution if it takes a long time.
So I separated the execution path from the container creation path. As I take the pull-based method, now invokers only receive container creation requests. Activations are pulled by each container.
2.4 New concept of the resource
Currently, OpenWhisk has two action throttling limits, 1. the number of activations per minute, 2. the number of concurrent activations. But if we take the pull-based method, throttling is simplified to "the number of containers". With this method, containers will execute activations as long as there are activations in the queue with a best effort manner. It means these containers are fully assigned for the given namespace. As containers are dedicated to the given action and will keep running, we don't need to care about the number of activations per minute. We can only care about the number of containers in the namespace. If one container can handle all incoming requests, we will not create more container. This is because creating a new container can take more time than just waiting for the running activation to be finished under heavy loads. This is obvious that if an action only runs for 10ms, it would be better to wait for the previous run to be finished than wasting 200ms ~ 1s to create a new container.
And with this change, it is critical to decide the timing and the number of containers to create. Performance can considerably vary depending on this decision.
2.5 Rule out Kafka
This can be controversial. Kafka is a great solution. It supports horizontal scaling, high throughput with commodity hardware, and high reliability, and so on. I have used it for many years for many purposes. But I reached a conclusion that Kafka does not fit well with OpenWhisk and it becomes a hurdle for better controllability and flexibility. This is because we cannot control the message routing. While Kafka supports some methods to partition messages, it's not enough for Openwhisk.
If I implement a new system with the above design, I need a dedicated topic for each action. Containers for the same action will access to a dedicated topic. Since the number of parallel consumers is limited to the number of partition, if we want to utilize 100 containers, we need 100 partitions in the topic, and for 200 containers, 200 partitions are required. Partition is not designed to dynamically changed and it takes relatively much time to change it. So it's not practical to change it on the fly. This is the first limitation of Kafka.
For this reason, I introduced MessageDistributor in AutonomousContainerSchedulerV2. It can limit the number of partitions to the number of invokers. But the second limitation came out from it.
We cannot dynamically change the "MaxPeek". As containers are dynamically created and deleted, there can be a mismatch between the number of containers and "MaxPeek" in MessageDistributor. If there are less number of containers than MaxPeek, many messages can wait for containers to be free. If we configure MaxPeek to small enough value, it cannot fully utilize all containers when more containers are created.
Even though we don't take the design I described above, we will need complex message routing at some point as more and more use cases and features are implemented in OpenWhisk. And this is clearly not possible with Kafka as we don't have full control on the routing. We cannot even implement OpenWhisk-specific logic in Kafka.
This is the reason why I am thinking we need to rule out Kafka and introduce a new component for buffering and routing.
2.6 The meaning of At-most-once and circuit breaking
This can be also controversial. I have been thinking about the at-most-once semantic and the circuit breaking in OpenWhisk.
Let's think about the at-most-once semantic which is supported in OpenWhisk. Mostly, OpenWhisk relies on Kafka. As soon as invokers fetch activations from Kafka, it immediately commits offsets. Once offsets are committed, activations reside in the invoker memory only. If the invoker crashed for some reasons while it holds some activations, all "not-yet-finished" activations will be lost. But the invoker already committed offsets, no more activation is executed again. This is the at-most-once semantic which OpenWhisk supports. It is meaningful because it would be better to skip failed activations in some cases instead of running the same activations again.
For example, let's assume there is an action that increases an existing value and puts the results in DB. If it crashed right after the value is successfully stored in DB, multiple executions will store a totally different value. So precisely, activations can be failed at any time and OpenWhisk does not guarantee at-least-once semantic. Instead, OpenWhisk guarantees that a response is always sent to clients no matter whether it is successful or not. This is achieved by controllers. Controllers start a timer when it sends activation to an invoker. If there is no response from the invoker for the given time, the activation will be timed out and a failed response will be sent to the client(or stored in DB). This is a circuit breaking in OpenWhisk. No matter how many invokers are failed, clients can receive (failed) responses from controllers anyway. The failure of invokers should not make the entire system unresponsive. It would be the worst that clients never receive any responses from controllers and nothing is left in client-side about what happens on their requests. Now we can finally reach the following conclusion with these ideas.
- The system(OpenWhisk) should not be affected by the failure of one component.
- Clients should always be able to receive responses. At least they should be able to notice their requests are failed when the system failure happens.
With the above two requirements, now let's think of ruling out Kafka more. Kafka is being used for activation buffering and message delivery. Message delivery in OpenWhisk is a little bit different with message delivery in other traditional systems. Messages are delivered in realtime and no message is fetched again if it is already delivered(executed). It means activation messages are transient. We don't need to keep all data for all retention period. Messages just pass through the whole system and once they are delivered, they become stale and useless. This implies that our message delivery component does not necessarily need to support persistence against all messages. In case incoming messages overwhelm the system capacity and some messages should be buffered until resources become available, we definitely need to support persistence for them. But it is not critical to support persistence for all flowing messages I think. Even though we lose flowing messages, OpenWhisk already supports circuit breaking and clients will anyway receive some responses. I think this is the reason why Markus proposed to use Kafka only for overflowing workload in OpenWhisk future architecture. Even if we lose some messages, it would be the same level of guarantee with the current version as invokers can also lose already committed messages on failure.
Similarly, when I design a new component for message delivery I didn't consider the persistence.
In short, we do not necessarily support persistence for message delivery with at-most-once semantic and circuit breaking process in OpenWhisk.
2.7 Transactional support
There is a limitation in current throttling method as the limit is divided by the number of controllers. Invocations can be throttled in one controller though there are still enough capacity in other controllers. This is because throttler does not consider the invocations in the other controllers.
For better throttling, throttlers should consider the cluster-wise resources. According to 2.4, the resource is now the number of containers. Throttler will throttle the namespace when the number of containers being used by the namespace reaches the throttling limit. As throttling works based on cluster-wise resources, it is important to make sure the number of containers does not exceed the throttling limit. Since multiple controllers receives invocation requests and triggers container creation, we need transactional support when creating containers. So whenever they tries to create containers, only one of them should be able to create them. The others will fail to perform the transaction and retry it. At anytime, once the number of containers exceed the limit, all subsequent transaction will fail and no more retry will happen.
Since invocation path is separated with the container creation path, though there is transaction in container creation path, there would be no huge performance disadvantage in invocation path.