You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA Unable to render Jira issues macro, execution error.

Proposed Pull Request: #9492

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The TimestampConverter transform only allows to convert one field at a time for each usage of the transform (by use of the field configuration parameter).  But in a real environment you will often have multiple timestamps on an event (such as Created On, Last Updated On, Approved On, etc), and if you are in a position that one of them need to be converted using TimestampConverter then probably more than one (if not all of them) need to be transformed.  For large messages which may already be going through multiple other transforms, then the performance goes down quite a bit if you end up chaining more than just a few TimestampConverter transforms just to catch all of the different fields.

At the same time, in the case of parsing strings to timestamps, in "real" environments it is not always possible to strictly control timestamp formats if multiple different services are producing messages to the same topic.  For example, maybe some have specified a time zone and some have not, some give milliseconds, and some do not, etc.  All of these variations could even be "valid" within the ISO 8601 standard but even the slightest difference in format of any event that does not match the exact specified format pattern will produce a failure with TimestampConverter .  So it would be better if it was possible to give an input pattern that allowed for different variations to be parsed from string into a proper Date/Time type.

Public Interfaces

From the perspective of using this transform in Connect, the following things will be changed:

  • Change the configuration parameter field to be called fields since it will now support multiple comma-separated field names (but can support backward compatibility for some time).
  • Add new configuration parameters format.input to allow for a pattern format which supports multiple variations to parse a string, and format.output to specify the exact string format to output in the case of converting from a Date/Time to a string.
  • The configuration parameter format could be removed (but remain for backwards compatibility), or could also be used to specify both format.input and format.output at the same time (assuming you just have a single string input format).

Proposed Changes

Supporting Multiple Fields

For supporting multiple fields, we can create a new configuration parameter called fields which is of type ConfigDef.Type.LIST.


And then for actually performing the transformation on multiple fields, instead of checking field.name().equals(config.field) like seen in applyValueWithSchema:

TimestampConverter.java
    private Struct applyValueWithSchema(Struct value, Schema updatedSchema) {
	//...
        for (Field field : value.schema().fields()) {
            final Object updatedFieldValue;
            if (field.name().equals(config.field)) {
			//...

... it should check if the new fields configuration parameter contains the field name.  The rest of this logic can be the same.

And in applySchemaless it should instead do a for-each loop on the entries of the Map instead of just doing a put based on the old single field name.

Instead of this:

Something like this:


TimestampConverter.java
            for (Map.Entry<String, Object> field : value.entrySet()) {
                if (config.fields.contains(field.getKey()))
                    updatedValue.put(field.getKey(), convertTimestamp(field.getValue()));
            }

Supporting Multiple String Input Formats

For output of a Date/Time field to a string, then it must be given in an exact format.  So because of this, we need to separate the format configuration parameter into two:  one parameter for output to strings with an exact format, and one parameter for input format of strings to be parsed into the target.type that can support a pattern of different variations of the string-based date or timestamps.

So now there will be two new parameters added:  format.input and format.output.

It is possible also to keep the format parameter in place, and sort of allow it as a "single" configuration which will provide both the input and output formats, and work exactly as it did before this change.  In this scenario, it would not support multiple different input formats (so again, the same as before).  But it should not allow to set a mix of both the old and the new format parameters.

In order to support multiple input patterns the suggestion is to make the string to target type parsing use some of the features of java.time such as DateTimeFormatter instead of relying on the much older and more limited java.text.SimpleDateFormatjava.time was added in Java 8 which as I understand is the oldest version of Java supported by Kafka, so it should be fine to introduce these newer libraries I think.

With DateTimeFormatter then we can simply use the ofPattern method to build an input formatter using a regex-like pattern string that can be given in the new format.input configuration parameter.  For example patterns like this would be supported:  "[yyyy-MM-dd[['T'][ ]HH:mm:ss[.SSSSSSSz][.SSS[XXX][X]]]]"

This also means that within the Config instance there should be two separate formatters for intput and output (instead of just one called Config.format today).

The output formatter can work exactly the same as today, basically...

TimestampConverter.java
SimpleDateFormat outputFormat = new SimpleDateFormat(outputFormatPattern);
outputFormat.setTimeZone(UTC);

But the input formatter will come from DateTimeFormatter instead.  Something like this:

TimestampConverter.java
DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern(inputFormatPattern).withZone(ZoneOffset.UTC);

Then there will need to be changes in the TRANSLATORS Map which actually performs the conversion (via call to convertTimestamp()).  Namely:

  • String toType() will need to make sure to use the SimpleDateFormat output formatter based on the format.output parameter.
  • toRaw() from String types will need to use the new DateTimeFormatter instead of calling SimpleDateFormat.parse() like it is doing currently.
    • The proposal is to create a TemporalAccessor from DateTimeFormatter.parseBest() to help with different variations like in case sometimes the string could include only a date and sometimes the string could include both a date and a time.
    • In the end, since Kafka uses java.util.Date then the return value will need to be a Date as well.  So we will only use java.time to help with more advanced string parsing, but in the end since Kafka is so dependent on Date then we will keep the "raw" format the same as it is before. One day it would be great to see a migration to java.time for all of Kafka but this is quite a huge change.
    • We can use the toInstant() method to convert a java.time object to an Instant, and then Date.from() to convert the Instant to a Date.
    • Here is a proposed example:

TimestampConverter.java
TemporalAccessor temporalAccessor = config.inputFormat.parseBest((String) orig, ZonedDateTime::from, LocalDate::from);
if (temporalAccessor instanceof ZonedDateTime)
    return Date.from(((ZonedDateTime) temporalAccessor).toInstant());
else if (temporalAccessor instanceof LocalDate)
    return Date.from(((LocalDate) temporalAccessor).atStartOfDay(ZoneOffset.UTC).toInstant());


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

If the transform configuration parameter field is renamed to fields then users will need to update their connectors.

If the transform configuration parameter format is removed and instead you must specify both format.output and format.input, then users will need to update their connectors.

  • If we are changing behavior how will we phase out the older behavior?

We can either leave the existing configuration parameters how they are and make them "fit" into the new parameters, or they can be deprecated after a few versions.

  • If we need special migration tools, describe them here.

No migration tool should be necessary, users will just need to update their config files or send a PUT request to the Connect REST API to update the configuration of connectors which are using the TimestampConverter transform.

  • When will we remove the existing behavior?

Assuming that it will be based on the standard: "2 versions later".

Rejected Alternatives

One initial thought was to change the entire transform from using java.util.Date to instead use java.time classes instead.  However, after a bit of investigation I quickly found that since Kafka and Connect have a huge list of dependencies on dates and times being a java.util.Date, then it quickly became apparent that the easiest thing to do would be to focus on the core problem: parsing strings into a Date in a smarter way with the help of something like DateTimeFormatter. and then continue returning a Date for use by the rest of Connect.


  • No labels