1 package net.pterodactylus.fcp.quelaton;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.nio.file.Files;
7 import java.util.Objects;
8 import java.util.Optional;
9 import java.util.concurrent.ExecutorService;
10 import java.util.concurrent.atomic.AtomicBoolean;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.util.concurrent.atomic.AtomicReference;
14 import net.pterodactylus.fcp.ClientPut;
15 import net.pterodactylus.fcp.FcpMessage;
16 import net.pterodactylus.fcp.Key;
17 import net.pterodactylus.fcp.ProtocolError;
18 import net.pterodactylus.fcp.PutFailed;
19 import net.pterodactylus.fcp.PutSuccessful;
20 import net.pterodactylus.fcp.TestDDAComplete;
21 import net.pterodactylus.fcp.TestDDAReply;
22 import net.pterodactylus.fcp.TestDDARequest;
23 import net.pterodactylus.fcp.TestDDAResponse;
24 import net.pterodactylus.fcp.UploadFrom;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.ListeningExecutorService;
28 import com.google.common.util.concurrent.MoreExecutors;
31 * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}.
33 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
35 class ClientPutCommandImpl implements ClientPutCommand {
37 private final ListeningExecutorService threadPool;
38 private final ConnectionSupplier connectionSupplier;
39 private final AtomicReference<String> redirectUri = new AtomicReference<>();
40 private final AtomicReference<File> file = new AtomicReference<>();
41 private final AtomicReference<InputStream> payload = new AtomicReference<>();
42 private final AtomicLong length = new AtomicLong();
43 private final AtomicReference<String> targetFilename = new AtomicReference<>();
45 public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
46 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
47 this.connectionSupplier = connectionSupplier;
51 public ClientPutCommand named(String targetFilename) {
52 this.targetFilename.set(targetFilename);
57 public WithUri<Optional<Key>> redirectTo(String uri) {
58 this.redirectUri.set(Objects.requireNonNull(uri, "uri must not be null"));
63 public WithUri<Optional<Key>> from(File file) {
64 this.file.set(Objects.requireNonNull(file, "file must not be null"));
69 public WithLength<WithUri<Optional<Key>>> from(InputStream inputStream) {
70 payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null"));
74 private WithUri<Optional<Key>> length(long length) {
75 this.length.set(length);
79 private ListenableFuture<Optional<Key>> key(String uri) {
80 String identifier = new RandomIdentifierGenerator().generate();
81 ClientPut clientPut = createClientPutCommand(uri, identifier);
82 return threadPool.submit(() -> new ClientPutReplySequence().send(clientPut).get());
85 private ClientPut createClientPutCommand(String uri, String identifier) {
87 if (file.get() != null) {
88 clientPut = createClientPutFromDisk(uri, identifier, file.get());
89 } else if (redirectUri.get() != null) {
90 clientPut = createClientPutRedirect(uri, identifier, redirectUri.get());
92 clientPut = createClientPutDirect(uri, identifier, length.get(), payload.get());
94 if (targetFilename.get() != null) {
95 clientPut.setTargetFilename(targetFilename.get());
100 private ClientPut createClientPutFromDisk(String uri, String identifier, File file) {
101 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.disk);
102 clientPut.setFilename(file.getAbsolutePath());
106 private ClientPut createClientPutRedirect(String uri, String identifier, String redirectUri) {
107 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.redirect);
108 clientPut.setTargetURI(redirectUri);
112 private ClientPut createClientPutDirect(String uri, String identifier, long length, InputStream payload) {
113 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.direct);
114 clientPut.setDataLength(length);
115 clientPut.setPayloadInputStream(payload);
119 private class ClientPutReplySequence extends FcpReplySequence<Optional<Key>> {
121 private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
122 private final AtomicReference<String> identifier = new AtomicReference<>();
123 private final AtomicReference<String> directory = new AtomicReference<>();
124 private final AtomicReference<Key> finalKey = new AtomicReference<>();
125 private final AtomicBoolean putFinished = new AtomicBoolean();
127 public ClientPutReplySequence() throws IOException {
128 super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
132 protected boolean isFinished() {
133 return putFinished.get();
137 protected Optional<Key> getResult() {
138 return Optional.ofNullable(finalKey.get());
142 public ListenableFuture<Optional<Key>> send(FcpMessage fcpMessage) throws IOException {
143 originalClientPut.set(fcpMessage);
144 identifier.set(fcpMessage.getField("Identifier"));
145 String filename = fcpMessage.getField("Filename");
146 if (filename != null) {
147 directory.set(new File(filename).getParent());
149 return super.send(fcpMessage);
153 protected void consumePutSuccessful(PutSuccessful putSuccessful) {
154 if (putSuccessful.getIdentifier().equals(identifier.get())) {
155 finalKey.set(new Key(putSuccessful.getURI()));
156 putFinished.set(true);
161 protected void consumePutFailed(PutFailed putFailed) {
162 if (putFailed.getIdentifier().equals(identifier.get())) {
163 putFinished.set(true);
168 protected void consumeProtocolError(ProtocolError protocolError) {
169 if (protocolError.getIdentifier().equals(identifier.get())) {
170 if (protocolError.getCode() == 25) {
171 setIdentifier(directory.get());
172 sendMessage(new TestDDARequest(directory.get(), true, false));
174 putFinished.set(true);
180 protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
181 if (testDDAReply.getDirectory().equals(directory.get())) {
183 String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0);
184 sendMessage(new TestDDAResponse(directory.get(), readContent));
185 } catch (IOException e) {
186 sendMessage(new TestDDAResponse(directory.get(), "failed-to-read"));
192 protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
193 if (testDDAComplete.getDirectory().equals(directory.get())) {
194 setIdentifier(originalClientPut.get().getField("Identifier"));
195 sendMessage(originalClientPut.get());
200 protected void consumeConnectionClosed(Throwable throwable) {
201 putFinished.set(true);