import java.io.File;
import java.io.InputStream;
import java.util.Optional;
+import java.util.function.Consumer;
import net.pterodactylus.fcp.Key;
*/
public interface ClientPutCommand {
+ ClientPutCommand onKeyGenerated(Consumer<String> keyGenerated);
ClientPutCommand named(String targetFilename);
WithUri<Executable<Optional<Key>>> redirectTo(String uri);
WithUri<Executable<Optional<Key>>> from(File file);
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import net.pterodactylus.fcp.ClientPut;
import net.pterodactylus.fcp.FcpMessage;
import net.pterodactylus.fcp.TestDDAReply;
import net.pterodactylus.fcp.TestDDARequest;
import net.pterodactylus.fcp.TestDDAResponse;
+import net.pterodactylus.fcp.URIGenerated;
import net.pterodactylus.fcp.UploadFrom;
import com.google.common.util.concurrent.ListenableFuture;
private final AtomicReference<InputStream> payload = new AtomicReference<>();
private final AtomicLong length = new AtomicLong();
private final AtomicReference<String> targetFilename = new AtomicReference<>();
+ private final List<Consumer<String>> keyGenerateds = new CopyOnWriteArrayList<>();
public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
this.threadPool = MoreExecutors.listeningDecorator(threadPool);
}
@Override
+ public ClientPutCommand onKeyGenerated(Consumer<String> keyGenerated) {
+ keyGenerateds.add(keyGenerated);
+ return this;
+ }
+
+ @Override
public ClientPutCommand named(String targetFilename) {
this.targetFilename.set(targetFilename);
return this;
}
@Override
+ protected void consumeURIGenerated(URIGenerated uriGenerated) {
+ for (Consumer<String> keyGenerated : keyGenerateds) {
+ keyGenerated.accept(uriGenerated.getURI());
+ }
+ }
+
+ @Override
protected void consumePutSuccessful(PutSuccessful putSuccessful) {
finalKey.set(new Key(putSuccessful.getURI()));
putFinished.set(true);
package net.pterodactylus.fcp.quelaton;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
}
@Test
+ public void clientPutSendsNotificationsForGeneratedKeys()
+ throws InterruptedException, ExecutionException, IOException {
+ List<String> generatedKeys = new CopyOnWriteArrayList<>();
+ Future<Optional<Key>> key = fcpClient.clientPut()
+ .onKeyGenerated(generatedKeys::add)
+ .from(new ByteArrayInputStream("Hello\n".getBytes()))
+ .length(6)
+ .uri("KSK@foo.txt")
+ .execute();
+ connectNode();
+ List<String> lines = fcpServer.collectUntil(is("Hello"));
+ String identifier = extractIdentifier(lines);
+ fcpServer.writeLine(
+ "URIGenerated",
+ "Identifier="+identifier,
+ "URI=KSK@foo.txt",
+ "EndMessage"
+ );
+ fcpServer.writeLine(
+ "PutSuccessful",
+ "URI=KSK@foo.txt",
+ "Identifier=" + identifier,
+ "EndMessage"
+ );
+ assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+ assertThat(generatedKeys, contains("KSK@foo.txt"));
+ }
+
+ @Test
public void clientCanListPeers() throws IOException, ExecutionException, InterruptedException {
Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
connectNode();