import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Files;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import net.pterodactylus.fcp.ClientPut;
import net.pterodactylus.fcp.FcpMessage;
import net.pterodactylus.fcp.Key;
+import net.pterodactylus.fcp.ProtocolError;
import net.pterodactylus.fcp.PutFailed;
import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.TestDDARequest;
+import net.pterodactylus.fcp.TestDDAResponse;
import net.pterodactylus.fcp.UploadFrom;
import com.google.common.util.concurrent.ListenableFuture;
private class ClientPutReplySequence extends FcpReplySequence<Optional<Key>> {
+ private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
private final AtomicReference<String> identifier = new AtomicReference<>();
+ private final AtomicReference<String> directory = new AtomicReference<>();
private final AtomicReference<Key> finalKey = new AtomicReference<>();
private final AtomicBoolean putFinished = new AtomicBoolean();
@Override
public ListenableFuture<Optional<Key>> send(FcpMessage fcpMessage) throws IOException {
+ originalClientPut.set(fcpMessage);
identifier.set(fcpMessage.getField("Identifier"));
+ String filename = fcpMessage.getField("Filename");
+ if (filename != null) {
+ directory.set(new File(filename).getParent());
+ }
return super.send(fcpMessage);
}
}
@Override
+ protected void consumeProtocolError(ProtocolError protocolError) {
+ if (protocolError.getIdentifier().equals(identifier.get()) && (protocolError.getCode() == 25)) {
+ sendMessage(new TestDDARequest(directory.get(), true, false));
+ }
+ }
+
+ @Override
+ protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
+ if (testDDAReply.getDirectory().equals(directory.get())) {
+ try {
+ String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0);
+ sendMessage(new TestDDAResponse(directory.get(), readContent));
+ } catch (IOException e) {
+ e.printStackTrace();
+ sendMessage(new TestDDAResponse(directory.get(), "failed-to-read"));
+ }
+ }
+ }
+
+ @Override
+ protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
+ if (testDDAComplete.getDirectory().equals(directory.get())) {
+ sendMessage(originalClientPut.get());
+ }
+ }
+
+ @Override
protected void consumeConnectionClosed(Throwable throwable) {
putFinished.set(true);
}
+
}
}
import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
}
+ @Test
+ public void clientPutWithFileCanCompleteTestDdaSequence()
+ throws IOException, ExecutionException, InterruptedException {
+ File tempFile = createTempFile();
+ fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).key(new Key("KSK@foo.txt"));
+ connectNode();
+ List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+ String identifier = extractIdentifier(lines);
+ fcpServer.writeLine(
+ "ProtocolError",
+ "Identifier=" + identifier,
+ "Code=25",
+ "EndMessage"
+ );
+ lines = fcpServer.collectUntil(is("EndMessage"));
+ assertThat(lines, matchesFcpMessage(
+ "TestDDARequest",
+ "Directory=" + tempFile.getParent(),
+ "WantReadDirectory=true",
+ "WantWriteDirectory=false",
+ "EndMessage"
+ ));
+ fcpServer.writeLine(
+ "TestDDAReply",
+ "Directory=" + tempFile.getParent(),
+ "ReadFilename=" + tempFile,
+ "EndMessage"
+ );
+ lines = fcpServer.collectUntil(is("EndMessage"));
+ assertThat(lines, matchesFcpMessage(
+ "TestDDAResponse",
+ "Directory=" + tempFile.getParent(),
+ "ReadContent=test-content",
+ "EndMessage"
+ ));
+ fcpServer.writeLine(
+ "TestDDAComplete",
+ "Directory=" + tempFile.getParent(),
+ "ReadDirectoryAllowed=true",
+ "EndMessage"
+ );
+ lines = fcpServer.collectUntil(is("EndMessage"));
+ assertThat(lines,
+ matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
+ "Filename=" + new File(tempFile.getParent(), "test.dat")));
+ }
+
+ private File createTempFile() throws IOException {
+ File tempFile = File.createTempFile("test-dda-", ".dat");
+ tempFile.deleteOnExit();
+ Files.write("test-content", tempFile, StandardCharsets.UTF_8);
+ return tempFile;
+ }
+
}