THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner<String> {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(String key, int a_numPartitions) {
int partition = 0;
int offset = key.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;ublic class SimplePartitioner implements Partitioner<String> { }
|
The logic takes the key, which we expect to be the IP address, finds the last octet and does a modulo operation on the number of partitions defined within Kafka for the topic. The benefit of this partitioning logic is all web visits from the same source IP end up in the same Partition. Of course so do other IPs, but your consumer logic will need to know how to handle that.
...
Code Block |
---|
bin/kafka-create-topic.sh --topic page_visits --replica 3 --zookeeper localhost:2181 ---partition 5
|
Make sure you include a --partition option so you create more than one.
...