...
We will need to add a new property to the ConfigKey to indicate if a config is updatable or not. This property can be used to generate documentation so it becomes very easy to discover which properties can be dynamically changed. If a broker property is set in zookeeper and it is not a config marked "isDynamic", that property can simply be ignored. This is similar to configuring Kafka with a property it does not understand.
Code Block |
---|
private static class ConfigKey {
public final String name;
public final Type type;
public final String documentation;
public final Object defaultValue;
public final Validator validator;
public final Importance importance;
public final boolean required;
public final boolean isDynamic;
public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required, boolean isDynamic) {
super();
this.name = name;
this.type = type;
this.defaultValue = defaultValue;
this.validator = validator;
this.importance = importance;
if (this.validator != null)
this.validator.ensureValid(name, defaultValue);
this.documentation = documentation;
this.required = required;
this.isDynamic = isDynamic;
}
}
// The isDynamic property can be included in the documentation for all configs
public String toHtmlTable() {
StringBuilder b = new StringBuilder();
b.append("<table>\n");
b.append("<tr>\n");
b.append("<th>Name</th>\n");
b.append("<th>Type</th>\n");
b.append("<th>Default</th>\n");
b.append("<th>Importance</th>\n");
b.append("<th>Description</th>\n");
b.append("<th>IsDynamic</th>\n");
} |
Applying Configs within Broker
The ZK listener can have a Properties object parsed from the properties file which was used to start the KafkaServer. Anytime a change notification is received, the new configs need to be parsed into a properties object.
Code Block |
---|
// Sample code only class BrokerConfigManager { // Properties initially configured into the server val properties = KafkaConfig.getProperties() ..... def processConfigChanges(notifications: Seq[String]) { val newProperties = AdminUtils.fetchConfig(zkClient, configPath) } } class ConfigKey { AdminUtils { def fetchConfig(zkClient: ZkClient, path: String): Properties = { val str: String = zkClient.readData(path, true) val props = new Properties() if(str != null) { Json.parseFull(str) match { case None => // there are no config overrides case Some(map: Map[_, _]) => // parse the map and insert valid properties. Each parsed property should verify that it is dynamically changeable } } props } } |
Config Change Notification
...