Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
Proposed Pull Request: #9492
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The TimeStampConverter
transform only allows to convert one field at a time for each usage of the transform (by use of the field
configuration parameter). But in a real environment you will often have multiple timestamps on an event (such as Created On, Last Updated On, Approved On, etc), and if you are in a position that one of them need to be converted using TimeStampConverter
then probably more than one (if not all of them) need to be transformed. For large messages which may already be going through multiple other transforms, then the performance goes down quite a bit if you end up chaining more than just a few TimeStampConverter
transforms just to catch all of the different fields.
At the same time, in the case of parsing strings to timestamps, in "real" environments it is not always possible to strictly control timestamp formats if multiple different services are producing messages to the same topic. For example, maybe some have specified a time zone and some have not, some give milliseconds, and some do not, etc. All of these variations could even be "valid" within the ISO 8601 standard but even the slightest difference in format of any event that does not match the exact specified format
pattern will produce a failure with TimeStampConverter
. So it would be better if it was possible to give an input pattern that allowed for different variations to be parsed from string into a proper Date/Time type.
Public Interfaces
From the perspective of using this transform in Connect, the following things will be changed:
- Change the configuration parameter
field
to be calledfields
since it will now support multiple comma-separated field names (but can support backward compatibility for some time). - Add new configuration parameters
format.input
to allow for a pattern format which supports multiple variations to parse a string, andformat.output
to specify the exact string format to output in the case of converting from a Date/Time to a string. - The configuration parameter
format
could be removed (but remain for backwards compatibility), or could also be used to specify bothformat.input
andformat.output
at the same time (assuming you just have a single string input format).
Proposed Changes
Supporting Multiple Fields
For supporting multiple fields, we can create a new configuration parameter called fields
which is of type ConfigDef.Type.LIST
.
And then for actually performing the transformation on multiple fields, instead of checking field.name().equals(config.field)
like seen in applyValueWithSchema
:
private Struct applyValueWithSchema(Struct value, Schema updatedSchema) { //... for (Field field : value.schema().fields()) { final Object updatedFieldValue; if (field.name().equals(config.field)) { //...
... it should check if the new fields
configuration parameter contains the field name. The rest of this logic can be the same.
And in applySchemaless
it should instead do a for-each loop on the entries of the Map instead of just doing a put
based on the old single field
name.
Instead of this:
Something like this:
for (Map.Entry<String, Object> field : value.entrySet()) { if (config.fields.contains(field.getKey())) updatedValue.put(field.getKey(), convertTimestamp(field.getValue())); }
Supporting Multiple String Input Formats
For output of a Date/Time field to a string, then it must be given in an exact format. So because of this, we need to separate the format configuration parameter into two: one parameter for output to strings with an exact format, and one parameter for input format of strings to be parsed into the target.type
that can support a pattern of different variations of the string-based date or timestamps.
So now there will be two new parameters added: format.input
and format.output
.
It is possible also to keep the format
parameter in place, and sort of allow it as a "single" configuration which will provide both the input and output formats, and work exactly as it did before this change. In this scenario, it would not support multiple different input formats (so again, the same as before). But it should not allow to set a mix of both the old and the new format parameters.
In order to support multiple input patterns the suggestion is to make the string to target type parsing use some of the features of java.time
such as DateTimeFormatter
instead of relying on the much older and more limited java.text.SimpleDateFormat
. java.time
was added in Java 8 which as I understand is the oldest version of Java supported by Kafka, so it should be fine to introduce these newer libraries I think.
With DateTimeFormatter
then we can simply use the ofPattern
method to build an input formatter using a regex-like pattern string that can be given in the new format.input
configuration parameter. For example patterns like this would be supported: "[yyyy-MM-dd[['T'][ ]HH:mm:ss[.SSSSSSSz][.SSS[XXX][X]]]]"
This also means that within the Config
instance there should be two separate formatters for intput and output (instead of just one called Config.format
today).
The output formatter can work exactly the same as today, basically...
SimpleDateFormat outputFormat = new SimpleDateFormat(outputFormatPattern); outputFormat.setTimeZone(UTC);
But the input formatter will come from DateTimeFormatter
instead. Something like this:
DateTimeFormatter inputFormat = DateTimeFormatter.ofPattern(inputFormatPattern).withZone(ZoneOffset.UTC);
Then there will need to be changes in the TRANSLATORS
Map which actually performs the conversion (via call to convertTimestamp()
). Namely:
String toType()
will need to make sure to use theSimpleDateFormat
output formatter based on theformat.output
parameter.toRaw()
from String types will need to use the newDateTimeFormatter
instead of callingSimpleDateFormat.parse()
like it is doing currently.- The proposal is to create a
TemporalAccessor
fromDateTimeFormatter.parseBest()
to help with different variations like in case sometimes the string could include only a date and sometimes the string could include both a date and a time. - In the end, since Kafka uses
java.util.Date
then the return value will need to be aDate
as well. So we will only usejava.time
to help with more advanced string parsing, but in the end since Kafka is so dependent onDate
then we will keep the "raw" format the same as it is before. One day it would be great to see a migration tojava.time
for all of Kafka but this is quite a huge change. - We can use the
toInstant()
method to convert a java.time object to an Instant, and then Date.from() to convert the Instant to a Date. - Here is a proposed example:
- The proposal is to create a
TemporalAccessor temporalAccessor = config.inputFormat.parseBest((String) orig, ZonedDateTime::from, LocalDate::from); if (temporalAccessor instanceof ZonedDateTime) return Date.from(((ZonedDateTime) temporalAccessor).toInstant()); else if (temporalAccessor instanceof LocalDate) return Date.from(((LocalDate) temporalAccessor).atStartOfDay(ZoneOffset.UTC).toInstant());
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
If the transform configuration parameter field
is renamed to fields
then users will need to update their connectors.
If the transform configuration parameter format
is removed and instead you must specify both format.output
and format.input
, then users will need to update their connectors.
- If we are changing behavior how will we phase out the older behavior?
We can either leave the existing configuration parameters how they are and make them "fit" into the new parameters, or they can be deprecated after a few versions.
- If we need special migration tools, describe them here.
No migration tool should be necessary, users will just need to update their config files or send a PUT request to the Connect REST API to update the configuration of connectors which are using the TimestampConverter transform.
- When will we remove the existing behavior?
Assuming that it will be based on the standard: "2 versions later".
Rejected Alternatives
One initial thought was to change the entire transform from using java.util.Date
to instead use java.time
classes instead. However, after a bit of investigation I quickly found that since Kafka and Connect have a huge list of dependencies on dates and times being a java.util.Date
, then it quickly became apparent that the easiest thing to do would be to focus on the core problem: parsing strings into a Date
in a smarter way with the help of something like DateTimeFormatter
. and then continue returning a Date
for use by the rest of Connect.