Implement notification on USK update
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sat, 22 Aug 2015 16:23:17 +0000 (18:23 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Sat, 22 Aug 2015 16:23:17 +0000 (18:23 +0200)
src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java [new file with mode: 0644]
src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java
src/main/java/net/pterodactylus/fcp/quelaton/SubscribeUskCommandImpl.java
src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java
src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java

diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java b/src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java
new file mode 100644 (file)
index 0000000..241253d
--- /dev/null
@@ -0,0 +1,104 @@
+package net.pterodactylus.fcp.quelaton;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import net.pterodactylus.fcp.FcpAdapter;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpListener;
+import net.pterodactylus.fcp.Priority;
+import net.pterodactylus.fcp.SubscribeUSK;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+
+/**
+ * Maintains a record of active subscriptions.
+ *
+ * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class ActiveSubscriptions {
+
+       private final Map<String, RemoteUskSubscription> subscriptions = Collections.synchronizedMap(new HashMap<>());
+
+       public void renew(Consumer<FcpListener> fcpEventSender, Supplier<SubscribeUskCommand> subscribeUskCommandSupplier)
+       throws ExecutionException, InterruptedException {
+               fcpEventSender.accept(createFcpListener());
+               for (UskSubscription uskSubscription : subscriptions.values()) {
+                       subscribeUskCommandSupplier.get().uri(uskSubscription.getUri()).execute().get();
+               }
+       }
+
+       private FcpListener createFcpListener() {
+               return new FcpAdapter() {
+                       @Override
+                       public void receivedSubscribedUSKUpdate(
+                               FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+                               String identifier = subscribedUSKUpdate.getIdentifier();
+                               RemoteUskSubscription uskSubscription = subscriptions.get(identifier);
+                               if (uskSubscription == null) {
+                                       /* TODO - log warning? */
+                                       return;
+                               }
+                               uskSubscription.foundUpdate(subscribedUSKUpdate.getEdition());
+                       }
+               };
+       }
+
+       public UskSubscription createUskSubscription(SubscribeUSK subscribeUSK) {
+               RemoteUskSubscription remoteUskSubscription =
+                       new RemoteUskSubscription(subscribeUSK.getIdentifier(), subscribeUSK.getUri(), subscribeUSK.isActive(),
+                               subscribeUSK.isSparse(), subscribeUSK.getPriority(), subscribeUSK.getActivePriority(),
+                               subscribeUSK.isRealTime(), subscribeUSK.isIgnoreDateHints());
+               subscriptions.put(subscribeUSK.getIdentifier(), remoteUskSubscription);
+               return remoteUskSubscription;
+       }
+
+       private static class RemoteUskSubscription implements UskSubscription {
+
+               private final String identifier;
+               private final String uri;
+               private final boolean active;
+               private final boolean sparse;
+               private final Priority priority;
+               private final Priority activePriority;
+               private final boolean realTime;
+               private final boolean ignoreDateHints;
+               private final List<UskUpdater> uskUpdaters = Collections.synchronizedList(new ArrayList<>());
+
+               private RemoteUskSubscription(
+                       String identifier, String uri, boolean active, boolean sparse, Priority priority, Priority activePriority,
+                       boolean realTime, boolean ignoreDateHints) {
+                       this.identifier = identifier;
+                       this.uri = uri;
+                       this.active = active;
+                       this.sparse = sparse;
+                       this.priority = priority;
+                       this.activePriority = activePriority;
+                       this.realTime = realTime;
+                       this.ignoreDateHints = ignoreDateHints;
+               }
+
+               @Override
+               public String getUri() {
+                       return uri;
+               }
+
+               @Override
+               public void onUpdate(UskUpdater uskUpdater) {
+                       uskUpdaters.add(uskUpdater);
+               }
+
+               private void foundUpdate(int edition) {
+                       for (UskUpdater uskUpdater : uskUpdaters) {
+                               uskUpdater.uskUpdated(edition);
+                       }
+               }
+
+       }
+
+}
index 255549c..53e3640 100644 (file)
@@ -23,6 +23,7 @@ public class DefaultFcpClient implements FcpClient {
        private final int port;
        private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
        private final Supplier<String> clientName;
+       private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions();
 
        public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName) {
                this.threadPool = MoreExecutors.listeningDecorator(threadPool);
@@ -38,6 +39,11 @@ public class DefaultFcpClient implements FcpClient {
                }
                fcpConnection = createConnection();
                this.fcpConnection.set(fcpConnection);
+               try {
+                       activeSubscriptions.renew(fcpConnection::addFcpListener, this::subscribeUsk);
+               } catch (InterruptedException | ExecutionException e) {
+                       throw new IOException(e);
+               }
                return fcpConnection;
        }
 
@@ -136,7 +142,7 @@ public class DefaultFcpClient implements FcpClient {
 
        @Override
        public SubscribeUskCommand subscribeUsk() {
-               return new SubscribeUskCommandImpl(threadPool, this::connect);
+               return new SubscribeUskCommandImpl(threadPool, this::connect, activeSubscriptions);
        }
 
 }
index cf9d10b..598aeec 100644 (file)
@@ -5,7 +5,6 @@ import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import net.pterodactylus.fcp.IdentifierCollision;
 import net.pterodactylus.fcp.SubscribeUSK;
@@ -26,8 +25,12 @@ public class SubscribeUskCommandImpl implements SubscribeUskCommand {
        private final ListeningExecutorService threadPool;
        private final ConnectionSupplier connectionSupplier;
        private final SubscribeUSK subscribeUSK = new SubscribeUSK(IDENTIFIER.generate());
+       private final ActiveSubscriptions activeSubscriptions;
 
-       public SubscribeUskCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
+       public SubscribeUskCommandImpl(
+               ExecutorService threadPool, ConnectionSupplier connectionSupplier,
+               ActiveSubscriptions activeSubscriptions) {
+               this.activeSubscriptions = activeSubscriptions;
                this.threadPool = MoreExecutors.listeningDecorator(threadPool);
                this.connectionSupplier = connectionSupplier;
        }
@@ -44,14 +47,18 @@ public class SubscribeUskCommandImpl implements SubscribeUskCommand {
 
        private Optional<UskSubscription> executeDialog() throws IOException, ExecutionException, InterruptedException {
                try (SubscribeUskDialog subscribeUskDialog = new SubscribeUskDialog()) {
-                       return subscribeUskDialog.send(subscribeUSK).get();
+                       if (subscribeUskDialog.send(subscribeUSK).get()) {
+                               UskSubscription uskSubscription = activeSubscriptions.createUskSubscription(subscribeUSK);
+                               return Optional.of(uskSubscription);
+                       }
+                       return Optional.empty();
                }
        }
 
-       private class SubscribeUskDialog extends FcpDialog<Optional<UskSubscription>> {
+       private class SubscribeUskDialog extends FcpDialog<Boolean> {
 
                private final AtomicBoolean finished = new AtomicBoolean();
-               private final AtomicReference<SubscribedUSK> subscribedUSK = new AtomicReference<>();
+               private final AtomicBoolean success = new AtomicBoolean();
 
                public SubscribeUskDialog() throws IOException {
                        super(threadPool, connectionSupplier.get());
@@ -63,18 +70,13 @@ public class SubscribeUskCommandImpl implements SubscribeUskCommand {
                }
 
                @Override
-               protected Optional<UskSubscription> getResult() {
-                       return Optional.ofNullable(subscribedUSK.get()).map(subscribedUSK -> new UskSubscription() {
-                               @Override
-                               public String getUri() {
-                                       return subscribedUSK.getURI();
-                               }
-                       });
+               protected Boolean getResult() {
+                       return success.get();
                }
 
                @Override
                protected void consumeSubscribedUSK(SubscribedUSK subscribedUSK) {
-                       this.subscribedUSK.set(subscribedUSK);
+                       success.set(true);
                        finished.set(true);
                }
 
index 3ab2863..7fb8ad4 100644 (file)
@@ -8,5 +8,12 @@ package net.pterodactylus.fcp.quelaton;
 public interface UskSubscription {
 
        String getUri();
+       void onUpdate(UskUpdater uskUpdater);
+
+       interface UskUpdater {
+
+               void uskUpdated(int edition);
+
+       }
 
 }
index 4fad0ea..a93c229 100644 (file)
@@ -20,10 +20,13 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -2301,6 +2304,28 @@ public class DefaultFcpClientTest {
                                "EndMessage"
                        );
                        assertThat(uskSubscription.get().get().getUri(), is(URI));
+                       AtomicInteger edition = new AtomicInteger();
+                       CountDownLatch updated = new CountDownLatch(2);
+                       uskSubscription.get().get().onUpdate(e -> {
+                               edition.set(e);
+                               updated.countDown();
+                       });
+                       fcpServer.writeLine(
+                               "SubscribedUSKUpdate",
+                               "Identifier=" + identifier,
+                               "URI=" + URI,
+                               "Edition=23",
+                               "EndMessage"
+                       );
+                       fcpServer.writeLine(
+                               "SubscribedUSKUpdate",
+                               "Identifier=" + identifier,
+                               "URI=" + URI,
+                               "Edition=24",
+                               "EndMessage"
+                       );
+                       assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
+                       assertThat(edition.get(), is(24));
                }
 
        }