Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  Under Discussion Voting

Discussion thread: discuss mail    discuss mail2

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9436

PULL REQUEST:   https://github.com/apache/kafka/pull/12219 ( before review PR https://github.com/apache/kafka/pull/7965 )

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. Parsing AccessLog

Input

Code Block
{
  "message" : "111.61.73.113 - - [08/Aug/2019:18:15:29 +0900] \"OPTIONS /api/v1/service_config HTTP/1.1\" 200 - 101989 \"http://local.test.com/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36\""
}


Output


Code Block
{
   "IP" : "111.61.73.113"
   ,"RemoteUser" : "-"
   ,"AuthedRemoteUser" : "-"
   ,"DateTime" : "08/Aug/2019:18:15:29 +0900"
   ,"Method" : "OPTIONS"
   ,"Request" : "/api/v1/service_config"
   ,"Protocol" : "HTTP/1.1"
   ,"Response" : "200"
   ,"BytesSent" : "-"
   ,"Ms" : "101989"
   ,"Referrer" : "http://local.test.com"
   ,"UserAgent"
 : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"}



2. Parsing Patterned String ( such as URL )


Input

Code Block
{
  "url" :
 "https://kafka.apache.org/documentation/#connect"
}


Output

Code Block
{
  "protocol" : "https"
  ,"domain" : "kafka.apache.org"
  ,"path" : "documentation/#connect"
}

...

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.


name

description

type

default

valid values

importance

transforms.RegexTransform.struct.field

target fieldName In Case struct input

String

message

medium

regex
String Regex Group Pattern
transforms.RegexTransform.regex
Ordered Regex Group Mapping Keys ( with :{TYPE} )
String
grouped regular expression
string
string medium
transforms.RegexTransform.
mapping
String
Ordered Regex Group
Pattern
Mapping KeysString
comma seperated namesmedium


Example Config

Usecase1. Transform Config

Code Block
"transforms": "RegexTransform",
"transforms.RegexTransform.type": "org.apache.kafka.connect.transforms.ToStructByRegexTransform$ValueParseStructByRegex$Value",

"transforms.RegexTransform.regex": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(GET|POST|OPTIONS|HEAD|PUT|DELETE|PATCH) (.+?) (.+?)\" (\\d{3}) ([0-9|-]+) ([0-9|-]+) \"([^\"]+)\" \"([^\"]+)\""

"transforms.RegexTransform.mapping": "IP,RemoteUser,AuthedRemoteUser,DateTime,Method,Request,Protocol,Response,BytesSent,Ms:NUMBER,Referrer,UserAgent"


Usecase2. Transform Config

Code Block
"transforms": "RegexTransform",
"transforms.RegexTransform.type": "org.apache.kafka.connect.transforms.ToStructByRegexTransform$ValueParseStructByRegex$Value",

"transforms.RegexTransform.struct.field": "url",
"transforms.RegexTransform.regex": "^(https?):\\/\\/([^/]*)/(.*)"

"transforms.RegexTransform.mapping": "protocol,domain,path"

...

Proposed Changes


Only one main abstract class and one validator class added : ToStructByRegexTransformParseStructByRegex, GroupRegexValidator 

this include

  1. describe/declare part
  2. main functionstype case support


1. describe/declare part

Code Block
languagejava
public abstract class ToStructByRegexTransfo
rm<RParseStructByRegex<R extends ConnectRecord<R>> implements Transformation<R> {
    public static final String OVERVIEW_DOC = "Generate key/value Struct objects supported by ordered Regex Group"
        + "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) "
        + "or value (<code>" + Value.class.getName() + "</code>).";

    private static final String TYPE_DELIMITER = ":";

    private interface ConfigName {
        String REGEX = "regex";
        String MAPPING_KEY = "mapping";
        String STURCT_INPUT_KEY_NAME = "struct.field";
    }


    public static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new GroupRegexValidator(), ConfigDef.Importance.MEDIUM,
            "String Regex Group Pattern.")
        .define(ConfigName.MAPPING_KEY, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
            "Ordered Regex Group Mapping Keys ( with :{TYPE} )")
        .define(ConfigName.STURCT_INPUT_KEY_NAME, ConfigDef.Type.STRING, "message", ConfigDef.Importance.MEDIUM,
            "target fieldName In Case struct input");


    private static final String PURPOSE = "TransformParse Struct by regex group mapping";


 ...



2. main functions interface

Detail Code


Code Block
languagejava
@Override
    public R apply(R record) {
        if (operatingSchema(record) == null) {
            return applySchemaless(record);
        } else {
            return applyWithSchema(record);
        }
    }

    private R applySchemaless(R record) {
        ...
    }

    private R applyWithSchema(R record) {
        ...
    }

3. Additional Type Convert Support

You can use TypeCase below with regex config

...

Code Block
languagejava
titleToStructByRegexTransform.java innerClass
public enum TYPE{
        STRING
        ,NUMBER
        ,FLOAT
        ,BOOLEAN
        ,TIMEMILLIS
    }

    private static class KeyData{
        private String name;
        private TYPE type;

        private KeyData(String name, String type){
            this.name = name;
            this.type = type != null ? TYPE.valueOf(type) : TYPE.STRING;
        }

        public String getName(){
            return this.name;
        }

        public TYPE type(){
            return this.type;
        }

        private Object castJavaType(String value){
            try {
                switch (this.type) {
                    case STRING: return value;
                    case NUMBER: return Long.valueOf(value);
                    case FLOAT: return Float.valueOf(value);
                    case BOOLEAN: return Boolean.valueOf(value);
                    case TIMEMILLIS: return new Date(Long.valueOf(value));
                    default: return value;
                }
            }catch (Exception e){
                return value;
            }
        }


        private Schema getTypeSchema(){
            switch (this.type){
                case STRING: return Schema.STRING_SCHEMA;
                case NUMBER: return Schema.INT64_SCHEMA;
                case FLOAT: return Schema.FLOAT64_SCHEMA;
                case BOOLEAN: return Schema.BOOLEAN_SCHEMA;
                case TIMEMILLIS: return Timestamp.SCHEMA;
                default: return Schema.STRING_SCHEMA;
            }
        }
    }



Compatibility, Deprecation, and Migration Plan

...