package net.pterodactylus.fcp.quelaton;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
-import net.pterodactylus.fcp.ClientHello;
-import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
import net.pterodactylus.fcp.FcpConnection;
-import net.pterodactylus.fcp.FcpKeyPair;
-import net.pterodactylus.fcp.GenerateSSK;
-import net.pterodactylus.fcp.NodeHello;
-import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.Peer;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* Default {@link FcpClient} implementation.
*/
public class DefaultFcpClient implements FcpClient {
- private final ExecutorService threadPool;
+ private final ListeningExecutorService threadPool;
private final String hostname;
private final int port;
private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
private final Supplier<String> clientName;
- private final Supplier<String> expectedVersion;
- public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
- Supplier<String> expectedVersion) {
- this.threadPool = threadPool;
+ public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName) {
+ this.threadPool = MoreExecutors.listeningDecorator(threadPool);
this.hostname = hostname;
this.port = port;
this.clientName = clientName;
- this.expectedVersion = expectedVersion;
}
- private void connect() throws IOException {
- if (fcpConnection.get() != null) {
- return;
+ private FcpConnection connect() throws IOException {
+ FcpConnection fcpConnection = this.fcpConnection.get();
+ if ((fcpConnection != null) && !fcpConnection.isClosed()) {
+ return fcpConnection;
}
- fcpConnection.compareAndSet(null, createConnection());
+ fcpConnection = createConnection();
+ this.fcpConnection.set(fcpConnection);
+ return fcpConnection;
}
private FcpConnection createConnection() throws IOException {
- FcpConnection connection = new FcpConnection(hostname, port);
- connection.connect();
- AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
- AtomicBoolean receivedClosed = new AtomicBoolean();
- FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
- nodeHelloSequence
- .handle(NodeHello.class)
- .with((nodeHello) -> receivedNodeHello.set(nodeHello));
- nodeHelloSequence
- .handle(CloseConnectionDuplicateClientName.class)
- .with((closeConnection) -> receivedClosed.set(true));
- nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
- ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
try {
- nodeHelloSequence.send(clientHello).get();
+ return new ClientHelloImpl(threadPool, hostname, port).withName(clientName.get()).execute().get();
} catch (InterruptedException | ExecutionException e) {
- connection.close();
- throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
+ throw new IOException(e);
}
- return connection;
+ }
+
+ @Override
+ public GetNodeCommand getNode() {
+ return new GetNodeCommandImpl(threadPool, this::connect);
}
@Override
public GenerateKeypairCommand generateKeypair() {
- return new GenerateKeypairCommandImpl();
+ return new GenerateKeypairCommandImpl(threadPool, this::connect);
}
- private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
-
- @Override
- public Future<FcpKeyPair> execute() {
- return threadPool.submit(() -> {
- connect();
- GenerateSSK generateSSK = new GenerateSSK();
- AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
- FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
- replySequence.handle(SSKKeypair.class)
- .with((message) -> keyPair.set(
- new FcpKeyPair(message.getRequestURI(), message.getInsertURI())));
- replySequence.waitFor(() -> keyPair.get() != null);
- replySequence.send(generateSSK).get();
- return keyPair.get();
- });
- }
+ @Override
+ public ClientGetCommand clientGet() {
+ return new ClientGetCommandImpl(threadPool, this::connect);
+ }
+ @Override
+ public ClientPutCommand clientPut() {
+ return new ClientPutCommandImpl(threadPool, this::connect);
+ }
+
+ @Override
+ public ListPeerCommand listPeer() {
+ return new ListPeerCommandImpl(threadPool, this::connect);
+ }
+
+ @Override
+ public ListPeersCommand listPeers() {
+ return new ListPeersCommandImpl(threadPool, this::connect);
+ }
+
+ @Override
+ public AddPeerCommand addPeer() {
+ return new AddPeerCommandImpl(threadPool, this::connect);
+ }
+
+ @Override
+ public ModifyPeerCommand modifyPeer() {
+ return new ModifyPeerCommandImpl(threadPool, this::connect);
+ }
+
+ @Override
+ public RemovePeerCommand removePeer() {
+ return new RemovePeerCommandImpl(threadPool, this::connect);
+ }
+
+ @Override
+ public ListPeerNotesCommand listPeerNotes() {
+ return new ListPeerNotesCommandImpl(threadPool, this::connect);
}
}
+