Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Fix `Override` typo.

...

  1. Use the Kafka producer and consumer APIs with customized processing logic. For example:

    Code Block
    languagejava
    // create a producer and a consumer
    KafkaProducer producer = new KafkaProducer(configs);
    KafkaConsumer consumer = new KafkaConsumer(configs);
     
    // start a thread with a producer and consumer client
    // for data IO and execute processing logic
    new Thread(new Runnable {
      @override@Override
      void run() {
        while (isRunning) {
          // read some data from up-stream Kafka
          List<Message> inputMessages = consumer.poll();
     
          // do some processing..
    
          // send the output to the down-stream Kafka
          producer.send(outputMessages);
        }
      }
    }).start()
  2. Use a full-fledged stream processing system such as Storm, Samza, Spark Streaming, or Flink with Kafka as their source / sink stream data storage.

...

Code Block
languagejava
class MyKafkaProcessor implements KafkaProcessor {

  @overide@Override
  void process(String /* topic */, K /* key */, V /* value */, Context) { .. }
}
 
// starting a new thread running the user defined processing logic
KafkaProcessor processor = new KafkaProcessor(configs);
KStreamThread thread = new KStreamThread(processor);
thread.start();
 
// stopping the stream thread
thread.shutdown();

...