Use listenable future instead of plain ones
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / ClientGetCommandImpl.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutorService;
7 import java.util.concurrent.atomic.AtomicBoolean;
8 import java.util.concurrent.atomic.AtomicReference;
9
10 import net.pterodactylus.fcp.AllData;
11 import net.pterodactylus.fcp.ClientGet;
12 import net.pterodactylus.fcp.FcpMessage;
13 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
14 import net.pterodactylus.fcp.GetFailed;
15 import net.pterodactylus.fcp.Priority;
16 import net.pterodactylus.fcp.ReturnType;
17
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
21
22 /**
23  * Implementation of the {@link ClientGetCommand}.
24  *
25  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
26  */
27 class ClientGetCommandImpl implements ClientGetCommand {
28
29         private final ListeningExecutorService threadPool;
30         private final ConnectionSupplier connectionSupplier;
31
32         private boolean ignoreDataStore;
33         private boolean dataStoreOnly;
34         private Long maxSize;
35         private Priority priority;
36         private boolean realTime;
37         private boolean global;
38
39         public ClientGetCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
40                 this.threadPool = MoreExecutors.listeningDecorator(threadPool);
41                 this.connectionSupplier = connectionSupplier;
42         }
43
44         @Override
45         public ClientGetCommand ignoreDataStore() {
46                 ignoreDataStore = true;
47                 return this;
48         }
49
50         @Override
51         public ClientGetCommand dataStoreOnly() {
52                 dataStoreOnly = true;
53                 return this;
54         }
55
56         @Override
57         public ClientGetCommand maxSize(long maxSize) {
58                 this.maxSize = maxSize;
59                 return this;
60         }
61
62         @Override
63         public ClientGetCommand priority(Priority priority) {
64                 this.priority = priority;
65                 return this;
66         }
67
68         @Override
69         public ClientGetCommand realTime() {
70                 realTime = true;
71                 return this;
72         }
73
74         @Override
75         public ClientGetCommand global() {
76                 global = true;
77                 return this;
78         }
79
80         @Override
81         public ListenableFuture<Optional<Data>> uri(String uri) {
82                 ClientGet clientGet = createClientGetCommand(uri);
83                 return threadPool.submit(() -> new ClientGetReplySequence().send(clientGet).get());
84         }
85
86         private ClientGet createClientGetCommand(String uri) {
87                 String identifier = new RandomIdentifierGenerator().generate();
88                 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
89                 if (ignoreDataStore) {
90                         clientGet.setIgnoreDataStore(true);
91                 }
92                 if (dataStoreOnly) {
93                         clientGet.setDataStoreOnly(true);
94                 }
95                 if (maxSize != null) {
96                         clientGet.setMaxSize(maxSize);
97                 }
98                 if (priority != null) {
99                         clientGet.setPriority(priority);
100                 }
101                 if (realTime) {
102                         clientGet.setRealTimeFlag(true);
103                 }
104                 if (global) {
105                         clientGet.setGlobal(true);
106                 }
107                 return clientGet;
108         }
109
110         private class ClientGetReplySequence extends FcpReplySequence<Optional<Data>> {
111
112                 private final AtomicReference<String> identifier = new AtomicReference<>();
113                 private final AtomicBoolean finished = new AtomicBoolean();
114                 private final AtomicBoolean failed = new AtomicBoolean();
115
116                 private String contentType;
117                 private long dataLength;
118                 private InputStream payload;
119
120                 public ClientGetReplySequence() throws IOException {
121                         super(ClientGetCommandImpl.this.threadPool, ClientGetCommandImpl.this.connectionSupplier.get());
122                 }
123
124                 @Override
125                 protected boolean isFinished() {
126                         return finished.get() || failed.get();
127                 }
128
129                 @Override
130                 protected Optional<Data> getResult() {
131                         return failed.get() ? Optional.empty() : Optional.of(new Data() {
132                                 @Override
133                                 public String getMimeType() {
134                                         return contentType;
135                                 }
136
137                                 @Override
138                                 public long size() {
139                                         return dataLength;
140                                 }
141
142                                 @Override
143                                 public InputStream getInputStream() {
144                                         return payload;
145                                 }
146                         });
147                 }
148
149                 @Override
150                 protected void consumeAllData(AllData allData) {
151                         if (allData.getIdentifier().equals(identifier.get())) {
152                                 synchronized (this) {
153                                         contentType = allData.getContentType();
154                                         dataLength = allData.getDataLength();
155                                         try {
156                                                 payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
157                                                 finished.set(true);
158                                         } catch (IOException e) {
159                                                 // TODO – logging
160                                                 failed.set(true);
161                                         }
162                                 }
163                         }
164                 }
165
166                 @Override
167                 protected void consumeGetFailed(GetFailed getFailed) {
168                         if (getFailed.getIdentifier().equals(identifier.get())) {
169                                 failed.set(true);
170                         }
171                 }
172
173                 @Override
174                 protected void consumeConnectionClosed(Throwable throwable) {
175                         failed.set(true);
176                 }
177
178                 @Override
179                 public ListenableFuture<Optional<Data>> send(FcpMessage fcpMessage) throws IOException {
180                         identifier.set(fcpMessage.getField("Identifier"));
181                         return super.send(fcpMessage);
182                 }
183
184         }
185
186 }