1 package net.pterodactylus.fcp.quelaton;
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Optional;
6 import java.util.concurrent.Callable;
7 import java.util.concurrent.ExecutionException;
8 import java.util.concurrent.ExecutorService;
9 import java.util.concurrent.Future;
10 import java.util.concurrent.atomic.AtomicBoolean;
11 import java.util.concurrent.atomic.AtomicReference;
12 import java.util.function.Supplier;
14 import net.pterodactylus.fcp.AllData;
15 import net.pterodactylus.fcp.ClientGet;
16 import net.pterodactylus.fcp.ClientHello;
17 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
18 import net.pterodactylus.fcp.FcpConnection;
19 import net.pterodactylus.fcp.FcpKeyPair;
20 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
21 import net.pterodactylus.fcp.GenerateSSK;
22 import net.pterodactylus.fcp.GetFailed;
23 import net.pterodactylus.fcp.NodeHello;
24 import net.pterodactylus.fcp.Priority;
25 import net.pterodactylus.fcp.ReturnType;
26 import net.pterodactylus.fcp.SSKKeypair;
28 import com.google.common.io.ByteStreams;
31 * Default {@link FcpClient} implementation.
33 * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
35 public class DefaultFcpClient implements FcpClient {
37 private final ExecutorService threadPool;
38 private final String hostname;
39 private final int port;
40 private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
41 private final Supplier<String> clientName;
42 private final Supplier<String> expectedVersion;
44 public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
45 Supplier<String> expectedVersion) {
46 this.threadPool = threadPool;
47 this.hostname = hostname;
49 this.clientName = clientName;
50 this.expectedVersion = expectedVersion;
53 private void connect() throws IOException {
54 if (fcpConnection.get() != null) {
57 fcpConnection.compareAndSet(null, createConnection());
60 private FcpConnection createConnection() throws IOException {
61 FcpConnection connection = new FcpConnection(hostname, port);
63 AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
64 AtomicBoolean receivedClosed = new AtomicBoolean();
65 FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
67 .handle(NodeHello.class)
68 .with((nodeHello) -> receivedNodeHello.set(nodeHello));
70 .handle(CloseConnectionDuplicateClientName.class)
71 .with((closeConnection) -> receivedClosed.set(true));
72 nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
73 ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
75 nodeHelloSequence.send(clientHello).get();
76 } catch (InterruptedException | ExecutionException e) {
78 throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
84 public GenerateKeypairCommand generateKeypair() {
85 return new GenerateKeypairCommandImpl();
88 private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
91 public Future<FcpKeyPair> execute() {
92 return threadPool.submit(() -> {
94 Sequence sequence = new Sequence();
95 FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
96 replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair);
97 replySequence.waitFor(sequence::isFinished);
98 replySequence.send(new GenerateSSK()).get();
99 return sequence.getKeyPair();
103 private class Sequence {
105 private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
107 public void handleSSKKeypair(SSKKeypair sskKeypair) {
108 keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
111 public boolean isFinished() {
112 return keyPair.get() != null;
115 public FcpKeyPair getKeyPair() {
116 return keyPair.get();
124 public ClientGetCommand clientGet() {
125 return new ClientGetCommandImpl();
128 private class ClientGetCommandImpl implements ClientGetCommand {
130 private String identifier;
131 private boolean ignoreDataStore;
132 private boolean dataStoreOnly;
133 private Long maxSize;
134 private Priority priority;
135 private boolean realTime;
136 private boolean global;
139 public ClientGetCommand identifier(String identifier) {
140 this.identifier = identifier;
145 public ClientGetCommand ignoreDataStore() {
146 ignoreDataStore = true;
151 public ClientGetCommand dataStoreOnly() {
152 dataStoreOnly = true;
157 public ClientGetCommand maxSize(long maxSize) {
158 this.maxSize = maxSize;
163 public ClientGetCommand priority(Priority priority) {
164 this.priority = priority;
169 public ClientGetCommand realTime() {
175 public ClientGetCommand global() {
181 public Future<Optional<Data>> uri(String uri) {
182 return threadPool.submit(() -> execute(uri));
185 private Optional<Data> execute(String uri) throws IOException, ExecutionException, InterruptedException {
186 DefaultFcpClient.this.connect();
187 ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
188 if (ignoreDataStore) {
189 clientGet.setIgnoreDataStore(true);
192 clientGet.setDataStoreOnly(true);
194 if (maxSize != null) {
195 clientGet.setMaxSize(maxSize);
197 if (priority != null) {
198 clientGet.setPriority(priority);
201 clientGet.setRealTimeFlag(true);
204 clientGet.setGlobal(true);
206 try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) {
207 Sequence sequence = new Sequence(identifier);
208 replySequence.handle(AllData.class).with(sequence::allData);
209 replySequence.handle(GetFailed.class).with(sequence::getFailed);
210 replySequence.handleClose().with(sequence::disconnect);
211 replySequence.waitFor(sequence::isFinished);
212 replySequence.send(clientGet).get();
213 return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty();
217 private class Sequence {
219 private final AtomicBoolean finished = new AtomicBoolean();
220 private final AtomicBoolean failed = new AtomicBoolean();
222 private final String identifier;
224 private String contentType;
225 private long dataLength;
226 private InputStream payload;
228 private Sequence(String identifier) {
229 this.identifier = identifier;
232 public boolean isFinished() {
233 return finished.get() || failed.get();
236 public boolean isSuccessful() {
237 return !failed.get();
240 public Data getData() {
243 public String getMimeType() {
244 synchronized (Sequence.this) {
251 synchronized (Sequence.this) {
257 public InputStream getInputStream() {
258 synchronized (Sequence.this) {
265 public void allData(AllData allData) {
266 if (allData.getIdentifier().equals(identifier)) {
267 synchronized (this) {
268 contentType = allData.getContentType();
269 dataLength = allData.getDataLength();
271 payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
273 } catch (IOException e) {
281 public void getFailed(GetFailed getFailed) {
282 if (getFailed.getIdentifier().equals(identifier)) {
287 public void disconnect(Throwable t) {