Add subscription cancelling
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sat, 22 Aug 2015 17:03:49 +0000 (19:03 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sat, 22 Aug 2015 17:03:49 +0000 (19:03 +0200)
src/main/java/net/pterodactylus/fcp/UnsubscribeUSK.java [new file with mode: 0644]
src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java
src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java
src/main/java/net/pterodactylus/fcp/quelaton/UnsubscribeUskCommandImpl.java [new file with mode: 0644]
src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java
src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java

diff --git a/src/main/java/net/pterodactylus/fcp/UnsubscribeUSK.java b/src/main/java/net/pterodactylus/fcp/UnsubscribeUSK.java
new file mode 100644 (file)
index 0000000..baadefb
--- /dev/null
@@ -0,0 +1,15 @@
+package net.pterodactylus.fcp;
+
+/**
+ * The “UnsubscribeUSK” message is used to cancel a {@link SubscribeUSK USK subscription}.
+ *
+ * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class UnsubscribeUSK extends FcpMessage {
+
+       public UnsubscribeUSK(String identifier) {
+               super("UnsubscribeUSK");
+               setField("Identifier", identifier);
+       }
+
+}
index 241253d..43317d0 100644 (file)
@@ -23,8 +23,13 @@ import net.pterodactylus.fcp.SubscribedUSKUpdate;
  */
 public class ActiveSubscriptions {
 
+       private final Supplier<UnsubscribeUskCommand> unsubscribeUskCommandSupplier;
        private final Map<String, RemoteUskSubscription> subscriptions = Collections.synchronizedMap(new HashMap<>());
 
+       public ActiveSubscriptions(Supplier<UnsubscribeUskCommand> unsubscribeUskCommandSupplier) {
+               this.unsubscribeUskCommandSupplier = unsubscribeUskCommandSupplier;
+       }
+
        public void renew(Consumer<FcpListener> fcpEventSender, Supplier<SubscribeUskCommand> subscribeUskCommandSupplier)
        throws ExecutionException, InterruptedException {
                fcpEventSender.accept(createFcpListener());
@@ -58,7 +63,7 @@ public class ActiveSubscriptions {
                return remoteUskSubscription;
        }
 
-       private static class RemoteUskSubscription implements UskSubscription {
+       private class RemoteUskSubscription implements UskSubscription {
 
                private final String identifier;
                private final String uri;
@@ -99,6 +104,11 @@ public class ActiveSubscriptions {
                        }
                }
 
+               public void cancel() throws ExecutionException, InterruptedException {
+                       unsubscribeUskCommandSupplier.get().identifier(identifier).execute().get();
+                       subscriptions.remove(identifier);
+               }
+
        }
 
 }
index 53e3640..d0bf14a 100644 (file)
@@ -23,7 +23,7 @@ public class DefaultFcpClient implements FcpClient {
        private final int port;
        private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
        private final Supplier<String> clientName;
-       private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions();
+       private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions(this::unsubscribeUsk);
 
        public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName) {
                this.threadPool = MoreExecutors.listeningDecorator(threadPool);
@@ -145,5 +145,9 @@ public class DefaultFcpClient implements FcpClient {
                return new SubscribeUskCommandImpl(threadPool, this::connect, activeSubscriptions);
        }
 
+       private UnsubscribeUskCommand unsubscribeUsk() {
+               return new UnsubscribeUskCommandImpl(threadPool, this::connect);
+       }
+
 }
 
diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/UnsubscribeUskCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/UnsubscribeUskCommandImpl.java
new file mode 100644 (file)
index 0000000..44450b5
--- /dev/null
@@ -0,0 +1,58 @@
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import net.pterodactylus.fcp.UnsubscribeUSK;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Default {@link UnsubscribeUskCommand} implementation based on {@link FcpDialog}.
+ *
+ * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class UnsubscribeUskCommandImpl implements UnsubscribeUskCommand {
+
+       private static final RandomIdentifierGenerator IDENTIFIER = new RandomIdentifierGenerator();
+       private final ListeningExecutorService threadPool;
+       private final ConnectionSupplier connectionSupplier;
+       private final UnsubscribeUSK unsubscribeUSK = new UnsubscribeUSK(IDENTIFIER.generate());
+
+       public UnsubscribeUskCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
+               this.threadPool = MoreExecutors.listeningDecorator(threadPool);
+               this.connectionSupplier = connectionSupplier;
+       }
+
+       @Override
+       public Executable<Void> identifier(String identifier) {
+               return this::execute;
+       }
+
+       private ListenableFuture<Void> execute() {
+               return threadPool.submit(this::executeDialog);
+       }
+
+       private Void executeDialog() throws IOException, ExecutionException, InterruptedException {
+               try (UnsubscribeUskDialog unsubscribeUskDialog = new UnsubscribeUskDialog()) {
+                       return unsubscribeUskDialog.send(unsubscribeUSK).get();
+               }
+       }
+
+       private class UnsubscribeUskDialog extends FcpDialog<Void> {
+
+               public UnsubscribeUskDialog() throws IOException {
+                       super(threadPool, connectionSupplier.get());
+               }
+
+               @Override
+               protected boolean isFinished() {
+                       return true;
+               }
+
+       }
+
+}
index 7fb8ad4..b8e4526 100644 (file)
@@ -1,5 +1,7 @@
 package net.pterodactylus.fcp.quelaton;
 
+import java.util.concurrent.ExecutionException;
+
 /**
  * USK subscription object that is returned to the client application.
  *
@@ -9,6 +11,7 @@ public interface UskSubscription {
 
        String getUri();
        void onUpdate(UskUpdater uskUpdater);
+       void cancel() throws ExecutionException, InterruptedException;
 
        interface UskUpdater {
 
index 6f9e52d..c716ed5 100644 (file)
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -2332,6 +2333,20 @@ public class DefaultFcpClientTest {
                        assertThat(edition.get(), is(23));
                }
 
+               @Test
+               public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
+                       connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
+                       replyWithSubscribed();
+                       assertThat(uskSubscription.get().get().getUri(), is(URI));
+                       AtomicBoolean updated = new AtomicBoolean();
+                       uskSubscription.get().get().onUpdate(e -> updated.set(true));
+                       uskSubscription.get().get().cancel();
+                       readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier, "EndMessage"));
+                       sendUpdateNotification(23);
+                       assertThat(updated.get(), is(false));
+               }
+
                private void replyWithSubscribed() throws IOException {
                        fcpServer.writeLine(
                                "SubscribedUSK",