X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FClientPutCommandImpl.java;h=06d54e0b4ef2f4811eca52b3cc3b85bc445f5642;hb=60bf94d38d9c17dfba81e20564e98337714ae332;hp=43d6c99e858532ebb01c92d5f38ed6e24d05df03;hpb=1ce220842b96db2ea57e456c70977620943cb5de;p=jFCPlib.git
diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java
index 43d6c99..06d54e0 100644
--- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java
+++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java
@@ -3,18 +3,29 @@ package net.pterodactylus.fcp.quelaton;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import net.pterodactylus.fcp.ClientPut;
import net.pterodactylus.fcp.FcpMessage;
import net.pterodactylus.fcp.Key;
+import net.pterodactylus.fcp.ProtocolError;
import net.pterodactylus.fcp.PutFailed;
import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.TestDDARequest;
+import net.pterodactylus.fcp.TestDDAResponse;
+import net.pterodactylus.fcp.URIGenerated;
import net.pterodactylus.fcp.UploadFrom;
import com.google.common.util.concurrent.ListenableFuture;
@@ -22,7 +33,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
/**
- * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}.
+ * Default {@link ClientPutCommand} implemented based on {@link FcpDialog}.
*
* @author David âBombeâ Roden
*/
@@ -35,6 +46,7 @@ class ClientPutCommandImpl implements ClientPutCommand {
private final AtomicReference payload = new AtomicReference<>();
private final AtomicLong length = new AtomicLong();
private final AtomicReference targetFilename = new AtomicReference<>();
+ private final List> keyGenerateds = new CopyOnWriteArrayList<>();
public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
this.threadPool = MoreExecutors.listeningDecorator(threadPool);
@@ -42,38 +54,50 @@ class ClientPutCommandImpl implements ClientPutCommand {
}
@Override
+ public ClientPutCommand onKeyGenerated(Consumer keyGenerated) {
+ keyGenerateds.add(keyGenerated);
+ return this;
+ }
+
+ @Override
public ClientPutCommand named(String targetFilename) {
this.targetFilename.set(targetFilename);
return this;
}
@Override
- public Keyed> redirectTo(Key key) {
- this.redirectUri.set(Objects.requireNonNull(key, "key must not be null").getKey());
+ public WithUri redirectTo(String uri) {
+ this.redirectUri.set(Objects.requireNonNull(uri, "uri must not be null"));
return this::key;
}
@Override
- public Keyed> from(File file) {
+ public WithUri from(File file) {
this.file.set(Objects.requireNonNull(file, "file must not be null"));
return this::key;
}
@Override
- public Lengthed>> from(InputStream inputStream) {
+ public WithLength from(InputStream inputStream) {
payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null"));
return this::length;
}
- private Keyed> length(long length) {
+ private WithUri length(long length) {
this.length.set(length);
return this::key;
}
- private ListenableFuture> key(Key key) {
+ private Executable> key(String uri) {
+ return () -> threadPool.submit(() -> execute(uri));
+ }
+
+ private Optional execute(String uri) throws InterruptedException, ExecutionException, IOException {
String identifier = new RandomIdentifierGenerator().generate();
- ClientPut clientPut = createClientPutCommand(key.getKey(), identifier);
- return threadPool.submit(() -> new ClientPutReplySequence().send(clientPut).get());
+ ClientPut clientPut = createClientPutCommand(uri, identifier);
+ try (ClientPutDialog clientPutDialog = new ClientPutDialog()) {
+ return clientPutDialog.send(clientPut).get();
+ }
}
private ClientPut createClientPutCommand(String uri, String identifier) {
@@ -110,13 +134,14 @@ class ClientPutCommandImpl implements ClientPutCommand {
return clientPut;
}
- private class ClientPutReplySequence extends FcpReplySequence> {
+ private class ClientPutDialog extends FcpDialog> {
- private final AtomicReference identifier = new AtomicReference<>();
+ private final AtomicReference originalClientPut = new AtomicReference<>();
+ private final AtomicReference directory = new AtomicReference<>();
private final AtomicReference finalKey = new AtomicReference<>();
private final AtomicBoolean putFinished = new AtomicBoolean();
- public ClientPutReplySequence() throws IOException {
+ public ClientPutDialog() throws IOException {
super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
}
@@ -132,29 +157,58 @@ class ClientPutCommandImpl implements ClientPutCommand {
@Override
public ListenableFuture> send(FcpMessage fcpMessage) throws IOException {
- identifier.set(fcpMessage.getField("Identifier"));
+ originalClientPut.set(fcpMessage);
+ String filename = fcpMessage.getField("Filename");
+ if (filename != null) {
+ directory.set(new File(filename).getParent());
+ }
return super.send(fcpMessage);
}
@Override
- protected void consumePutSuccessful(PutSuccessful putSuccessful) {
- if (putSuccessful.getIdentifier().equals(identifier.get())) {
- finalKey.set(new Key(putSuccessful.getURI()));
- putFinished.set(true);
+ protected void consumeURIGenerated(URIGenerated uriGenerated) {
+ for (Consumer keyGenerated : keyGenerateds) {
+ keyGenerated.accept(uriGenerated.getURI());
}
}
@Override
+ protected void consumePutSuccessful(PutSuccessful putSuccessful) {
+ finalKey.set(new Key(putSuccessful.getURI()));
+ putFinished.set(true);
+ }
+
+ @Override
protected void consumePutFailed(PutFailed putFailed) {
- if (putFailed.getIdentifier().equals(identifier.get())) {
+ putFinished.set(true);
+ }
+
+ @Override
+ protected void consumeProtocolError(ProtocolError protocolError) {
+ if (protocolError.getCode() == 25) {
+ setIdentifier(directory.get());
+ sendMessage(new TestDDARequest(directory.get(), true, false));
+ } else {
putFinished.set(true);
}
}
@Override
- protected void consumeConnectionClosed(Throwable throwable) {
- putFinished.set(true);
+ protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
+ try {
+ String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0);
+ sendMessage(new TestDDAResponse(directory.get(), readContent));
+ } catch (IOException e) {
+ sendMessage(new TestDDAResponse(directory.get(), "failed-to-read"));
+ }
+ }
+
+ @Override
+ protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
+ setIdentifier(originalClientPut.get().getField("Identifier"));
+ sendMessage(originalClientPut.get());
}
+
}
}