Ignore identifier when consuming unknown messages
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / quelaton / FcpReplySequence.java
index 0c54d7c..ea5d084 100644 (file)
@@ -1,14 +1,13 @@
 package net.pterodactylus.fcp.quelaton;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 
 import net.pterodactylus.fcp.AllData;
 import net.pterodactylus.fcp.BaseMessage;
@@ -52,85 +51,73 @@ import net.pterodactylus.fcp.URIGenerated;
 import net.pterodactylus.fcp.UnknownNodeIdentifier;
 import net.pterodactylus.fcp.UnknownPeerNoteType;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
 /**
  * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
  *
  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
  */
-public class FcpReplySequence implements AutoCloseable, FcpListener {
+public abstract class FcpReplySequence<R> implements AutoCloseable, FcpListener {
 
-       private final ExecutorService executorService;
+       private final Object syncObject = new Object();
+       private final ListeningExecutorService executorService;
        private final FcpConnection fcpConnection;
-       private final Map<Class<? extends BaseMessage>, Consumer<BaseMessage>> expectedMessageActions = new HashMap<>();
-       private final List<Consumer<FcpMessage>> unknownMessageHandlers = new ArrayList<>();
-       private final List<Consumer<Throwable>> closeHandlers = new ArrayList<>();
-       private Supplier<Boolean> endPredicate;
+       private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
+       private final AtomicReference<String> identifier = new AtomicReference<>();
+       private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
 
        public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
-               this.executorService = executorService;
+               this.executorService = MoreExecutors.listeningDecorator(executorService);
                this.fcpConnection = fcpConnection;
        }
 
-       public <M extends BaseMessage> $1<M> handle(Class<M> messageClass) {
-               return new $1<>(messageClass);
-       }
-
-       public class $1<M extends BaseMessage> {
-
-               private Class<M> messageClass;
-
-               private $1(Class<M> messageClass) {
-                       this.messageClass = messageClass;
-               }
-
-               public FcpReplySequence with(Consumer<M> action) {
-                       expectedMessageActions.put(messageClass, (Consumer<BaseMessage>) action);
-                       return FcpReplySequence.this;
-               }
-
+       protected void setIdentifier(String identifier) {
+               this.identifier.set(identifier);
        }
 
-       public $2 handleUnknown() {
-               return new $2();
-       }
-
-       public class $2 {
-
-               public FcpReplySequence with(Consumer<FcpMessage> consumer) {
-                       unknownMessageHandlers.add(consumer);
-                       return FcpReplySequence.this;
-               }
+       protected abstract boolean isFinished();
 
+       public ListenableFuture<R> send(FcpMessage fcpMessage) throws IOException {
+               setIdentifier(fcpMessage.getField("Identifier"));
+               fcpConnection.addFcpListener(this);
+               messages.add(fcpMessage);
+               return executorService.submit(() -> {
+                       synchronized (syncObject) {
+                               while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) {
+                                       while (messages.peek() != null) {
+                                               FcpMessage message = messages.poll();
+                                               fcpConnection.sendMessage(message);
+                                       }
+                                       if (isFinished() || (connectionFailureReason.get() != null)) {
+                                               continue;
+                                       }
+                                       syncObject.wait();
+                               }
+                       }
+                       Throwable throwable = connectionFailureReason.get();
+                       if (throwable != null) {
+                               throw new ExecutionException(throwable);
+                       }
+                       return getResult();
+               });
        }
 
-       public $3 handleClose() {
-               return new $3();
+       protected void sendMessage(FcpMessage fcpMessage) {
+               messages.add(fcpMessage);
+               notifySyncObject();
        }
 
-       public class $3 {
-
-               public FcpReplySequence with(Consumer<Throwable> consumer) {
-                       closeHandlers.add(consumer);
-                       return FcpReplySequence.this;
+       private void notifySyncObject() {
+               synchronized (syncObject) {
+                       syncObject.notifyAll();
                }
-
        }
 
-       public void waitFor(Supplier<Boolean> endPredicate) {
-               this.endPredicate = endPredicate;
-       }
-
-       public Future<?> send(FcpMessage fcpMessage) throws IOException {
-               fcpConnection.addFcpListener(this);
-               fcpConnection.sendMessage(fcpMessage);
-               return executorService.submit(() -> {
-                       synchronized (endPredicate) {
-                               while (!endPredicate.get()) {
-                                       endPredicate.wait();
-                               }
-                       }
-                       return null;
-               });
+       protected R getResult() {
+               return null;
        }
 
        @Override
@@ -138,230 +125,305 @@ public class FcpReplySequence implements AutoCloseable, FcpListener {
                fcpConnection.removeFcpListener(this);
        }
 
-       private <M extends BaseMessage> void consume(Class<M> fcpMessageClass, BaseMessage fcpMessage) {
-               if (expectedMessageActions.containsKey(fcpMessageClass)) {
-                       expectedMessageActions.get(fcpMessageClass).accept(fcpMessage);
-               }
-               synchronized (endPredicate) {
-                       endPredicate.notifyAll();
+       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
+               consume(consumer, message, "Identifier");
+       }
+
+       private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
+                       String identifier) {
+               if (Objects.equals(message.getField(identifier), this.identifier.get())) {
+                       consumeAlways(consumer, message);
                }
        }
 
+       private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {
+               consumer.accept(message);
+               notifySyncObject();
+       }
+
        private void consumeUnknown(FcpMessage fcpMessage) {
-               for (Consumer<FcpMessage> unknownMessageHandler : unknownMessageHandlers) {
-                       unknownMessageHandler.accept(fcpMessage);
-               }
-               synchronized (endPredicate) {
-                       endPredicate.notifyAll();
-               }
+               consumeUnknownMessage(fcpMessage);
+               notifySyncObject();
        }
 
        private void consumeClose(Throwable throwable) {
-               for (Consumer<Throwable> closeHandler : closeHandlers) {
-                       closeHandler.accept(throwable);
-               }
-               synchronized (endPredicate) {
-                       endPredicate.notifyAll();
-               }
+               connectionFailureReason.set(throwable);
+               notifySyncObject();
        }
 
        @Override
-       public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
-               consume(NodeHello.class, nodeHello);
+       public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
+               consume(this::consumeNodeHello, nodeHello);
        }
 
+       protected void consumeNodeHello(NodeHello nodeHello) { }
+
        @Override
-       public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
+       public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
                        CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
-               consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName);
+               consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
        }
 
+       protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
+
        @Override
-       public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
-               consume(SSKKeypair.class, sskKeypair);
+       public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
+               consume(this::consumeSSKKeypair, sskKeypair);
        }
 
+       protected void consumeSSKKeypair(SSKKeypair sskKeypair) { }
+
        @Override
-       public void receivedPeer(FcpConnection fcpConnection, Peer peer) {
-               consume(Peer.class, peer);
+       public final void receivedPeer(FcpConnection fcpConnection, Peer peer) {
+               consume(this::consumePeer, peer);
        }
 
+       protected void consumePeer(Peer peer) { }
+
        @Override
-       public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
-               consume(EndListPeers.class, endListPeers);
+       public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
+               consume(this::consumeEndListPeers, endListPeers);
        }
 
+       protected void consumeEndListPeers(EndListPeers endListPeers) { }
+
        @Override
-       public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
-               consume(PeerNote.class, peerNote);
+       public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
+               consume(this::consumePeerNote, peerNote);
        }
 
+       protected void consumePeerNote(PeerNote peerNote) { }
+
        @Override
-       public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
-               consume(EndListPeerNotes.class, endListPeerNotes);
+       public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
+               consume(this::consumeEndListPeerNotes, endListPeerNotes);
        }
 
+       protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { }
+
        @Override
-       public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
-               consume(PeerRemoved.class, peerRemoved);
+       public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
+               consume(this::consumePeerRemoved, peerRemoved);
        }
 
+       protected void consumePeerRemoved(PeerRemoved peerRemoved) { }
+
        @Override
-       public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
-               consume(NodeData.class, nodeData);
+       public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
+               consume(this::consumeNodeData, nodeData);
        }
 
+       protected void consumeNodeData(NodeData nodeData) { }
+
        @Override
-       public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
-               consume(TestDDAReply.class, testDDAReply);
+       public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
+               consume(this::consumeTestDDAReply, testDDAReply, "Directory");
        }
 
+       protected void consumeTestDDAReply(TestDDAReply testDDAReply) { }
+
        @Override
-       public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
-               consume(TestDDAComplete.class, testDDAComplete);
+       public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
+               consume(this::consumeTestDDAComplete, testDDAComplete, "Directory");
        }
 
+       protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { }
+
        @Override
-       public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
-               consume(PersistentGet.class, persistentGet);
+       public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
+               consume(this::consumePersistentGet, persistentGet);
        }
 
+       protected void consumePersistentGet(PersistentGet persistentGet) { }
+
        @Override
-       public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
-               consume(PersistentPut.class, persistentPut);
+       public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
+               consume(this::consumePersistentPut, persistentPut);
        }
 
+       protected void consumePersistentPut(PersistentPut persistentPut) { }
+
        @Override
-       public void receivedEndListPersistentRequests(FcpConnection fcpConnection,
+       public final void receivedEndListPersistentRequests(FcpConnection fcpConnection,
                        EndListPersistentRequests endListPersistentRequests) {
-               consume(EndListPersistentRequests.class, endListPersistentRequests);
+               consume(this::consumeEndListPersistentRequests, endListPersistentRequests);
        }
 
+       protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { }
+
        @Override
-       public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
-               consume(URIGenerated.class, uriGenerated);
+       public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
+               consume(this::consumeURIGenerated, uriGenerated);
        }
 
+       protected void consumeURIGenerated(URIGenerated uriGenerated) { }
+
        @Override
-       public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
-               consume(DataFound.class, dataFound);
+       public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
+               consume(this::consumeDataFound, dataFound);
        }
 
+       protected void consumeDataFound(DataFound dataFound) { }
+
        @Override
-       public void receivedAllData(FcpConnection fcpConnection, AllData allData) {
-               consume(AllData.class, allData);
+       public final void receivedAllData(FcpConnection fcpConnection, AllData allData) {
+               consume(this::consumeAllData, allData);
        }
 
+       protected void consumeAllData(AllData allData) { }
+
        @Override
-       public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
-               consume(SimpleProgress.class, simpleProgress);
+       public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
+               consume(this::consumeSimpleProgress, simpleProgress);
        }
 
+       protected void consumeSimpleProgress(SimpleProgress simpleProgress) { }
+
        @Override
-       public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
-               consume(StartedCompression.class, startedCompression);
+       public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
+               consume(this::consumeStartedCompression, startedCompression);
        }
 
+       protected void consumeStartedCompression(StartedCompression startedCompression) { }
+
        @Override
-       public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
-               consume(FinishedCompression.class, finishedCompression);
+       public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
+               consume(this::consumeFinishedCompression, finishedCompression);
        }
 
+       protected void consumeFinishedCompression(FinishedCompression finishedCompression) { }
+
        @Override
-       public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
-               consume(UnknownPeerNoteType.class, unknownPeerNoteType);
+       public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
+               consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType);
        }
 
+       protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { }
+
        @Override
-       public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
+       public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
                        UnknownNodeIdentifier unknownNodeIdentifier) {
-               consume(UnknownNodeIdentifier.class, unknownNodeIdentifier);
+               consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier);
        }
 
+       protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { }
+
        @Override
-       public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
-               consume(ConfigData.class, configData);
+       public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
+               consume(this::consumeConfigData, configData);
        }
 
+       protected void consumeConfigData(ConfigData configData) { }
+
        @Override
-       public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
-               consume(GetFailed.class, getFailed);
+       public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
+               consume(this::consumeGetFailed, getFailed);
        }
 
+       protected void consumeGetFailed(GetFailed getFailed) { }
+
        @Override
-       public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
-               consume(PutFailed.class, putFailed);
+       public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
+               consume(this::consumePutFailed, putFailed);
        }
 
+       protected void consumePutFailed(PutFailed putFailed) { }
+
        @Override
-       public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
-               consume(IdentifierCollision.class, identifierCollision);
+       public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
+               consume(this::consumeIdentifierCollision, identifierCollision);
        }
 
+       protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { }
+
        @Override
-       public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
-               consume(PersistentPutDir.class, persistentPutDir);
+       public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
+               consume(this::consumePersistentPutDir, persistentPutDir);
        }
 
+       protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { }
+
        @Override
-       public void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
+       public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
                        PersistentRequestRemoved persistentRequestRemoved) {
-               consume(PersistentRequestRemoved.class, persistentRequestRemoved);
+               consume(this::consumePersistentRequestRemoved, persistentRequestRemoved);
        }
 
+       protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { }
+
        @Override
-       public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
-               consume(SubscribedUSKUpdate.class, subscribedUSKUpdate);
+       public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+               consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate);
        }
 
+       protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { }
+
        @Override
-       public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
-               consume(PluginInfo.class, pluginInfo);
+       public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
+               consume(this::consumePluginInfo, pluginInfo);
        }
 
+       protected void consumePluginInfo(PluginInfo pluginInfo) { }
+
        @Override
-       public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
-               consume(FCPPluginReply.class, fcpPluginReply);
+       public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
+               consume(this::consumeFCPPluginReply, fcpPluginReply);
        }
 
+       protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { }
+
        @Override
-       public void receivedPersistentRequestModified(FcpConnection fcpConnection,
+       public final void receivedPersistentRequestModified(FcpConnection fcpConnection,
                        PersistentRequestModified persistentRequestModified) {
-               consume(PersistentRequestModified.class, persistentRequestModified);
+               consume(this::consumePersistentRequestModified, persistentRequestModified);
        }
 
+       protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { }
+
        @Override
-       public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
-               consume(PutSuccessful.class, putSuccessful);
+       public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
+               consume(this::consumePutSuccessful, putSuccessful);
        }
 
+       protected void consumePutSuccessful(PutSuccessful putSuccessful) { }
+
        @Override
-       public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
-               consume(PutFetchable.class, putFetchable);
+       public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
+               consume(this::consumePutFetchable, putFetchable);
        }
 
+       protected void consumePutFetchable(PutFetchable putFetchable) { }
+
        @Override
-       public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
-               consume(SentFeed.class, sentFeed);
+       public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
+               consume(this::consumeSentFeed, sentFeed);
        }
 
+       protected void consumeSentFeed(SentFeed sentFeed) { }
+
        @Override
-       public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
-               consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed);
+       public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
+               consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed);
        }
 
+       protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { }
+
        @Override
-       public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
-               consume(ProtocolError.class, protocolError);
+       public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
+               consume(this::consumeProtocolError, protocolError);
        }
 
+       protected void consumeProtocolError(ProtocolError protocolError) { }
+
        @Override
-       public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+       public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
                consumeUnknown(fcpMessage);
        }
 
+       protected void consumeUnknownMessage(FcpMessage fcpMessage) { }
+
        @Override
-       public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
+       public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
                consumeClose(throwable);
        }