Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Draft

Discussion thread:

JIRA: 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10339

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
titleMirrorSinkTask
    private boolean isTransactional = config.getTransactionalProducer();
    private boolean transactionStarttransactionInProgress = false;
	protected Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
    private Set<TopicPartition> taskTopicPartitions;
	private KafkaProducer<byte[], byte[]> producer;
    private String consumerGroupIdconnectConsumerGroup;

    @Override
	public void start(Map<String, String> props) {
		config = new MirrorTaskConfig(props);
	    taskTopicPartitions = config.taskTopicPartitions();
        isTransactional = config.transactionalProducer();
	    producer = initProducer(isTransactional);
        consumerGroupIdconnectConsumerGroup = getSourceConsumerGroupId();
		if (isTransactional) {
			Map<TopicPartition, OffsetAndMetadata> offsetsOnTargetinitOffsetsOnTarget = listTargetConsumerGroupOffsets(consumerGroupIdconnectConsumerGroup);
	
			Map<TopicPartition, Long> offsets = new HashMap<>();
			
			for (Map.Entry<TopicPartition, OffsetAndMetadata> offset : offsetsOnTarget.entrySet()) {
				offsets.put(offset.getKey(), offset.getValue().offset());
			}
			Map<TopicPartition, Long> taskOffsets = loadOffsets(taskTopicPartitions, offsets);
			context.offset(taskOffsets);
// To rewind consumer to correct offset per <topic, partition> assigned to this task, there are more than one case:
        }
    }

	protected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) {
        Map<String, Object> producerConfig = config.targetProducerConfig();
// (1) offset exists on target, use target offset as the starting point of consumption
            if// (isTransactional2) {
offset DOES NOT exist on target, presume  	log.info("use transactional producer");a new topic, so set offset to 0
            producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
   Map<TopicPartition, Long> taskOffsets = taskTopicPartitions.stream().collect(Collectors.toMap(
         producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
   		x -> x, x -> 	producerConfiginitOffsetsOnTarget.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId());containsKey(x) 
        	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer");
    		?
          } else {
        	producerConfiginitOffsetsOnTarget.putget(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer");
 x).offset() + 1
         }
           return MirrorUtils.newProducer(producerConfig);
	}
: 0L));
			context.offset(taskOffsets);
        }
    @Override}

	public void put(Collection<SinkRecord> recordsprotected KafkaProducer<byte[], byte[]> initProducer(boolean isTransactional) {
        log.info("receive {} messages from consumer", records.size())Map<String, Object> producerConfig = config.targetProducerConfig();
        if (records.size() == 0isTransactional) {
        	returnlog.info("use transactional producer");
        }
        try {producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
            sendBatch(records, producerproducerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        } catch (RebalanceException e) {	producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, genTransactionId());
            producer.close(	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "trasactional-producer");
        } else {
      producer = initProducer(isTransactional);  	producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "non-trasactional-producer");
        }
  catch (ResendRecordsException e) {
            abortTransaction(producer);return MirrorUtils.newProducer(producerConfig);
	}

    @Override
	public void put(Collection<SinkRecord> records) {
    //TODO: add limited retry
            sendBatch(e.getRemainingRecords(), producer log.info("receive {} messages from consumer", records.size());
        } catchif (Throwable erecords.size() == 0) {
        	return;
        log.error(getHostName() + " terminating on exception: {}", e);
}
        try {
             returnsendBatch(records, producer);
        }
 catch (RebalanceException e) }{

    private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]>   producer.close() {;
        try {
   producer = initProducer(isTransactional);  
     Map<TopicPartition, List<SinkRecord> remainingRecordsMap =} newcatch HashMap<>(ResendRecordsException e); {
            offsetsMap.clearabortTransaction(producer);
            beginTransaction(producer);//TODO: add limited retry
            SinkRecord recordsendBatch(e.getRemainingRecords(), producer);
        } catch (Throwable e) for ((record = records.peek()) != null) {{
            log.error(getHostName() + " terminating on exception: {}", e);
            return;
    ProducerRecord<byte[], byte[]> producerRecord = convertToProducerRecord(record);}
    }

    private void sendBatch(Collection<SinkRecord> records, KafkaProducer<byte[], byte[]> producer) {
 offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()),
     try {
            Map<TopicPartition, List<SinkRecord> remainingRecordsMap = new HashMap<>(tp, curOffsetMetadata) ->);
            offsetsMap.clear();
            beginTransaction(producer);
       (curOffsetMetadata == null || record.kafkaOffset() > curOffsetMetadata.offset())SinkRecord record;
            for ((record = records.peek()) != null) {
                ProducerRecord<byte[], byte[]> producerRecord   ?= convertToProducerRecord(record);
                offsetsMap.compute(new TopicPartition(record.topic(), record.kafkaPartition()),
                     new  OffsetAndMetadata(record.kafkaOffset())(tp, curOffsetMetadata) ->
                               (curOffsetMetadata == null || record.kafkaOffset()  >  : curOffsetMetadata.offset();)
                Future<RecordMetadata> future = producer.send(producerRecord, (recordMetadata, e) -> {
                ?
    if (e != null) {
                        log.error("{} failed to send record to {}: ", MirrorSinkTask.this, producerRecord.topic(), e); new OffsetAndMetadata(record.kafkaOffset())
                        log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord);
               : curOffsetMetadata);
                Future<RecordMetadata> future throw new KafkaException(e);= producer.send(producerRecord, (recordMetadata, e) -> {
                    } elseif (e != null) {
                        log.infoerror("{} Wrotefailed to send record successfully: topicto {}: partition {} offset {}", //log.trace", MirrorSinkTask.this, producerRecord.topic(), e);
                        		log.debug("{} Failed record: {}", MirrorSinkTask.this, producerRecord);
                        throw new KafkaException(e);
       recordMetadata.topic(), recordMetadata.partition(),
            } else {
                  recordMetadata.offset());
      log.info("{} Wrote record successfully: topic {} partition {} offset {}", //log.trace
        commitRecord(record, recordMetadata);
               		MirrorSinkTask.this,
     }
                });
                futures.add(future);
recordMetadata.topic(), recordMetadata.partition(),
                  records.poll              recordMetadata.offset());
            }

          } catch commitRecord(KafkaExceptionrecord, erecordMetadata) {;
            // Any unsent messages are added to the remaining remainingRecordsMap for re-send }
            for (SinkRecord record = records.poll(});
 record != null; record = records.poll()) {
          futures.add(future);
    	addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap);
        } finally {  //TODO: may add more exception handling case
records.poll();
            }

       for (Future<RecordMetadata>} futurecatch :(KafkaException futurese) {
            // Any unsent messages tryare {
added to the remaining remainingRecordsMap for re-send
            for  future.get(SinkRecord record = records.poll();
 record != null; record = records.poll()) {
          } catch 	addConsumerRecordToTopicPartitionRecordsMap(Exceptionrecord, eremainingRecordsMap) {;
        } finally {  //TODO: may add more exception handling case
  SinkRecord record = futureMap.get(future);
       for (Future<RecordMetadata> future : futures) {
        // Any record failed to send, add to thetry remainingRecordsMap{
                    addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMapfuture.get();
                }
 catch (Exception e) {
        }
         }

   SinkRecord record     if (isTransactional && remainingRecordsMap.size() == 0) {
= futureMap.get(future);
                producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);
   // Any record failed to send, add to the  commitTransaction(producer);
remainingRecordsMap
           }

        if addConsumerRecordToTopicPartitionRecordsMap(record, remainingRecordsMap.size();
 > 0) {
            // For}
 transaction case, all records should be put into remainingRecords, as the whole}
 transaction should be redone
     }

       Collection<SinkRecord> recordsToReSend;
 if (isTransactional && remainingRecordsMap.size() == 0) {
     if (isTransactional) {
      producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);
         // transactional: retry all records, as the transaction will cancel all successful and failed records. commitTransaction(producer);
         }

        if (remainingRecordsMap.size() > 0) {
    recordsToReSend = records;
      // For transaction case, all records }should elsebe {
put into remainingRecords, as the whole transaction should be redone
       // non-transactional: only retry failed records, others were finished and sent.Collection<SinkRecord> recordsToReSend;
            if (isTransactional) {
  recordsToReSend = remainingRecordsMap;
            }
// transactional: retry all records, as the transaction will cancel all successful throwand new ResendRecordsException(recordsToReSend);failed records.
        }
    }

    @Override
recordsToReSend = records;
        public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata>} offsets)else {
    	// if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in WorkerSinkTask.java
        // so that the transactional producer is able to commit the consumer offsets to target cluster in a transaction at its pace
    	if (isTransactional) 
    		return new HashMap<TopicPartition, OffsetAndMetadata>();
    	else // otherwise, return offsetsMap to let commitOffsets() in "WorkerSinkTask.java" to commit offsets
    		return offsetsMap;	// non-transactional: only retry failed records, others were finished and sent.
                recordsToReSend = remainingRecordsMap;
            }
            throw new ResendRecordsException(recordsToReSend);
        }
    }

    //@Override
 This commitRecord() follows thepublic sameMap<TopicPartition, logicsOffsetAndMetadata> as commitRecord() in MirrorSourceTask, to preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    public	// void commitRecord(SinkRecord record, RecordMetadata metadata) {
        try {if transactional, return empty Map intentionally to disable the offset commit by commitOffsets() in WorkerSinkTask.java
        // so that the iftransactional (stopping) {
                return;
            }producer is able to commit the consumer offsets to target cluster in a transaction at its pace
    	if (isTransactional) 
    		return new HashMap<TopicPartition, OffsetAndMetadata>();
    	else // otherwise, return offsetsMap to   if (!metadata.hasOffsetlet commitOffsets()) {
    in "WorkerSinkTask.java" to commit offsets
      		return offsetsMap;	
    }

  log.error("RecordMetadata has no// offset -- can't sync offsets for {}.", record.topic());
    This commitRecord() follows the same logics as commitRecord() in MirrorSourceTask, to 
    public void commitRecord(SinkRecord record, RecordMetadata metadata) {
  return;
      try {
     }
       if (stopping) {
   TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
            long latency = System.currentTimeMillis() - record.timestamp()return;
            metrics.countRecord(topicPartition);}
            metrics.replicationLatency(topicPartition, latency);if (!metadata.hasOffset()) {
            TopicPartition  sourceTopicPartition = MirrorUtilslog.unwrapPartition(record.sourcePartition());
            long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffseterror("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
            long  downstreamOffset = metadata.offset()return;
            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);}
        } catch (Throwable e) {
TopicPartition topicPartition =          log.warn("Failure committing record.", enew TopicPartition(record.topic(), record.kafkaPartition());
        }
    }

long latency = System.currentTimeMillis() private void beginTransaction(KafkaProducer<byte[], byte[]> producer) {
- record.timestamp();
           if metrics.countRecord(isTransactionaltopicPartition) {;
            producermetrics.beginTransactionreplicationLatency(topicPartition, latency);
            transactionStartTopicPartition sourceTopicPartition = trueMirrorUtils.unwrapPartition(record.sourcePartition());
        }
    }
	
long upstreamOffset =  private void initTransactions(KafkaProducer<byte[], byte[]> producer) {
MirrorUtils.unwrapOffset(record.sourceOffset());
            long downstreamOffset if= metadata.offset(isTransactional) {;
            producer.initTransactions(maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
        }
 catch (Throwable e) {
            log.warn("Failure committing record.", e);
        }
    }

    private void commitTransactionbeginTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional) {
            producer.commitTransactionbeginTransaction();
            transactionStarttransactionInProgress = falsetrue;
        }
    }
    	
    private void abortTransactioninitTransactions(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional && transactionStart) {
            producer.abortTransactioninitTransactions();
        }
    transactionStart = false;}
    
    }
private void commitTransaction(KafkaProducer<byte[],  }

    public static class ResendRecordsException extends Exception byte[]> producer) {
        private Collection<SinkRecord> remainingRecords;

        public ResendRecordsException(Collection<SinkRecord> remainingRecordsif (isTransactional) {
            superproducer.commitTransaction(cause);
            this.remainingRecordstransactionInProgress = remainingRecordsfalse;
        }

    }
    
 public Collection<SinkRecord> getRemainingRecords()  private void abortTransaction(KafkaProducer<byte[], byte[]> producer) {
        if (isTransactional && transactionInProgress)  return remainingRecords{
            producer.abortTransaction();
            transactionInProgress = false;
        }
    }

MirrorSinkConnector



    public static class ResendRecordsException extends Exception {
        private Collection<SinkRecord> remainingRecords;

        public ResendRecordsException(Collection<SinkRecord> remainingRecords) {
            super(cause);
            this.remainingRecords = remainingRecords;
        }

        public Collection<SinkRecord> getRemainingRecords() {
            return remainingRecords;
        }
    }

MirrorSinkConnector

As SinkTask can As SinkTask can only be created by SinkConnector, MirrorSinkConnector will be implemented and follow the follow the most same logics as current MirrorSourceConnector. To minimize the duplicate code, a new class, e.g. "MirrorCommonConnector", will be proposed to host the common code used by both MirrorSourceConnector and MirrorSinkConnectornew class, e.g. "MirrorCommonConnector", may be proposed to host the common code as a separate code change merged before this KIP.

WorkerSinkTask

Due to the fact that group offsets from target cluster is single-source-of-truth, each time when MirrorSInkTask restarts, it loads group offsets from target cluster and supply them to the consumer in WorkerSinkTask so that the consumer will rewind to the last committed offsets without duplicating or missing data. However in our test, we ran into 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10370
 so resolving it is a prerequisite of this KIP.

Migration from MirrorSourceConnector to MirrorSinkConnector /w EOS

...