diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java index 2654306a02..591a0a3008 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java @@ -18,6 +18,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.internal.cache.wan.WANTestBase; +import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.junit.categories.WanTest; @@ -522,4 +523,82 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase { vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); } + + /** + * Test loop back issue between 2 WAN sites (LN & NY). LN -> NY -> LN. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderShouldEventuallyDrainQueues() + throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + // create receiver on site-ln and site-ny + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + // create senders on site-ln, Note: sender-id is its destination, i.e. ny + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + + // create senders on site-ny, Note: sender-id is its destination, i.e. ln + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + // create PR on site-ln + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + // create PR on site-ny + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 10000)); + + // start sender on site-ny + startSenderInVMsAsync("ny", vm2, vm4); + + inv.join(); + + // verify tmpDroppedEvents is 0 now at site-ny + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + // tmpDroppedEvents is to make sure all senders' queues are drained + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + + // stop sender on site-ny + vm2.invoke(() -> stopSender("ny")); + vm4.invoke(() -> stopSender("ny")); + + inv = vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 10000)); + + // start sender on site-ny + startSenderInVMsAsync("ny", vm4, vm2); + + inv.join(); + + // verify tmpDroppedEvents is 0 now at site-ny + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + // tmpDroppedEvents is to make sure all senders' queues are drained + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } }