Discussion threadhttps://lists.apache.org/thread/zs9n9p8d7tyvnq4yyxhc8zvq1k2c1hvs
Vote threadhttps://lists.apache.org/thread/jo4m68m9lml6bbs3zcd43zv5vqdxmq5j
JIRA

Unable to render Jira issues macro, execution error.

Release1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This FLIP proposes a pluggable interface that allows users to implement custom failure enrichers using Flink’s generic plugin framework.

Failure enrichers are holding custom user logic and may also emit pre-defined labels (string kv pairs) that are exposed via the Job Masters’ REST interface.
This enables use-cases like: assigning particular types to failures (e.g., User or System), emitting custom metrics per type (e.g., application or platform), exposing failures to downstream consumers (e.g., notification systems).

Public interfaces

The public facing changes this FLIP introduces include:

  1. A config option for FailureEnrichers.

    1. Key: jobmanager.failure-enrichers

    2. Default: empty

    3. Description: An optional list of failure enricher names. If empty, NO failure enrichers will be started. If configured, only enrichers whose name matches any of the names in the list will be started.
      Example:

      jobmanager.failure-enrichers = org.apache.flink.test.plugin.jar.failure.TypeFailureEnricher



  2. Introducing FailureEnricher/Context and FailureEnricherFactory interfaces for defining and loading (using the plugin manager) custom Enricher implementations respectively.

    /**
     * Failure Enricher enabling custom logic and attaching metadata in the form of labels to each type
     * of failure as tracked in the job manager.
     */
    @Experimental
    public interface FailureEnricher {
    
        /**
         * Method to list all the label Keys the enricher can associate with Values in case of a failure
         * {@code processFailure}. Note that Keys must unique and properly defined per enricher
         * implementation otherwise will be ignored.
         *
         * @return the unique label Keys of the FailureEnricher
         */
        Set<String> getOutputKeys();
    
        /**
         * Method to handle a failure as part of the enricher and optionally return a map of KV pairs
         * (labels). Note that Values should only be associated with Keys from {@code getOutputKeys}
         * method otherwise will be ignored.
         *
         * @param cause the exception that caused this failure
         * @param context the context that includes extra information (e.g., if it was a global failure)
         * @return map of KV pairs (labels) associated with the failure
         */
        CompletableFuture<Map<String, String>> processFailure(
                final Throwable cause, final Context context);
    
        /**
         * An interface used by the {@link FailureEnricher}. Context includes an executor pool for the
         * enrichers to run heavy operations, the Classloader used for code gen, and other metadata.
         */
        @Experimental
        interface Context {
    
            /** Type of failure. */
            enum FailureType {
                /* happened in the scheduler context */
                GLOBAL,
                /* happened in the task manager context */
                LOCAL,
                /* caused by task manager disconnection/HB timeout */
                DISCONNECT
            }
    
            /**
             * Return the type of the failure e.g., global failure that happened in the scheduler
             * context.
             *
             * @return FailureType
             */
            FailureType getFailureType();
    
            /**
             * Get the user {@link ClassLoader} used for code generation, UDF loading and other
             * operations requiring reflections on user code.
             *
             * @return the user ClassLoader
             */
            ClassLoader getUserClassLoader();
    
            /**
             * Get an Executor pool for the Enrichers to run async operations that can potentially be
             * IO-heavy.
             *
             * @return the Executor pool
             */
            Executor getIOExecutor();
        }
    }
    /** Factory class for creating {@link FailureEnricher}. */
    @Experimental
    public interface FailureEnricherFactory extends Plugin {
    
        /**
         * Construct a FailureEnricher.
         *
         * @param conf configuration for this failure enricher
         * @param jobId the ID of the job
         * @param jobName the name of the job
         * @param metricGroup the metric group of the JobMaster
         * @return the FailureEnricher
         */
        FailureEnricher createFailureEnricher(
                Configuration conf, JobID jobId, String jobName, MetricGroup metricGroup);
    }
  3. Runtime REST API snaphot update -- job exceptions now include the labels emitted by failure enricher implementations. 

    GET /jobs/:jobid/exceptions
    
    RESPONSE BODY:
    {
    ...
      "exceptionHistory": {
        "type": "object",
        "id": "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:JobExceptionHistory",
        "properties": {
          "entries": {
            "type": "array",
            "items": {
              "type": "object",
              "id": "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:RootExceptionInfo",
              "properties": {
                "exceptionName": {
                  "type": "string"
                },
                "stacktrace": {
                  "type": "string"
                },
                "timestamp": {
                  "type": "integer"
                },
                "taskName": {
                  "type": "string"
                },
                "location": {
                  "type": "string"
                },
                "taskManagerId": {
                  "type": "string"
                },
                "labels": {
                  "type": "object",
                  "additionalProperties": {
                    "type": "string"
                  }
                }
                ...
              }
            }
          }
        }
       ...
      }
    }
    
    

Proposed Changes

Overview

Currently, all local/disconnect streaming task failures either from the Adaptive or the Default scheduler, go through the JobMaster and eventually trigger the ExecutionFailureHandler.
The handler returns a
FailureHandlingResult containing the reason of the failure (as a Throwable) and the vertices to restart if the failure is recoverable.
FailureHandlingResults are then stored as a snapshot and exposed as part of the ExceptionHistory through the JobMaster (UI and REST) as well as the the HistoryServer when enabled.

In similar manner, global failures (failures that happened in the scheduler context) go through the GlobalFailureHandler create FailureHandlingResults and then end-up getting stored as a part of a snapshot.

This FLIP proposes the addition of a pluggable interface that will allow users to add custom logic and optionally enrich such failures with custom metadata labels.
In short every Enricher:

  • Is triggered on every global/local/disconnect failure
  • Receives a Throwable cause and an immutable Context
  • Performs asynchronous execution (separate IoExecutor) to avoid blocking the main thread for RPCs
  • Is completely independent from other Enrichers
  • Emits failure labels/tags for its unique, pre-defined keys (defined at startup time –  see getOutputKeys() above)

Implementation

Flink core

We first introduce FailureEnricher/Context and FailureEnricherFactory as described in the Public Interfaces section above.

Flink runtime


Then as part of the runtime, we provide a default implementation of the Context that is passed down to the FailureEnrichers -- it is holding an executor pool for the enrichers to run heavy operations, the user Classloader, and the type of error.

/** The default implementation of {@link Context} class. */
public class DefaultFailureEnricherContext implements FailureEnricher.Context {
    private final Executor ioExecutor;
    private final ClassLoader userClassLoader;
    private final FailureType failureType;

    private DefaultFailureEnricherContext(
            FailureType failureType, Executor ioExecutor, ClassLoader classLoader) {
        this.failureType = failureType;
        this.ioExecutor = checkNotNull(ioExecutor);
        this.userClassLoader = classLoader;
    }

    @Override
    public FailureType getFailureType() {
        return failureType;
    }

    @Override
    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    @Override
    public Executor getIOExecutor() {
        return ioExecutor;
    }

    /** Factory method returning a Local failure Context for the given params. */
    public static Context forLocalFailure(Executor ioExecutor, ClassLoader classLoader) {
        return new DefaultFailureEnricherContext(FailureType.LOCAL, ioExecutor, classLoader);
    }

    /** Factory method returning a Global failure Context for the given params. */
    public static Context forGlobalFailure(Executor ioExecutor, ClassLoader classLoader) {
        return new DefaultFailureEnricherContext(FailureType.GLOBAL, ioExecutor, classLoader);
    }

    /** Factory method returning a Disconnection failure Context for the given params. */
    public static Context forDisconnectFailure(Executor ioExecutor, ClassLoader classLoader) {
        return new DefaultFailureEnricherContext(FailureType.DISCONNECT, ioExecutor, classLoader);
    }
}


We also extend the JobMaster to hold a Set of loaded Enrichers that are initialized using the FailureEnricherFactory. Keep in mind that each FailureEnricher can emit labels for specific keys that must be unique.
Thus as part of initialization we run a validation step making sure these keys don't overlap –  if they do we log an error message and remove the Enricher from the returned Set.

/** Utils class for loading and running pluggable failure enrichers. */
public class FailureEnricherUtils {

    public static Set<FailureEnricher> getFailureEnrichers(
            final Configuration configuration,
            final JobID jobId,
            final String jobName,
            final MetricGroup metricGroup,
			final PluginManager pluginManager) {

		Set<String> includedEnrichers = getIncludedFailureEnrichers(configuration);
        //  When empty, NO enrichers will be started.
        if (includedEnrichers.isEmpty()) {
            return Collections.emptySet();
        }
		final Iterator<FailureEnricherFactory> factoryIterator =
                pluginManager.load(FailureEnricherFactory.class);
        final Set<FailureEnricher> failureEnrichers = new HashSet<>();
        while (factoryIterator.hasNext()) {
            try {
                final FailureEnricherFactory failureEnricherFactory = factoryIterator.next();
                final FailureEnricher failureEnricher =
                        failureEnricherFactory.createFailureEnricher(
                                configuration, jobId, jobName, metricGroup);
                if (includedEnrichers.contains(failureEnricher.getClass().getName())) {
                    failureEnrichers.add(failureEnricher);
                    LOG.debug(
                            "Found failure enricher {} at {} ",
                            failureEnricherFactory.getClass().getName(),
                            new File(
                                            failureEnricher
                                                    .getClass()
                                                    .getProtectionDomain()
                                                    .getCodeSource()
                                                    .getLocation()
                                                    .toURI())
                                    .getCanonicalPath());
                } else {
                    LOG.info(
                            "Excluding failure enricher {}, not configured in enricher list ({}).",
                            failureEnricherFactory.getClass().getName(),
                            includedEnrichers);
                }
            } catch (Exception e) {
                LOG.warn("Error while loading failure enricher factory.", e);
            }
        }

        return filterInvalidEnrichers(failureEnrichers);
     }
 ...
}


Enrichers are actually triggered as part of JobMaster#updateTaskExecutionState for local failures, as part of JobMaster#disconnectTaskManager for disconnections, and as part of DefaultScheduler#handleGlobalFailure for global ones.
This makes the implementation generic enough to work with all Flink schedulers and easier to guard from race conditions e.g., concurrent processing of task failures.

@Override
public CompletableFuture<Acknowledge> updateTaskExecutionState(
        final TaskExecutionState taskExecutionState) {
		checkNotNull(taskExecutionState, "taskExecutionState");

        if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
            return labelFailure(taskExecutionState)
                    .thenApplyAsync(
                            taskStateWithLabels -> {
                                try {
                                    return doUpdateTaskExecutionState(taskStateWithLabels);
                                } catch (FlinkException e) {
                                    throw new CompletionException(e);
                                }
                            },
                            getMainThreadExecutor());
        }
        ...
}
 	
/**
* Creates a new task execution state copy of the original but with the provided labels
* (currently used for Failures)
*/
public TaskExecutionState withLabels(Map<String, String> failureLabels)  {..}


The collection of labels characterizing the failure is performed using a
ConjunctFuture – also validating Enrichers' output keys at runtime – and is then propagated as part of ErrorInfo within an FailedExecution of a FailureHandlingResult, eventually making their way to ExceptionsInfo and ExceptionHistoryEntry.

public ErrorInfo(@Nonnull Throwable exception, long timestamp, @Nullable Map<String, String> labels) {...}


@JsonCreator
public ExceptionInfo(
	@JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
	@JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace,
	@JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
	@JsonProperty(FIELD_NAME_EXCEPTION_LABELS) @Nullable Map<String, String> labels,
	@JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
	@JsonProperty(FIELD_NAME_LOCATION) @Nullable String location,
	@JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String taskManagerId) { ... }


Example implementations of the FailureEnrichers may include:

  1. a simple TypeFailureEnricher, attempting to categorize failures to USER or SYSTEM based on the class of the Throwable (OOM exception to SYSTEM while ArithmeticException to USER):

    /**
     * Type implementation of {@link FailureEnricher} that aims to categorize failures to USER or SYSTEM
     * based on the class of the failure.
     */
    public class TypeFailureEnricher implements FailureEnricher {
    
        private static final String typeKey = "type";
        private static final Set<String> labelKeys =
                Stream.of(typeKey).collect(Collectors.toCollection(HashSet::new));
    
        @Override
        public Set<String> getOutputKeys() {
            return labelKeys;
        }
    
        @Override
        public CompletableFuture<Map<String, String>> processFailure(
                final Throwable cause, final Context context) {
            return CompletableFuture.supplyAsync(
                    () -> {
                        final Map<String, String> labels = new HashMap();
                        if (cause == null) {
                            return labels;
                        }
                        if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(cause)) {
                            labels.put(typeKey, "SYSTEM");
                        } else if (ExceptionUtils.findThrowable(cause, ArithmeticException.class)
                                .isPresent()) {
                            labels.put(typeKey, "USER");
                        }
                        if (!labels.containsKey(typeKey)) {
                            labels.put(typeKey, "UNKNOWN");
                        }
                        return labels;
                    },
                    context.getIOExecutor());
        }
    }
  2. a CountingFailureEnricher counting the number of job failures using its own counter:

    /**
     * Counting implementation of {@link FailureEnricher} that records the count of job failures.
     * Counter includes failures that ignore restarts thus may be larger than numRestarts.
     **/
    public class CountingFailureEnricher implements FailureEnricher {
        private final Counter failureCount;
    
    
    
    
    	@Override
        public Set<String> getOutputKeys() {
            return Collections.emptySet();
        }
    
    
    	public CountingFailureEnricher(final MetricGroup metricGroup) {
            this.failureCount = metricGroup.counter(MetricNames.NUM_JOB_FAILURES);
        }
        @Override
        public CompletableFuture<Map<String, String>> processFailure(
                final Throwable cause, final Context context) {
            failureCount.inc();
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
    }


Flink runtime-web

Finally, inspired by Unable to render Jira issues macro, execution error. , we are going to provide a history of previously caused job failures to include the FailureEnricher labels.
Labels will be part of a separate column in the Job Web dashboard –  ready for direct consumption from the UI.



Compatibility, Deprecation, and Migration Plan

Nothing backwards incompatible.


Test Plan

The change will be covered with unit and integration tests.


Rejected Alternatives

  • Synchronous Enricher execution (could block JMs mainThread making JM unavailable)
  • Mutable Enricher Context


Credit

This effort was heavily influenced by Flink Exception Classifier for Downtime Cause Classification and Unable to render Jira issues macro, execution error. while UI changes by Unable to render Jira issues macro, execution error. .