Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion Voting
Discussion thread: discuss mail discuss mail2
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
PULL REQUEST: https://github.com/apache/kafka/pull/12219 ( before review PR https://github.com/apache/kafka/pull/7965 )
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
- Parsing AccessLog
Input
Code Block |
---|
{ "message" : "111.61.73.113 - - [08/Aug/2019:18:15:29 +0900] \"OPTIONS /api/v1/service_config HTTP/1.1\" 200 - 101989 \"http://local.test.com/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36\"" } |
Output
Code Block |
---|
{ "IP" : "111.61.73.113" ,"RemoteUser" : "-" ,"AuthedRemoteUser" : "-" ,"DateTime" : "08/Aug/2019:18:15:29 +0900" ,"Method" : "OPTIONS" ,"Request" : "/api/v1/service_config" ,"Protocol" : "HTTP/1.1" ,"Response" : "200" ,"BytesSent" : "-" ,"Ms" : "101989" ,"Referrer" : "http://local.test.com" ,"UserAgent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"} |
2. Parsing Patterned String ( such as URL )
Input
Code Block |
---|
{ "url" : "https://kafka.apache.org/documentation/#connect" } |
Output
Code Block |
---|
{ "protocol" : "https" ,"domain" : "kafka.apache.org" ,"path" : "documentation/#connect" } |
...
name | description | type | default | valid values | importance | |||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
regex | String Regex Group Pattern | struct.field | target fieldName In Case struct input | String | message | medium | regex | Ordered Regex Group Mapping Keys ( with :{TYPE} ) | String | grouped regular expression stringstring | medium | |
mapping | String Ordered Regex Group PatternMapping Keys | String | comma seperated names | medium |
...
Code Block |
---|
"transforms": "RegexTransform", "transforms.RegexTransform.type": "org.apache.kafka.connect.transforms.ToStructByRegexTransform$ValueParseStructByRegex$Value", "transforms.RegexTransform.regex": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(GET|POST|OPTIONS|HEAD|PUT|DELETE|PATCH) (.+?) (.+?)\" (\\d{3}) ([0-9|-]+) ([0-9|-]+) \"([^\"]+)\" \"([^\"]+)\"" "transforms.RegexTransform.mapping": "IP,RemoteUser,AuthedRemoteUser,DateTime,Method,Request,Protocol,Response,BytesSent,Ms:NUMBER,Referrer,UserAgent" |
Usecase2. Transform Config
Code Block |
---|
"transforms": "RegexTransform", "transforms.RegexTransform.type": "org.apache.kafka.connect.transforms.ToStructByRegexTransform$ValueParseStructByRegex$Value", "transforms.RegexTransform.struct.field": "url", "transforms.RegexTransform.regex": "^(https?):\\/\\/([^/]*)/(.*)" "transforms.RegexTransform.mapping": "protocol,domain,path" |
...
Proposed Changes
Only one main abstract class and one validator class added : ToStructByRegexTransformParseStructByRegex, GroupRegexValidator
this include
- describe/declare part
- main functionstype case support
1. describe/declare part
Code Block | ||
---|---|---|
| ||
public abstract class ToStructByRegexTransfo rm<RParseStructByRegex<R extends ConnectRecord<R>> implements Transformation<R> { public static final String OVERVIEW_DOC = "Generate key/value Struct objects supported by ordered Regex Group" + "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) " + "or value (<code>" + Value.class.getName() + "</code>)."; private static final String TYPE_DELIMITER = ":"; private interface ConfigName { String REGEX = "regex"; String MAPPING_KEY = "mapping"; String STURCT_INPUT_KEY_NAME = "struct.field"; } public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new GroupRegexValidator(), ConfigDef.Importance.MEDIUM, "String Regex Group Pattern.") .define(ConfigName.MAPPING_KEY, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Ordered Regex Group Mapping Keys ( with :{TYPE} )") .define(ConfigName.STURCT_INPUT_KEY_NAME, ConfigDef.Type.STRING, "message", ConfigDef.Importance.MEDIUM, "target fieldName In Case struct input"); private static final String PURPOSE = "TransformParse Struct by regex group mapping"; ... |
2. main functions interface
Code Block | ||
---|---|---|
| ||
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}
private R applySchemaless(R record) {
...
}
private R applyWithSchema(R record) {
...
} |
...
. |
...
You can use TypeCase below with regex config
...
Code Block | ||||
---|---|---|---|---|
| ||||
public enum TYPE{
STRING
,NUMBER
,FLOAT
,BOOLEAN
,TIMEMILLIS
}
private static class KeyData{
private String name;
private TYPE type;
private KeyData(String name, String type){
this.name = name;
this.type = type != null ? TYPE.valueOf(type) : TYPE.STRING;
}
public String getName(){
return this.name;
}
public TYPE type(){
return this.type;
}
private Object castJavaType(String value){
try {
switch (this.type) {
case STRING: return value;
case NUMBER: return Long.valueOf(value);
case FLOAT: return Float.valueOf(value);
case BOOLEAN: return Boolean.valueOf(value);
case TIMEMILLIS: return new Date(Long.valueOf(value));
default: return value;
}
}catch (Exception e){
return value;
}
}
private Schema getTypeSchema(){
switch (this.type){
case STRING: return Schema.STRING_SCHEMA;
case NUMBER: return Schema.INT64_SCHEMA;
case FLOAT: return Schema.FLOAT64_SCHEMA;
case BOOLEAN: return Schema.BOOLEAN_SCHEMA;
case TIMEMILLIS: return Timestamp.SCHEMA;
default: return Schema.STRING_SCHEMA;
}
}
} |
Compatibility, Deprecation, and Migration Plan
...