Don’t access fcpConnection container directly
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / DefaultFcpClient.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutionException;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Supplier;
12
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.ClientGet;
15 import net.pterodactylus.fcp.ClientHello;
16 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
17 import net.pterodactylus.fcp.FcpConnection;
18 import net.pterodactylus.fcp.FcpKeyPair;
19 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
20 import net.pterodactylus.fcp.GenerateSSK;
21 import net.pterodactylus.fcp.GetFailed;
22 import net.pterodactylus.fcp.NodeHello;
23 import net.pterodactylus.fcp.Priority;
24 import net.pterodactylus.fcp.ReturnType;
25 import net.pterodactylus.fcp.SSKKeypair;
26
27 /**
28  * Default {@link FcpClient} implementation.
29  *
30  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
31  */
32 public class DefaultFcpClient implements FcpClient {
33
34         private final ExecutorService threadPool;
35         private final String hostname;
36         private final int port;
37         private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
38         private final Supplier<String> clientName;
39         private final Supplier<String> expectedVersion;
40
41         public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
42                         Supplier<String> expectedVersion) {
43                 this.threadPool = threadPool;
44                 this.hostname = hostname;
45                 this.port = port;
46                 this.clientName = clientName;
47                 this.expectedVersion = expectedVersion;
48         }
49
50         private FcpConnection connect() throws IOException {
51                 FcpConnection fcpConnection = this.fcpConnection.get();
52                 if (fcpConnection != null) {
53                         return fcpConnection;
54                 }
55                 fcpConnection = createConnection();
56                 this.fcpConnection.compareAndSet(null, fcpConnection);
57                 return fcpConnection;
58         }
59
60         private FcpConnection createConnection() throws IOException {
61                 FcpConnection connection = new FcpConnection(hostname, port);
62                 connection.connect();
63                 FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
64                         private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
65                         private final AtomicBoolean receivedClosed = new AtomicBoolean();
66                         @Override
67                         protected boolean isFinished() {
68                                 return receivedNodeHello.get() != null || receivedClosed.get();
69                         }
70
71                         @Override
72                         protected void consumeNodeHello(NodeHello nodeHello) {
73                                 receivedNodeHello.set(nodeHello);
74                         }
75
76                         @Override
77                         protected void consumeCloseConnectionDuplicateClientName(
78                                 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
79                                 receivedClosed.set(true);
80                         }
81                 };
82                 ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
83                 try {
84                         nodeHelloSequence.send(clientHello).get();
85                 } catch (InterruptedException | ExecutionException e) {
86                         connection.close();
87                         throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
88                 }
89                 return connection;
90         }
91
92         @Override
93         public GenerateKeypairCommand generateKeypair() {
94                 return new GenerateKeypairCommandImpl();
95         }
96
97         private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
98
99                 @Override
100                 public Future<FcpKeyPair> execute() {
101                         return threadPool.submit(() -> {
102                                 connect();
103                                 return new FcpReplySequence<FcpKeyPair>(threadPool, connect()) {
104                                         private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
105
106                                         @Override
107                                         protected boolean isFinished() {
108                                                 return keyPair.get() != null;
109                                         }
110
111                                         @Override
112                                         protected FcpKeyPair getResult() {
113                                                 return keyPair.get();
114                                         }
115
116                                         @Override
117                                         protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
118                                                 keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
119                                         }
120                                 }.send(new GenerateSSK()).get();
121                         });
122                 }
123
124         }
125
126         @Override
127         public ClientGetCommand clientGet() {
128                 return new ClientGetCommandImpl();
129         }
130
131         private class ClientGetCommandImpl implements ClientGetCommand {
132
133                 private String identifier;
134                 private boolean ignoreDataStore;
135                 private boolean dataStoreOnly;
136                 private Long maxSize;
137                 private Priority priority;
138                 private boolean realTime;
139                 private boolean global;
140
141                 @Override
142                 public ClientGetCommand identifier(String identifier) {
143                         this.identifier = identifier;
144                         return this;
145                 }
146
147                 @Override
148                 public ClientGetCommand ignoreDataStore() {
149                         ignoreDataStore = true;
150                         return this;
151                 }
152
153                 @Override
154                 public ClientGetCommand dataStoreOnly() {
155                         dataStoreOnly = true;
156                         return this;
157                 }
158
159                 @Override
160                 public ClientGetCommand maxSize(long maxSize) {
161                         this.maxSize = maxSize;
162                         return this;
163                 }
164
165                 @Override
166                 public ClientGetCommand priority(Priority priority) {
167                         this.priority = priority;
168                         return this;
169                 }
170
171                 @Override
172                 public ClientGetCommand realTime() {
173                         realTime = true;
174                         return this;
175                 }
176
177                 @Override
178                 public ClientGetCommand global() {
179                         global = true;
180                         return this;
181                 }
182
183                 @Override
184                 public Future<Optional<Data>> uri(String uri) {
185                         ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
186                         if (ignoreDataStore) {
187                                 clientGet.setIgnoreDataStore(true);
188                         }
189                         if (dataStoreOnly) {
190                                 clientGet.setDataStoreOnly(true);
191                         }
192                         if (maxSize != null) {
193                                 clientGet.setMaxSize(maxSize);
194                         }
195                         if (priority != null) {
196                                 clientGet.setPriority(priority);
197                         }
198                         if (realTime) {
199                                 clientGet.setRealTimeFlag(true);
200                         }
201                         if (global) {
202                                 clientGet.setGlobal(true);
203                         }
204                         return threadPool.submit(() -> {
205                                 FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, connect()) {
206                                         private final AtomicBoolean finished = new AtomicBoolean();
207                                         private final AtomicBoolean failed = new AtomicBoolean();
208
209                                         private final String identifier = ClientGetCommandImpl.this.identifier;
210
211                                         private String contentType;
212                                         private long dataLength;
213                                         private InputStream payload;
214
215                                         @Override
216                                         protected boolean isFinished() {
217                                                 return finished.get() || failed.get();
218                                         }
219
220                                         @Override
221                                         protected Optional<Data> getResult() {
222                                                 return failed.get() ? Optional.empty() : Optional.of(new Data() {
223                                                         @Override
224                                                         public String getMimeType() {
225                                                                 return contentType;
226                                                         }
227
228                                                         @Override
229                                                         public long size() {
230                                                                 return dataLength;
231                                                         }
232
233                                                         @Override
234                                                         public InputStream getInputStream() {
235                                                                 return payload;
236                                                         }
237                                                 });
238                                         }
239
240                                         @Override
241                                         protected void consumeAllData(AllData allData) {
242                                                 if (allData.getIdentifier().equals(identifier)) {
243                                                         synchronized (this) {
244                                                                 contentType = allData.getContentType();
245                                                                 dataLength = allData.getDataLength();
246                                                                 try {
247                                                                         payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
248                                                                         finished.set(true);
249                                                                 } catch (IOException e) {
250                                                                         // TODO – logging
251                                                                         failed.set(true);
252                                                                 }
253                                                         }
254                                                 }
255                                         }
256
257                                         @Override
258                                         protected void consumeGetFailed(GetFailed getFailed) {
259                                                 if (getFailed.getIdentifier().equals(identifier)) {
260                                                         failed.set(true);
261                                                 }
262                                         }
263
264                                         @Override
265                                         protected void consumeConnectionClosed(Throwable throwable) {
266                                                 failed.set(true);
267                                         }
268                                 };
269                                 return replySequence.send(clientGet).get();
270                         });
271                 }
272
273         }
274
275 }
276