Move key pair generation command to its own class
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / DefaultFcpClient.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutionException;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Supplier;
12
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.ClientGet;
15 import net.pterodactylus.fcp.ClientHello;
16 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
17 import net.pterodactylus.fcp.FcpConnection;
18 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
19 import net.pterodactylus.fcp.GetFailed;
20 import net.pterodactylus.fcp.NodeHello;
21 import net.pterodactylus.fcp.Priority;
22 import net.pterodactylus.fcp.ReturnType;
23
24 /**
25  * Default {@link FcpClient} implementation.
26  *
27  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
28  */
29 public class DefaultFcpClient implements FcpClient {
30
31         private final ExecutorService threadPool;
32         private final String hostname;
33         private final int port;
34         private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
35         private final Supplier<String> clientName;
36         private final Supplier<String> expectedVersion;
37
38         public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
39                         Supplier<String> expectedVersion) {
40                 this.threadPool = threadPool;
41                 this.hostname = hostname;
42                 this.port = port;
43                 this.clientName = clientName;
44                 this.expectedVersion = expectedVersion;
45         }
46
47         private FcpConnection connect() throws IOException {
48                 FcpConnection fcpConnection = this.fcpConnection.get();
49                 if (fcpConnection != null) {
50                         return fcpConnection;
51                 }
52                 fcpConnection = createConnection();
53                 this.fcpConnection.compareAndSet(null, fcpConnection);
54                 return fcpConnection;
55         }
56
57         private FcpConnection createConnection() throws IOException {
58                 FcpConnection connection = new FcpConnection(hostname, port);
59                 connection.connect();
60                 FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
61                         private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
62                         private final AtomicBoolean receivedClosed = new AtomicBoolean();
63                         @Override
64                         protected boolean isFinished() {
65                                 return receivedNodeHello.get() != null || receivedClosed.get();
66                         }
67
68                         @Override
69                         protected void consumeNodeHello(NodeHello nodeHello) {
70                                 receivedNodeHello.set(nodeHello);
71                         }
72
73                         @Override
74                         protected void consumeCloseConnectionDuplicateClientName(
75                                 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
76                                 receivedClosed.set(true);
77                         }
78                 };
79                 ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
80                 try {
81                         nodeHelloSequence.send(clientHello).get();
82                 } catch (InterruptedException | ExecutionException e) {
83                         connection.close();
84                         throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
85                 }
86                 return connection;
87         }
88
89         @Override
90         public GenerateKeypairCommand generateKeypair() {
91                 return new GenerateKeypairCommandImpl(threadPool, this::connect);
92         }
93
94         @Override
95         public ClientGetCommand clientGet() {
96                 return new ClientGetCommandImpl();
97         }
98
99         private class ClientGetCommandImpl implements ClientGetCommand {
100
101                 private String identifier;
102                 private boolean ignoreDataStore;
103                 private boolean dataStoreOnly;
104                 private Long maxSize;
105                 private Priority priority;
106                 private boolean realTime;
107                 private boolean global;
108
109                 @Override
110                 public ClientGetCommand identifier(String identifier) {
111                         this.identifier = identifier;
112                         return this;
113                 }
114
115                 @Override
116                 public ClientGetCommand ignoreDataStore() {
117                         ignoreDataStore = true;
118                         return this;
119                 }
120
121                 @Override
122                 public ClientGetCommand dataStoreOnly() {
123                         dataStoreOnly = true;
124                         return this;
125                 }
126
127                 @Override
128                 public ClientGetCommand maxSize(long maxSize) {
129                         this.maxSize = maxSize;
130                         return this;
131                 }
132
133                 @Override
134                 public ClientGetCommand priority(Priority priority) {
135                         this.priority = priority;
136                         return this;
137                 }
138
139                 @Override
140                 public ClientGetCommand realTime() {
141                         realTime = true;
142                         return this;
143                 }
144
145                 @Override
146                 public ClientGetCommand global() {
147                         global = true;
148                         return this;
149                 }
150
151                 @Override
152                 public Future<Optional<Data>> uri(String uri) {
153                         ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
154                         if (ignoreDataStore) {
155                                 clientGet.setIgnoreDataStore(true);
156                         }
157                         if (dataStoreOnly) {
158                                 clientGet.setDataStoreOnly(true);
159                         }
160                         if (maxSize != null) {
161                                 clientGet.setMaxSize(maxSize);
162                         }
163                         if (priority != null) {
164                                 clientGet.setPriority(priority);
165                         }
166                         if (realTime) {
167                                 clientGet.setRealTimeFlag(true);
168                         }
169                         if (global) {
170                                 clientGet.setGlobal(true);
171                         }
172                         return threadPool.submit(() -> {
173                                 FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, connect()) {
174                                         private final AtomicBoolean finished = new AtomicBoolean();
175                                         private final AtomicBoolean failed = new AtomicBoolean();
176
177                                         private final String identifier = ClientGetCommandImpl.this.identifier;
178
179                                         private String contentType;
180                                         private long dataLength;
181                                         private InputStream payload;
182
183                                         @Override
184                                         protected boolean isFinished() {
185                                                 return finished.get() || failed.get();
186                                         }
187
188                                         @Override
189                                         protected Optional<Data> getResult() {
190                                                 return failed.get() ? Optional.empty() : Optional.of(new Data() {
191                                                         @Override
192                                                         public String getMimeType() {
193                                                                 return contentType;
194                                                         }
195
196                                                         @Override
197                                                         public long size() {
198                                                                 return dataLength;
199                                                         }
200
201                                                         @Override
202                                                         public InputStream getInputStream() {
203                                                                 return payload;
204                                                         }
205                                                 });
206                                         }
207
208                                         @Override
209                                         protected void consumeAllData(AllData allData) {
210                                                 if (allData.getIdentifier().equals(identifier)) {
211                                                         synchronized (this) {
212                                                                 contentType = allData.getContentType();
213                                                                 dataLength = allData.getDataLength();
214                                                                 try {
215                                                                         payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
216                                                                         finished.set(true);
217                                                                 } catch (IOException e) {
218                                                                         // TODO – logging
219                                                                         failed.set(true);
220                                                                 }
221                                                         }
222                                                 }
223                                         }
224
225                                         @Override
226                                         protected void consumeGetFailed(GetFailed getFailed) {
227                                                 if (getFailed.getIdentifier().equals(identifier)) {
228                                                         failed.set(true);
229                                                 }
230                                         }
231
232                                         @Override
233                                         protected void consumeConnectionClosed(Throwable throwable) {
234                                                 failed.set(true);
235                                         }
236                                 };
237                                 return replySequence.send(clientGet).get();
238                         });
239                 }
240
241         }
242
243 }
244