Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Enums are used throughout the Kafka codebase and are heavily utilized in the Kafka Connect ecosystem. Supporting parsing and validation of enum values would provide a better user experience by utilizing error messages that have a clear corrective action. For example take a typo while configuring security.protocol throws the following error message.

...

The second version of this error message gives the user a message that is specific to the version of software they are using. It read the enum that is present in the JVM and outputs the available constants for the enum.

Public Interfaces

The changes to the API consist of two parts. The AbstractConfig class is modified to include methods for working with Enums. The second part are changes to the ConfigDef classes that will provide recommended values and validation.

...

Code Block
class AbstractConfig {

  private static <T extends Enum> T parseEnum(String key, String value, Class<T> enumType) {
    try {
      return Enum.valueOf(enumType, value);
    } catch (IllegalArgumentException ex) {
      String message = String.format(
          "Enum value not found. Valid values are: %s",
          Stream.of(enumType.getEnumConstants())
              .map(Enum::name)
              .collect(Collectors.joining(", "))
      );
      throw new ConfigException(name, value, message);
    }
  }

  public <T extends Enum> List<T> getEnums(String key, Class<T> enumType) {
    List value = getList(key);
    List<T> result = new ArrayList<>();
    for (Object entry : value) {
      result.add(parseEnum(key, value, enumType));
    }
    return result;
  }

  public <T extends Enum> T getEnum(String key, Class<T> enumType) {
    String value = getString(key);
    return parseEnum(key, value, enumType);
  }
}

class ConfigDef {

  public static class ValidEnum<T extends Enum> implements ConfigDef.Validator {
    final Set<String> validEnums;
    final String message;

    public static <T> ValidEnum<T> of(Class<T> enumClass, T... excludes) {
      return new ValidEnum(enumClass, excludes);
    }

    private ValidEnum(Class<T> enumClass, T... excludes) {
      Set<T> exclude = new HashSet<>(excludes);
      this.validEnums = Stream.of(enumClass.getEnumConstants())
          .filter(e -> !exclude.contains(e))
          .map(Enum::name)
          .collect(Collectors.toSet());
      this.message = this.validEnums.stream()
          .sort()
          .collect(Collectors.joining(", "));
    }

    @Override
    public void ensureValid(String s, Object o) {
      if (o instanceof String) {
        if (!validEnums.contains(o)) {
          throw new ConfigException(
              s,
              String.format(
                  "'%s' not found. Valid values are %s.",
                  o,
                  enumClass.getSimpleName(),
                  message
              )
          );
        }
      } else if (o instanceof List) {
        List list = (List) o;
        for (Object i : list) {
          ensureValid(s, i);
        }
      } else {
        throw new ConfigException(
            s,
            o,
            "Must be a String or List"
        );
      }
    }

    @Override
    public String toString() {
      return this.message;
    }
  }

  public static class EnumRecommender<T extends Enum> implements ConfigDef.Recommender {
    final Set<String> validEnums;

    public static <T> EnumRecommender<T> of(Class<T> enumClass, T... excludes) {
      return new EnumRecommender(enumClass, excludes);
    }

    private EnumRecommender(Class<T> enumClass, T... excludes) {
      Set<T> exclude = new HashSet<>(excludes);
      this.validEnums = Stream.of(enumClass.getEnumConstants())
          .filter(e -> !exclude.contains(e))
          .map(Enum::name)
          .collect(Collectors.toSet());
    }

    @Override
    public List<Object> validValues(String s, Map<String, Object> map) {
      return Collections.unmodifiableList(validEnums);
    }

    @Override
    public boolean visible(String s, Map<String, Object> map) {
      return true;
    }
  }
}

Compatibility, Deprecation, and Migration Plan

Additional methods and classes will be added to ConfigDef and AbstractConfig. Kafka Connect plugin developers utilizing this functionality could receive a MissingMethodException if their plugin is used with an older Connect Worker. 

Rejected Alternatives

One possibility was to extend ConfigDef.Type to include ENUM as a type similar to CLASS. This would require extensive modification to the ConfigDef classes because it doesn't currently support a way to specify the type of CLASS or ENUM. This is validated later. Providing a Validator and method to read from a string requires less modification while achieving acceptable results.