Add command that retrieves data from Freenet
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / DefaultFcpClient.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.Callable;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.Future;
10 import java.util.concurrent.atomic.AtomicBoolean;
11 import java.util.concurrent.atomic.AtomicReference;
12 import java.util.function.Supplier;
13
14 import net.pterodactylus.fcp.AllData;
15 import net.pterodactylus.fcp.ClientGet;
16 import net.pterodactylus.fcp.ClientHello;
17 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
18 import net.pterodactylus.fcp.FcpConnection;
19 import net.pterodactylus.fcp.FcpKeyPair;
20 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
21 import net.pterodactylus.fcp.GenerateSSK;
22 import net.pterodactylus.fcp.GetFailed;
23 import net.pterodactylus.fcp.NodeHello;
24 import net.pterodactylus.fcp.Priority;
25 import net.pterodactylus.fcp.ReturnType;
26 import net.pterodactylus.fcp.SSKKeypair;
27
28 import com.google.common.io.ByteStreams;
29
30 /**
31  * Default {@link FcpClient} implementation.
32  *
33  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
34  */
35 public class DefaultFcpClient implements FcpClient {
36
37         private final ExecutorService threadPool;
38         private final String hostname;
39         private final int port;
40         private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
41         private final Supplier<String> clientName;
42         private final Supplier<String> expectedVersion;
43
44         public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
45                         Supplier<String> expectedVersion) {
46                 this.threadPool = threadPool;
47                 this.hostname = hostname;
48                 this.port = port;
49                 this.clientName = clientName;
50                 this.expectedVersion = expectedVersion;
51         }
52
53         private void connect() throws IOException {
54                 if (fcpConnection.get() != null) {
55                         return;
56                 }
57                 fcpConnection.compareAndSet(null, createConnection());
58         }
59
60         private FcpConnection createConnection() throws IOException {
61                 FcpConnection connection = new FcpConnection(hostname, port);
62                 connection.connect();
63                 AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
64                 AtomicBoolean receivedClosed = new AtomicBoolean();
65                 FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
66                 nodeHelloSequence
67                                 .handle(NodeHello.class)
68                                 .with((nodeHello) -> receivedNodeHello.set(nodeHello));
69                 nodeHelloSequence
70                                 .handle(CloseConnectionDuplicateClientName.class)
71                                 .with((closeConnection) -> receivedClosed.set(true));
72                 nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
73                 ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
74                 try {
75                         nodeHelloSequence.send(clientHello).get();
76                 } catch (InterruptedException | ExecutionException e) {
77                         connection.close();
78                         throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
79                 }
80                 return connection;
81         }
82
83         @Override
84         public GenerateKeypairCommand generateKeypair() {
85                 return new GenerateKeypairCommandImpl();
86         }
87
88         private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
89
90                 @Override
91                 public Future<FcpKeyPair> execute() {
92                         return threadPool.submit(() -> {
93                                 connect();
94                                 Sequence sequence = new Sequence();
95                                 FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
96                                 replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair);
97                                 replySequence.waitFor(sequence::isFinished);
98                                 replySequence.send(new GenerateSSK()).get();
99                                 return sequence.getKeyPair();
100                         });
101                 }
102
103                 private class Sequence {
104
105                         private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
106
107                         public void handleSSKKeypair(SSKKeypair sskKeypair) {
108                                 keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
109                         }
110
111                         public boolean isFinished() {
112                                 return keyPair.get() != null;
113                         }
114
115                         public FcpKeyPair getKeyPair() {
116                                 return keyPair.get();
117                         }
118
119                 }
120
121         }
122
123         @Override
124         public ClientGetCommand clientGet() {
125                 return new ClientGetCommandImpl();
126         }
127
128         private class ClientGetCommandImpl implements ClientGetCommand {
129
130                 private String identifier;
131                 private boolean ignoreDataStore;
132                 private boolean dataStoreOnly;
133                 private Long maxSize;
134                 private Priority priority;
135                 private boolean realTime;
136                 private boolean global;
137
138                 @Override
139                 public ClientGetCommand identifier(String identifier) {
140                         this.identifier = identifier;
141                         return this;
142                 }
143
144                 @Override
145                 public ClientGetCommand ignoreDataStore() {
146                         ignoreDataStore = true;
147                         return this;
148                 }
149
150                 @Override
151                 public ClientGetCommand dataStoreOnly() {
152                         dataStoreOnly = true;
153                         return this;
154                 }
155
156                 @Override
157                 public ClientGetCommand maxSize(long maxSize) {
158                         this.maxSize = maxSize;
159                         return this;
160                 }
161
162                 @Override
163                 public ClientGetCommand priority(Priority priority) {
164                         this.priority = priority;
165                         return this;
166                 }
167
168                 @Override
169                 public ClientGetCommand realTime() {
170                         realTime = true;
171                         return this;
172                 }
173
174                 @Override
175                 public ClientGetCommand global() {
176                         global = true;
177                         return this;
178                 }
179
180                 @Override
181                 public Future<Optional<Data>> uri(String uri) {
182                         return threadPool.submit(new Callable<Optional<Data>>() {
183                                 @Override
184                                 public Optional<Data> call() throws Exception {
185                                         DefaultFcpClient.this.connect();
186                                         ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
187                                         if (ignoreDataStore) {
188                                                 clientGet.setIgnoreDataStore(true);
189                                         }
190                                         if (dataStoreOnly) {
191                                                 clientGet.setDataStoreOnly(true);
192                                         }
193                                         if (maxSize != null) {
194                                                 clientGet.setMaxSize(maxSize);
195                                         }
196                                         if (priority != null) {
197                                                 clientGet.setPriority(priority);
198                                         }
199                                         if (realTime) {
200                                                 clientGet.setRealTimeFlag(true);
201                                         }
202                                         if (global) {
203                                                 clientGet.setGlobal(true);
204                                         }
205                                         try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) {
206                                                 Sequence sequence = new Sequence(identifier);
207                                                 replySequence.handle(AllData.class).with(sequence::allData);
208                                                 replySequence.handle(GetFailed.class).with(sequence::getFailed);
209                                                 replySequence.handleClose().with(sequence::disconnect);
210                                                 replySequence.waitFor(sequence::isFinished);
211                                                 replySequence.send(clientGet).get();
212                                                 return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty();
213                                         }
214                                 }
215                         });
216                 }
217
218                 private class Sequence {
219
220                         private final AtomicBoolean finished = new AtomicBoolean();
221                         private final AtomicBoolean failed = new AtomicBoolean();
222
223                         private final String identifier;
224
225                         private String contentType;
226                         private long dataLength;
227                         private InputStream payload;
228
229                         private Sequence(String identifier) {
230                                 this.identifier = identifier;
231                         }
232
233                         public boolean isFinished() {
234                                 return finished.get() || failed.get();
235                         }
236
237                         public boolean isSuccessful() {
238                                 return !failed.get();
239                         }
240
241                         public Data getData() {
242                                 return new Data() {
243                                         @Override
244                                         public String getMimeType() {
245                                                 synchronized (Sequence.this) {
246                                                         return contentType;
247                                                 }
248                                         }
249
250                                         @Override
251                                         public long size() {
252                                                 synchronized (Sequence.this) {
253                                                         return dataLength;
254                                                 }
255                                         }
256
257                                         @Override
258                                         public InputStream getInputStream() {
259                                                 synchronized (Sequence.this) {
260                                                         return payload;
261                                                 }
262                                         }
263                                 };
264                         }
265
266                         public void allData(AllData allData) {
267                                 if (allData.getIdentifier().equals(identifier)) {
268                                         synchronized (this) {
269                                                 contentType = allData.getContentType();
270                                                 dataLength = allData.getDataLength();
271                                                 try {
272                                                         payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
273                                                         finished.set(true);
274                                                 } catch (IOException e) {
275                                                         // TODO – logging
276                                                         failed.set(true);
277                                                 }
278                                         }
279                                 }
280                         }
281
282                         public void getFailed(GetFailed getFailed) {
283                                 if (getFailed.getIdentifier().equals(identifier)) {
284                                         failed.set(true);
285                                 }
286                         }
287
288                         public void disconnect(Throwable t) {
289                                 failed.set(true);
290                         }
291
292                 }
293
294         }
295
296 }
297