1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutionException;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Supplier;
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.ClientGet;
15 import net.pterodactylus.fcp.ClientHello;
16 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
17 import net.pterodactylus.fcp.FcpConnection;
18 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
19 import net.pterodactylus.fcp.GetFailed;
20 import net.pterodactylus.fcp.NodeHello;
21 import net.pterodactylus.fcp.Priority;
22 import net.pterodactylus.fcp.ReturnType;
25 * Default {@link FcpClient} implementation.
27 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
29 public class DefaultFcpClient implements FcpClient {
31 private final ExecutorService threadPool;
32 private final String hostname;
33 private final int port;
34 private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
35 private final Supplier<String> clientName;
36 private final Supplier<String> expectedVersion;
38 public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
39 Supplier<String> expectedVersion) {
40 this.threadPool = threadPool;
41 this.hostname = hostname;
43 this.clientName = clientName;
44 this.expectedVersion = expectedVersion;
47 private FcpConnection connect() throws IOException {
48 FcpConnection fcpConnection = this.fcpConnection.get();
49 if (fcpConnection != null) {
52 fcpConnection = createConnection();
53 this.fcpConnection.compareAndSet(null, fcpConnection);
57 private FcpConnection createConnection() throws IOException {
58 FcpConnection connection = new FcpConnection(hostname, port);
60 FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
61 private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
62 private final AtomicBoolean receivedClosed = new AtomicBoolean();
64 protected boolean isFinished() {
65 return receivedNodeHello.get() != null || receivedClosed.get();
69 protected void consumeNodeHello(NodeHello nodeHello) {
70 receivedNodeHello.set(nodeHello);
74 protected void consumeCloseConnectionDuplicateClientName(
75 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
76 receivedClosed.set(true);
79 ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
81 nodeHelloSequence.send(clientHello).get();
82 } catch (InterruptedException | ExecutionException e) {
84 throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
90 public GenerateKeypairCommand generateKeypair() {
91 return new GenerateKeypairCommandImpl(threadPool, this::connect);
95 public ClientGetCommand clientGet() {
96 return new ClientGetCommandImpl();
99 private class ClientGetCommandImpl implements ClientGetCommand {
101 private String identifier;
102 private boolean ignoreDataStore;
103 private boolean dataStoreOnly;
104 private Long maxSize;
105 private Priority priority;
106 private boolean realTime;
107 private boolean global;
110 public ClientGetCommand identifier(String identifier) {
111 this.identifier = identifier;
116 public ClientGetCommand ignoreDataStore() {
117 ignoreDataStore = true;
122 public ClientGetCommand dataStoreOnly() {
123 dataStoreOnly = true;
128 public ClientGetCommand maxSize(long maxSize) {
129 this.maxSize = maxSize;
134 public ClientGetCommand priority(Priority priority) {
135 this.priority = priority;
140 public ClientGetCommand realTime() {
146 public ClientGetCommand global() {
152 public Future<Optional<Data>> uri(String uri) {
153 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
154 if (ignoreDataStore) {
155 clientGet.setIgnoreDataStore(true);
158 clientGet.setDataStoreOnly(true);
160 if (maxSize != null) {
161 clientGet.setMaxSize(maxSize);
163 if (priority != null) {
164 clientGet.setPriority(priority);
167 clientGet.setRealTimeFlag(true);
170 clientGet.setGlobal(true);
172 return threadPool.submit(() -> {
173 FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, connect()) {
174 private final AtomicBoolean finished = new AtomicBoolean();
175 private final AtomicBoolean failed = new AtomicBoolean();
177 private final String identifier = ClientGetCommandImpl.this.identifier;
179 private String contentType;
180 private long dataLength;
181 private InputStream payload;
184 protected boolean isFinished() {
185 return finished.get() || failed.get();
189 protected Optional<Data> getResult() {
190 return failed.get() ? Optional.empty() : Optional.of(new Data() {
192 public String getMimeType() {
202 public InputStream getInputStream() {
209 protected void consumeAllData(AllData allData) {
210 if (allData.getIdentifier().equals(identifier)) {
211 synchronized (this) {
212 contentType = allData.getContentType();
213 dataLength = allData.getDataLength();
215 payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
217 } catch (IOException e) {
226 protected void consumeGetFailed(GetFailed getFailed) {
227 if (getFailed.getIdentifier().equals(identifier)) {
233 protected void consumeConnectionClosed(Throwable throwable) {
237 return replySequence.send(clientGet).get();