Send generated URIs on ClientPutDiskDir to registered consumers
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / ClientPutDiskDirCommandImpl.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.util.List;
6 import java.util.Objects;
7 import java.util.Optional;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.atomic.AtomicReference;
12 import java.util.function.Consumer;
13 import java.util.function.Supplier;
14
15 import net.pterodactylus.fcp.ClientPutDiskDir;
16 import net.pterodactylus.fcp.Key;
17 import net.pterodactylus.fcp.ProtocolError;
18 import net.pterodactylus.fcp.PutSuccessful;
19 import net.pterodactylus.fcp.RequestProgress;
20 import net.pterodactylus.fcp.SimpleProgress;
21 import net.pterodactylus.fcp.URIGenerated;
22 import net.pterodactylus.fcp.Verbosity;
23
24 import com.google.common.util.concurrent.ListeningExecutorService;
25 import com.google.common.util.concurrent.MoreExecutors;
26
27 /**
28  * Default {@link ClientPutDiskDirCommand} implemented based on {@link FcpDialog}.
29  *
30  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
31  */
32 public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand {
33
34         private final ListeningExecutorService threadPool;
35         private final ConnectionSupplier connectionSupplier;
36         private final Supplier<String> identifierGenerator;
37         private final AtomicReference<String> directory = new AtomicReference<>();
38         private final AtomicReference<String> uri = new AtomicReference<>();
39         private final List<Consumer<RequestProgress>> requestProgressConsumers = new CopyOnWriteArrayList<>();
40         private final List<Consumer<String>> keyGeneratedConsumers = new CopyOnWriteArrayList<>();
41
42         public ClientPutDiskDirCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier<String> identifierGenerator) {
43                 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
44                 this.connectionSupplier = connectionSupplier;
45                 this.identifierGenerator = identifierGenerator;
46         }
47
48         @Override
49         public ClientPutDiskDirCommand onProgress(Consumer<RequestProgress> requestProgressConsumer) {
50                 requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer));
51                 return this;
52         }
53
54         @Override
55         public ClientPutDiskDirCommand onKeyGenerated(Consumer<String> keyGenerated) {
56                 keyGeneratedConsumers.add(Objects.requireNonNull(keyGenerated));
57                 return this;
58         }
59
60         @Override
61         public WithUri fromDirectory(File directory) {
62                 this.directory.set(Objects.requireNonNull(directory).getPath());
63                 return this::uri;
64         }
65
66         public Executable<Optional<Key>> uri(String uri) {
67                 this.uri.set(Objects.requireNonNull(uri));
68                 return () -> threadPool.submit(this::execute);
69         }
70
71         private Optional<Key> execute() throws IOException, ExecutionException, InterruptedException {
72                 ClientPutDiskDir clientPutDiskDir = new ClientPutDiskDir(uri.get(), identifierGenerator.get(), directory.get());
73                 if (!requestProgressConsumers.isEmpty()) {
74                         clientPutDiskDir.setVerbosity(Verbosity.PROGRESS);
75                 }
76                 try (ClientPutDiskDirDialog clientPutDiskDirDialog = new ClientPutDiskDirDialog()) {
77                         return clientPutDiskDirDialog.send(clientPutDiskDir).get();
78                 }
79         }
80
81         private class ClientPutDiskDirDialog extends FcpDialog<Optional<Key>> {
82
83                 public ClientPutDiskDirDialog() throws IOException {
84                         super(threadPool, connectionSupplier.get(), Optional.<Key>empty());
85                 }
86
87                 @Override
88                 protected void consumePutSuccessful(PutSuccessful putSuccessful) {
89                         setResult(Optional.of(new Key(putSuccessful.getURI())));
90                 }
91
92                 @Override
93                 protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
94                         RequestProgress requestProgress = new RequestProgress(
95                                 simpleProgress.getTotal(),
96                                 simpleProgress.getRequired(),
97                                 simpleProgress.getFailed(),
98                                 simpleProgress.getFatallyFailed(),
99                                 simpleProgress.getLastProgress(),
100                                 simpleProgress.getSucceeded(),
101                                 simpleProgress.isFinalizedTotal(),
102                                 simpleProgress.getMinSuccessFetchBlocks()
103                         );
104                         requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress));
105                 }
106
107                 @Override
108                 protected void consumeURIGenerated(URIGenerated uriGenerated) {
109                         keyGeneratedConsumers.forEach(consumer -> consumer.accept(uriGenerated.getURI()));
110                 }
111
112                 @Override
113                 protected void consumeProtocolError(ProtocolError protocolError) {
114                         finish();
115                 }
116
117         }
118
119 }