THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Visitor interface for system consumers to implement to support {@link Startpoint}s. */ public interface StartpointVisitor { /** * Seek to specific offset represented by {@link StartpointSpecific} * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to. * @param startpointSpecific The {@link Startpoint} that represents the specific offset. */ void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific); /** * Seek to timestamp offset represented by {@link StartpointTimestamp} * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to. * @param startpointTimestamp The {@link Startpoint} that represents the timestamp offset. */ default void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) { throw new UnsupportedOperationException("StartpointTimestamp is not supported."); } /** * Seek to earliest offset represented by {@link StartpointOldest} * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to. * @param startpointOldest The {@link Startpoint} that represents the earliest offset. */ default void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) { throw new UnsupportedOperationException("StartpointOldest is not supported."); } /** * Seek to latest offset represented by {@link StartpointUpcoming} * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to. * @param startpointUpcoming The {@link Startpoint} that represents the latest offset. */ default void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) { throw new UnsupportedOperationException("StartpointUpcoming is not supported."); } /** * Bootstrap signal represented by {@link StartpointCustom} * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to. * @param startpointCustom The {@link Startpoint} that represents the bootstrap signal. */ default void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) { throw new UnsupportedOperationException(String.format("%s is not supported.", startpointCustom.getClass().getSimpleName())); } } // Below is example pseudocode for system specific implementations on how to handle timestamp position types. Other positionvisitor interface typesmethods, // (StartpointSpecific, StartpointOldest, StartpointUpcoming) left out for brevity. public KafkaSystemConsumer implements SystemConsumer, StartpointVisitor { @Override public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpoint) { if (startpoint.getPositionType() == PositionType.TIMESTAMP) { // Call underlying Kafka Consumer#offsetsForTimes(). // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes() } else if... } } public EventHubSystemConsumer implements SystemConsumer, StartpointVisitor { @Override public void visit(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { if (startpoint.getPositionType() == PositionType.TIMESTAMPStartpointTimestamp startpoint) { // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position())) // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition) } else if... } } |
Storing Requested Startpoint
...