--- /dev/null
+package net.pterodactylus.fcp;
+
+/**
+ * The “UnsubscribeUSK” message is used to cancel a {@link SubscribeUSK USK subscription}.
+ *
+ * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class UnsubscribeUSK extends FcpMessage {
+
+ public UnsubscribeUSK(String identifier) {
+ super("UnsubscribeUSK");
+ setField("Identifier", identifier);
+ }
+
+}
*/
public class ActiveSubscriptions {
+ private final Supplier<UnsubscribeUskCommand> unsubscribeUskCommandSupplier;
private final Map<String, RemoteUskSubscription> subscriptions = Collections.synchronizedMap(new HashMap<>());
+ public ActiveSubscriptions(Supplier<UnsubscribeUskCommand> unsubscribeUskCommandSupplier) {
+ this.unsubscribeUskCommandSupplier = unsubscribeUskCommandSupplier;
+ }
+
public void renew(Consumer<FcpListener> fcpEventSender, Supplier<SubscribeUskCommand> subscribeUskCommandSupplier)
throws ExecutionException, InterruptedException {
fcpEventSender.accept(createFcpListener());
return remoteUskSubscription;
}
- private static class RemoteUskSubscription implements UskSubscription {
+ private class RemoteUskSubscription implements UskSubscription {
private final String identifier;
private final String uri;
}
}
+ public void cancel() throws ExecutionException, InterruptedException {
+ unsubscribeUskCommandSupplier.get().identifier(identifier).execute().get();
+ subscriptions.remove(identifier);
+ }
+
}
}
private final int port;
private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
private final Supplier<String> clientName;
- private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions();
+ private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions(this::unsubscribeUsk);
public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName) {
this.threadPool = MoreExecutors.listeningDecorator(threadPool);
return new SubscribeUskCommandImpl(threadPool, this::connect, activeSubscriptions);
}
+ private UnsubscribeUskCommand unsubscribeUsk() {
+ return new UnsubscribeUskCommandImpl(threadPool, this::connect);
+ }
+
}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import net.pterodactylus.fcp.UnsubscribeUSK;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Default {@link UnsubscribeUskCommand} implementation based on {@link FcpDialog}.
+ *
+ * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class UnsubscribeUskCommandImpl implements UnsubscribeUskCommand {
+
+ private static final RandomIdentifierGenerator IDENTIFIER = new RandomIdentifierGenerator();
+ private final ListeningExecutorService threadPool;
+ private final ConnectionSupplier connectionSupplier;
+ private final UnsubscribeUSK unsubscribeUSK = new UnsubscribeUSK(IDENTIFIER.generate());
+
+ public UnsubscribeUskCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
+ this.threadPool = MoreExecutors.listeningDecorator(threadPool);
+ this.connectionSupplier = connectionSupplier;
+ }
+
+ @Override
+ public Executable<Void> identifier(String identifier) {
+ return this::execute;
+ }
+
+ private ListenableFuture<Void> execute() {
+ return threadPool.submit(this::executeDialog);
+ }
+
+ private Void executeDialog() throws IOException, ExecutionException, InterruptedException {
+ try (UnsubscribeUskDialog unsubscribeUskDialog = new UnsubscribeUskDialog()) {
+ return unsubscribeUskDialog.send(unsubscribeUSK).get();
+ }
+ }
+
+ private class UnsubscribeUskDialog extends FcpDialog<Void> {
+
+ public UnsubscribeUskDialog() throws IOException {
+ super(threadPool, connectionSupplier.get());
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return true;
+ }
+
+ }
+
+}
package net.pterodactylus.fcp.quelaton;
+import java.util.concurrent.ExecutionException;
+
/**
* USK subscription object that is returned to the client application.
*
String getUri();
void onUpdate(UskUpdater uskUpdater);
+ void cancel() throws ExecutionException, InterruptedException;
interface UskUpdater {
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
assertThat(edition.get(), is(23));
}
+ @Test
+ public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
+ Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
+ connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
+ replyWithSubscribed();
+ assertThat(uskSubscription.get().get().getUri(), is(URI));
+ AtomicBoolean updated = new AtomicBoolean();
+ uskSubscription.get().get().onUpdate(e -> updated.set(true));
+ uskSubscription.get().get().cancel();
+ readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier, "EndMessage"));
+ sendUpdateNotification(23);
+ assertThat(updated.get(), is(false));
+ }
+
private void replyWithSubscribed() throws IOException {
fcpServer.writeLine(
"SubscribedUSK",