From 979b46ce2bec0482076a1d340419e5ba60acc30f Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Sat, 22 Aug 2015 19:03:49 +0200 Subject: [PATCH] Add subscription cancelling --- .../java/net/pterodactylus/fcp/UnsubscribeUSK.java | 15 ++++++ .../fcp/quelaton/ActiveSubscriptions.java | 12 ++++- .../fcp/quelaton/DefaultFcpClient.java | 6 ++- .../fcp/quelaton/UnsubscribeUskCommandImpl.java | 58 ++++++++++++++++++++++ .../fcp/quelaton/UskSubscription.java | 3 ++ .../fcp/quelaton/DefaultFcpClientTest.java | 15 ++++++ 6 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 src/main/java/net/pterodactylus/fcp/UnsubscribeUSK.java create mode 100644 src/main/java/net/pterodactylus/fcp/quelaton/UnsubscribeUskCommandImpl.java diff --git a/src/main/java/net/pterodactylus/fcp/UnsubscribeUSK.java b/src/main/java/net/pterodactylus/fcp/UnsubscribeUSK.java new file mode 100644 index 0000000..baadefb --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/UnsubscribeUSK.java @@ -0,0 +1,15 @@ +package net.pterodactylus.fcp; + +/** + * The “UnsubscribeUSK” message is used to cancel a {@link SubscribeUSK USK subscription}. + * + * @author David ‘Bombe’ Roden + */ +public class UnsubscribeUSK extends FcpMessage { + + public UnsubscribeUSK(String identifier) { + super("UnsubscribeUSK"); + setField("Identifier", identifier); + } + +} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java b/src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java index 241253d..43317d0 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java @@ -23,8 +23,13 @@ import net.pterodactylus.fcp.SubscribedUSKUpdate; */ public class ActiveSubscriptions { + private final Supplier unsubscribeUskCommandSupplier; private final Map subscriptions = Collections.synchronizedMap(new HashMap<>()); + public ActiveSubscriptions(Supplier unsubscribeUskCommandSupplier) { + this.unsubscribeUskCommandSupplier = unsubscribeUskCommandSupplier; + } + public void renew(Consumer fcpEventSender, Supplier subscribeUskCommandSupplier) throws ExecutionException, InterruptedException { fcpEventSender.accept(createFcpListener()); @@ -58,7 +63,7 @@ public class ActiveSubscriptions { return remoteUskSubscription; } - private static class RemoteUskSubscription implements UskSubscription { + private class RemoteUskSubscription implements UskSubscription { private final String identifier; private final String uri; @@ -99,6 +104,11 @@ public class ActiveSubscriptions { } } + public void cancel() throws ExecutionException, InterruptedException { + unsubscribeUskCommandSupplier.get().identifier(identifier).execute().get(); + subscriptions.remove(identifier); + } + } } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 53e3640..d0bf14a 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -23,7 +23,7 @@ public class DefaultFcpClient implements FcpClient { private final int port; private final AtomicReference fcpConnection = new AtomicReference<>(); private final Supplier clientName; - private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions(); + private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions(this::unsubscribeUsk); public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier clientName) { this.threadPool = MoreExecutors.listeningDecorator(threadPool); @@ -145,5 +145,9 @@ public class DefaultFcpClient implements FcpClient { return new SubscribeUskCommandImpl(threadPool, this::connect, activeSubscriptions); } + private UnsubscribeUskCommand unsubscribeUsk() { + return new UnsubscribeUskCommandImpl(threadPool, this::connect); + } + } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/UnsubscribeUskCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/UnsubscribeUskCommandImpl.java new file mode 100644 index 0000000..44450b5 --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/quelaton/UnsubscribeUskCommandImpl.java @@ -0,0 +1,58 @@ +package net.pterodactylus.fcp.quelaton; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + +import net.pterodactylus.fcp.UnsubscribeUSK; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * Default {@link UnsubscribeUskCommand} implementation based on {@link FcpDialog}. + * + * @author David ‘Bombe’ Roden + */ +public class UnsubscribeUskCommandImpl implements UnsubscribeUskCommand { + + private static final RandomIdentifierGenerator IDENTIFIER = new RandomIdentifierGenerator(); + private final ListeningExecutorService threadPool; + private final ConnectionSupplier connectionSupplier; + private final UnsubscribeUSK unsubscribeUSK = new UnsubscribeUSK(IDENTIFIER.generate()); + + public UnsubscribeUskCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) { + this.threadPool = MoreExecutors.listeningDecorator(threadPool); + this.connectionSupplier = connectionSupplier; + } + + @Override + public Executable identifier(String identifier) { + return this::execute; + } + + private ListenableFuture execute() { + return threadPool.submit(this::executeDialog); + } + + private Void executeDialog() throws IOException, ExecutionException, InterruptedException { + try (UnsubscribeUskDialog unsubscribeUskDialog = new UnsubscribeUskDialog()) { + return unsubscribeUskDialog.send(unsubscribeUSK).get(); + } + } + + private class UnsubscribeUskDialog extends FcpDialog { + + public UnsubscribeUskDialog() throws IOException { + super(threadPool, connectionSupplier.get()); + } + + @Override + protected boolean isFinished() { + return true; + } + + } + +} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java b/src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java index 7fb8ad4..b8e4526 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java @@ -1,5 +1,7 @@ package net.pterodactylus.fcp.quelaton; +import java.util.concurrent.ExecutionException; + /** * USK subscription object that is returned to the client application. * @@ -9,6 +11,7 @@ public interface UskSubscription { String getUri(); void onUpdate(UskUpdater uskUpdater); + void cancel() throws ExecutionException, InterruptedException; interface UskUpdater { diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java index 6f9e52d..c716ed5 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -2332,6 +2333,20 @@ public class DefaultFcpClientTest { assertThat(edition.get(), is(23)); } + @Test + public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException { + Future> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute(); + connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage")); + replyWithSubscribed(); + assertThat(uskSubscription.get().get().getUri(), is(URI)); + AtomicBoolean updated = new AtomicBoolean(); + uskSubscription.get().get().onUpdate(e -> updated.set(true)); + uskSubscription.get().get().cancel(); + readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier, "EndMessage")); + sendUpdateNotification(23); + assertThat(updated.get(), is(false)); + } + private void replyWithSubscribed() throws IOException { fcpServer.writeLine( "SubscribedUSK", -- 2.7.4