Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will need to add a new property to the ConfigKey to indicate if a config is updatable or not. This property can be used to generate documentation so it becomes very easy to discover which properties can be dynamically changed. If a broker property is set in zookeeper and it is not a config marked "isDynamic", that property can simply be ignored. This is similar to configuring Kafka with a property it does not understand.

Code Block
private static class ConfigKey {
    public final String name;
    public final Type type;
    public final String documentation;
    public final Object defaultValue;
    public final Validator validator;
    public final Importance importance;
    public final boolean required;
	public final boolean isDynamic;

    public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required, boolean isDynamic) {
        super();
        this.name = name;
        this.type = type;
        this.defaultValue = defaultValue;
        this.validator = validator;
        this.importance = importance;
        if (this.validator != null)
            this.validator.ensureValid(name, defaultValue);
        this.documentation = documentation;
        this.required = required;
		this.isDynamic = isDynamic;
    }
 }
 
// The isDynamic property can be included in the documentation for all configs
public String toHtmlTable() {
    StringBuilder b = new StringBuilder();
    b.append("<table>\n");
    b.append("<tr>\n");
    b.append("<th>Name</th>\n");
    b.append("<th>Type</th>\n");
    b.append("<th>Default</th>\n");
    b.append("<th>Importance</th>\n");
    b.append("<th>Description</th>\n");
	b.append("<th>IsDynamic</th>\n");
}

Applying Configs within Broker

The ZK listener can have a Properties object parsed from the properties file which was used to start the KafkaServer. Anytime a change notification is received, the new configs need to be parsed into a properties object. 

Code Block
// Sample code only
class BrokerConfigManager {
	// Properties initially configured into the server
	val properties = KafkaConfig.getProperties()
	.....
	def processConfigChanges(notifications: Seq[String]) {
		val newProperties = AdminUtils.fetchConfig(zkClient, configPath)
	}
	
}
 
class ConfigKey {
   AdminUtils {
 def fetchConfig(zkClient: ZkClient, path: String): Properties = {
		val str: String = zkClient.readData(path, true)
		val props = new Properties()
		if(str != null) {
		    Json.parseFull(str) match {
		      case None => // there are no config overrides
		      case Some(map: Map[_, _]) =>
 // parse the map and insert valid properties. Each parsed property should verify that it is dynamically changeable
				
    }
  }
  props
}
}

 

Config Change Notification

...