Rework FcpReplySequence into an abstract base class
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / DefaultFcpClient.java
index 603b150..3a33730 100644 (file)
@@ -3,7 +3,6 @@ package net.pterodactylus.fcp.quelaton;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -25,8 +24,6 @@ import net.pterodactylus.fcp.Priority;
 import net.pterodactylus.fcp.ReturnType;
 import net.pterodactylus.fcp.SSKKeypair;
 
-import com.google.common.io.ByteStreams;
-
 /**
  * Default {@link FcpClient} implementation.
  *
@@ -60,16 +57,25 @@ public class DefaultFcpClient implements FcpClient {
        private FcpConnection createConnection() throws IOException {
                FcpConnection connection = new FcpConnection(hostname, port);
                connection.connect();
-               AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
-               AtomicBoolean receivedClosed = new AtomicBoolean();
-               FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
-               nodeHelloSequence
-                               .handle(NodeHello.class)
-                               .with((nodeHello) -> receivedNodeHello.set(nodeHello));
-               nodeHelloSequence
-                               .handle(CloseConnectionDuplicateClientName.class)
-                               .with((closeConnection) -> receivedClosed.set(true));
-               nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
+               FcpReplySequence<?> nodeHelloSequence = new FcpReplySequence<Void>(threadPool, connection) {
+                       private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
+                       private final AtomicBoolean receivedClosed = new AtomicBoolean();
+                       @Override
+                       protected boolean isFinished() {
+                               return receivedNodeHello.get() != null || receivedClosed.get();
+                       }
+
+                       @Override
+                       protected void consumeNodeHello(NodeHello nodeHello) {
+                               receivedNodeHello.set(nodeHello);
+                       }
+
+                       @Override
+                       protected void consumeCloseConnectionDuplicateClientName(
+                               CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+                               receivedClosed.set(true);
+                       }
+               };
                ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
                try {
                        nodeHelloSequence.send(clientHello).get();
@@ -91,31 +97,25 @@ public class DefaultFcpClient implements FcpClient {
                public Future<FcpKeyPair> execute() {
                        return threadPool.submit(() -> {
                                connect();
-                               Sequence sequence = new Sequence();
-                               FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
-                               replySequence.handle(SSKKeypair.class).with(sequence::handleSSKKeypair);
-                               replySequence.waitFor(sequence::isFinished);
-                               replySequence.send(new GenerateSSK()).get();
-                               return sequence.getKeyPair();
-                       });
-               }
-
-               private class Sequence {
-
-                       private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
+                               return new FcpReplySequence<FcpKeyPair>(threadPool, fcpConnection.get()) {
+                                       private AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
 
-                       public void handleSSKKeypair(SSKKeypair sskKeypair) {
-                               keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
-                       }
-
-                       public boolean isFinished() {
-                               return keyPair.get() != null;
-                       }
+                                       @Override
+                                       protected boolean isFinished() {
+                                               return keyPair.get() != null;
+                                       }
 
-                       public FcpKeyPair getKeyPair() {
-                               return keyPair.get();
-                       }
+                                       @Override
+                                       protected FcpKeyPair getResult() {
+                                               return keyPair.get();
+                                       }
 
+                                       @Override
+                                       protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
+                                               keyPair.set(new FcpKeyPair(sskKeypair.getRequestURI(), sskKeypair.getInsertURI()));
+                                       }
+                               }.send(new GenerateSSK()).get();
+                       });
                }
 
        }
@@ -179,11 +179,6 @@ public class DefaultFcpClient implements FcpClient {
 
                @Override
                public Future<Optional<Data>> uri(String uri) {
-                       return threadPool.submit(() -> execute(uri));
-               }
-
-               private Optional<Data> execute(String uri) throws IOException, ExecutionException, InterruptedException {
-                       DefaultFcpClient.this.connect();
                        ClientGet clientGet = new ClientGet(uri, identifier, ReturnType.direct);
                        if (ignoreDataStore) {
                                clientGet.setIgnoreDataStore(true);
@@ -203,91 +198,74 @@ public class DefaultFcpClient implements FcpClient {
                        if (global) {
                                clientGet.setGlobal(true);
                        }
-                       try (FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get())) {
-                               Sequence sequence = new Sequence(identifier);
-                               replySequence.handle(AllData.class).with(sequence::allData);
-                               replySequence.handle(GetFailed.class).with(sequence::getFailed);
-                               replySequence.handleClose().with(sequence::disconnect);
-                               replySequence.waitFor(sequence::isFinished);
-                               replySequence.send(clientGet).get();
-                               return sequence.isSuccessful() ? Optional.of(sequence.getData()) : Optional.empty();
-                       }
-               }
-
-               private class Sequence {
-
-                       private final AtomicBoolean finished = new AtomicBoolean();
-                       private final AtomicBoolean failed = new AtomicBoolean();
-
-                       private final String identifier;
-
-                       private String contentType;
-                       private long dataLength;
-                       private InputStream payload;
-
-                       private Sequence(String identifier) {
-                               this.identifier = identifier;
-                       }
+                       return threadPool.submit(() -> {
+                               connect();
+                               FcpReplySequence<Optional<Data>> replySequence = new FcpReplySequence<Optional<Data>>(threadPool, fcpConnection.get()) {
+                                       private final AtomicBoolean finished = new AtomicBoolean();
+                                       private final AtomicBoolean failed = new AtomicBoolean();
 
-                       public boolean isFinished() {
-                               return finished.get() || failed.get();
-                       }
+                                       private final String identifier = ClientGetCommandImpl.this.identifier;
 
-                       public boolean isSuccessful() {
-                               return !failed.get();
-                       }
+                                       private String contentType;
+                                       private long dataLength;
+                                       private InputStream payload;
 
-                       public Data getData() {
-                               return new Data() {
                                        @Override
-                                       public String getMimeType() {
-                                               synchronized (Sequence.this) {
-                                                       return contentType;
-                                               }
+                                       protected boolean isFinished() {
+                                               return finished.get() || failed.get();
                                        }
 
                                        @Override
-                                       public long size() {
-                                               synchronized (Sequence.this) {
-                                                       return dataLength;
-                                               }
+                                       protected Optional<Data> getResult() {
+                                               return failed.get() ? Optional.empty() : Optional.of(new Data() {
+                                                       @Override
+                                                       public String getMimeType() {
+                                                               return contentType;
+                                                       }
+
+                                                       @Override
+                                                       public long size() {
+                                                               return dataLength;
+                                                       }
+
+                                                       @Override
+                                                       public InputStream getInputStream() {
+                                                               return payload;
+                                                       }
+                                               });
                                        }
 
                                        @Override
-                                       public InputStream getInputStream() {
-                                               synchronized (Sequence.this) {
-                                                       return payload;
+                                       protected void consumeAllData(AllData allData) {
+                                               if (allData.getIdentifier().equals(identifier)) {
+                                                       synchronized (this) {
+                                                               contentType = allData.getContentType();
+                                                               dataLength = allData.getDataLength();
+                                                               try {
+                                                                       payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
+                                                                       finished.set(true);
+                                                               } catch (IOException e) {
+                                                                       // TODO – logging
+                                                                       failed.set(true);
+                                                               }
+                                                       }
                                                }
                                        }
-                               };
-                       }
 
-                       public void allData(AllData allData) {
-                               if (allData.getIdentifier().equals(identifier)) {
-                                       synchronized (this) {
-                                               contentType = allData.getContentType();
-                                               dataLength = allData.getDataLength();
-                                               try {
-                                                       payload = new TempInputStream(allData.getPayloadInputStream(), dataLength);
-                                                       finished.set(true);
-                                               } catch (IOException e) {
-                                                       // TODO – logging
+                                       @Override
+                                       protected void consumeGetFailed(GetFailed getFailed) {
+                                               if (getFailed.getIdentifier().equals(identifier)) {
                                                        failed.set(true);
                                                }
                                        }
-                               }
-                       }
-
-                       public void getFailed(GetFailed getFailed) {
-                               if (getFailed.getIdentifier().equals(identifier)) {
-                                       failed.set(true);
-                               }
-                       }
-
-                       public void disconnect(Throwable t) {
-                               failed.set(true);
-                       }
 
+                                       @Override
+                                       protected void consumeConnectionClosed(Throwable throwable) {
+                                               failed.set(true);
+                                       }
+                               };
+                               return replySequence.send(clientGet).get();
+                       });
                }
 
        }