THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys * from external system. * * <p>The output type of this table function is fixed as {@link RowData}. */ @PublicEvolving public abstract class LookupFunction extends TableFunction<RowData> { /** * Synchronously lookup rows matching the lookup keys. * * @param keyRow - A {@link RowData} that wraps keys to lookup. * @return A collections of all matching rows in the lookup table. */ public abstract Collection<RowData> lookup(RowData keyRow) throws IOException; /** Invoke {@link #lookup} and handle exceptions. */ public final void eval(Object... keys) { try { lookup(GenericRowData.of(keys)).forEach(this::collect); } catch (IOException e) { throw new RuntimeException("Failed to lookup values with given key", e); } } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup * keys from external system. * * <p>The output type of this table function is fixed as {@link RowData}. */ @PublicEvolving public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> { /** * Asynchronously lookup rows matching the lookup keys. * * @param keyRow - A {@link RowData} that wraps keys to lookup. * @return A collections of all matching rows in the lookup table. */ public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow); /** Invokes {@link #asyncLookup} and chains futures. */ public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) { asyncLookup(GenericRowData.of(keys)) .whenCompleteAsync( (result, exception) -> { if (exception != null) { future.completeExceptionally(exception); return; } future.complete(result); }); } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Runtime provider for periodically re-scanning all entries of the lookup table and storing the * table locally for lookup. */ @PublicEvolving public interface RescanRuntimeProvider extends LookupTableSource.LookupRuntimeProvider { /** Creates builder of {@link RescanRuntimeProvider}. */ static Builder newBuilder() { return new Builder(); } /** * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider} * for executing the periodically re-scan. */ ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); /** Gets the interval between two re-scans. */ Duration getRescanInterval(); /** Builder class for {@link RescanRuntimeProvider}. */ class Builder { private ScanTableSource.ScanRuntimeProvider scanRuntimeProvider; private Duration rescanInterval; /** Sets scan runtime provider. */ public Builder withScanRuntimeProvider( ScanTableSource.ScanRuntimeProvider scanRuntimeProvider) { this.scanRuntimeProvider = scanRuntimeProvider; return this; } /** Sets rescan interval. */ public Builder withRescanInterval(Duration rescanInterval) { this.rescanInterval = rescanInterval; return this; } /** Build {@link RescanRuntimeProvider}. */ public RescanRuntimeProvider build() { // Build RescanRuntimeProvider } } } |
TableFunctionProvider / AsyncTableFunctionProvider
...