--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import net.pterodactylus.fcp.FcpAdapter;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpListener;
+import net.pterodactylus.fcp.Priority;
+import net.pterodactylus.fcp.SubscribeUSK;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+
+/**
+ * Maintains a record of active subscriptions.
+ *
+ * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class ActiveSubscriptions {
+
+ private final Map<String, RemoteUskSubscription> subscriptions = Collections.synchronizedMap(new HashMap<>());
+
+ public void renew(Consumer<FcpListener> fcpEventSender, Supplier<SubscribeUskCommand> subscribeUskCommandSupplier)
+ throws ExecutionException, InterruptedException {
+ fcpEventSender.accept(createFcpListener());
+ for (UskSubscription uskSubscription : subscriptions.values()) {
+ subscribeUskCommandSupplier.get().uri(uskSubscription.getUri()).execute().get();
+ }
+ }
+
+ private FcpListener createFcpListener() {
+ return new FcpAdapter() {
+ @Override
+ public void receivedSubscribedUSKUpdate(
+ FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+ String identifier = subscribedUSKUpdate.getIdentifier();
+ RemoteUskSubscription uskSubscription = subscriptions.get(identifier);
+ if (uskSubscription == null) {
+ /* TODO - log warning? */
+ return;
+ }
+ uskSubscription.foundUpdate(subscribedUSKUpdate.getEdition());
+ }
+ };
+ }
+
+ public UskSubscription createUskSubscription(SubscribeUSK subscribeUSK) {
+ RemoteUskSubscription remoteUskSubscription =
+ new RemoteUskSubscription(subscribeUSK.getIdentifier(), subscribeUSK.getUri(), subscribeUSK.isActive(),
+ subscribeUSK.isSparse(), subscribeUSK.getPriority(), subscribeUSK.getActivePriority(),
+ subscribeUSK.isRealTime(), subscribeUSK.isIgnoreDateHints());
+ subscriptions.put(subscribeUSK.getIdentifier(), remoteUskSubscription);
+ return remoteUskSubscription;
+ }
+
+ private static class RemoteUskSubscription implements UskSubscription {
+
+ private final String identifier;
+ private final String uri;
+ private final boolean active;
+ private final boolean sparse;
+ private final Priority priority;
+ private final Priority activePriority;
+ private final boolean realTime;
+ private final boolean ignoreDateHints;
+ private final List<UskUpdater> uskUpdaters = Collections.synchronizedList(new ArrayList<>());
+
+ private RemoteUskSubscription(
+ String identifier, String uri, boolean active, boolean sparse, Priority priority, Priority activePriority,
+ boolean realTime, boolean ignoreDateHints) {
+ this.identifier = identifier;
+ this.uri = uri;
+ this.active = active;
+ this.sparse = sparse;
+ this.priority = priority;
+ this.activePriority = activePriority;
+ this.realTime = realTime;
+ this.ignoreDateHints = ignoreDateHints;
+ }
+
+ @Override
+ public String getUri() {
+ return uri;
+ }
+
+ @Override
+ public void onUpdate(UskUpdater uskUpdater) {
+ uskUpdaters.add(uskUpdater);
+ }
+
+ private void foundUpdate(int edition) {
+ for (UskUpdater uskUpdater : uskUpdaters) {
+ uskUpdater.uskUpdated(edition);
+ }
+ }
+
+ }
+
+}
private final int port;
private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
private final Supplier<String> clientName;
+ private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions();
public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName) {
this.threadPool = MoreExecutors.listeningDecorator(threadPool);
}
fcpConnection = createConnection();
this.fcpConnection.set(fcpConnection);
+ try {
+ activeSubscriptions.renew(fcpConnection::addFcpListener, this::subscribeUsk);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
return fcpConnection;
}
@Override
public SubscribeUskCommand subscribeUsk() {
- return new SubscribeUskCommandImpl(threadPool, this::connect);
+ return new SubscribeUskCommandImpl(threadPool, this::connect, activeSubscriptions);
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import net.pterodactylus.fcp.IdentifierCollision;
import net.pterodactylus.fcp.SubscribeUSK;
private final ListeningExecutorService threadPool;
private final ConnectionSupplier connectionSupplier;
private final SubscribeUSK subscribeUSK = new SubscribeUSK(IDENTIFIER.generate());
+ private final ActiveSubscriptions activeSubscriptions;
- public SubscribeUskCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
+ public SubscribeUskCommandImpl(
+ ExecutorService threadPool, ConnectionSupplier connectionSupplier,
+ ActiveSubscriptions activeSubscriptions) {
+ this.activeSubscriptions = activeSubscriptions;
this.threadPool = MoreExecutors.listeningDecorator(threadPool);
this.connectionSupplier = connectionSupplier;
}
private Optional<UskSubscription> executeDialog() throws IOException, ExecutionException, InterruptedException {
try (SubscribeUskDialog subscribeUskDialog = new SubscribeUskDialog()) {
- return subscribeUskDialog.send(subscribeUSK).get();
+ if (subscribeUskDialog.send(subscribeUSK).get()) {
+ UskSubscription uskSubscription = activeSubscriptions.createUskSubscription(subscribeUSK);
+ return Optional.of(uskSubscription);
+ }
+ return Optional.empty();
}
}
- private class SubscribeUskDialog extends FcpDialog<Optional<UskSubscription>> {
+ private class SubscribeUskDialog extends FcpDialog<Boolean> {
private final AtomicBoolean finished = new AtomicBoolean();
- private final AtomicReference<SubscribedUSK> subscribedUSK = new AtomicReference<>();
+ private final AtomicBoolean success = new AtomicBoolean();
public SubscribeUskDialog() throws IOException {
super(threadPool, connectionSupplier.get());
}
@Override
- protected Optional<UskSubscription> getResult() {
- return Optional.ofNullable(subscribedUSK.get()).map(subscribedUSK -> new UskSubscription() {
- @Override
- public String getUri() {
- return subscribedUSK.getURI();
- }
- });
+ protected Boolean getResult() {
+ return success.get();
}
@Override
protected void consumeSubscribedUSK(SubscribedUSK subscribedUSK) {
- this.subscribedUSK.set(subscribedUSK);
+ success.set(true);
finished.set(true);
}
public interface UskSubscription {
String getUri();
+ void onUpdate(UskUpdater uskUpdater);
+
+ interface UskUpdater {
+
+ void uskUpdated(int edition);
+
+ }
}
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
"EndMessage"
);
assertThat(uskSubscription.get().get().getUri(), is(URI));
+ AtomicInteger edition = new AtomicInteger();
+ CountDownLatch updated = new CountDownLatch(2);
+ uskSubscription.get().get().onUpdate(e -> {
+ edition.set(e);
+ updated.countDown();
+ });
+ fcpServer.writeLine(
+ "SubscribedUSKUpdate",
+ "Identifier=" + identifier,
+ "URI=" + URI,
+ "Edition=23",
+ "EndMessage"
+ );
+ fcpServer.writeLine(
+ "SubscribedUSKUpdate",
+ "Identifier=" + identifier,
+ "URI=" + URI,
+ "Edition=24",
+ "EndMessage"
+ );
+ assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
+ assertThat(edition.get(), is(24));
}
}