Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The ZK listener can have a Properties object parsed from the properties file which was used to start the KafkaServer. Anytime a change notification is received, the new configs need to be parsed into a properties object. we should do the following:

  1. Parse received notification into a properties object.
  2. Within KafkaConfig, for each newly changed property, verify that it can be changed dynamically. For this, ConfigDef needs to expose a getConfigKey() method that can be used to check the isDynamic flag.

All configs should always be accessed via a reference to KafkaConfig. For this, all subsystems within the broker need to be configured with a config object and not individual config values.

Code Block
// Sample code only
class BrokerConfigManager {
	// Properties initially configured into the serverserver 

	val kafkaConfig : propertiesKafkaConfig = KafkaConfig.getProperties()... ;
	
	.....
	def processConfigChanges(notifications: Seq[String]) {
		val newPropertiesupdatedProperties = AdminUtils.fetchConfig(zkClient, configPath)
		kafkaConfig.updateProperties(updatedProperties);
		
	}
	
}
 
class AdminUtils {
 def fetchConfig(zkClient: ZkClient, path: String): Properties = {
		val str: String = zkClient.readData(path, true)
		val props = new Properties()
		if(str != null) {
		    Json.parseFull(str) match {
		      case None => // there are no config overrides
		      case Some(map: Map[_, _]) =>
 // parse the map and insert valid properties. Each parsed property should verify that it is dynamically changeable
				
    }
  }
  props
}
}

...

// Read the znode and return a properties object
	props
}

Config Change Notification

...