THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Connector descriptor for the Apache Kafka message queue. */ @PublicEvolving public class KafkaConnector extends TableDescriptor { public static KafkaConnectorBuilder newBuilder() { return new KafkaConnectorBuilder(); } public static class KafkaConnectorBuilder extends TableDescriptorBuilder<KafkaConnector, KafkaConnectorBuilder> { public KafkaConnectorBuilder() { super(KafkaConnector.class); } /** * Sets the Kafka version to be used. * * @param version Kafka version. E.g., "0.8", "0.11", etc. */ public KafkaConnectorBuilder version(String version) { Preconditions.checkNotNull(version); option(CONNECTOR.key(), "kafka-" + version); return this; } /** * Sets the topic from which the table is read. * * @param topic The topic from which the table is read. */ public KafkaConnectorBuilder topic(String topic) { Preconditions.checkNotNull(topic); option("topic", topic); return this; } // TODO: add more methods ... @Override protected KafkaConnectorBuilder self() { return this; } } } |
FormatDescriptor & FormatDescriptorBuilder
Code Block | ||
---|---|---|
| ||
/**
* Describes the format of data.
* Please use a specific {@link FormatDescriptorBuilder} to build the {@link FormatDescriptor}.
*/
@PublicEvolving
public abstract class FormatDescriptor {
protected final Map<String, String> formatOptions = new HashMap<>();
/**
* Converts this descriptor into a set of connector options.
*/
protected Map<String, String> toFormatOptions() {
return new HashMap<>(formatOptions);
}
}
/**
* A basic builder implementation to build {@link FormatDescriptor}.
*/
@PublicEvolving
public abstract class FormatDescriptorBuilder<FD extends FormatDescriptor, FDB extends FormatDescriptorBuilder<FD, FDB>> {
private final FD descriptor;
protected FormatDescriptorBuilder(Class<? extends FD> descriptorClass) {
descriptor = InstantiationUtil.instantiate(descriptorClass, FormatDescriptor.class);
}
protected abstract FDB self();
protected FDB option(String key, String value) {
descriptor.formatOptions.put(key, value);
return self();
}
public FD build() {
return descriptor;
}
}
/**
* Format descriptor for JSON.
*/
public class JsonFormat extends FormatDescriptor {
public static JsonFormat newInstance() {
return new JsonFormat();
}
public static JsonFormatBuilder newBuilder() {
return new JsonFormatBuilder();
}
public static class JsonFormatBuilder extends FormatDescriptorBuilder<JsonFormat, JsonFormatBuilder> {
protected JsonFormatBuilder() {
super(JsonFormat.class);
}
/**
* Sets flag whether to fail if a field is missing or not.
*
* @param failOnMissingField If set to true, the operation fails if there is a missing field.
* If set to false, a missing field is set to null.
*/
public JsonFormatBuilder failOnMissingField(boolean failOnMissingField) {
option("fail-on-missing-field", String.valueOf(failOnMissingField));
return this;
}
/**
* Sets flag whether to fail when parsing json fails.
*
* @param ignoreParseErrors If set to true, the operation will ignore parse errors.
* If set to false, the operation fails when parsing json fails.
*/
public JsonFormatBuilder ignoreParseErrors(boolean ignoreParseErrors) {
option("ignore-parse-errors", String.valueOf(ignoreParseErrors));
return this;
}
@Override
protected JsonFormatBuilder self() {
return this;
}
}
} |
Schema
The current Rowtime
class will be removed. The current Schema
class will be refactored into:
...