Move anonymous class to its own method and use a lambda to call it
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / DefaultFcpClient.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.Callable;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.Future;
10 import java.util.concurrent.atomic.AtomicBoolean;
11 import java.util.concurrent.atomic.AtomicReference;
12 import java.util.function.Supplier;
13
14 import net.pterodactylus.fcp.AllData;
15 import net.pterodactylus.fcp.ClientGet;
16 import net.pterodactylus.fcp.ClientHello;
17 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
18 import net.pterodactylus.fcp.FcpConnection;
19 import net.pterodactylus.fcp.FcpKeyPair;
20 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
21 import net.pterodactylus.fcp.GenerateSSK;
22 import net.pterodactylus.fcp.GetFailed;
23 import net.pterodactylus.fcp.NodeHello;
24 import net.pterodactylus.fcp.Priority;
25 import net.pterodactylus.fcp.ReturnType;
26 import net.pterodactylus.fcp.SSKKeypair;
27
28 import com.google.common.io.ByteStreams;
29
30 /**
31  * Default {@link FcpClient} implementation.
32  *
33  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
34  */
35 public class DefaultFcpClient implements FcpClient {
36
37         private final ExecutorService threadPool;
38         private final String hostname;
39         private final int port;
40         private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
41         private final Supplier<String> clientName;
42         private final Supplier<String> expectedVersion;
43
44         public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
45                         Supplier<String> expectedVersion) {
46                 this.threadPool = threadPool;
47                 this.hostname = hostname;
48                 this.port = port;
49                 this.clientName = clientName;
50                 this.expectedVersion = expectedVersion;
51         }
52
53         private void connect() throws IOException {
54                 if (fcpConnection.get() != null) {
55                         return;
56                 }
57                 fcpConnection.compareAndSet(null, createConnection());
58         }
59
60         private FcpConnection createConnection() throws IOException {
61                 FcpConnection connection = new FcpConnection(hostname, port);
62                 connection.connect();
63                 AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
64                 AtomicBoolean receivedClosed = new AtomicBoolean();
65                 FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
66                 nodeHelloSequence
67                                 .handle(NodeHello.class)
68                                 .with((nodeHello) -> receivedNodeHello.set(nodeHello));
69                 nodeHelloSequence
70                                 .handle(CloseConnectionDuplicateClientName.class)
71                                 .with((closeConnection) -> receivedClosed.set(true));
72                 nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
73                 ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
74                 try {
75                         nodeHelloSequence.send(clientHello).get();
76                 } catch (InterruptedException | ExecutionException e) {
77                         connection.close();
78                         throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
79                 }
80                 return connection;
81         }
82
83         @Override
84         public GenerateKeypairCommand generateKeypair() {
85                 return new GenerateKeypairCommandImpl();
86         }
87
88         private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
89
90                 @Override
91                 public Future<FcpKeyPair> execute() {
92                         return threadPool.submit(() -> {
93                                 connect();
94                                 Sequence sequence = new Sequence();
95                                 FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
96                                 replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair);
97                                 replySequence.waitFor(sequence::isFinished);
98                                 replySequence.send(new GenerateSSK()).get();
99                                 return sequence.getKeyPair();
100                         });
101                 }
102
103                 private class Sequence {
104
105                         private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
106
107                         public void handleSSKKeypair(SSKKeypair sskKeypair) {
108                                 keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
109                         }
110
111                         public boolean isFinished() {
112                                 return keyPair.get() != null;
113                         }
114
115                         public FcpKeyPair getKeyPair() {
116                                 return keyPair.get();
117                         }
118
119                 }
120
121         }
122
123         @Override
124         public ClientGetCommand clientGet() {
125                 return new ClientGetCommandImpl();
126         }
127
128         private class ClientGetCommandImpl implements ClientGetCommand {
129
130                 private String identifier;
131                 private boolean ignoreDataStore;
132                 private boolean dataStoreOnly;
133                 private Long maxSize;
134                 private Priority priority;
135                 private boolean realTime;
136                 private boolean global;
137
138                 @Override
139                 public ClientGetCommand identifier(String identifier) {
140                         this.identifier = identifier;
141                         return this;
142                 }
143
144                 @Override
145                 public ClientGetCommand ignoreDataStore() {
146                         ignoreDataStore = true;
147                         return this;
148                 }
149
150                 @Override
151                 public ClientGetCommand dataStoreOnly() {
152                         dataStoreOnly = true;
153                         return this;
154                 }
155
156                 @Override
157                 public ClientGetCommand maxSize(long maxSize) {
158                         this.maxSize = maxSize;
159                         return this;
160                 }
161
162                 @Override
163                 public ClientGetCommand priority(Priority priority) {
164                         this.priority = priority;
165                         return this;
166                 }
167
168                 @Override
169                 public ClientGetCommand realTime() {
170                         realTime = true;
171                         return this;
172                 }
173
174                 @Override
175                 public ClientGetCommand global() {
176                         global = true;
177                         return this;
178                 }
179
180                 @Override
181                 public Future<Optional<Data>> uri(String uri) {
182                         return threadPool.submit(() -> execute(uri));
183                 }
184
185                 private Optional<Data> execute(String uri) throws IOException, ExecutionException, InterruptedException {
186                         DefaultFcpClient.this.connect();
187                         ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
188                         if (ignoreDataStore) {
189                                 clientGet.setIgnoreDataStore(true);
190                         }
191                         if (dataStoreOnly) {
192                                 clientGet.setDataStoreOnly(true);
193                         }
194                         if (maxSize != null) {
195                                 clientGet.setMaxSize(maxSize);
196                         }
197                         if (priority != null) {
198                                 clientGet.setPriority(priority);
199                         }
200                         if (realTime) {
201                                 clientGet.setRealTimeFlag(true);
202                         }
203                         if (global) {
204                                 clientGet.setGlobal(true);
205                         }
206                         try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) {
207                                 Sequence sequence = new Sequence(identifier);
208                                 replySequence.handle(AllData.class).with(sequence::allData);
209                                 replySequence.handle(GetFailed.class).with(sequence::getFailed);
210                                 replySequence.handleClose().with(sequence::disconnect);
211                                 replySequence.waitFor(sequence::isFinished);
212                                 replySequence.send(clientGet).get();
213                                 return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty();
214                         }
215                 }
216
217                 private class Sequence {
218
219                         private final AtomicBoolean finished = new AtomicBoolean();
220                         private final AtomicBoolean failed = new AtomicBoolean();
221
222                         private final String identifier;
223
224                         private String contentType;
225                         private long dataLength;
226                         private InputStream payload;
227
228                         private Sequence(String identifier) {
229                                 this.identifier = identifier;
230                         }
231
232                         public boolean isFinished() {
233                                 return finished.get() || failed.get();
234                         }
235
236                         public boolean isSuccessful() {
237                                 return !failed.get();
238                         }
239
240                         public Data getData() {
241                                 return new Data() {
242                                         @Override
243                                         public String getMimeType() {
244                                                 synchronized (Sequence.this) {
245                                                         return contentType;
246                                                 }
247                                         }
248
249                                         @Override
250                                         public long size() {
251                                                 synchronized (Sequence.this) {
252                                                         return dataLength;
253                                                 }
254                                         }
255
256                                         @Override
257                                         public InputStream getInputStream() {
258                                                 synchronized (Sequence.this) {
259                                                         return payload;
260                                                 }
261                                         }
262                                 };
263                         }
264
265                         public void allData(AllData allData) {
266                                 if (allData.getIdentifier().equals(identifier)) {
267                                         synchronized (this) {
268                                                 contentType = allData.getContentType();
269                                                 dataLength = allData.getDataLength();
270                                                 try {
271                                                         payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
272                                                         finished.set(true);
273                                                 } catch (IOException e) {
274                                                         // TODO – logging
275                                                         failed.set(true);
276                                                 }
277                                         }
278                                 }
279                         }
280
281                         public void getFailed(GetFailed getFailed) {
282                                 if (getFailed.getIdentifier().equals(identifier)) {
283                                         failed.set(true);
284                                 }
285                         }
286
287                         public void disconnect(Throwable t) {
288                                 failed.set(true);
289                         }
290
291                 }
292
293         }
294
295 }
296