Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Draft, JIRA exists with pull-requestAdopted

Discussion thread: not started here

Vote threadhere

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6755

...

New param 'replacement' will be added to the MaskField  SMT config. Param is optional so this change is backward compatible.

namedescriptiontypedefaultvalid valuesimportance

replacement

custom value replacement, that will be applied to all 'fields' values

(numeric or non-empty string values convertable to the 'fields' type(s)).

String

null

non-empty string with numeric or text value, which can be converted to 'fields' type(s)

low


Example config: 

Code Block
languagetext
titleconfig.properties
transforms=SSNMask,IPMask,PhoneMask

transforms.SSNMask.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.SSNMask.fields=ssn
transforms.SSNMask.replacement=***-***-****

transforms.IPMask.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.IPMask.fields=ipAddress
transforms.IPMask.replacement=xxx.xxx.xxx.xxx


transforms.PhoneMask.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.PhoneMask.fields=office,mobile
transforms.PhoneMask.replacement=+0-000-000-0000

...

Proposed Changes

Add 'replacement' field param to config definition:

Code Block
languagejava
titleMaskField.java
private static final String REPLACEMENT_CONFIG = "replacement";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Names of fields to mask.")
        .define(REPLACEMENT_CONFIG, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), ConfigDef.Importance.LOW, "Custom value replacement, that will be applied to all"
                + " 'fields' values (numeric or non-empty string values only).");

...

Define mapping functions for custom replacement fieldvalue:

Code Block
languagejava
titleMaskField.java
private static final Map<Class<?>, Function<String, ?>> REPLACEMENT_MAPPING_FUNC = new HashMap<>();

static {
    REPLACEMENT_MAPPING_FUNC.put(Byte.class, v -> Values.convertToByte(null, v));
    REPLACEMENT_MAPPING_FUNC.put(Short.class, v -> Values.convertToShort(null, v));
    REPLACEMENT_MAPPING_FUNC.put(Integer.class, v -> Values.convertToInteger(null, v));
    REPLACEMENT_MAPPING_FUNC.put(Long.class, v -> Values.convertToLong(null, v));
    REPLACEMENT_MAPPING_FUNC.put(Float.class, v -> Values.convertToFloat(null, v));
    REPLACEMENT_MAPPING_FUNC.put(Double.class, v -> Values.convertToDouble(null, v));
    REPLACEMENT_MAPPING_FUNC.put(String.class, Function.identity());
    REPLACEMENT_MAPPING_FUNC.put(BigDecimal.class, BigDecimal::new);
    REPLACEMENT_MAPPING_FUNC.put(BigInteger.class, BigInteger::new);
}

...

Compatibility, Deprecation, and Migration Plan

...

The new configuration property in this proposal is backward compatible, so any existing connector configurations that use the MaskField SMT will continue to work as-is with no behavioral changes. To use the replacement feature, such existing connector configurations will need to be modified to add a new `replacement` property.

Rejected Alternatives

First version of the improvement, proposed in the PR, used JSON map of maskField → replacement, this approach had both pros and cons:

...