THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
name | description | type | default | valid values | importance | |||||
---|---|---|---|---|---|---|---|---|---|---|
struct.field | target fieldName In Case struct input | String | message | medium | regex | Ordered Regex Group Mapping Keys ( with :{TYPE} ) | String | regular expression string | medium | |
mapping | String Regex Group Pattern | String | comma seperated names | medium |
...
Code Block |
---|
"transforms": "RegexTransform", "transforms.RegexTransform.type": "org.apache.kafka.connect.transforms.ToStructByRegexTransform$Value", "transforms.RegexTransform.struct.field": "url", "transforms.RegexTransform.regex": "^(https?):\\/\\/([^/]*)/(.*)" "transforms.RegexTransform.mapping": "protocol,domain,path" |
...
Code Block | ||
---|---|---|
| ||
public abstract class ToStructByRegexTransfo rm<R extends ConnectRecord<R>> implements Transformation<R> { public static final String OVERVIEW_DOC = "Generate key/value Struct objects supported by ordered Regex Group" + "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) " + "or value (<code>" + Value.class.getName() + "</code>)."; private static final String TYPE_DELIMITER = ":"; private interface ConfigName { String REGEX = "regex"; String MAPPING_KEY = "mapping"; String STURCT_INPUT_KEY_NAME = "struct.field"; } public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new GroupRegexValidator(), ConfigDef.Importance.MEDIUM, "String Regex Group Pattern.") .define(ConfigName.MAPPING_KEY, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Ordered Regex Group Mapping Keys ( with :{TYPE} )") .define(ConfigName.STURCT_INPUT_KEY_NAME, ConfigDef.Type.STRING, "message", ConfigDef.Importance.MEDIUM, "target fieldName In Case struct input"); private static final String PURPOSE = "Transform Struct by regex group mapping" ... |
...
Code Block | ||
---|---|---|
| ||
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}
private R applySchemaless(R record) {
...
}
private R applyWithSchema(R record) {
...
} |
3. Additional Type Convert Support
You can use TypeCase below with regex config
...
Code Block | ||||
---|---|---|---|---|
| ||||
public enum TYPE{ STRING ,NUMBER ,FLOAT ,BOOLEAN ,TIMEMILLIS } private static class KeyData{ private String name; private TYPE type; private KeyData(String name, String type){ this.name = name; this.type = type != null ? TYPE.valueOf(type) : TYPE.STRING; } public String getName(){ return this.name; } public TYPE type(){ return this.type; } private Object castJavaType(String value){ try { switch (this.type) { case STRING: return value; case NUMBER: return Long.valueOf(value); case FLOAT: return Float.valueOf(value); case BOOLEAN: return Boolean.valueOf(value); case TIMEMILLIS: return new Date(Long.valueOf(value)); default: return value; } }catch (Exception e){ return value; } } private Schema getTypeSchema(){ switch (this.type){ case STRING: return Schema.STRING_SCHEMA; case NUMBER: return Schema.INT64_SCHEMA; case FLOAT: return Schema.FLOAT64_SCHEMA; case BOOLEAN: return Schema.BOOLEAN_SCHEMA; case TIMEMILLIS: return Timestamp.SCHEMA; default: return Schema.STRING_SCHEMA; } } } |
Compatibility, Deprecation, and Migration Plan
...