Add subscription cancelling
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
index a93c229..c716ed5 100644 (file)
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -2002,6 +2003,10 @@ public class DefaultFcpClientTest {
        private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
        throws InterruptedException, ExecutionException, IOException {
                connectNode();
+               readMessage(requestMatcher);
+       }
+
+       private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
                lines = fcpServer.collectUntil(is("EndMessage"));
                identifier = extractIdentifier(lines);
                assertThat(lines, requestMatcher.get());
@@ -2290,19 +2295,31 @@ public class DefaultFcpClientTest {
 
        public class UskSubscriptionCommands {
 
-               private static final String URI = "SSK@some,uri/file.txt";
+               private static final String URI = "USK@some,uri/file.txt";
 
                @Test
                public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
                        Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
                        connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
-                       fcpServer.writeLine(
-                               "SubscribedUSK",
-                               "Identifier=" + identifier,
-                               "URI=" + URI,
-                               "DontPoll=false",
-                               "EndMessage"
-                       );
+                       replyWithSubscribed();
+                       assertThat(uskSubscription.get().get().getUri(), is(URI));
+                       AtomicInteger edition = new AtomicInteger();
+                       CountDownLatch updated = new CountDownLatch(2);
+                       uskSubscription.get().get().onUpdate(e -> {
+                               edition.set(e);
+                               updated.countDown();
+                       });
+                       sendUpdateNotification(23);
+                       sendUpdateNotification(24);
+                       assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
+                       assertThat(edition.get(), is(24));
+               }
+
+               @Test
+               public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
+                       connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
+                       replyWithSubscribed();
                        assertThat(uskSubscription.get().get().getUri(), is(URI));
                        AtomicInteger edition = new AtomicInteger();
                        CountDownLatch updated = new CountDownLatch(2);
@@ -2310,22 +2327,45 @@ public class DefaultFcpClientTest {
                                edition.set(e);
                                updated.countDown();
                        });
+                       uskSubscription.get().get().onUpdate(e -> updated.countDown());
+                       sendUpdateNotification(23);
+                       assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
+                       assertThat(edition.get(), is(23));
+               }
+
+               @Test
+               public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
+                       connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
+                       replyWithSubscribed();
+                       assertThat(uskSubscription.get().get().getUri(), is(URI));
+                       AtomicBoolean updated = new AtomicBoolean();
+                       uskSubscription.get().get().onUpdate(e -> updated.set(true));
+                       uskSubscription.get().get().cancel();
+                       readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier, "EndMessage"));
+                       sendUpdateNotification(23);
+                       assertThat(updated.get(), is(false));
+               }
+
+               private void replyWithSubscribed() throws IOException {
                        fcpServer.writeLine(
-                               "SubscribedUSKUpdate",
+                               "SubscribedUSK",
                                "Identifier=" + identifier,
                                "URI=" + URI,
-                               "Edition=23",
+                               "DontPoll=false",
                                "EndMessage"
                        );
+               }
+
+               private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
                        fcpServer.writeLine(
                                "SubscribedUSKUpdate",
                                "Identifier=" + identifier,
                                "URI=" + URI,
-                               "Edition=24",
-                               "EndMessage"
+                               "Edition=" + edition
                        );
-                       assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
-                       assertThat(edition.get(), is(24));
+                       fcpServer.writeLine(additionalLines);
+                       fcpServer.writeLine("EndMessage");
                }
 
        }