3a3373015384299e44c4d1034baa5fc430425dde
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / DefaultFcpClient.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutionException;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Supplier;
12
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.ClientGet;
15 import net.pterodactylus.fcp.ClientHello;
16 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
17 import net.pterodactylus.fcp.FcpConnection;
18 import net.pterodactylus.fcp.FcpKeyPair;
19 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
20 import net.pterodactylus.fcp.GenerateSSK;
21 import net.pterodactylus.fcp.GetFailed;
22 import net.pterodactylus.fcp.NodeHello;
23 import net.pterodactylus.fcp.Priority;
24 import net.pterodactylus.fcp.ReturnType;
25 import net.pterodactylus.fcp.SSKKeypair;
26
27 /**
28  * Default {@link FcpClient} implementation.
29  *
30  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
31  */
32 public class DefaultFcpClient implements FcpClient {
33
34         private final ExecutorService threadPool;
35         private final String hostname;
36         private final int port;
37         private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
38         private final Supplier<String> clientName;
39         private final Supplier<String> expectedVersion;
40
41         public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
42                         Supplier<String> expectedVersion) {
43                 this.threadPool = threadPool;
44                 this.hostname = hostname;
45                 this.port = port;
46                 this.clientName = clientName;
47                 this.expectedVersion = expectedVersion;
48         }
49
50         private void connect() throws IOException {
51                 if (fcpConnection.get() != null) {
52                         return;
53                 }
54                 fcpConnection.compareAndSet(null, createConnection());
55         }
56
57         private FcpConnection createConnection() throws IOException {
58                 FcpConnection connection = new FcpConnection(hostname, port);
59                 connection.connect();
60                 FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
61                         private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
62                         private final AtomicBoolean receivedClosed = new AtomicBoolean();
63                         @Override
64                         protected boolean isFinished() {
65                                 return receivedNodeHello.get() != null || receivedClosed.get();
66                         }
67
68                         @Override
69                         protected void consumeNodeHello(NodeHello nodeHello) {
70                                 receivedNodeHello.set(nodeHello);
71                         }
72
73                         @Override
74                         protected void consumeCloseConnectionDuplicateClientName(
75                                 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
76                                 receivedClosed.set(true);
77                         }
78                 };
79                 ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
80                 try {
81                         nodeHelloSequence.send(clientHello).get();
82                 } catch (InterruptedException | ExecutionException e) {
83                         connection.close();
84                         throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
85                 }
86                 return connection;
87         }
88
89         @Override
90         public GenerateKeypairCommand generateKeypair() {
91                 return new GenerateKeypairCommandImpl();
92         }
93
94         private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
95
96                 @Override
97                 public Future<FcpKeyPair> execute() {
98                         return threadPool.submit(() -> {
99                                 connect();
100                                 return new FcpReplySequence<FcpKeyPair>(threadPool, fcpConnection.get()) {
101                                         private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
102
103                                         @Override
104                                         protected boolean isFinished() {
105                                                 return keyPair.get() != null;
106                                         }
107
108                                         @Override
109                                         protected FcpKeyPair getResult() {
110                                                 return keyPair.get();
111                                         }
112
113                                         @Override
114                                         protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
115                                                 keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
116                                         }
117                                 }.send(new GenerateSSK()).get();
118                         });
119                 }
120
121         }
122
123         @Override
124         public ClientGetCommand clientGet() {
125                 return new ClientGetCommandImpl();
126         }
127
128         private class ClientGetCommandImpl implements ClientGetCommand {
129
130                 private String identifier;
131                 private boolean ignoreDataStore;
132                 private boolean dataStoreOnly;
133                 private Long maxSize;
134                 private Priority priority;
135                 private boolean realTime;
136                 private boolean global;
137
138                 @Override
139                 public ClientGetCommand identifier(String identifier) {
140                         this.identifier = identifier;
141                         return this;
142                 }
143
144                 @Override
145                 public ClientGetCommand ignoreDataStore() {
146                         ignoreDataStore = true;
147                         return this;
148                 }
149
150                 @Override
151                 public ClientGetCommand dataStoreOnly() {
152                         dataStoreOnly = true;
153                         return this;
154                 }
155
156                 @Override
157                 public ClientGetCommand maxSize(long maxSize) {
158                         this.maxSize = maxSize;
159                         return this;
160                 }
161
162                 @Override
163                 public ClientGetCommand priority(Priority priority) {
164                         this.priority = priority;
165                         return this;
166                 }
167
168                 @Override
169                 public ClientGetCommand realTime() {
170                         realTime = true;
171                         return this;
172                 }
173
174                 @Override
175                 public ClientGetCommand global() {
176                         global = true;
177                         return this;
178                 }
179
180                 @Override
181                 public Future<Optional<Data>> uri(String uri) {
182                         ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
183                         if (ignoreDataStore) {
184                                 clientGet.setIgnoreDataStore(true);
185                         }
186                         if (dataStoreOnly) {
187                                 clientGet.setDataStoreOnly(true);
188                         }
189                         if (maxSize != null) {
190                                 clientGet.setMaxSize(maxSize);
191                         }
192                         if (priority != null) {
193                                 clientGet.setPriority(priority);
194                         }
195                         if (realTime) {
196                                 clientGet.setRealTimeFlag(true);
197                         }
198                         if (global) {
199                                 clientGet.setGlobal(true);
200                         }
201                         return threadPool.submit(() -> {
202                                 connect();
203                                 FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, fcpConnection.get()) {
204                                         private final AtomicBoolean finished = new AtomicBoolean();
205                                         private final AtomicBoolean failed = new AtomicBoolean();
206
207                                         private final String identifier = ClientGetCommandImpl.this.identifier;
208
209                                         private String contentType;
210                                         private long dataLength;
211                                         private InputStream payload;
212
213                                         @Override
214                                         protected boolean isFinished() {
215                                                 return finished.get() || failed.get();
216                                         }
217
218                                         @Override
219                                         protected Optional<Data> getResult() {
220                                                 return failed.get() ? Optional.empty() : Optional.of(new Data() {
221                                                         @Override
222                                                         public String getMimeType() {
223                                                                 return contentType;
224                                                         }
225
226                                                         @Override
227                                                         public long size() {
228                                                                 return dataLength;
229                                                         }
230
231                                                         @Override
232                                                         public InputStream getInputStream() {
233                                                                 return payload;
234                                                         }
235                                                 });
236                                         }
237
238                                         @Override
239                                         protected void consumeAllData(AllData allData) {
240                                                 if (allData.getIdentifier().equals(identifier)) {
241                                                         synchronized (this) {
242                                                                 contentType = allData.getContentType();
243                                                                 dataLength = allData.getDataLength();
244                                                                 try {
245                                                                         payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
246                                                                         finished.set(true);
247                                                                 } catch (IOException e) {
248                                                                         // TODO – logging
249                                                                         failed.set(true);
250                                                                 }
251                                                         }
252                                                 }
253                                         }
254
255                                         @Override
256                                         protected void consumeGetFailed(GetFailed getFailed) {
257                                                 if (getFailed.getIdentifier().equals(identifier)) {
258                                                         failed.set(true);
259                                                 }
260                                         }
261
262                                         @Override
263                                         protected void consumeConnectionClosed(Throwable throwable) {
264                                                 failed.set(true);
265                                         }
266                                 };
267                                 return replySequence.send(clientGet).get();
268                         });
269                 }
270
271         }
272
273 }
274