1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutionException;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Supplier;
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.ClientGet;
15 import net.pterodactylus.fcp.ClientHello;
16 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
17 import net.pterodactylus.fcp.FcpConnection;
18 import net.pterodactylus.fcp.FcpKeyPair;
19 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
20 import net.pterodactylus.fcp.GenerateSSK;
21 import net.pterodactylus.fcp.GetFailed;
22 import net.pterodactylus.fcp.NodeHello;
23 import net.pterodactylus.fcp.Priority;
24 import net.pterodactylus.fcp.ReturnType;
25 import net.pterodactylus.fcp.SSKKeypair;
28 * Default {@link FcpClient} implementation.
30 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
32 public class DefaultFcpClient implements FcpClient {
34 private final ExecutorService threadPool;
35 private final String hostname;
36 private final int port;
37 private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
38 private final Supplier<String> clientName;
39 private final Supplier<String> expectedVersion;
41 public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
42 Supplier<String> expectedVersion) {
43 this.threadPool = threadPool;
44 this.hostname = hostname;
46 this.clientName = clientName;
47 this.expectedVersion = expectedVersion;
50 private void connect() throws IOException {
51 if (fcpConnection.get() != null) {
54 fcpConnection.compareAndSet(null, createConnection());
57 private FcpConnection createConnection() throws IOException {
58 FcpConnection connection = new FcpConnection(hostname, port);
60 FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
61 private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
62 private final AtomicBoolean receivedClosed = new AtomicBoolean();
64 protected boolean isFinished() {
65 return receivedNodeHello.get() != null || receivedClosed.get();
69 protected void consumeNodeHello(NodeHello nodeHello) {
70 receivedNodeHello.set(nodeHello);
74 protected void consumeCloseConnectionDuplicateClientName(
75 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
76 receivedClosed.set(true);
79 ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
81 nodeHelloSequence.send(clientHello).get();
82 } catch (InterruptedException | ExecutionException e) {
84 throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
90 public GenerateKeypairCommand generateKeypair() {
91 return new GenerateKeypairCommandImpl();
94 private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
97 public Future<FcpKeyPair> execute() {
98 return threadPool.submit(() -> {
100 return new FcpReplySequence<FcpKeyPair>(threadPool, fcpConnection.get()) {
101 private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
104 protected boolean isFinished() {
105 return keyPair.get() != null;
109 protected FcpKeyPair getResult() {
110 return keyPair.get();
114 protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
115 keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
117 }.send(new GenerateSSK()).get();
124 public ClientGetCommand clientGet() {
125 return new ClientGetCommandImpl();
128 private class ClientGetCommandImpl implements ClientGetCommand {
130 private String identifier;
131 private boolean ignoreDataStore;
132 private boolean dataStoreOnly;
133 private Long maxSize;
134 private Priority priority;
135 private boolean realTime;
136 private boolean global;
139 public ClientGetCommand identifier(String identifier) {
140 this.identifier = identifier;
145 public ClientGetCommand ignoreDataStore() {
146 ignoreDataStore = true;
151 public ClientGetCommand dataStoreOnly() {
152 dataStoreOnly = true;
157 public ClientGetCommand maxSize(long maxSize) {
158 this.maxSize = maxSize;
163 public ClientGetCommand priority(Priority priority) {
164 this.priority = priority;
169 public ClientGetCommand realTime() {
175 public ClientGetCommand global() {
181 public Future<Optional<Data>> uri(String uri) {
182 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
183 if (ignoreDataStore) {
184 clientGet.setIgnoreDataStore(true);
187 clientGet.setDataStoreOnly(true);
189 if (maxSize != null) {
190 clientGet.setMaxSize(maxSize);
192 if (priority != null) {
193 clientGet.setPriority(priority);
196 clientGet.setRealTimeFlag(true);
199 clientGet.setGlobal(true);
201 return threadPool.submit(() -> {
203 FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, fcpConnection.get()) {
204 private final AtomicBoolean finished = new AtomicBoolean();
205 private final AtomicBoolean failed = new AtomicBoolean();
207 private final String identifier = ClientGetCommandImpl.this.identifier;
209 private String contentType;
210 private long dataLength;
211 private InputStream payload;
214 protected boolean isFinished() {
215 return finished.get() || failed.get();
219 protected Optional<Data> getResult() {
220 return failed.get() ? Optional.empty() : Optional.of(new Data() {
222 public String getMimeType() {
232 public InputStream getInputStream() {
239 protected void consumeAllData(AllData allData) {
240 if (allData.getIdentifier().equals(identifier)) {
241 synchronized (this) {
242 contentType = allData.getContentType();
243 dataLength = allData.getDataLength();
245 payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
247 } catch (IOException e) {
256 protected void consumeGetFailed(GetFailed getFailed) {
257 if (getFailed.getIdentifier().equals(identifier)) {
263 protected void consumeConnectionClosed(Throwable throwable) {
267 return replySequence.send(clientGet).get();