Send progress updates from ClientPutDiskDir
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / ClientPutDiskDirCommandImpl.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.util.List;
6 import java.util.Objects;
7 import java.util.Optional;
8 import java.util.concurrent.CopyOnWriteArrayList;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.atomic.AtomicReference;
12 import java.util.function.Consumer;
13 import java.util.function.Supplier;
14
15 import net.pterodactylus.fcp.ClientPutDiskDir;
16 import net.pterodactylus.fcp.Key;
17 import net.pterodactylus.fcp.ProtocolError;
18 import net.pterodactylus.fcp.PutSuccessful;
19 import net.pterodactylus.fcp.RequestProgress;
20 import net.pterodactylus.fcp.SimpleProgress;
21 import net.pterodactylus.fcp.Verbosity;
22
23 import com.google.common.util.concurrent.ListeningExecutorService;
24 import com.google.common.util.concurrent.MoreExecutors;
25
26 /**
27  * Default {@link ClientPutDiskDirCommand} implemented based on {@link FcpDialog}.
28  *
29  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
30  */
31 public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand {
32
33         private final ListeningExecutorService threadPool;
34         private final ConnectionSupplier connectionSupplier;
35         private final Supplier<String> identifierGenerator;
36         private final AtomicReference<String> directory = new AtomicReference<>();
37         private final AtomicReference<String> uri = new AtomicReference<>();
38         private final List<Consumer<RequestProgress>> requestProgressConsumers = new CopyOnWriteArrayList<>();
39
40         public ClientPutDiskDirCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier<String> identifierGenerator) {
41                 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
42                 this.connectionSupplier = connectionSupplier;
43                 this.identifierGenerator = identifierGenerator;
44         }
45
46         @Override
47         public ClientPutDiskDirCommand onProgress(Consumer<RequestProgress> requestProgressConsumer) {
48                 requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer));
49                 return this;
50         }
51
52         @Override
53         public WithUri fromDirectory(File directory) {
54                 this.directory.set(Objects.requireNonNull(directory).getPath());
55                 return this::uri;
56         }
57
58         public Executable<Optional<Key>> uri(String uri) {
59                 this.uri.set(Objects.requireNonNull(uri));
60                 return () -> threadPool.submit(this::execute);
61         }
62
63         private Optional<Key> execute() throws IOException, ExecutionException, InterruptedException {
64                 ClientPutDiskDir clientPutDiskDir = new ClientPutDiskDir(uri.get(), identifierGenerator.get(), directory.get());
65                 if (!requestProgressConsumers.isEmpty()) {
66                         clientPutDiskDir.setVerbosity(Verbosity.PROGRESS);
67                 }
68                 try (ClientPutDiskDirDialog clientPutDiskDirDialog = new ClientPutDiskDirDialog()) {
69                         return clientPutDiskDirDialog.send(clientPutDiskDir).get();
70                 }
71         }
72
73         private class ClientPutDiskDirDialog extends FcpDialog<Optional<Key>> {
74
75                 public ClientPutDiskDirDialog() throws IOException {
76                         super(threadPool, connectionSupplier.get(), Optional.<Key>empty());
77                 }
78
79                 @Override
80                 protected void consumePutSuccessful(PutSuccessful putSuccessful) {
81                         setResult(Optional.of(new Key(putSuccessful.getURI())));
82                 }
83
84                 @Override
85                 protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
86                         RequestProgress requestProgress = new RequestProgress(
87                                 simpleProgress.getTotal(),
88                                 simpleProgress.getRequired(),
89                                 simpleProgress.getFailed(),
90                                 simpleProgress.getFatallyFailed(),
91                                 simpleProgress.getLastProgress(),
92                                 simpleProgress.getSucceeded(),
93                                 simpleProgress.isFinalizedTotal(),
94                                 simpleProgress.getMinSuccessFetchBlocks()
95                         );
96                         requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress));
97                 }
98
99                 @Override
100                 protected void consumeProtocolError(ProtocolError protocolError) {
101                         finish();
102                 }
103
104         }
105
106 }