Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

name

description

type

default

valid values

importance

struct.field

target fieldName In Case struct input

String

message

medium

regex
Ordered Regex Group Mapping Keys ( with :{TYPE} )String
regular expression stringmedium
mapping
String Regex Group PatternString
comma seperated namesmedium

...

Code Block
"transforms": "RegexTransform",
"transforms.RegexTransform.type": "org.apache.kafka.connect.transforms.ToStructByRegexTransform$Value",

"transforms.RegexTransform.struct.field": "url",
"transforms.RegexTransform.regex": "^(https?):\\/\\/([^/]*)/(.*)"

"transforms.RegexTransform.mapping": "protocol,domain,path"

...

Code Block
languagejava
public abstract class ToStructByRegexTransfo
rm<R extends ConnectRecord<R>> implements Transformation<R> {
    public static final String OVERVIEW_DOC = "Generate key/value Struct objects supported by ordered Regex Group"
        + "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) "
        + "or value (<code>" + Value.class.getName() + "</code>).";

    private static final String TYPE_DELIMITER = ":";

    private interface ConfigName {
        String REGEX = "regex";
        String MAPPING_KEY = "mapping";
        String STURCT_INPUT_KEY_NAME = "struct.field";
    }


    public static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new GroupRegexValidator(), ConfigDef.Importance.MEDIUM,
            "String Regex Group Pattern.")
        .define(ConfigName.MAPPING_KEY, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
            "Ordered Regex Group Mapping Keys ( with :{TYPE} )")
        .define(ConfigName.STURCT_INPUT_KEY_NAME, ConfigDef.Type.STRING, "message", ConfigDef.Importance.MEDIUM,
            "target fieldName In Case struct input");


    private static final String PURPOSE = "Transform Struct by regex group mapping"


...

...

Code Block
languagejava
@Override
    public R apply(R record) {
        if (operatingSchema(record) == null) {
            return applySchemaless(record);
        } else {
            return applyWithSchema(record);
        }
    }

    private R applySchemaless(R record) {
        ...
    }

    private R applyWithSchema(R record) {
        ...
    }

3. Additional Type Convert Support

You can use TypeCase below with regex config

...

Code Block
languagejava
titleToStructByRegexTransform.java innerClass
public enum TYPE{
        STRING
        ,NUMBER
        ,FLOAT
        ,BOOLEAN
        ,TIMEMILLIS
    }

    private static class KeyData{
        private String name;
        private TYPE type;

        private KeyData(String name, String type){
            this.name = name;
            this.type = type != null ? TYPE.valueOf(type) : TYPE.STRING;
        }

        public String getName(){
            return this.name;
        }

        public TYPE type(){
            return this.type;
        }

        private Object castJavaType(String value){
            try {
                switch (this.type) {
                    case STRING: return value;
                    case NUMBER: return Long.valueOf(value);
                    case FLOAT: return Float.valueOf(value);
                    case BOOLEAN: return Boolean.valueOf(value);
                    case TIMEMILLIS: return new Date(Long.valueOf(value));
                    default: return value;
                }
            }catch (Exception e){
                return value;
            }
        }


        private Schema getTypeSchema(){
            switch (this.type){
                case STRING: return Schema.STRING_SCHEMA;
                case NUMBER: return Schema.INT64_SCHEMA;
                case FLOAT: return Schema.FLOAT64_SCHEMA;
                case BOOLEAN: return Schema.BOOLEAN_SCHEMA;
                case TIMEMILLIS: return Timestamp.SCHEMA;
                default: return Schema.STRING_SCHEMA;
            }
        }
    }



Compatibility, Deprecation, and Migration Plan

...