Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafka
/**
 * Connector descriptor for the Apache Kafka message queue.
 */
@PublicEvolving
public class KafkaConnector extends TableDescriptor {

	public static KafkaConnectorBuilder newBuilder() {
		return new KafkaConnectorBuilder();
	}

	public static class KafkaConnectorBuilder extends TableDescriptorBuilder<KafkaConnector, KafkaConnectorBuilder> {

		public KafkaConnectorBuilder() {
			super(KafkaConnector.class);
		}

		/**
		 * Sets the Kafka version to be used.
		 *
		 * @param version Kafka version. E.g., "0.8", "0.11", etc.
		 */
		public KafkaConnectorBuilder version(String version) {
			Preconditions.checkNotNull(version);
			option(CONNECTOR.key(), "kafka-" + version);
			return this;
		}

		/**
		 * Sets the topic from which the table is read.
		 *
		 * @param topic The topic from which the table is read.
		 */
		public KafkaConnectorBuilder topic(String topic) {
			Preconditions.checkNotNull(topic);
			option("topic", topic);
			return this;
		}
 
        // TODO: add more methods
		... 

		@Override
		protected KafkaConnectorBuilder self() {
			return this;
		}
	}
}


FormatDescriptor & FormatDescriptorBuilder

Code Block
languagejava
/**
 * Describes the format of data.
 * Please use a specific {@link FormatDescriptorBuilder} to build the {@link FormatDescriptor}.
 */
@PublicEvolving
public abstract class FormatDescriptor {

	protected final Map<String, String> formatOptions = new HashMap<>();

	/**
	 * Converts this descriptor into a set of connector options.
	 */
	protected Map<String, String> toFormatOptions() {
		return new HashMap<>(formatOptions);
	}
}

/**
 * A basic builder implementation to build {@link FormatDescriptor}.
 */
@PublicEvolving
public abstract class FormatDescriptorBuilder<FD extends FormatDescriptor, FDB extends FormatDescriptorBuilder<FD, FDB>> {

	private final FD descriptor;

	protected FormatDescriptorBuilder(Class<? extends FD> descriptorClass) {
		descriptor = InstantiationUtil.instantiate(descriptorClass, FormatDescriptor.class);
	}

	protected abstract FDB self();

	protected FDB option(String key, String value) {
		descriptor.formatOptions.put(key, value);
		return self();
	}

	public FD build() {
		return descriptor;
	}
}

/**
  * Format descriptor for JSON.
  */
public class JsonFormat extends FormatDescriptor {

	public static JsonFormat newInstance() {
		return new JsonFormat();
	}

	public static JsonFormatBuilder newBuilder() {
		return new JsonFormatBuilder();
	}

	public static class JsonFormatBuilder extends FormatDescriptorBuilder<JsonFormat, JsonFormatBuilder> {
		protected JsonFormatBuilder() {
			super(JsonFormat.class);
		}

		/**
		 * Sets flag whether to fail if a field is missing or not.
		 *
		 * @param failOnMissingField If set to true, the operation fails if there is a missing field.
		 *                           If set to false, a missing field is set to null.
		 */
		public JsonFormatBuilder failOnMissingField(boolean failOnMissingField) {
			option("fail-on-missing-field", String.valueOf(failOnMissingField));
			return this;
		}

		/**
		 * Sets flag whether to fail when parsing json fails.
		 *
		 * @param ignoreParseErrors If set to true, the operation will ignore parse errors.
		 *                          If set to false, the operation fails when parsing json fails.
		 */
		public JsonFormatBuilder ignoreParseErrors(boolean ignoreParseErrors) {
			option("ignore-parse-errors", String.valueOf(ignoreParseErrors));
			return this;
		}

		@Override
		protected JsonFormatBuilder self() {
			return this;
		}
	}
}



Schema

The current Rowtime class will be removed. The current Schema class will be refactored into:

...