X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FClientPutCommandImpl.java;h=06d54e0b4ef2f4811eca52b3cc3b85bc445f5642;hb=54b77863a4a3d63a0298157a87afb09007b03fc4;hp=f0a670616f244fa3c82a41129b2f3a305d4dd29a;hpb=12eb8641be8a71957856ec76c9522d80394e5cd6;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java index f0a6706..06d54e0 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutCommandImpl.java @@ -4,12 +4,16 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; +import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import net.pterodactylus.fcp.ClientPut; import net.pterodactylus.fcp.FcpMessage; @@ -21,6 +25,7 @@ import net.pterodactylus.fcp.TestDDAComplete; import net.pterodactylus.fcp.TestDDAReply; import net.pterodactylus.fcp.TestDDARequest; import net.pterodactylus.fcp.TestDDAResponse; +import net.pterodactylus.fcp.URIGenerated; import net.pterodactylus.fcp.UploadFrom; import com.google.common.util.concurrent.ListenableFuture; @@ -28,7 +33,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; /** - * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}. + * Default {@link ClientPutCommand} implemented based on {@link FcpDialog}. * * @author David ‘Bombe’ Roden */ @@ -41,6 +46,7 @@ class ClientPutCommandImpl implements ClientPutCommand { private final AtomicReference payload = new AtomicReference<>(); private final AtomicLong length = new AtomicLong(); private final AtomicReference targetFilename = new AtomicReference<>(); + private final List> keyGenerateds = new CopyOnWriteArrayList<>(); public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) { this.threadPool = MoreExecutors.listeningDecorator(threadPool); @@ -48,38 +54,50 @@ class ClientPutCommandImpl implements ClientPutCommand { } @Override + public ClientPutCommand onKeyGenerated(Consumer keyGenerated) { + keyGenerateds.add(keyGenerated); + return this; + } + + @Override public ClientPutCommand named(String targetFilename) { this.targetFilename.set(targetFilename); return this; } @Override - public Keyed> redirectTo(Key key) { - this.redirectUri.set(Objects.requireNonNull(key, "key must not be null").getKey()); + public WithUri redirectTo(String uri) { + this.redirectUri.set(Objects.requireNonNull(uri, "uri must not be null")); return this::key; } @Override - public Keyed> from(File file) { + public WithUri from(File file) { this.file.set(Objects.requireNonNull(file, "file must not be null")); return this::key; } @Override - public Lengthed>> from(InputStream inputStream) { + public WithLength from(InputStream inputStream) { payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null")); return this::length; } - private Keyed> length(long length) { + private WithUri length(long length) { this.length.set(length); return this::key; } - private ListenableFuture> key(Key key) { + private Executable> key(String uri) { + return () -> threadPool.submit(() -> execute(uri)); + } + + private Optional execute(String uri) throws InterruptedException, ExecutionException, IOException { String identifier = new RandomIdentifierGenerator().generate(); - ClientPut clientPut = createClientPutCommand(key.getKey(), identifier); - return threadPool.submit(() -> new ClientPutReplySequence().send(clientPut).get()); + ClientPut clientPut = createClientPutCommand(uri, identifier); + try (ClientPutDialog clientPutDialog = new ClientPutDialog()) { + return clientPutDialog.send(clientPut).get(); + } } private ClientPut createClientPutCommand(String uri, String identifier) { @@ -116,15 +134,14 @@ class ClientPutCommandImpl implements ClientPutCommand { return clientPut; } - private class ClientPutReplySequence extends FcpReplySequence> { + private class ClientPutDialog extends FcpDialog> { private final AtomicReference originalClientPut = new AtomicReference<>(); - private final AtomicReference identifier = new AtomicReference<>(); private final AtomicReference directory = new AtomicReference<>(); private final AtomicReference finalKey = new AtomicReference<>(); private final AtomicBoolean putFinished = new AtomicBoolean(); - public ClientPutReplySequence() throws IOException { + public ClientPutDialog() throws IOException { super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get()); } @@ -141,7 +158,6 @@ class ClientPutCommandImpl implements ClientPutCommand { @Override public ListenableFuture> send(FcpMessage fcpMessage) throws IOException { originalClientPut.set(fcpMessage); - identifier.set(fcpMessage.getField("Identifier")); String filename = fcpMessage.getField("Filename"); if (filename != null) { directory.set(new File(filename).getParent()); @@ -150,50 +166,47 @@ class ClientPutCommandImpl implements ClientPutCommand { } @Override - protected void consumePutSuccessful(PutSuccessful putSuccessful) { - if (putSuccessful.getIdentifier().equals(identifier.get())) { - finalKey.set(new Key(putSuccessful.getURI())); - putFinished.set(true); + protected void consumeURIGenerated(URIGenerated uriGenerated) { + for (Consumer keyGenerated : keyGenerateds) { + keyGenerated.accept(uriGenerated.getURI()); } } @Override + protected void consumePutSuccessful(PutSuccessful putSuccessful) { + finalKey.set(new Key(putSuccessful.getURI())); + putFinished.set(true); + } + + @Override protected void consumePutFailed(PutFailed putFailed) { - if (putFailed.getIdentifier().equals(identifier.get())) { - putFinished.set(true); - } + putFinished.set(true); } @Override protected void consumeProtocolError(ProtocolError protocolError) { - if (protocolError.getIdentifier().equals(identifier.get()) && (protocolError.getCode() == 25)) { + if (protocolError.getCode() == 25) { + setIdentifier(directory.get()); sendMessage(new TestDDARequest(directory.get(), true, false)); + } else { + putFinished.set(true); } } @Override protected void consumeTestDDAReply(TestDDAReply testDDAReply) { - if (testDDAReply.getDirectory().equals(directory.get())) { - try { - String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0); - sendMessage(new TestDDAResponse(directory.get(), readContent)); - } catch (IOException e) { - e.printStackTrace(); - sendMessage(new TestDDAResponse(directory.get(), "failed-to-read")); - } + try { + String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0); + sendMessage(new TestDDAResponse(directory.get(), readContent)); + } catch (IOException e) { + sendMessage(new TestDDAResponse(directory.get(), "failed-to-read")); } } @Override protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { - if (testDDAComplete.getDirectory().equals(directory.get())) { - sendMessage(originalClientPut.get()); - } - } - - @Override - protected void consumeConnectionClosed(Throwable throwable) { - putFinished.set(true); + setIdentifier(originalClientPut.get().getField("Identifier")); + sendMessage(originalClientPut.get()); } }