Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Draft, JIRA exists with pull-requestAdopted
Discussion thread: here
Vote thread: not started here
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-6755
PULL REQUEST: #6284
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The existing MaskField SMT uses null value equivalent for all types of fields masked.
...
'replacement' param value, if specified in config, will be applied to all fields in the 'fields' param list. This way user can mask any string or numeric value with any literal of his choice.
Public Interfaces
New param 'replacement' will be added to the MaskField SMT config. Param is optional so this change is backward compatible.
name | description | type | default | valid values | importance |
---|---|---|---|---|---|
replacement | custom value replacement, that will be applied to all 'fields' values (numeric or non-empty string values convertable to the 'fields' type(s)). | String | null | non-empty string with numeric or text value, which can be converted to 'fields' type(s) | low |
Example config:
Code Block | ||||
---|---|---|---|---|
| ||||
transforms=SSNMask,IPMask,PhoneMask transforms.SSNMask.type=org.apache.kafka.connect.transforms.MaskField$Value transforms.SSNMask.fields=ssn transforms.SSNMask.replacement=***-***-**** transforms.IPMask.type=org.apache.kafka.connect.transforms.MaskField$Value transforms.IPMask.fields=ipAddress transforms.IPMask.replacement=xxx.xxx.xxx.xxx transforms.PhoneMask.type=org.apache.kafka.connect.transforms.MaskField$Value transforms.PhoneMask.fields=office,mobile transforms.PhoneMask.replacement=+0-000-000-0000 |
With config described above there will be 3 transformations, first and second will replace value in a single field, while third will be applied to both 'office' and 'phone' fields.
Restrictions
If replacement provided cannot be converted to the target field type or field type is not supported then org.apache.kafka.connect.errors.DataException will be thrown.
...
Replacing Date, Map or List with custom value seems to be useless and makes conversion logic more complicated.
Proposed Changes
Add 'replacement' field param to config definition:
Code Block | ||||
---|---|---|---|---|
| ||||
private static final String REPLACEMENT_CONFIG = "replacement"; public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Names of fields to mask.") .define(REPLACEMENT_CONFIG, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), ConfigDef.Importance.LOW, "Custom value replacement, that will be applied to all" + " 'fields' values (numeric or non-empty string values only)."); |
...
Define mapping functions for custom replacement fieldvalue:
Code Block | ||||
---|---|---|---|---|
| ||||
private static final Map<Class<?>, Function<String, ?>> REPLACEMENT_MAPPING_FUNC = new HashMap<>(); static { REPLACEMENT_MAPPING_FUNC.put(Byte.class, v -> Values.convertToByte(null, v)); REPLACEMENT_MAPPING_FUNC.put(Short.class, v -> Values.convertToShort(null, v)); REPLACEMENT_MAPPING_FUNC.put(Integer.class, v -> Values.convertToInteger(null, v)); REPLACEMENT_MAPPING_FUNC.put(Long.class, v -> Values.convertToLong(null, v)); REPLACEMENT_MAPPING_FUNC.put(Float.class, v -> Values.convertToFloat(null, v)); REPLACEMENT_MAPPING_FUNC.put(Double.class, v -> Values.convertToDouble(null, v)); REPLACEMENT_MAPPING_FUNC.put(String.class, Function.identity()); REPLACEMENT_MAPPING_FUNC.put(BigDecimal.class, BigDecimal::new); REPLACEMENT_MAPPING_FUNC.put(BigInteger.class, BigInteger::new); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
private static Object maskWithCustomReplacement(Object value, String replacement) { Function<String, ?> replacementMapper = REPLACEMENT_MAPPING_FUNC.get(value.getClass()); if (replacementMapper == null) { throw new DataException("Cannot mask value of type " + value.getClass() + " with custom replacement."); } try { return replacementMapper.apply(replacement); } catch (NumberFormatException ex) { throw new DataException("Unable to convert " + replacement + " (" + replacement.getClass() + ") to number", ex); } } |
Compatibility, Deprecation, and Migration Plan
...
The new configuration property in this proposal is backward compatible, so any existing connector configurations that use the MaskField SMT will continue to work as-is with no behavioral changes. To use the replacement feature, such existing connector configurations will need to be modified to add a new `replacement` property.
Rejected Alternatives
First version of the improvement, proposed in the PR, used JSON map of maskField → replacement, this approach had both pros and cons:
...