import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import net.pterodactylus.fcp.ClientHello;
import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
import net.pterodactylus.fcp.FcpConnection;
-import net.pterodactylus.fcp.FcpKeyPair;
-import net.pterodactylus.fcp.GenerateSSK;
import net.pterodactylus.fcp.NodeHello;
-import net.pterodactylus.fcp.SSKKeypair;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* Default {@link FcpClient} implementation.
*/
public class DefaultFcpClient implements FcpClient {
- private final ExecutorService threadPool;
+ private final ListeningExecutorService threadPool;
private final String hostname;
private final int port;
private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
private final Supplier<String> clientName;
- private final Supplier<String> expectedVersion;
- public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
- Supplier<String> expectedVersion) {
- this.threadPool = threadPool;
+ public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName) {
+ this.threadPool = MoreExecutors.listeningDecorator(threadPool);
this.hostname = hostname;
this.port = port;
this.clientName = clientName;
- this.expectedVersion = expectedVersion;
}
- private void connect() throws IOException {
- if (fcpConnection.get() != null) {
- return;
+ private FcpConnection connect() throws IOException {
+ FcpConnection fcpConnection = this.fcpConnection.get();
+ if (fcpConnection != null) {
+ return fcpConnection;
}
- fcpConnection.compareAndSet(null, createConnection());
+ fcpConnection = createConnection();
+ this.fcpConnection.compareAndSet(null, fcpConnection);
+ return fcpConnection;
}
private FcpConnection createConnection() throws IOException {
FcpConnection connection = new FcpConnection(hostname, port);
connection.connect();
- AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
- AtomicBoolean receivedClosed = new AtomicBoolean();
- FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
- nodeHelloSequence
- .handle(NodeHello.class)
- .with((nodeHello) -> receivedNodeHello.set(nodeHello));
- nodeHelloSequence
- .handle(CloseConnectionDuplicateClientName.class)
- .with((closeConnection) -> receivedClosed.set(true));
- nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
- ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
+ FcpReplySequence<?> nodeHelloSequence = new ClientHelloReplySequence(connection);
+ ClientHello clientHello = new ClientHello(clientName.get(), "2.0");
try {
nodeHelloSequence.send(clientHello).get();
} catch (InterruptedException | ExecutionException e) {
@Override
public GenerateKeypairCommand generateKeypair() {
- return new GenerateKeypairCommandImpl();
+ return new GenerateKeypairCommandImpl(threadPool, this::connect);
}
- private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
+ @Override
+ public ClientGetCommand clientGet() {
+ return new ClientGetCommandImpl(threadPool, this::connect);
+ }
- @Override
- public Future<FcpKeyPair> execute() {
- return threadPool.submit(() -> {
- connect();
- Sequence sequence = new Sequence();
- FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
- replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair);
- replySequence.waitFor(sequence::isFinished);
- replySequence.send(new GenerateSSK()).get();
- return sequence.getKeyPair();
- });
- }
+ @Override
+ public ClientPutCommand clientPut() {
+ return new ClientPutCommandImpl(threadPool, this::connect);
+ }
- private class Sequence {
+ @Override
+ public ListPeersCommand listPeers() {
+ return new ListPeersCommandImpl(threadPool, this::connect);
+ }
- private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
+ private class ClientHelloReplySequence extends FcpReplySequence<Void> {
- public void handleSSKKeypair(SSKKeypair sskKeypair) {
- keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
- }
+ private final AtomicReference<NodeHello> receivedNodeHello;
+ private final AtomicBoolean receivedClosed;
- public boolean isFinished() {
- return keyPair.get() != null;
- }
+ public ClientHelloReplySequence(FcpConnection connection) {
+ super(DefaultFcpClient.this.threadPool, connection);
+ receivedNodeHello = new AtomicReference<>();
+ receivedClosed = new AtomicBoolean();
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return receivedNodeHello.get() != null || receivedClosed.get();
+ }
- public FcpKeyPair getKeyPair() {
- return keyPair.get();
- }
+ @Override
+ protected void consumeNodeHello(NodeHello nodeHello) {
+ receivedNodeHello.set(nodeHello);
+ }
+ @Override
+ protected void consumeCloseConnectionDuplicateClientName(
+ CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+ receivedClosed.set(true);
}
}
}
+