...
Code Block |
---|
|
// register a table using specific descriptor
tEnv.createTemporaryTable(
"MyTable",
KafkaConnector.newBuilder()
.version("0.11")
.topic("user_logs")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.startFromEarliestscanStartupModeEarliest()
.sinkPartitionerRoundRobin()
.format(JsonFormat.newBuilder().ignoreParseErrors(false).build())
.schema(
Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.column("score", DataTypes.DECIMAL(10, 2))
.column("log_ts", DataTypes.STRING())
.column("part_field_0", DataTypes.STRING())
.column("part_field_1", DataTypes.INT())
.column("proc", proctime()) // define a processing-time attribute with column name "proc"
.column("ts", toTimestamp($("log_ts")))
.watermark("ts", $("ts").minus(lit(3).seconds()))
.primaryKey("user_id")
.build())
.partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API
.build()
); |
...
Code Block |
---|
|
Schema schema = Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("score", DataTypes.DECIMAL(10, 2))
.column("ts", DataTypes.TIMESTAMP(3))
.build();
Table myKafka = tEnv.from(
KafkaConnector.newBuilder()
.version("0.11")
.topic("user_logs")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.startFromEarliestscanStartupModeEarliest()
.sinkPartitionerRoundRobin()
.format(JsonFormat.newBuilder().ignoreParseErrors(false).build())
.schema(schema)
.build()
);
// reading from kafka table and write into filesystem table
myKafka.executeInsert(
Connector.of("filesystem")
.option("path", "/path/to/whatever")
.option("format", "json")
.schema(schema)
.build()
); |
...
Code Block |
---|
language | java |
---|
title | TableDescriptor |
---|
|
/**
* Describes a table to connect. It is a same representation of SQL CREATE TABLE DDL. It onlywraps the wrapsneeded meta information about a catalog table.
* Please use a specific {@link TableDescriptorBuilder} to build the {@link TableDescriptor}.
*/
@PublicEvolving
public abstract classinterface TableDescriptor {
// package visible variables used internally to build a CatalogTable
protected List<String> partitionedFields;
protected Schema schema;
protectedList<String> getPartitionedFields();
Schema getSchema();
Map<String, String> optionsgetOptions();
protected LikeOption[] likeOptionsgetLikeOptions();
protected String likePathgetLikePath();
}
/**
* A basic builder implementation to build a {@link TableDescriptor}.
*/
@PublicEvolving
public abstract class TableDescriptorBuilder<DESCRIPTORTableDescriptorBuilder<BUILDER extends TableDescriptor, BUILDER extends TableDescriptorBuilder<DESCRIPTOR, BUILDER>> {
TableDescriptorBuilder<BUILDER>> {
private final DESCRIPTORInternalTableDescriptor descriptor;
protected TableDescriptorBuilder(Class<DESCRIPTOR> descriptorClass) {
descriptor = InstantiationUtil.instantiate(descriptorClass, TableDescriptor.class);
} = new InternalTableDescriptor();
/**
* Returns the this builder instance in the type of subclass.
*/
protected abstract BUILDER self();
/**
* Specifies the table schema.
*/
public BUILDER schema(Schema schema) {
descriptor.schema = schema;
return self();
}
/**
* Specifies the partition keys of this table.
*/
public BUILDER partitionedBy(String... fieldNames) {
checkArgument(descriptor.partitionedFields.isEmpty(), "partitionedBy(...) shouldn't be called more than once.");
descriptor.partitionedFields.addAll(Arrays.asList(fieldNames));
return self();
}
/**
* Extends some parts from the original registered table path.
*/
public BUILDER like(String tablePath, LikeOption... likeOptions) {
descriptor.likePath = tablePath;
descriptor.likeOptions = likeOptions;
return self();
}
protected BUILDER option(String key, String value) {
descriptor.options.put(key, value);
return self();
}
/**
* Returns created table descriptor.
*/
public DESCRIPTOR build() {
return descriptor;
}
}
|
...
Code Block |
---|
language | java |
---|
title | Connector |
---|
|
public class Connector extends TableDescriptor {
public static ConnectorBuilder of(String identifier) {
return new ConnectorBuilder(identifier);
}
public static class ConnectorBuilder extends TableDescriptorBuilder<Connector, ConnectorBuilder>TableDescriptorBuilder<ConnectorBuilder> {
private ConnectorBuilder(String identifier) {
super(Connector.class);
option(CONNECTOR.key(), identifier);
}
public ConnectorBuilder option(String key, String value) {
String lowerKey = key.toLowerCase().trim();
if (CONNECTOR.key().equals(lowerKey)) {
throw new IllegalArgumentException("It's not allowed to override 'connector' option.");
}
return super.option(key, value);
}
@Override
protected ConnectorBuilder self() {
return this;
}
}
} |
...
Code Block |
---|
|
/**
* Connector descriptor for the Apache Kafka message queue.
*/
@PublicEvolving
public class KafkaConnector extends TableDescriptor {
public static KafkaConnectorBuilder newBuilder() {
return new KafkaConnectorBuilder();
}
public static class KafkaConnectorBuilder extends TableDescriptorBuilder<KafkaConnector,TableDescriptorBuilder<KafkaConnectorBuilder> KafkaConnectorBuilder> {
public KafkaConnectorBuilder() {/**
super(KafkaConnector.class);
}
/**
* * Sets the Kafka version to be used.
*
* @param version Kafka version. E.g., "0.8", "0.11", etc.
*/
public KafkaConnectorBuilder version(String version) {
Preconditions.checkNotNull(version);
option(CONNECTOR.key(), "kafka-" + version);
return this;
}
/**
* Sets the topic from which the table is read.
*
* @param topic The topic from which the table is read.
*/
public KafkaConnectorBuilder topic(String topic) {
Preconditions.checkNotNull(topic);
option("topic", topic);
return this;
}
// TODO: add more methods
...
@Override
protected KafkaConnectorBuilder self() {
return this;
}
}
} |
NOTE: when we refactoring classes such as KafkaConnector
or ElasticsearchConnector
, we should align the method names with the new property names introduced in FLIP-122. For example, the property method for property scan.startup.mode=earliest-offset
should be scanStartupModeEarliest
, instead of startFromEarliest
.
FormatDescriptor & FormatDescriptorBuilder
Code Block |
---|
|
/**
* Describes the format of data.
* Please use a specific {@link FormatDescriptorBuilder} to build thea {@link FormatDescriptor}.
*/
@PublicEvolving
public abstract classinterface FormatDescriptor {
protected final Map<String, String> formatOptions = new HashMap<>();
/**
* Converts this descriptor into a set of connector options.
*/
protected Map<String, String> toFormatOptions() {
return new HashMap<>(formatOptions);
}
}
/**
* A basic builder implementation to build {@link FormatDescriptor}.
*/
@PublicEvolving
public abstract class FormatDescriptorBuilder<FD extends FormatDescriptor, FDB extends FormatDescriptorBuilder<FD, FDB>> {
private final FD descriptor;
protected FormatDescriptorBuilder(Class<? FormatDescriptorBuilder<BUILDER extends FD> descriptorClass)FormatDescriptorBuilder<BUILDER>> {
private final InternalFormatDescriptor descriptor = InstantiationUtil.instantiate(descriptorClass, FormatDescriptor.classnew InternalFormatDescriptor();
}
protected abstract FDBBUILDER self();
protected FDBBUILDER option(String key, String value) {
descriptor.formatOptions.put(key, value);
return self();
}
public FDFormatDescriptor build() {
return descriptor;
}
}
/**
* Format descriptor for JSON.
*/
public class JsonFormat extends FormatDescriptor {
public static JsonFormatFormatDescriptor newInstance() {
return new JsonFormatnewBuilder().build();
}
public static JsonFormatBuilder newBuilder() {
return new JsonFormatBuilder();
}
public static class JsonFormatBuilder extends FormatDescriptorBuilder<JsonFormat, JsonFormatBuilder>FormatDescriptorBuilder<JsonFormatBuilder> {
protected JsonFormatBuilder() {
super(JsonFormat.class);
}
/**
* Sets flag whether to fail if a field is missing or not.
*
* @param failOnMissingField If set to true, the operation fails if there is a missing field.
* If set to false, a missing field is set to null.
*/
public JsonFormatBuilder failOnMissingField(boolean failOnMissingField) {
option("fail-on-missing-field", String.valueOf(failOnMissingField));
return this;
}
/**
* Sets flag whether to fail when parsing json fails.
*
* @param ignoreParseErrors If set to true, the operation will ignore parse errors.
* If set to false, the operation fails when parsing json fails.
*/
public JsonFormatBuilder ignoreParseErrors(boolean ignoreParseErrors) {
option("ignore-parse-errors", String.valueOf(ignoreParseErrors));
return this;
}
@Override
protected JsonFormatBuilder self() {
return this;
}
}
} |
...
Code Block |
---|
|
tableEnv.connect(
new Kafka() // can be replaced by new Connector("kafka-0.11")
.version("0.11")
.topic("myTopic")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.startFromEarliestscanStartupModeEarliest()
.sinkPartitionerRoundRobin()
.format(new Json().ignoreParseErrors(false))
.schema(
new Schema()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.column("score", DataTypes.DECIMAL(10, 2))
.column("log_ts", DataTypes.TIMESTAMP(3))
.column("part_field_0", DataTypes.STRING())
.column("part_field_1", DataTypes.INT())
.column("proc", proctime())
.column("my_ts", toTimestamp($("log_ts"))
.watermarkFor("my_ts", $("my_ts").minus(lit(3).seconds())))
.primaryKey("user_id")
.partitionedBy("part_field_0", "part_field_1")
.createTemporaryTable("MyTable"); |
...