Discussion threadhttps://lists.apache.org/thread/dcwgv0gmngqt40fl3694km53pykocn5s
Vote threadhttps://lists.apache.org/thread/8hto6oxpf3fnjnhqwoyr69fscl58vx8h
JIRA

Unable to render Jira issues macro, execution error.

Release1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As Flink SQL expands its range of connectors, it becomes crucial to support the lazy initialization of catalogs and the persistence of catalog configurations in order to manage a growing number of catalogs. Implementing lazy initialization for catalogs can decrease overhead of register catalog and simplify management. Additionally, enabling persistence would allow for effortless retrieval and use of catalogs across multiple Flink clusters, promoting their reuse in various applications and use cases. 

Public Interfaces

CatalogDescriptor

Add a new class to describe what should be stored in the CatalogStore.

@PublicEvolving
public class CatalogDescriptor {
	/* Catalog name */
    private final String catalogName;

	/* The configuration used to discover and construct the catalog. */
    private final Configuration configuration;

	public String getCatalogName() {
		return catalogName;
	}

	public Configuration getConfiguration() {
		return configuration;
	}

	private CatalogDescriptor(String catalogName, Configuration configuration) {
		this.catalogName = catalogName;
		this.configuration = configuration;
    }

	public static CatalogDescriptor of(String catalogName, Configuration configuration) {
		return new CatalogDescriptor(catalogName, configuration)
	}
}


TableEnvironment

Add a new method for register catalog, deprecate the old register method. 

@PublicEvolving
public interface TableEnvironment {
     /**
     * Create a {@link CatalogDescriptor} under a unique name. All tables registered in the {@link CatalogDescriptor}
     * can be accessed. The CatalogDescriptor is initialized immediately and then saved to the CatalogStore.
     *
     * @param catalogName The name under which the catalog will be registered.
     * @param catalogDescriptor The catalogDescriptor to register.
     */
    void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor); 

    /**
     * Registers a {@link Catalog} under a unique name. All tables registered in the {@link Catalog}
     * can be accessed.
     *
     * @param catalogName The name under which the catalog will be registered.
     * @param catalog The catalog to register.
     */
    @Deprecated
    void registerCatalog(String catalogName, Catalog catalog); 
}


CatalogStore

Adding the CatalogStore  interface would enable the storage and retrieval of properties linked to catalogs.

CatalogStore
@PublicEvolving
public interface CatalogStore {

    /**
     * Store a catalog under the give name. The catalog name must be unique.
	 * 
     * @param catalogName name under which to register the given catalog
     * @param catalog catalog instance to store
     * @throws CatalogException if the registration of the catalog under the given name failed
     */
    void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException;

    /**
     * Unregisters a catalog under the given name. The catalog name must be existed.
     *
     * @param catalogName name under which to unregister the given catalog.
     * @param ignoreIfNotExists If false exception will be thrown if the table or database or
     *     catalog to be altered does not exist.
     * @throws CatalogException if the unregistration of the catalog under the given name failed
     */
     void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException;

    /**
     * Gets a catalog by name.
     *
     * @param catalogName name of the catalog to retrieve
     *
     * @return the requested catalog or empty if it does not exist
     */
    Optional<CatalogDescriptor> getCatalog(String catalogName);


    /**
     * Retrieves names of all registered catalogs.
     *
     * @return a set of names of registered catalogs
     */
    Set<String> listCatalogs();

	
	/**
	* Check if there is a corresponding catalog with the given name in CatalogStore.
	* 
	* @return whether there is a corresponding Catalog with the given name
	*/
	boolean contains(String catalogName);


	/**
	* Initialization method for the CatalogStore.
	*  
	*/
	void open();

	
	/**
	*  Tear-down method for the CatalogStore.
	* 	
	*/
	void close();
}


CatalogStoreFactory

To enable plug-in management of CatalogStore implementations through the service provider interface (SPI), it would be beneficial to introduce a CatalogStoreFactory interface. This interface would include a method for creating a CatalogStore instance.

Typically, a factory only creates a single instance. However, in the Flink SQL gateway scenario, multiple sessions are maintained, with each session creating a CatalogStore instance. This can result in a significant number of CatalogStore instances being created. For example, we may have a MySQLCatalogStore that stores Catalog information in MySQL, with each CatalogStore establishing a connection to MySQL. To address this issue, we can create a connection pool in the CatalogStoreFactory. This approach allows us to initialize the connection pool in the open method, pass it to other CatalogStores in the createStore method, and destroy it in the close method.

CatalogStoreFactory
@PublicEvolving
public interface CatalogStoreFactory extends Factory {
	
	/**
	* Creates a {@link CatalogStore} instance from context information. 
	*
	**/
    CatalogStore createCatalogStore(Context context);

	/**
	* Initialization method for the CatalogStoreFactory.
	* 
	**/
	void open();
	
	/**
	* Tear-down method for the CatalogStoreFactory.
	*
	**/
	void close();
}

CatalogStore Registration Using Configuration

By default, there are two built-in CatalogStores available: the In-Memory CatalogStore and the File CatalogStore,  And we can only specify one CatalogStore. In-MemoryCatalog will be the default Catalog

In-MemoryCatalogStore

table.catalog-store.kind: in-memory

FileCatalogStore

FileCatalogStore will save the catalog configuration to the specific directory.

table.catalog-store.kind: file
table.catalog-store.file.path: file:///xxxx/xxx

Custom CatalogStore

If you have implemented a custom CatalogStore, you can configure the following parameters to make it effective.

table.catalog-store.kind: {identifier}
table.catalog-store.{identifier}.{option1}: xxx1
table.catalog-store.{identifier}.{option2}: xxx2
table.catalog-store.{identifier}.{option3}: xxx3


CatalogStore Registration Using Table API

Add a new method to EnvironmentSettings::Builder for specifying the CatalogStore. In EnviromentSettings,  InMemoryCatalogStore will be the default catalogStore. 

@PublicEvolving
public class EnvironmentSettings {

    @PublicEvolving
    public static class Builder {

        private final Configuration configuration = new Configuration();
        private ClassLoader classLoader;
        private CatalogStore catalogStore;

        public Builder() {}

		....
        
		/** Sets a catalogStore for retrieving the configuration of the Catalog.*/
        public Builder withCatalogStore(CatalogStore catalogStore) {
            this.catalogStore = catalogStore;
            return this;
        }

	    /** Returns an immutable instance of {@link EnvironmentSettings}. */
        public EnvironmentSettings build() {
            if (classLoader == null) {
                classLoader = Thread.currentThread().getContextClassLoader();
            }
            if (catalogStore == null) {
                catalogStore = new InMemoryCatalogStore();
            }
            return new EnvironmentSettings(configuration, classLoader, catalogStore);
        }

		private CatalogStore lookupCatalogStore() {
			// Construct CatalogStore 
		}
	
}


For Table API Users

public class CatalogStoreExample {
    public static void main(String[] args) throws Exception {
        // Initialize a catalog Store
        CatalogStore catalogStore = new FileCatalogStore("");

        // set up the Table API
        final EnvironmentSettings settings =
                EnvironmentSettings.newInstance().inBatchMode()
                        .withCatalogStore(catalogStore)
                        .build();

        final TableEnvironment tableEnv = TableEnvironment.create(settings);
	}

}


Proposed Changes

CatalogStore Initialize and close

To initialize the CatalogStore, the CatalogManager needs to have additional methods for opening and closing.

@Internal
public final class CatalogManager {     
	
	/**
    *  initialize the catalogStore
    */
    public void open() {
        catalogStore.open();
    }


    /**
     * Close all initialized catalogs.
     */
    public void close() {
        catalogs.forEach(catalog -> catalog.close());
        catalogStore.close();
    }
 }

Catalog Configuration Persistence

1.  Add  createCatalog(String catalogName, CatalogDescriptor catalogDescriptor)  for register the catalog and store catalogDescriptor into CatalogStore. 

2.  Deprecate the registerCatalog(String catalogName, Catalog catalog) method. 


public final class CatalogManager {
    private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);

	// Store the initialized catalogs.
	private final Map<String, Catalog> catalogs;	


	// CatalogStore for storing the CatalogDescriptor. 
	private final CatalogStore catalogStore;	
	.....          
	 
     /**
     * Registers a catalog under the given name. The catalog name must be unique.
	 * Add the catalog to the CatalogStore and verify if there is already a catalog with the same name in the store.
     *
     * @param catalogName name under which to register the given catalog
     * @param catalog catalog to register
     * @throws CatalogException if the registration of the catalog under the given name failed
     */     
	public void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) {
        checkArgument(
                !StringUtils.isNullOrWhitespaceOnly(catalogName),
                "Catalog name cannot be null or empty.");
        checkNotNull(catalogDescriptor, "Catalog cannot be null");

        if (catalogStore.contains(catalogName) {
            throw new CatalogException(String.format("Catalog %s already exists.", catalogName));
        } else {
			if (catalogs.contains(catalogName)) {
				throw new CatalogException(String.format("Catalog %s already exists in initialized catalogs.", catalogName));
			}
			Catalog catalog = initCatalog(catalogDescriptor);
			catalogs.put(catalogName, catalog);
			catalogStore.storeCatalog(catalogName, catalogDescriptor);
        }
        catalog.open();
    }
    
	/**
     * Gets a catalog by name.
     *
     * @param catalogName name of the catalog to retrieve
     * @return the requested catalog or empty if it does not exist
     */
    public Optional<Catalog> getCatalog(String catalogName) {
		// Get catalog from the catalogs.
		if (catalogs.containsKey(catalogName)) {
			return Optional.of(catalogs.get(catalogName));
		}
		
 		// Get catalog from the CatalogStore.
		Optional<CatalogDescriptor> optionalDescriptor = catalogStore.get(catalogName);
		if (optionalDescriptor.isPresent()) {
			Catalog catalog = initCatalog(optionalDescriptor.get());
		}

		return Optional.empty();
    }
	
	// Init the catalog instance by CatalogDescriptor
	private Catalog initCatalog(CatalogDescriptor descriptor) {
		// Discover the catalog and init
	}
}

Behavior of execute CREATE CATALOG statement

When the user executes the "CREATE CATALOG" statement, the method createCatalog(String catalogName, CatalogDescriptor) is utilized to save the catalog configuration and initialize the catalog instance.

public class CreateCatalogOperation implements CreateOperation {

    @Override
    public TableResultInternal execute(Context ctx) {
        try {

			CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, properties);
            ctx.getCatalogManager().createCatalog(catalogName, catalog);

            return TableResultImpl.TABLE_RESULT_OK;
        } catch (CatalogException e) {
            throw new ValidationException(
                    String.format("Could not execute %s", asSummaryString()), e);
        }
    }
}


Add build-in catalogStore

There are two built-in CatalogStores available, In-MemoryCatalogStore will be the default CatalogStore.

In-MemoryCatalogStore

In-Memory Catalog Store will be the default catalog store.  By default, Catalog configurations are not shared between sessions, so the In-MemoryCatalogStore will not save any catalog configuration.

FileCatalogStore

FileCatalogStore will store all the catalog to a file directory. Each catalog is associated with a file that stores its configuration, and deleting the catalog will remove this file.

FileCatalogStore uses YAML format to save configuration, which is used by flink-conf.yaml

➜  catalog_path tree
.
├── hive_catalog1.yaml
├── hive_catalog2.yaml
├── jdbc_catalog1.yaml
└── jdbc_catalog2.yaml


Example of the file content

type: jdbc,
default-database: test-database,
username: test-user,
password: test-pass,
base-url: mysql://127.0.0.1:3306/test-database


Note: CatalogStore must be initialized at the same time as CatalogManager during initialization, and once CatalogManager is declared, CatalogStore cannot be modified again.


Conflict Scenarios

Currently, CatalogManager will cache the initialized Catalog by default. Therefore, in a multi-session scenario, if one session deletes a Catalog, other sessions may not be able to synchronize the deletion.

For this situation, we should make our caching logic configurable in the future.

There may be three scenarios:

  • caching all initialized Catalog
  • using LRU Cache caching the most frequently used Catalog to avoid occupying too much memory;
  • not caching any Catalog instances.

Compatibility, Deprecation, and Migration Plan

Compatibility:

InMemoryCatalogStore will be the default CatalogStore

The current implementation will not cause compatibility issues. The original Catalog registration process can still be used, including CatalogManager::registerCatalog(String catalogName, Catalog catalog) and TableEnvironment::registerCatalog(String catalogName, Catalog catalog).

When using the CREATE CATALOG statement, the CatalogManager::createCatalog(String catalogName, CatalogDescriptor) method will be used by default. This method will initialize the Catalog instance by default and is consistent with the previous logic.

When using a different CatalogStore, such as FileCatalogStore, the configuration of the Catalog will persist to an external system when executing the CREATE CATALOG statement. Even after a Flink session is restarted, the previously registered Catalog can still be used in the TableEnvironment.

Deprecation: 

TableEnvironment 

TableEnvironment::registerCatalog(String catalogName, Catalog catalog) will be deprecated,  We will use TableEnvironment::createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) when execute CREATE CATALOG statement. 

We also recommend that TABLE API users create Catalogs using the new functions.

@PublicEvolving
public interface TableEnvironment {
     /**
     * Registers a {@link CatalogDescriptor} under a unique name. All tables registered in the {@link CatalogDescriptor}
     * can be accessed. The CatalogDescriptor is initialized immediately and then saved to the CatalogStore.
     *
     * @param catalogName The name under which the catalog will be registered.
     * @param catalogDescriptor The catalogDescriptor of the Catalog.
     */
    void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor); 

    /**
     * Registers a {@link Catalog} under a unique name. All tables registered in the {@link Catalog}
     * can be accessed.
     *
     * @param catalogName The name under which the catalog will be registered.
     * @param catalog The catalog to register.
     */
    @Deprecated
    void registerCatalog(String catalogName, Catalog catalog); 
}


Test Plan

UT

Rejected Alternatives

Make Catalog serializable and move the creation and caching of Catalog into CatalogStore

Here is a brief descriptions of the design:

  • Add toProperties method to the Catalog make catalog serializable.
  • Remove Map<String, Catalog> catalogs in the CatalogManager.
  • The registerCatalog(String catalogName, Catalog catalog) will store the catalog to CatalogStore.
  • The getCatalog(String catalogName)  will get a Catalog instance from CatalogStore

The CatalogStore Interface

/** Interfaces defining catalog-related behaviors */
public interface CatalogStore {

    /**
     * Store a catalog under the give name. The catalog name must be unique.
     *
     * @param catalogName name under which to register the given catalog
     * @param properties catalog properties to store
     * @throws CatalogException if the registration of the catalog under the given name failed
     */
    void storeCatalog(String catalogName, Catalog catalog) throws CatalogException;

    /**
     * Unregisters a catalog under the given name. The catalog name must be existed.
     *
     * @param catalogName name under which to unregister the given catalog.
     * @param ignoreIfNotExists If false exception will be thrown if the table or database or
     *     catalog to be altered does not exist.
     * @throws CatalogException if the unregistration of the catalog under the given name failed
     */
     void removeCatalog(String catalogName, boolean ignoreIfNotExists)
            throws CatalogException;

    /**
     * Gets a catalog by name.
     *
     * @param catalogName name of the catalog to retrieve
     * @return the requested catalog or empty if it does not exist
     */
    Optional<Catalog> getCatalog(String catalogName);

    /**
     * Retrieves names of all registered catalogs.
     *
     * @return a set of names of registered catalogs
     */
    Set<String> listCatalogs();
}

Here is the implementation logic of registerCatalog :

    /**
     * Registers a catalog under the given name. The catalog name must be unique.
     *
     * @param catalogName name under which to register the given catalog
     * @param catalog catalog to register
     * @throws CatalogException if the registration of the catalog under the given name failed
     */
    public void registerCatalog(String catalogName, Catalog catalog) {
        checkArgument(
                !StringUtils.isNullOrWhitespaceOnly(catalogName),
                "Catalog name cannot be null or empty.");
        checkNotNull(catalog, "Catalog cannot be null");

        if (catalogStore.contains(catalogName)) {
            throw new CatalogException(format("Catalog %s already exists.", catalogName));
        }

        catalogStore.store(catalogName, catalog);
    }

Here is the implementation logic of getCatalog :

     /**
     * Gets a catalog by name.
	 * Get Catalog Instance from CatalogStore. 
     *
     * @param catalogName name of the catalog to retrieve
     * @return the requested catalog or empty if it does not exist
     */
 	public Optional<Catalog> getCatalog(String catalogName) {
        return catalogStore.getCatalog(catalogName);
    }