From db815b5a0f70dcc602fc33b27c39e05a4e2fe57c Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Sat, 22 Aug 2015 18:23:17 +0200 Subject: [PATCH] Implement notification on USK update --- .../fcp/quelaton/ActiveSubscriptions.java | 104 +++++++++++++++++++++ .../fcp/quelaton/DefaultFcpClient.java | 8 +- .../fcp/quelaton/SubscribeUskCommandImpl.java | 28 +++--- .../fcp/quelaton/UskSubscription.java | 7 ++ .../fcp/quelaton/DefaultFcpClientTest.java | 25 +++++ 5 files changed, 158 insertions(+), 14 deletions(-) create mode 100644 src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java b/src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java new file mode 100644 index 0000000..241253d --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ActiveSubscriptions.java @@ -0,0 +1,104 @@ +package net.pterodactylus.fcp.quelaton; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import net.pterodactylus.fcp.FcpAdapter; +import net.pterodactylus.fcp.FcpConnection; +import net.pterodactylus.fcp.FcpListener; +import net.pterodactylus.fcp.Priority; +import net.pterodactylus.fcp.SubscribeUSK; +import net.pterodactylus.fcp.SubscribedUSKUpdate; + +/** + * Maintains a record of active subscriptions. + * + * @author David ‘Bombe’ Roden + */ +public class ActiveSubscriptions { + + private final Map subscriptions = Collections.synchronizedMap(new HashMap<>()); + + public void renew(Consumer fcpEventSender, Supplier subscribeUskCommandSupplier) + throws ExecutionException, InterruptedException { + fcpEventSender.accept(createFcpListener()); + for (UskSubscription uskSubscription : subscriptions.values()) { + subscribeUskCommandSupplier.get().uri(uskSubscription.getUri()).execute().get(); + } + } + + private FcpListener createFcpListener() { + return new FcpAdapter() { + @Override + public void receivedSubscribedUSKUpdate( + FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) { + String identifier = subscribedUSKUpdate.getIdentifier(); + RemoteUskSubscription uskSubscription = subscriptions.get(identifier); + if (uskSubscription == null) { + /* TODO - log warning? */ + return; + } + uskSubscription.foundUpdate(subscribedUSKUpdate.getEdition()); + } + }; + } + + public UskSubscription createUskSubscription(SubscribeUSK subscribeUSK) { + RemoteUskSubscription remoteUskSubscription = + new RemoteUskSubscription(subscribeUSK.getIdentifier(), subscribeUSK.getUri(), subscribeUSK.isActive(), + subscribeUSK.isSparse(), subscribeUSK.getPriority(), subscribeUSK.getActivePriority(), + subscribeUSK.isRealTime(), subscribeUSK.isIgnoreDateHints()); + subscriptions.put(subscribeUSK.getIdentifier(), remoteUskSubscription); + return remoteUskSubscription; + } + + private static class RemoteUskSubscription implements UskSubscription { + + private final String identifier; + private final String uri; + private final boolean active; + private final boolean sparse; + private final Priority priority; + private final Priority activePriority; + private final boolean realTime; + private final boolean ignoreDateHints; + private final List uskUpdaters = Collections.synchronizedList(new ArrayList<>()); + + private RemoteUskSubscription( + String identifier, String uri, boolean active, boolean sparse, Priority priority, Priority activePriority, + boolean realTime, boolean ignoreDateHints) { + this.identifier = identifier; + this.uri = uri; + this.active = active; + this.sparse = sparse; + this.priority = priority; + this.activePriority = activePriority; + this.realTime = realTime; + this.ignoreDateHints = ignoreDateHints; + } + + @Override + public String getUri() { + return uri; + } + + @Override + public void onUpdate(UskUpdater uskUpdater) { + uskUpdaters.add(uskUpdater); + } + + private void foundUpdate(int edition) { + for (UskUpdater uskUpdater : uskUpdaters) { + uskUpdater.uskUpdated(edition); + } + } + + } + +} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java index 255549c..53e3640 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -23,6 +23,7 @@ public class DefaultFcpClient implements FcpClient { private final int port; private final AtomicReference fcpConnection = new AtomicReference<>(); private final Supplier clientName; + private final ActiveSubscriptions activeSubscriptions = new ActiveSubscriptions(); public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier clientName) { this.threadPool = MoreExecutors.listeningDecorator(threadPool); @@ -38,6 +39,11 @@ public class DefaultFcpClient implements FcpClient { } fcpConnection = createConnection(); this.fcpConnection.set(fcpConnection); + try { + activeSubscriptions.renew(fcpConnection::addFcpListener, this::subscribeUsk); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } return fcpConnection; } @@ -136,7 +142,7 @@ public class DefaultFcpClient implements FcpClient { @Override public SubscribeUskCommand subscribeUsk() { - return new SubscribeUskCommandImpl(threadPool, this::connect); + return new SubscribeUskCommandImpl(threadPool, this::connect, activeSubscriptions); } } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/SubscribeUskCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/SubscribeUskCommandImpl.java index cf9d10b..598aeec 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/SubscribeUskCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/SubscribeUskCommandImpl.java @@ -5,7 +5,6 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import net.pterodactylus.fcp.IdentifierCollision; import net.pterodactylus.fcp.SubscribeUSK; @@ -26,8 +25,12 @@ public class SubscribeUskCommandImpl implements SubscribeUskCommand { private final ListeningExecutorService threadPool; private final ConnectionSupplier connectionSupplier; private final SubscribeUSK subscribeUSK = new SubscribeUSK(IDENTIFIER.generate()); + private final ActiveSubscriptions activeSubscriptions; - public SubscribeUskCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) { + public SubscribeUskCommandImpl( + ExecutorService threadPool, ConnectionSupplier connectionSupplier, + ActiveSubscriptions activeSubscriptions) { + this.activeSubscriptions = activeSubscriptions; this.threadPool = MoreExecutors.listeningDecorator(threadPool); this.connectionSupplier = connectionSupplier; } @@ -44,14 +47,18 @@ public class SubscribeUskCommandImpl implements SubscribeUskCommand { private Optional executeDialog() throws IOException, ExecutionException, InterruptedException { try (SubscribeUskDialog subscribeUskDialog = new SubscribeUskDialog()) { - return subscribeUskDialog.send(subscribeUSK).get(); + if (subscribeUskDialog.send(subscribeUSK).get()) { + UskSubscription uskSubscription = activeSubscriptions.createUskSubscription(subscribeUSK); + return Optional.of(uskSubscription); + } + return Optional.empty(); } } - private class SubscribeUskDialog extends FcpDialog> { + private class SubscribeUskDialog extends FcpDialog { private final AtomicBoolean finished = new AtomicBoolean(); - private final AtomicReference subscribedUSK = new AtomicReference<>(); + private final AtomicBoolean success = new AtomicBoolean(); public SubscribeUskDialog() throws IOException { super(threadPool, connectionSupplier.get()); @@ -63,18 +70,13 @@ public class SubscribeUskCommandImpl implements SubscribeUskCommand { } @Override - protected Optional getResult() { - return Optional.ofNullable(subscribedUSK.get()).map(subscribedUSK -> new UskSubscription() { - @Override - public String getUri() { - return subscribedUSK.getURI(); - } - }); + protected Boolean getResult() { + return success.get(); } @Override protected void consumeSubscribedUSK(SubscribedUSK subscribedUSK) { - this.subscribedUSK.set(subscribedUSK); + success.set(true); finished.set(true); } diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java b/src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java index 3ab2863..7fb8ad4 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/UskSubscription.java @@ -8,5 +8,12 @@ package net.pterodactylus.fcp.quelaton; public interface UskSubscription { String getUri(); + void onUpdate(UskUpdater uskUpdater); + + interface UskUpdater { + + void uskUpdated(int edition); + + } } diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java index 4fad0ea..a93c229 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java @@ -20,10 +20,13 @@ import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -2301,6 +2304,28 @@ public class DefaultFcpClientTest { "EndMessage" ); assertThat(uskSubscription.get().get().getUri(), is(URI)); + AtomicInteger edition = new AtomicInteger(); + CountDownLatch updated = new CountDownLatch(2); + uskSubscription.get().get().onUpdate(e -> { + edition.set(e); + updated.countDown(); + }); + fcpServer.writeLine( + "SubscribedUSKUpdate", + "Identifier=" + identifier, + "URI=" + URI, + "Edition=23", + "EndMessage" + ); + fcpServer.writeLine( + "SubscribedUSKUpdate", + "Identifier=" + identifier, + "URI=" + URI, + "Edition=24", + "EndMessage" + ); + assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true)); + assertThat(edition.get(), is(24)); } } -- 2.7.4