/* empty. */
}
+ @Override
+ public void receivedSubscribedUSK(FcpConnection fcpConnection, SubscribedUSK subscribedUSK) {
+ /* empty. */
+ }
+
/**
* {@inheritDoc}
*/
fcpListenerManager.fireReceivedDataFound(new DataFound(fcpMessage));
} else if ("SubscribedUSKUpdate".equals(messageName)) {
fcpListenerManager.fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
+ } else if ("SubscribedUSK".equals(messageName)) {
+ fcpListenerManager.fireReceivedSubscribedUSK(new SubscribedUSK(fcpMessage));
} else if ("IdentifierCollision".equals(messageName)) {
fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
} else if ("AllData".equals(messageName)) {
*/
public void receivedPersistentRequestRemoved(FcpConnection fcpConnection, PersistentRequestRemoved persistentRequestRemoved);
+ void receivedSubscribedUSK(FcpConnection fcpConnection, SubscribedUSK subscribedUSK);
+
/**
* Notifies a listener that a “SubscribedUSKUpdate” message was received.
*
}
}
+ public void fireReceivedSubscribedUSK(SubscribedUSK subscribedUSK) {
+ for (FcpListener fcpListener : getListeners()) {
+ fcpListener.receivedSubscribedUSK(getSource(), subscribedUSK);
+ }
+ }
+
/**
* Notifies all listeners that a “SubscribedUSKUpdate” message was
* received.
*/
public class SubscribeUSK extends FcpMessage {
- /**
- * Creates a new “SubscribeUSK” message.
- *
- * @param uri
- * The URI to watch for changes
- * @param identifier
- * The identifier of the request
- */
- public SubscribeUSK(String uri, String identifier) {
+ public SubscribeUSK(String identifier) {
super("SubscribeUSK");
- setField("URI", uri);
setField("Identifier", identifier);
}
+ public SubscribeUSK(String uri, String identifier) {
+ this(identifier);
+ setField("URI", uri);
+ }
+
+ public void setUri(String uri) {
+ setField("URI", uri);
+ }
+
/**
* Sets whether updates for the USK are actively searched.
*
--- /dev/null
+/*
+ * jFCPlib - SubscribedUSKUpdate.java - Copyright © 2008 David Roden
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ */
+
+package net.pterodactylus.fcp;
+
+/**
+ * A “SubscribedUSK” message is sent when a {@link SubscribeUSK} was succesfully processed.
+ *
+ * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
+ */
+public class SubscribedUSK extends BaseMessage implements Identifiable {
+
+ public SubscribedUSK(FcpMessage receivedMessage) {
+ super(receivedMessage);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return getField("Identifier");
+ }
+
+ public String getURI() {
+ return getField("URI");
+ }
+
+ public boolean isDontPoll() {
+ return Boolean.parseBoolean(getField("DontPoll"));
+ }
+
+}
return new GetPluginInfoCommandImpl(threadPool, this::connect);
}
+ @Override
+ public SubscribeUskCommand subscribeUsk() {
+ return new SubscribeUskCommandImpl(threadPool, this::connect);
+ }
+
}
RemovePluginCommand removePlugin();
GetPluginInfoCommand getPluginInfo();
+ SubscribeUskCommand subscribeUsk();
+
}
import net.pterodactylus.fcp.SentFeed;
import net.pterodactylus.fcp.SimpleProgress;
import net.pterodactylus.fcp.StartedCompression;
+import net.pterodactylus.fcp.SubscribedUSK;
import net.pterodactylus.fcp.SubscribedUSKUpdate;
import net.pterodactylus.fcp.TestDDAComplete;
import net.pterodactylus.fcp.TestDDAReply;
protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
@Override
+ public final void receivedSubscribedUSK(FcpConnection fcpConnection, SubscribedUSK subscribedUSK) {
+ consume(this::consumeSubscribedUSK, subscribedUSK);
+ }
+
+ protected void consumeSubscribedUSK(SubscribedUSK subscribedUSK) { }
+
+ @Override
public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import java.util.Optional;
+
+/**
+ * Subscribes to a USK.
+ *
+ * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public interface SubscribeUskCommand {
+
+ Executable<Optional<UskSubscription>> uri(String uri);
+
+}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import net.pterodactylus.fcp.IdentifierCollision;
+import net.pterodactylus.fcp.SubscribeUSK;
+import net.pterodactylus.fcp.SubscribedUSK;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Default {@link SubscribeUskCommand} implementation based on {@link FcpDialog}.
+ *
+ * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class SubscribeUskCommandImpl implements SubscribeUskCommand {
+
+ private static final RandomIdentifierGenerator IDENTIFIER = new RandomIdentifierGenerator();
+ private final ListeningExecutorService threadPool;
+ private final ConnectionSupplier connectionSupplier;
+ private final SubscribeUSK subscribeUSK = new SubscribeUSK(IDENTIFIER.generate());
+
+ public SubscribeUskCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
+ this.threadPool = MoreExecutors.listeningDecorator(threadPool);
+ this.connectionSupplier = connectionSupplier;
+ }
+
+ @Override
+ public Executable<Optional<UskSubscription>> uri(String uri) {
+ subscribeUSK.setUri(uri);
+ return this::execute;
+ }
+
+ private ListenableFuture<Optional<UskSubscription>> execute() {
+ return threadPool.submit(this::executeDialog);
+ }
+
+ private Optional<UskSubscription> executeDialog() throws IOException, ExecutionException, InterruptedException {
+ try (SubscribeUskDialog subscribeUskDialog = new SubscribeUskDialog()) {
+ return subscribeUskDialog.send(subscribeUSK).get();
+ }
+ }
+
+ private class SubscribeUskDialog extends FcpDialog<Optional<UskSubscription>> {
+
+ private final AtomicBoolean finished = new AtomicBoolean();
+ private final AtomicReference<SubscribedUSK> subscribedUSK = new AtomicReference<>();
+
+ public SubscribeUskDialog() throws IOException {
+ super(threadPool, connectionSupplier.get());
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return finished.get();
+ }
+
+ @Override
+ protected Optional<UskSubscription> getResult() {
+ return Optional.ofNullable(subscribedUSK.get()).map(subscribedUSK -> new UskSubscription() {
+ @Override
+ public String getUri() {
+ return subscribedUSK.getURI();
+ }
+ });
+ }
+
+ @Override
+ protected void consumeSubscribedUSK(SubscribedUSK subscribedUSK) {
+ this.subscribedUSK.set(subscribedUSK);
+ finished.set(true);
+ }
+
+ @Override
+ protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) {
+ finished.set(true);
+ }
+
+ }
+
+}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+/**
+ * USK subscription object that is returned to the client application.
+ *
+ * @author <a href="mailto:bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public interface UskSubscription {
+
+ String getUri();
+
+}
assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
}
- public class PluginCommands {
+ private List<String> lines;
+ private String identifier;
- private static final String CLASS_NAME = "foo.plugin.Plugin";
+ private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
+ throws InterruptedException, ExecutionException, IOException {
+ connectNode();
+ lines = fcpServer.collectUntil(is("EndMessage"));
+ identifier = extractIdentifier(lines);
+ assertThat(lines, requestMatcher.get());
+ }
- private List<String> lines;
- private String identifier;
+ public class PluginCommands {
- private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
- throws InterruptedException, ExecutionException, IOException {
- connectNode();
- lines = fcpServer.collectUntil(is("EndMessage"));
- identifier = extractIdentifier(lines);
- assertThat(lines, requestMatcher.get());
- }
+ private static final String CLASS_NAME = "foo.plugin.Plugin";
private void replyWithPluginInfo() throws IOException {
fcpServer.writeLine(
}
+ public class UskSubscriptionCommands {
+
+ private static final String URI = "SSK@some,uri/file.txt";
+
+ @Test
+ public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
+ Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
+ connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
+ fcpServer.writeLine(
+ "SubscribedUSK",
+ "Identifier=" + identifier,
+ "URI=" + URI,
+ "DontPoll=false",
+ "EndMessage"
+ );
+ assertThat(uskSubscription.get().get().getUri(), is(URI));
+ }
+
+ }
+
}