1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.ExecutionException;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.atomic.AtomicBoolean;
10 import java.util.concurrent.atomic.AtomicReference;
11 import java.util.function.Supplier;
13 import net.pterodactylus.fcp.AllData;
14 import net.pterodactylus.fcp.ClientGet;
15 import net.pterodactylus.fcp.ClientHello;
16 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
17 import net.pterodactylus.fcp.FcpConnection;
18 import net.pterodactylus.fcp.FcpKeyPair;
19 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
20 import net.pterodactylus.fcp.GenerateSSK;
21 import net.pterodactylus.fcp.GetFailed;
22 import net.pterodactylus.fcp.NodeHello;
23 import net.pterodactylus.fcp.Priority;
24 import net.pterodactylus.fcp.ReturnType;
25 import net.pterodactylus.fcp.SSKKeypair;
28 * Default {@link FcpClient} implementation.
30 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
32 public class DefaultFcpClient implements FcpClient {
34 private final ExecutorService threadPool;
35 private final String hostname;
36 private final int port;
37 private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
38 private final Supplier<String> clientName;
39 private final Supplier<String> expectedVersion;
41 public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
42 Supplier<String> expectedVersion) {
43 this.threadPool = threadPool;
44 this.hostname = hostname;
46 this.clientName = clientName;
47 this.expectedVersion = expectedVersion;
50 private FcpConnection connect() throws IOException {
51 FcpConnection fcpConnection = this.fcpConnection.get();
52 if (fcpConnection != null) {
55 fcpConnection = createConnection();
56 this.fcpConnection.compareAndSet(null, fcpConnection);
60 private FcpConnection createConnection() throws IOException {
61 FcpConnection connection = new FcpConnection(hostname, port);
63 FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
64 private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
65 private final AtomicBoolean receivedClosed = new AtomicBoolean();
67 protected boolean isFinished() {
68 return receivedNodeHello.get() != null || receivedClosed.get();
72 protected void consumeNodeHello(NodeHello nodeHello) {
73 receivedNodeHello.set(nodeHello);
77 protected void consumeCloseConnectionDuplicateClientName(
78 CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
79 receivedClosed.set(true);
82 ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
84 nodeHelloSequence.send(clientHello).get();
85 } catch (InterruptedException | ExecutionException e) {
87 throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
93 public GenerateKeypairCommand generateKeypair() {
94 return new GenerateKeypairCommandImpl();
97 private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
100 public Future<FcpKeyPair> execute() {
101 return threadPool.submit(() -> {
103 return new FcpReplySequence<FcpKeyPair>(threadPool, connect()) {
104 private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
107 protected boolean isFinished() {
108 return keyPair.get() != null;
112 protected FcpKeyPair getResult() {
113 return keyPair.get();
117 protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
118 keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
120 }.send(new GenerateSSK()).get();
127 public ClientGetCommand clientGet() {
128 return new ClientGetCommandImpl();
131 private class ClientGetCommandImpl implements ClientGetCommand {
133 private String identifier;
134 private boolean ignoreDataStore;
135 private boolean dataStoreOnly;
136 private Long maxSize;
137 private Priority priority;
138 private boolean realTime;
139 private boolean global;
142 public ClientGetCommand identifier(String identifier) {
143 this.identifier = identifier;
148 public ClientGetCommand ignoreDataStore() {
149 ignoreDataStore = true;
154 public ClientGetCommand dataStoreOnly() {
155 dataStoreOnly = true;
160 public ClientGetCommand maxSize(long maxSize) {
161 this.maxSize = maxSize;
166 public ClientGetCommand priority(Priority priority) {
167 this.priority = priority;
172 public ClientGetCommand realTime() {
178 public ClientGetCommand global() {
184 public Future<Optional<Data>> uri(String uri) {
185 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
186 if (ignoreDataStore) {
187 clientGet.setIgnoreDataStore(true);
190 clientGet.setDataStoreOnly(true);
192 if (maxSize != null) {
193 clientGet.setMaxSize(maxSize);
195 if (priority != null) {
196 clientGet.setPriority(priority);
199 clientGet.setRealTimeFlag(true);
202 clientGet.setGlobal(true);
204 return threadPool.submit(() -> {
205 FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, connect()) {
206 private final AtomicBoolean finished = new AtomicBoolean();
207 private final AtomicBoolean failed = new AtomicBoolean();
209 private final String identifier = ClientGetCommandImpl.this.identifier;
211 private String contentType;
212 private long dataLength;
213 private InputStream payload;
216 protected boolean isFinished() {
217 return finished.get() || failed.get();
221 protected Optional<Data> getResult() {
222 return failed.get() ? Optional.empty() : Optional.of(new Data() {
224 public String getMimeType() {
234 public InputStream getInputStream() {
241 protected void consumeAllData(AllData allData) {
242 if (allData.getIdentifier().equals(identifier)) {
243 synchronized (this) {
244 contentType = allData.getContentType();
245 dataLength = allData.getDataLength();
247 payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
249 } catch (IOException e) {
258 protected void consumeGetFailed(GetFailed getFailed) {
259 if (getFailed.getIdentifier().equals(identifier)) {
265 protected void consumeConnectionClosed(Throwable throwable) {
269 return replySequence.send(clientGet).get();