Move key pair generation command to its own class
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / DefaultFcpClient.java
index c37b40a..79c7f44 100644 (file)
@@ -3,7 +3,6 @@ package net.pterodactylus.fcp.quelaton;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -16,16 +15,11 @@ import net.pterodactylus.fcp.ClientGet;
 import net.pterodactylus.fcp.ClientHello;
 import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
 import net.pterodactylus.fcp.FcpConnection;
-import net.pterodactylus.fcp.FcpKeyPair;
 import net.pterodactylus.fcp.FcpUtils.TempInputStream;
-import net.pterodactylus.fcp.GenerateSSK;
 import net.pterodactylus.fcp.GetFailed;
 import net.pterodactylus.fcp.NodeHello;
 import net.pterodactylus.fcp.Priority;
 import net.pterodactylus.fcp.ReturnType;
-import net.pterodactylus.fcp.SSKKeypair;
-
-import com.google.common.io.ByteStreams;
 
 /**
  * Default {@link FcpClient} implementation.
@@ -50,26 +44,38 @@ public class DefaultFcpClient implements FcpClient {
                this.expectedVersion = expectedVersion;
        }
 
-       private void connect() throws IOException {
-               if (fcpConnection.get() != null) {
-                       return;
+       private FcpConnection connect() throws IOException {
+               FcpConnection fcpConnection = this.fcpConnection.get();
+               if (fcpConnection != null) {
+                       return fcpConnection;
                }
-               fcpConnection.compareAndSet(null, createConnection());
+               fcpConnection = createConnection();
+               this.fcpConnection.compareAndSet(null, fcpConnection);
+               return fcpConnection;
        }
 
        private FcpConnection createConnection() throws IOException {
                FcpConnection connection = new FcpConnection(hostname, port);
                connection.connect();
-               AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
-               AtomicBoolean receivedClosed = new AtomicBoolean();
-               FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
-               nodeHelloSequence
-                               .handle(NodeHello.class)
-                               .with((nodeHello) -> receivedNodeHello.set(nodeHello));
-               nodeHelloSequence
-                               .handle(CloseConnectionDuplicateClientName.class)
-                               .with((closeConnection) -> receivedClosed.set(true));
-               nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
+               FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
+                       private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
+                       private final AtomicBoolean receivedClosed = new AtomicBoolean();
+                       @Override
+                       protected boolean isFinished() {
+                               return receivedNodeHello.get() != null || receivedClosed.get();
+                       }
+
+                       @Override
+                       protected void consumeNodeHello(NodeHello nodeHello) {
+                               receivedNodeHello.set(nodeHello);
+                       }
+
+                       @Override
+                       protected void consumeCloseConnectionDuplicateClientName(
+                               CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+                               receivedClosed.set(true);
+                       }
+               };
                ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
                try {
                        nodeHelloSequence.send(clientHello).get();
@@ -82,42 +88,7 @@ public class DefaultFcpClient implements FcpClient {
 
        @Override
        public GenerateKeypairCommand generateKeypair() {
-               return new GenerateKeypairCommandImpl();
-       }
-
-       private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
-
-               @Override
-               public Future<FcpKeyPair> execute() {
-                       return threadPool.submit(() -> {
-                               connect();
-                               Sequence sequence = new Sequence();
-                               FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
-                               replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair);
-                               replySequence.waitFor(sequence::isFinished);
-                               replySequence.send(new GenerateSSK()).get();
-                               return sequence.getKeyPair();
-                       });
-               }
-
-               private class Sequence {
-
-                       private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
-
-                       public void handleSSKKeypair(SSKKeypair sskKeypair) {
-                               keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
-                       }
-
-                       public boolean isFinished() {
-                               return keyPair.get() != null;
-                       }
-
-                       public FcpKeyPair getKeyPair() {
-                               return keyPair.get();
-                       }
-
-               }
-
+               return new GenerateKeypairCommandImpl(threadPool, this::connect);
        }
 
        @Override
@@ -179,116 +150,92 @@ public class DefaultFcpClient implements FcpClient {
 
                @Override
                public Future<Optional<Data>> uri(String uri) {
-                       return threadPool.submit(new Callable<Optional<Data>>() {
-                               @Override
-                               public Optional<Data> call() throws Exception {
-                                       DefaultFcpClient.this.connect();
-                                       ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
-                                       if (ignoreDataStore) {
-                                               clientGet.setIgnoreDataStore(true);
-                                       }
-                                       if (dataStoreOnly) {
-                                               clientGet.setDataStoreOnly(true);
-                                       }
-                                       if (maxSize != null) {
-                                               clientGet.setMaxSize(maxSize);
-                                       }
-                                       if (priority != null) {
-                                               clientGet.setPriority(priority);
-                                       }
-                                       if (realTime) {
-                                               clientGet.setRealTimeFlag(true);
-                                       }
-                                       if (global) {
-                                               clientGet.setGlobal(true);
-                                       }
-                                       try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) {
-                                               Sequence sequence = new Sequence(identifier);
-                                               replySequence.handle(AllData.class).with(sequence::allData);
-                                               replySequence.handle(GetFailed.class).with(sequence::getFailed);
-                                               replySequence.handleClose().with(sequence::disconnect);
-                                               replySequence.waitFor(sequence::isFinished);
-                                               replySequence.send(clientGet).get();
-                                               return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty();
-                                       }
-                               }
-                       });
-               }
-
-               private class Sequence {
-
-                       private final AtomicBoolean finished = new AtomicBoolean();
-                       private final AtomicBoolean failed = new AtomicBoolean();
-
-                       private final String identifier;
-
-                       private String contentType;
-                       private long dataLength;
-                       private InputStream payload;
-
-                       private Sequence(String identifier) {
-                               this.identifier = identifier;
+                       ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
+                       if (ignoreDataStore) {
+                               clientGet.setIgnoreDataStore(true);
                        }
-
-                       public boolean isFinished() {
-                               return finished.get() || failed.get();
+                       if (dataStoreOnly) {
+                               clientGet.setDataStoreOnly(true);
                        }
-
-                       public boolean isSuccessful() {
-                               return !failed.get();
+                       if (maxSize != null) {
+                               clientGet.setMaxSize(maxSize);
+                       }
+                       if (priority != null) {
+                               clientGet.setPriority(priority);
+                       }
+                       if (realTime) {
+                               clientGet.setRealTimeFlag(true);
                        }
+                       if (global) {
+                               clientGet.setGlobal(true);
+                       }
+                       return threadPool.submit(() -> {
+                               FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, connect()) {
+                                       private final AtomicBoolean finished = new AtomicBoolean();
+                                       private final AtomicBoolean failed = new AtomicBoolean();
+
+                                       private final String identifier = ClientGetCommandImpl.this.identifier;
+
+                                       private String contentType;
+                                       private long dataLength;
+                                       private InputStream payload;
 
-                       public Data getData() {
-                               return new Data() {
                                        @Override
-                                       public String getMimeType() {
-                                               synchronized (Sequence.this) {
-                                                       return contentType;
-                                               }
+                                       protected boolean isFinished() {
+                                               return finished.get() || failed.get();
                                        }
 
                                        @Override
-                                       public long size() {
-                                               synchronized (Sequence.this) {
-                                                       return dataLength;
-                                               }
+                                       protected Optional<Data> getResult() {
+                                               return failed.get() ? Optional.empty() : Optional.of(new Data() {
+                                                       @Override
+                                                       public String getMimeType() {
+                                                               return contentType;
+                                                       }
+
+                                                       @Override
+                                                       public long size() {
+                                                               return dataLength;
+                                                       }
+
+                                                       @Override
+                                                       public InputStream getInputStream() {
+                                                               return payload;
+                                                       }
+                                               });
                                        }
 
                                        @Override
-                                       public InputStream getInputStream() {
-                                               synchronized (Sequence.this) {
-                                                       return payload;
+                                       protected void consumeAllData(AllData allData) {
+                                               if (allData.getIdentifier().equals(identifier)) {
+                                                       synchronized (this) {
+                                                               contentType = allData.getContentType();
+                                                               dataLength = allData.getDataLength();
+                                                               try {
+                                                                       payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
+                                                                       finished.set(true);
+                                                               } catch (IOException e) {
+                                                                       // TODO – logging
+                                                                       failed.set(true);
+                                                               }
+                                                       }
                                                }
                                        }
-                               };
-                       }
 
-                       public void allData(AllData allData) {
-                               if (allData.getIdentifier().equals(identifier)) {
-                                       synchronized (this) {
-                                               contentType = allData.getContentType();
-                                               dataLength = allData.getDataLength();
-                                               try {
-                                                       payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
-                                                       finished.set(true);
-                                               } catch (IOException e) {
-                                                       // TODO – logging
+                                       @Override
+                                       protected void consumeGetFailed(GetFailed getFailed) {
+                                               if (getFailed.getIdentifier().equals(identifier)) {
                                                        failed.set(true);
                                                }
                                        }
-                               }
-                       }
-
-                       public void getFailed(GetFailed getFailed) {
-                               if (getFailed.getIdentifier().equals(identifier)) {
-                                       failed.set(true);
-                               }
-                       }
-
-                       public void disconnect(Throwable t) {
-                               failed.set(true);
-                       }
 
+                                       @Override
+                                       protected void consumeConnectionClosed(Throwable throwable) {
+                                               failed.set(true);
+                                       }
+                               };
+                               return replySequence.send(clientGet).get();
+                       });
                }
 
        }