Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add clarification that connector and task assignments are separate.

...

In order to manage these hot-spots, it is necessary to change the scheduling algorithm to take these constraints into account. But Connect is not situated to be able to manage resources directly, as workers are given a fixed the shared-JVM model does not permit strong isolation between threads.

If instead each job can be assigned to it's own worker, resource constraints can be specified at the process-boundary. Existing tooling for managing process resources can monitor and enforce resource utilization for that job by enforcing them on the worker containing that job. In order to make this possible, Connect should provide a mechanism for the user or management layer to assign jobs to specific workers, and a scheduling algorithm that respects these assignments.

...

If static assignments are not specified, or at least one worker in the cluster is not using the static  protocol, they are ignored and the worker may receive an arbitrary assignment.

If a job is specified in the static assignments does not exist, it will be ignored. A static.tasks  for a connector may be specified without a corresponding static.connectors .

Choosing static assignments

Users and management frameworks are expected to choose the values for these configurations with knowledge of their workload (i.e. set of connectors and tasks).

The workload can be read from the GET /connectors  and GET /connectors/{connector}  endpoints, which reveals both the set of connector IDs and task IDs. For closed-loop control, these endpoints should be polled regularly to discover changes in the number of tasks requested by each connector.

Alternatively if a fixed set of workers is desirable, the set of connectors and tasks can be inferred from the connector configurations alone. Each connector will provision at most tasks.max tasks, so workers for all of the tasks can be provisioned ahead-of-time. This has the downside that connectors which spawn fewer than the maximum number of tasks may receive static assignments for jobs which don't exist, possibly leaving some static workers idle.

For a new cluster without any connectors, a worker may be started with an empty assignment, assignments for the first connector to be created if they are known in advance, or an unsatisfiable assignment if they are not known. For subsequent connectors, the connector can be created before, during, or after static workers for that connector are added.

Proposed Changes

If the connect.protocol is set to static, each worker will send it's static.connectors  and static.tasks  to the coordinator during rebalances.

...

Measuring and enforcing resource constraints is not practical when multiple jobs share the JVM, as it is difficult to track which threads, memory, and I/O are used by each job. Without the ability to measure or enforce these constraints as hard limits, misconfigured resource limits would be difficult to track down as resource exhaustion on a single worker could not be easily attributed to a single job.

Spawn additional JVM processes attached to a single worker for individual connectors and tasks

This has the benefits of preserving the ability to assign multiple pieces of work to a single worker, while getting JVM isolation between connectors and tasks.

However, this separates the lifetime of the connectors and tasks, and leaves the possibility of a connector or task running while being completely out of sight of the rest of the cluster. Similar bugs within the current single-JVM model have been very harmful, and so the opportunity for such bugs should be avoided.

Replace the existing scheduling algorithm with a "weighted" algorithm using weights as a proxy for resource utilization.

...

This represents a much larger investment, and has a much more difficult upgrade path for users of the Distributed deployment model, as it would require migrating connectors between Connect clusters. It also would require re-examining many of the abstractions used in Distributed mode, such as the config topic, connector config forwarding, zombie worker fencing, etc. Implementing an opt-in extension to Distributed mode which can force jobs to exclusively reside on certain nodes is much smaller incremental change, but still empowers users and external management layers to solve the resource isolation problems which are most painful.

Isolate ephemeral jobs in addition to persistent jobs

The Connect framework performs a number of ephemeral jobs using plugins, such as configuration validation and offsets resetting, version gathering, etc. These happen on whatever worker needs the information, most commonly the worker servicing a REST request or the leader of the cluster.

Because any worker can be expected to service any REST request, and any worker can become the leader, isolating these ephemeral jobs to specific nodes is not practical, and would require significant changes to the framework, such as the additional-process model or a change in request forwarding or leadership.