Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleLookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;

    /** Invoke {@link #lookup} and handle exceptions. */
    public final void eval(Object... keys) {
        try {
            lookup(GenericRowData.of(keys)).forEach(this::collect);
        } catch (IOException e) {
            throw new RuntimeException("Failed to lookup values with given key", e);
        }
    }
}

...

Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
            asyncLookup(GenericRowData.of(keys))
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

...

Code Block
languagejava
titleRescanRuntimeProvider
/**
 * Runtime provider for periodically re-scanning all entries of the lookup table and storing the
 * table locally for lookup.
 */
@PublicEvolving
public interface RescanRuntimeProvider extends LookupTableSource.LookupRuntimeProvider {

    /** Creates builder of {@link RescanRuntimeProvider}. */
    static Builder newBuilder() {
        return new Builder();
    }

    /**
     * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}
     * for executing the periodically re-scan.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /** Gets the interval between two re-scans. */
    Duration getRescanInterval();

    /** Builder class for {@link RescanRuntimeProvider}. */
    class Builder {
        private ScanTableSource.ScanRuntimeProvider scanRuntimeProvider;
        private Duration rescanInterval;

        /** Sets scan runtime provider. */
        public Builder withScanRuntimeProvider(
                ScanTableSource.ScanRuntimeProvider scanRuntimeProvider) {
            this.scanRuntimeProvider = scanRuntimeProvider;
            return this;
        }

        /** Sets rescan interval. */
        public Builder withRescanInterval(Duration rescanInterval) {
            this.rescanInterval = rescanInterval;
            return this;
        }

        /** Build {@link RescanRuntimeProvider}. */
        public RescanRuntimeProvider build() {
            // Build RescanRuntimeProvider
        }
    }
}

TableFunctionProvider / AsyncTableFunctionProvider

...