Close FCP reply sequences after use
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / ClientPutCommandImpl.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.nio.file.Files;
7 import java.util.Objects;
8 import java.util.Optional;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.atomic.AtomicBoolean;
12 import java.util.concurrent.atomic.AtomicLong;
13 import java.util.concurrent.atomic.AtomicReference;
14
15 import net.pterodactylus.fcp.ClientPut;
16 import net.pterodactylus.fcp.FcpMessage;
17 import net.pterodactylus.fcp.Key;
18 import net.pterodactylus.fcp.ProtocolError;
19 import net.pterodactylus.fcp.PutFailed;
20 import net.pterodactylus.fcp.PutSuccessful;
21 import net.pterodactylus.fcp.TestDDAComplete;
22 import net.pterodactylus.fcp.TestDDAReply;
23 import net.pterodactylus.fcp.TestDDARequest;
24 import net.pterodactylus.fcp.TestDDAResponse;
25 import net.pterodactylus.fcp.UploadFrom;
26
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.ListeningExecutorService;
29 import com.google.common.util.concurrent.MoreExecutors;
30
31 /**
32  * Default {@link ClientPutCommand} implemented based on {@link FcpReplySequence}.
33  *
34  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
35  */
36 class ClientPutCommandImpl implements ClientPutCommand {
37
38         private final ListeningExecutorService threadPool;
39         private final ConnectionSupplier connectionSupplier;
40         private final AtomicReference<String> redirectUri = new AtomicReference<>();
41         private final AtomicReference<File> file = new AtomicReference<>();
42         private final AtomicReference<InputStream> payload = new AtomicReference<>();
43         private final AtomicLong length = new AtomicLong();
44         private final AtomicReference<String> targetFilename = new AtomicReference<>();
45
46         public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
47                 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
48                 this.connectionSupplier = connectionSupplier;
49         }
50
51         @Override
52         public ClientPutCommand named(String targetFilename) {
53                 this.targetFilename.set(targetFilename);
54                 return this;
55         }
56
57         @Override
58         public WithUri<Executable<Optional<Key>>> redirectTo(String uri) {
59                 this.redirectUri.set(Objects.requireNonNull(uri, "uri must not be null"));
60                 return this::key;
61         }
62
63         @Override
64         public WithUri<Executable<Optional<Key>>> from(File file) {
65                 this.file.set(Objects.requireNonNull(file, "file must not be null"));
66                 return this::key;
67         }
68
69         @Override
70         public WithLength<WithUri<Executable<Optional<Key>>>> from(InputStream inputStream) {
71                 payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null"));
72                 return this::length;
73         }
74
75         private WithUri<Executable<Optional<Key>>> length(long length) {
76                 this.length.set(length);
77                 return this::key;
78         }
79
80         private Executable<Optional<Key>> key(String uri) {
81                 return () -> threadPool.submit(() -> execute(uri));
82         }
83
84         private Optional<Key> execute(String uri) throws InterruptedException, ExecutionException, IOException {
85                 String identifier = new RandomIdentifierGenerator().generate();
86                 ClientPut clientPut = createClientPutCommand(uri, identifier);
87                 try (ClientPutReplySequence clientPutReplySequence = new ClientPutReplySequence()) {
88                         return clientPutReplySequence.send(clientPut).get();
89                 }
90         }
91
92         private ClientPut createClientPutCommand(String uri, String identifier) {
93                 ClientPut clientPut;
94                 if (file.get() != null) {
95                         clientPut = createClientPutFromDisk(uri, identifier, file.get());
96                 } else if (redirectUri.get() != null) {
97                         clientPut = createClientPutRedirect(uri, identifier, redirectUri.get());
98                 } else {
99                         clientPut = createClientPutDirect(uri, identifier, length.get(), payload.get());
100                 }
101                 if (targetFilename.get() != null) {
102                         clientPut.setTargetFilename(targetFilename.get());
103                 }
104                 return clientPut;
105         }
106
107         private ClientPut createClientPutFromDisk(String uri, String identifier, File file) {
108                 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.disk);
109                 clientPut.setFilename(file.getAbsolutePath());
110                 return clientPut;
111         }
112
113         private ClientPut createClientPutRedirect(String uri, String identifier, String redirectUri) {
114                 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.redirect);
115                 clientPut.setTargetURI(redirectUri);
116                 return clientPut;
117         }
118
119         private ClientPut createClientPutDirect(String uri, String identifier, long length, InputStream payload) {
120                 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.direct);
121                 clientPut.setDataLength(length);
122                 clientPut.setPayloadInputStream(payload);
123                 return clientPut;
124         }
125
126         private class ClientPutReplySequence extends FcpReplySequence<Optional<Key>> {
127
128                 private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
129                 private final AtomicReference<String> directory = new AtomicReference<>();
130                 private final AtomicReference<Key> finalKey = new AtomicReference<>();
131                 private final AtomicBoolean putFinished = new AtomicBoolean();
132
133                 public ClientPutReplySequence() throws IOException {
134                         super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get());
135                 }
136
137                 @Override
138                 protected boolean isFinished() {
139                         return putFinished.get();
140                 }
141
142                 @Override
143                 protected Optional<Key> getResult() {
144                         return Optional.ofNullable(finalKey.get());
145                 }
146
147                 @Override
148                 public ListenableFuture<Optional<Key>> send(FcpMessage fcpMessage) throws IOException {
149                         originalClientPut.set(fcpMessage);
150                         String filename = fcpMessage.getField("Filename");
151                         if (filename != null) {
152                                 directory.set(new File(filename).getParent());
153                         }
154                         return super.send(fcpMessage);
155                 }
156
157                 @Override
158                 protected void consumePutSuccessful(PutSuccessful putSuccessful) {
159                         finalKey.set(new Key(putSuccessful.getURI()));
160                         putFinished.set(true);
161                 }
162
163                 @Override
164                 protected void consumePutFailed(PutFailed putFailed) {
165                         putFinished.set(true);
166                 }
167
168                 @Override
169                 protected void consumeProtocolError(ProtocolError protocolError) {
170                         if (protocolError.getCode() == 25) {
171                                 setIdentifier(directory.get());
172                                 sendMessage(new TestDDARequest(directory.get(), true, false));
173                         } else {
174                                 putFinished.set(true);
175                         }
176                 }
177
178                 @Override
179                 protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
180                         try {
181                                 String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0);
182                                 sendMessage(new TestDDAResponse(directory.get(), readContent));
183                         } catch (IOException e) {
184                                 sendMessage(new TestDDAResponse(directory.get(), "failed-to-read"));
185                         }
186                 }
187
188                 @Override
189                 protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
190                         setIdentifier(originalClientPut.get().getField("Identifier"));
191                         sendMessage(originalClientPut.get());
192                 }
193
194         }
195
196 }