Add progress consumer to ClientPut command
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / ClientPutCommandImpl.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.nio.file.Files;
7 import java.util.List;
8 import java.util.Objects;
9 import java.util.Optional;
10 import java.util.concurrent.CopyOnWriteArrayList;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.atomic.AtomicLong;
14 import java.util.concurrent.atomic.AtomicReference;
15 import java.util.function.Consumer;
16 import java.util.function.Supplier;
17
18 import net.pterodactylus.fcp.ClientPut;
19 import net.pterodactylus.fcp.FcpMessage;
20 import net.pterodactylus.fcp.Key;
21 import net.pterodactylus.fcp.ProtocolError;
22 import net.pterodactylus.fcp.PutFailed;
23 import net.pterodactylus.fcp.PutSuccessful;
24 import net.pterodactylus.fcp.RequestProgress;
25 import net.pterodactylus.fcp.SimpleProgress;
26 import net.pterodactylus.fcp.TestDDAComplete;
27 import net.pterodactylus.fcp.TestDDAReply;
28 import net.pterodactylus.fcp.TestDDARequest;
29 import net.pterodactylus.fcp.TestDDAResponse;
30 import net.pterodactylus.fcp.URIGenerated;
31 import net.pterodactylus.fcp.UploadFrom;
32 import net.pterodactylus.fcp.Verbosity;
33
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.ListeningExecutorService;
36 import com.google.common.util.concurrent.MoreExecutors;
37
38 /**
39  * Default {@link ClientPutCommand} implemented based on {@link FcpDialog}.
40  *
41  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
42  */
43 class ClientPutCommandImpl implements ClientPutCommand {
44
45         private final ListeningExecutorService threadPool;
46         private final ConnectionSupplier connectionSupplier;
47         private final Supplier<String> identifierGenerator;
48         private final AtomicReference<String> redirectUri = new AtomicReference<>();
49         private final AtomicReference<File> file = new AtomicReference<>();
50         private final AtomicReference<InputStream> payload = new AtomicReference<>();
51         private final AtomicLong length = new AtomicLong();
52         private final AtomicReference<String> targetFilename = new AtomicReference<>();
53         private final List<Consumer<RequestProgress>> requestProgressConsumers = new CopyOnWriteArrayList<>();
54         private final List<Consumer<String>> keyGenerateds = new CopyOnWriteArrayList<>();
55
56         public ClientPutCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier<String> identifierGenerator) {
57                 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
58                 this.connectionSupplier = connectionSupplier;
59                 this.identifierGenerator = identifierGenerator;
60         }
61
62         @Override
63         public ClientPutCommand onProgress(Consumer<RequestProgress> requestProgressConsumer) {
64                 requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer));
65                 return this;
66         }
67
68         @Override
69         public ClientPutCommand onKeyGenerated(Consumer<String> keyGenerated) {
70                 keyGenerateds.add(keyGenerated);
71                 return this;
72         }
73
74         @Override
75         public ClientPutCommand named(String targetFilename) {
76                 this.targetFilename.set(targetFilename);
77                 return this;
78         }
79
80         @Override
81         public WithUri redirectTo(String uri) {
82                 this.redirectUri.set(Objects.requireNonNull(uri, "uri must not be null"));
83                 return this::key;
84         }
85
86         @Override
87         public WithUri from(File file) {
88                 this.file.set(Objects.requireNonNull(file, "file must not be null"));
89                 return this::key;
90         }
91
92         @Override
93         public WithLength from(InputStream inputStream) {
94                 payload.set(Objects.requireNonNull(inputStream, "inputStream must not be null"));
95                 return this::length;
96         }
97
98         private WithUri length(long length) {
99                 this.length.set(length);
100                 return this::key;
101         }
102
103         private Executable<Optional<Key>> key(String uri) {
104                 return () -> threadPool.submit(() -> execute(uri));
105         }
106
107         private Optional<Key> execute(String uri) throws InterruptedException, ExecutionException, IOException {
108                 ClientPut clientPut = createClientPutCommand(uri, identifierGenerator.get());
109                 try (ClientPutDialog clientPutDialog = new ClientPutDialog()) {
110                         return clientPutDialog.send(clientPut).get();
111                 }
112         }
113
114         private ClientPut createClientPutCommand(String uri, String identifier) {
115                 ClientPut clientPut;
116                 if (file.get() != null) {
117                         clientPut = createClientPutFromDisk(uri, identifier, file.get());
118                 } else if (redirectUri.get() != null) {
119                         clientPut = createClientPutRedirect(uri, identifier, redirectUri.get());
120                 } else {
121                         clientPut = createClientPutDirect(uri, identifier, length.get(), payload.get());
122                 }
123                 if (targetFilename.get() != null) {
124                         clientPut.setTargetFilename(targetFilename.get());
125                 }
126                 if (!requestProgressConsumers.isEmpty()) {
127                         clientPut.setVerbosity(Verbosity.PROGRESS);
128                 }
129                 return clientPut;
130         }
131
132         private ClientPut createClientPutFromDisk(String uri, String identifier, File file) {
133                 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.disk);
134                 clientPut.setFilename(file.getAbsolutePath());
135                 return clientPut;
136         }
137
138         private ClientPut createClientPutRedirect(String uri, String identifier, String redirectUri) {
139                 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.redirect);
140                 clientPut.setTargetURI(redirectUri);
141                 return clientPut;
142         }
143
144         private ClientPut createClientPutDirect(String uri, String identifier, long length, InputStream payload) {
145                 ClientPut clientPut = new ClientPut(uri, identifier, UploadFrom.direct);
146                 clientPut.setDataLength(length);
147                 clientPut.setPayloadInputStream(payload);
148                 return clientPut;
149         }
150
151         private class ClientPutDialog extends FcpDialog<Optional<Key>> {
152
153                 private final AtomicReference<FcpMessage> originalClientPut = new AtomicReference<>();
154                 private final AtomicReference<String> directory = new AtomicReference<>();
155
156                 public ClientPutDialog() throws IOException {
157                         super(ClientPutCommandImpl.this.threadPool, ClientPutCommandImpl.this.connectionSupplier.get(), Optional.<Key>empty());
158                 }
159
160                 @Override
161                 public ListenableFuture<Optional<Key>> send(FcpMessage fcpMessage) throws IOException {
162                         originalClientPut.set(fcpMessage);
163                         String filename = fcpMessage.getField("Filename");
164                         if (filename != null) {
165                                 directory.set(new File(filename).getParent());
166                         }
167                         return super.send(fcpMessage);
168                 }
169
170                 @Override
171                 protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
172                         RequestProgress requestProgress = new RequestProgress(
173                                 simpleProgress.getTotal(),
174                                 simpleProgress.getRequired(),
175                                 simpleProgress.getFailed(),
176                                 simpleProgress.getFatallyFailed(),
177                                 simpleProgress.getLastProgress(),
178                                 simpleProgress.getSucceeded(),
179                                 simpleProgress.isFinalizedTotal(),
180                                 simpleProgress.getMinSuccessFetchBlocks()
181                         );
182                         requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress));
183                 }
184
185                 @Override
186                 protected void consumeURIGenerated(URIGenerated uriGenerated) {
187                         for (Consumer<String> keyGenerated : keyGenerateds) {
188                                 keyGenerated.accept(uriGenerated.getURI());
189                         }
190                 }
191
192                 @Override
193                 protected void consumePutSuccessful(PutSuccessful putSuccessful) {
194                         setResult(Optional.of(new Key(putSuccessful.getURI())));
195                 }
196
197                 @Override
198                 protected void consumePutFailed(PutFailed putFailed) {
199                         finish();
200                 }
201
202                 @Override
203                 protected void consumeProtocolError(ProtocolError protocolError) {
204                         if (protocolError.getCode() == 25) {
205                                 setIdentifier(directory.get());
206                                 sendMessage(new TestDDARequest(directory.get(), true, false));
207                         } else {
208                                 finish();
209                         }
210                 }
211
212                 @Override
213                 protected void consumeTestDDAReply(TestDDAReply testDDAReply) {
214                         try {
215                                 String readContent = Files.readAllLines(new File(testDDAReply.getReadFilename()).toPath()).get(0);
216                                 sendMessage(new TestDDAResponse(directory.get(), readContent));
217                         } catch (IOException e) {
218                                 sendMessage(new TestDDAResponse(directory.get(), "failed-to-read"));
219                         }
220                 }
221
222                 @Override
223                 protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) {
224                         setIdentifier(originalClientPut.get().getField("Identifier"));
225                         sendMessage(originalClientPut.get());
226                 }
227
228         }
229
230 }