Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  Under Discussion Voting

Discussion thread: discuss mail    discuss mail2

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9436

PULL REQUEST:   https://github.com/apache/kafka/pull/12219 ( before review PR https://github.com/apache/kafka/pull/7965 )

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. Parsing AccessLog

Input

Code Block
{
  "message" : "111.61.73.113 - - [08/Aug/2019:18:15:29 +0900] \"OPTIONS /api/v1/service_config HTTP/1.1\" 200 - 101989 \"http://local.test.com/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36\""
}


Output


Code Block
{
   "IP" : "111.61.73.113"
   ,"RemoteUser" : "-"
   ,"AuthedRemoteUser" : "-"
   ,"DateTime" : "08/Aug/2019:18:15:29 +0900"
   ,"Method" : "OPTIONS"
   ,"Request" : "/api/v1/service_config"
   ,"Protocol" : "HTTP/1.1"
   ,"Response" : "200"
   ,"BytesSent" : "-"
   ,"Ms" : "101989"
   ,"Referrer" : "http://local.test.com"
   ,"UserAgent"
 : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"}



2. Parsing Patterned String ( such as URL )


Input

Code Block
{
  "url" :
 "https://kafka.apache.org/documentation/#connect"
}


Output

Code Block
{
  "protocol" : "https"
  ,"domain" : "kafka.apache.org"
  ,"path" : "documentation/#connect"
}

...

name

description

type

default

valid values

importance

regex
String Regex Group Pattern
struct.field

target fieldName In Case struct input

String

message

medium

regex
Ordered Regex Group Mapping Keys ( with :{TYPE} )String
grouped regular expression stringstring medium
mapping
String Ordered Regex Group PatternMapping KeysString
comma seperated namesmedium

...

Code Block
"transforms": "RegexTransform",
"transforms.RegexTransform.type": "org.apache.kafka.connect.transforms.ToStructByRegexTransform$ValueParseStructByRegex$Value",

"transforms.RegexTransform.regex": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(GET|POST|OPTIONS|HEAD|PUT|DELETE|PATCH) (.+?) (.+?)\" (\\d{3}) ([0-9|-]+) ([0-9|-]+) \"([^\"]+)\" \"([^\"]+)\""

"transforms.RegexTransform.mapping": "IP,RemoteUser,AuthedRemoteUser,DateTime,Method,Request,Protocol,Response,BytesSent,Ms:NUMBER,Referrer,UserAgent"


Usecase2. Transform Config

Code Block
"transforms": "RegexTransform",
"transforms.RegexTransform.type": "org.apache.kafka.connect.transforms.ToStructByRegexTransform$ValueParseStructByRegex$Value",

"transforms.RegexTransform.struct.field": "url",
"transforms.RegexTransform.regex": "^(https?):\\/\\/([^/]*)/(.*)"

"transforms.RegexTransform.mapping": "protocol,domain,path"

...

Proposed Changes


Only one main abstract class and one validator class added : ToStructByRegexTransformParseStructByRegex, GroupRegexValidator 

this include

  1. describe/declare part
  2. main functionstype case support


1. describe/declare part

Code Block
languagejava
public abstract class ToStructByRegexTransfo
rm<RParseStructByRegex<R extends ConnectRecord<R>> implements Transformation<R> {
    public static final String OVERVIEW_DOC = "Generate key/value Struct objects supported by ordered Regex Group"
        + "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) "
        + "or value (<code>" + Value.class.getName() + "</code>).";

    private static final String TYPE_DELIMITER = ":";

    private interface ConfigName {
        String REGEX = "regex";
        String MAPPING_KEY = "mapping";
        String STURCT_INPUT_KEY_NAME = "struct.field";
    }


    public static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new GroupRegexValidator(), ConfigDef.Importance.MEDIUM,
            "String Regex Group Pattern.")
        .define(ConfigName.MAPPING_KEY, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
            "Ordered Regex Group Mapping Keys ( with :{TYPE} )")
        .define(ConfigName.STURCT_INPUT_KEY_NAME, ConfigDef.Type.STRING, "message", ConfigDef.Importance.MEDIUM,
            "target fieldName In Case struct input");


    private static final String PURPOSE = "TransformParse Struct by regex group mapping";


 ...



2. main functions interface

Detail Code


Code Block
languagejava
@Override
    public R apply(R record) {
        if (operatingSchema(record) == null) {
            return applySchemaless(record);
        } else {
            return applyWithSchema(record);
        }
    }

    private R applySchemaless(R record) {
        ...
    }

    private R applyWithSchema(R record) {
        ...
    }

...

.

...

You can use TypeCase below with regex config

...

Code Block
languagejava
titleToStructByRegexTransform.java innerClass
public enum TYPE{
        STRING
        ,NUMBER
        ,FLOAT
        ,BOOLEAN
        ,TIMEMILLIS
    }

    private static class KeyData{
        private String name;
        private TYPE type;

        private KeyData(String name, String type){
            this.name = name;
            this.type = type != null ? TYPE.valueOf(type) : TYPE.STRING;
        }

        public String getName(){
            return this.name;
        }

        public TYPE type(){
            return this.type;
        }

        private Object castJavaType(String value){
            try {
                switch (this.type) {
                    case STRING: return value;
                    case NUMBER: return Long.valueOf(value);
                    case FLOAT: return Float.valueOf(value);
                    case BOOLEAN: return Boolean.valueOf(value);
                    case TIMEMILLIS: return new Date(Long.valueOf(value));
                    default: return value;
                }
            }catch (Exception e){
                return value;
            }
        }


        private Schema getTypeSchema(){
            switch (this.type){
                case STRING: return Schema.STRING_SCHEMA;
                case NUMBER: return Schema.INT64_SCHEMA;
                case FLOAT: return Schema.FLOAT64_SCHEMA;
                case BOOLEAN: return Schema.BOOLEAN_SCHEMA;
                case TIMEMILLIS: return Timestamp.SCHEMA;
                default: return Schema.STRING_SCHEMA;
            }
        }
    }



Compatibility, Deprecation, and Migration Plan

...