Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// register a table using specific descriptor
tEnv.createTemporaryTable(
	"MyTable",
	KafkaConnector.newBuilder()
		.version("0.11")
		.topic("user_logs")
		.property("bootstrap.servers", "localhost:9092")
		.property("group.id", "test-group")
		.startFromEarliestscanStartupModeEarliest()
		.sinkPartitionerRoundRobin()
		.format(JsonFormat.newBuilder().ignoreParseErrors(false).build())
		.schema(
			Schema.newBuilder()
				.column("user_id", DataTypes.BIGINT())
				.column("user_name", DataTypes.STRING())
				.column("score", DataTypes.DECIMAL(10, 2))
				.column("log_ts", DataTypes.STRING())
				.column("part_field_0", DataTypes.STRING())
				.column("part_field_1", DataTypes.INT())
				.column("proc", proctime()) // define a processing-time attribute with column name "proc"
				.column("ts", toTimestamp($("log_ts")))
				.watermark("ts", $("ts").minus(lit(3).seconds()))
				.primaryKey("user_id")
				.build())
		.partitionedBy("part_field_0", "part_field_1")  // Kafka doesn't support partitioned table yet, this is just an example for the API
		.build()
);

...

Code Block
languagejava
Schema schema = Schema.newBuilder()
   .column("user_id", DataTypes.BIGINT())
   .column("score", DataTypes.DECIMAL(10, 2))
   .column("ts", DataTypes.TIMESTAMP(3))
   .build();
Table myKafka = tEnv.from(
   KafkaConnector.newBuilder()
      .version("0.11")
      .topic("user_logs")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "test-group")
      .startFromEarliestscanStartupModeEarliest()
      .sinkPartitionerRoundRobin()
      .format(JsonFormat.newBuilder().ignoreParseErrors(false).build())
      .schema(schema)
      .build()
);
// reading from kafka table and write into filesystem table
myKafka.executeInsert(
   Connector.of("filesystem")
      .option("path", "/path/to/whatever")
      .option("format", "json")
      .schema(schema)
      .build()
);

...

Code Block
languagejava
titleTableDescriptor
/**
 * Describes a table to connect. It is a same representation of SQL CREATE TABLE DDL. It onlywraps the wrapsneeded meta information about a catalog table.
 * Please use a specific {@link TableDescriptorBuilder} to build the {@link TableDescriptor}.
 */
@PublicEvolving
public abstract classinterface TableDescriptor {
    // package visible variables used internally to build a CatalogTable
	protected List<String> partitionedFields;
	protected Schema schema;
	protectedList<String> getPartitionedFields();
    Schema getSchema();
    Map<String, String> optionsgetOptions();
	protected    LikeOption[] likeOptionsgetLikeOptions();
	protected    String likePathgetLikePath();
}

/**
 * A basic builder implementation to build a {@link TableDescriptor}.
 */
@PublicEvolving
public abstract class TableDescriptorBuilder<DESCRIPTORTableDescriptorBuilder<BUILDER extends TableDescriptor, BUILDER extends TableDescriptorBuilder<DESCRIPTOR, BUILDER>> {

TableDescriptorBuilder<BUILDER>> {

	private final DESCRIPTORInternalTableDescriptor descriptor;

	protected TableDescriptorBuilder(Class<DESCRIPTOR> descriptorClass) {
		descriptor = InstantiationUtil.instantiate(descriptorClass, TableDescriptor.class);
	} = new InternalTableDescriptor();

	/**
	 * Returns the this builder instance in the type of subclass.
	 */
	protected abstract BUILDER self();

	/**
	 * Specifies the table schema.
	 */
	public BUILDER schema(Schema schema) {
		descriptor.schema = schema;
		return self();
	}

	/**
	 * Specifies the partition keys of this table.
	 */
	public BUILDER partitionedBy(String... fieldNames) {
		checkArgument(descriptor.partitionedFields.isEmpty(), "partitionedBy(...) shouldn't be called more than once.");
		descriptor.partitionedFields.addAll(Arrays.asList(fieldNames));
		return self();
	}

	/**
	 * Extends some parts from the original registered table path.
	 */
	public BUILDER like(String tablePath, LikeOption... likeOptions) {
		descriptor.likePath = tablePath;
		descriptor.likeOptions = likeOptions;
		return self();
	}

	protected BUILDER option(String key, String value) {
		descriptor.options.put(key, value);
		return self();
	}

	/**
	 * Returns created table descriptor.
	 */
	public DESCRIPTOR build() {
		return descriptor;
	}
}

...

Code Block
languagejava
titleConnector
public class Connector extends TableDescriptor {

	public static ConnectorBuilder of(String identifier) {
		return new ConnectorBuilder(identifier);
	}

	public static class ConnectorBuilder extends TableDescriptorBuilder<Connector, ConnectorBuilder>TableDescriptorBuilder<ConnectorBuilder> {

		private ConnectorBuilder(String identifier) {
			super(Connector.class);
			option(CONNECTOR.key(), identifier);
		}

		public ConnectorBuilder option(String key, String value) {
			String lowerKey = key.toLowerCase().trim();
			if (CONNECTOR.key().equals(lowerKey)) {
				throw new IllegalArgumentException("It's not allowed to override 'connector' option.");
			}
			return super.option(key, value);
		}

		@Override
		protected ConnectorBuilder self() {
			return this;
		}
	}
}

...

Code Block
languagejava
titleKafka
/**
 * Connector descriptor for the Apache Kafka message queue.
 */
@PublicEvolving
public class KafkaConnector extends TableDescriptor {

	public static KafkaConnectorBuilder newBuilder() {
		return new KafkaConnectorBuilder();
	}

	public static class KafkaConnectorBuilder extends TableDescriptorBuilder<KafkaConnector,TableDescriptorBuilder<KafkaConnectorBuilder> KafkaConnectorBuilder> {

		public KafkaConnectorBuilder() {/**
			super(KafkaConnector.class);
		}

		/**
		 *  * Sets the Kafka version to be used.
		 *
		 * @param version Kafka version. E.g., "0.8", "0.11", etc.
		 */
		public KafkaConnectorBuilder version(String version) {
			Preconditions.checkNotNull(version);
			option(CONNECTOR.key(), "kafka-" + version);
			return this;
		}

		/**
		 * Sets the topic from which the table is read.
		 *
		 * @param topic The topic from which the table is read.
		 */
		public KafkaConnectorBuilder topic(String topic) {
			Preconditions.checkNotNull(topic);
			option("topic", topic);
			return this;
		}
 
        // TODO: add more methods
		... 

		@Override
		protected KafkaConnectorBuilder self() {
			return this;
		}
	}
}

NOTE: when we refactoring classes such as KafkaConnector or ElasticsearchConnector, we should align the method names with the new property names introduced in FLIP-122. For example, the property method for property scan.startup.mode=earliest-offset should be scanStartupModeEarliest, instead of startFromEarliest.

FormatDescriptor & FormatDescriptorBuilder

Code Block
languagejava
/**
 * Describes the format of data.
 * Please use a specific {@link FormatDescriptorBuilder} to build thea {@link FormatDescriptor}.
 */
@PublicEvolving
public abstract classinterface FormatDescriptor {

	protected final Map<String, String> formatOptions = new HashMap<>();

	/**
	 * Converts this descriptor into a set of connector options.
	 */
	protected Map<String, String> toFormatOptions() {
		return new HashMap<>(formatOptions);
	}
}

/**
 * A basic builder implementation to build {@link FormatDescriptor}.
 */
@PublicEvolving
public abstract class FormatDescriptorBuilder<FD extends FormatDescriptor, FDB extends FormatDescriptorBuilder<FD, FDB>> {

	private final FD descriptor;

	protected FormatDescriptorBuilder(Class<? FormatDescriptorBuilder<BUILDER extends FD> descriptorClass)FormatDescriptorBuilder<BUILDER>> {

		private final InternalFormatDescriptor descriptor = InstantiationUtil.instantiate(descriptorClass, FormatDescriptor.classnew InternalFormatDescriptor();
	}

	protected abstract FDBBUILDER self();

	protected FDBBUILDER option(String key, String value) {
		descriptor.formatOptions.put(key, value);
		return self();
	}

	public FDFormatDescriptor build() {
		return descriptor;
	}
}

/**
  * Format descriptor for JSON.
  */
public class JsonFormat extends FormatDescriptor {

	public static JsonFormatFormatDescriptor newInstance() {
		return new JsonFormatnewBuilder().build();
	}

	public static JsonFormatBuilder newBuilder() {
		return new JsonFormatBuilder();
	}

	public static class JsonFormatBuilder extends FormatDescriptorBuilder<JsonFormat, JsonFormatBuilder>FormatDescriptorBuilder<JsonFormatBuilder> {
		protected JsonFormatBuilder() {
			super(JsonFormat.class);
		}

		/**
		 * Sets flag whether to fail if a field is missing or not.
		 *
		 * @param failOnMissingField If set to true, the operation fails if there is a missing field.
		 *                           If set to false, a missing field is set to null.
		 */
		public JsonFormatBuilder failOnMissingField(boolean failOnMissingField) {
			option("fail-on-missing-field", String.valueOf(failOnMissingField));
			return this;
		}

		/**
		 * Sets flag whether to fail when parsing json fails.
		 *
		 * @param ignoreParseErrors If set to true, the operation will ignore parse errors.
		 *                          If set to false, the operation fails when parsing json fails.
		 */
		public JsonFormatBuilder ignoreParseErrors(boolean ignoreParseErrors) {
			option("ignore-parse-errors", String.valueOf(ignoreParseErrors));
			return this;
		}

		@Override
		protected JsonFormatBuilder self() {
			return this;
		}
	}
}

...

Code Block
languagejava
tableEnv.connect(
        new Kafka() // can be replaced by new Connector("kafka-0.11")
            .version("0.11")
            .topic("myTopic")
            .property("bootstrap.servers", "localhost:9092")
            .property("group.id", "test-group")
            .startFromEarliestscanStartupModeEarliest()
            .sinkPartitionerRoundRobin()
            .format(new Json().ignoreParseErrors(false))
    .schema(
        new Schema()
            .column("user_id", DataTypes.BIGINT())
            .column("user_name", DataTypes.STRING())
            .column("score", DataTypes.DECIMAL(10, 2))
            .column("log_ts", DataTypes.TIMESTAMP(3))
            .column("part_field_0", DataTypes.STRING())
            .column("part_field_1", DataTypes.INT())
            .column("proc", proctime())
            .column("my_ts", toTimestamp($("log_ts"))
            .watermarkFor("my_ts", $("my_ts").minus(lit(3).seconds())))
            .primaryKey("user_id")
    .partitionedBy("part_field_0", "part_field_1")
    .createTemporaryTable("MyTable");

...