THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
The ZK listener can have a Properties object parsed from the properties file which was used to start the KafkaServer. Anytime a change notification is received, the new configs need to be parsed into a properties object. we should do the following:
- Parse received notification into a properties object.
- Within KafkaConfig, for each newly changed property, verify that it can be changed dynamically. For this, ConfigDef needs to expose a getConfigKey() method that can be used to check the isDynamic flag.
All configs should always be accessed via a reference to KafkaConfig. For this, all subsystems within the broker need to be configured with a config object and not individual config values.
Code Block |
---|
// Sample code only class BrokerConfigManager { // Properties initially configured into the serverserver val kafkaConfig : propertiesKafkaConfig = KafkaConfig.getProperties()... ; ..... def processConfigChanges(notifications: Seq[String]) { val newPropertiesupdatedProperties = AdminUtils.fetchConfig(zkClient, configPath) kafkaConfig.updateProperties(updatedProperties); } } class AdminUtils { def fetchConfig(zkClient: ZkClient, path: String): Properties = { val str: String = zkClient.readData(path, true) val props = new Properties() if(str != null) { Json.parseFull(str) match { case None => // there are no config overrides case Some(map: Map[_, _]) => // parse the map and insert valid properties. Each parsed property should verify that it is dynamically changeable } } props } } |
...
// Read the znode and return a properties object
props
} |
Config Change Notification
...