Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSystemConsumer
/**
 * Visitor interface for system consumers to implement to support {@link Startpoint}s.
 */
public interface StartpointVisitor {

  /**
   * Seek to specific offset represented by {@link StartpointSpecific}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointSpecific The {@link Startpoint} that represents the specific offset.
   */
  void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific);

  /**
   * Seek to timestamp offset represented by {@link StartpointTimestamp}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointTimestamp The {@link Startpoint} that represents the timestamp offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
    throw new UnsupportedOperationException("StartpointTimestamp is not supported.");
  }

  /**
   * Seek to earliest offset represented by {@link StartpointOldest}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointOldest The {@link Startpoint} that represents the earliest offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
    throw new UnsupportedOperationException("StartpointOldest is not supported.");
  }

  /**
   * Seek to latest offset represented by {@link StartpointUpcoming}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointUpcoming The {@link Startpoint} that represents the latest offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
    throw new UnsupportedOperationException("StartpointUpcoming is not supported.");
  }

  /**
   * Bootstrap signal represented by {@link StartpointCustom}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointCustom The {@link Startpoint} that represents the bootstrap signal.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
    throw new UnsupportedOperationException(String.format("%s is not supported.", startpointCustom.getClass().getSimpleName()));
  }
}
 
// Below is example pseudocode for system specific implementations on how to handle timestamp position types. Other positionvisitor interface typesmethods,
// (StartpointSpecific, StartpointOldest, StartpointUpcoming) left out for brevity.
 
public KafkaSystemConsumer implements SystemConsumer, StartpointVisitor {
  @Override
  public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // Call underlying Kafka Consumer#offsetsForTimes().
      // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}
 
public EventHubSystemConsumer implements SystemConsumer, StartpointVisitor {
  @Override
  public void visit(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMPStartpointTimestamp startpoint) {
      // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)
    } else if...
  }
}

Storing Requested Startpoint

...