X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FFcpReplySequence.java;h=ea5d084eefa214b3fae58df7979db10985d146cf;hb=0c86b219616126f1a020e3b6a312156cad7c46ee;hp=0c54d7c6ef3edbd94679c49baf315307bdf8b4d8;hpb=075f351f114b8a58f82a8e2d2e295b987237d66e;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java index 0c54d7c..ea5d084 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java @@ -1,14 +1,13 @@ package net.pterodactylus.fcp.quelaton; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Supplier; import net.pterodactylus.fcp.AllData; import net.pterodactylus.fcp.BaseMessage; @@ -52,85 +51,73 @@ import net.pterodactylus.fcp.URIGenerated; import net.pterodactylus.fcp.UnknownNodeIdentifier; import net.pterodactylus.fcp.UnknownPeerNoteType; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + /** * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies. * * @author David ‘Bombe’ Roden */ -public class FcpReplySequence implements AutoCloseable, FcpListener { +public abstract class FcpReplySequence implements AutoCloseable, FcpListener { - private final ExecutorService executorService; + private final Object syncObject = new Object(); + private final ListeningExecutorService executorService; private final FcpConnection fcpConnection; - private final Map, Consumer> expectedMessageActions = new HashMap<>(); - private final List> unknownMessageHandlers = new ArrayList<>(); - private final List> closeHandlers = new ArrayList<>(); - private Supplier endPredicate; + private final Queue messages = new ConcurrentLinkedQueue<>(); + private final AtomicReference identifier = new AtomicReference<>(); + private final AtomicReference connectionFailureReason = new AtomicReference<>(); public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { - this.executorService = executorService; + this.executorService = MoreExecutors.listeningDecorator(executorService); this.fcpConnection = fcpConnection; } - public $1 handle(Class messageClass) { - return new $1<>(messageClass); - } - - public class $1 { - - private Class messageClass; - - private $1(Class messageClass) { - this.messageClass = messageClass; - } - - public FcpReplySequence with(Consumer action) { - expectedMessageActions.put(messageClass, (Consumer) action); - return FcpReplySequence.this; - } - + protected void setIdentifier(String identifier) { + this.identifier.set(identifier); } - public $2 handleUnknown() { - return new $2(); - } - - public class $2 { - - public FcpReplySequence with(Consumer consumer) { - unknownMessageHandlers.add(consumer); - return FcpReplySequence.this; - } + protected abstract boolean isFinished(); + public ListenableFuture send(FcpMessage fcpMessage) throws IOException { + setIdentifier(fcpMessage.getField("Identifier")); + fcpConnection.addFcpListener(this); + messages.add(fcpMessage); + return executorService.submit(() -> { + synchronized (syncObject) { + while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) { + while (messages.peek() != null) { + FcpMessage message = messages.poll(); + fcpConnection.sendMessage(message); + } + if (isFinished() || (connectionFailureReason.get() != null)) { + continue; + } + syncObject.wait(); + } + } + Throwable throwable = connectionFailureReason.get(); + if (throwable != null) { + throw new ExecutionException(throwable); + } + return getResult(); + }); } - public $3 handleClose() { - return new $3(); + protected void sendMessage(FcpMessage fcpMessage) { + messages.add(fcpMessage); + notifySyncObject(); } - public class $3 { - - public FcpReplySequence with(Consumer consumer) { - closeHandlers.add(consumer); - return FcpReplySequence.this; + private void notifySyncObject() { + synchronized (syncObject) { + syncObject.notifyAll(); } - } - public void waitFor(Supplier endPredicate) { - this.endPredicate = endPredicate; - } - - public Future send(FcpMessage fcpMessage) throws IOException { - fcpConnection.addFcpListener(this); - fcpConnection.sendMessage(fcpMessage); - return executorService.submit(() -> { - synchronized (endPredicate) { - while (!endPredicate.get()) { - endPredicate.wait(); - } - } - return null; - }); + protected R getResult() { + return null; } @Override @@ -138,230 +125,305 @@ public class FcpReplySequence implements AutoCloseable, FcpListener { fcpConnection.removeFcpListener(this); } - private void consume(Class fcpMessageClass, BaseMessage fcpMessage) { - if (expectedMessageActions.containsKey(fcpMessageClass)) { - expectedMessageActions.get(fcpMessageClass).accept(fcpMessage); - } - synchronized (endPredicate) { - endPredicate.notifyAll(); + private void consume(Consumer consumer, M message) { + consume(consumer, message, "Identifier"); + } + + private void consume(Consumer consumer, M message, + String identifier) { + if (Objects.equals(message.getField(identifier), this.identifier.get())) { + consumeAlways(consumer, message); } } + private void consumeAlways(Consumer consumer, M message) { + consumer.accept(message); + notifySyncObject(); + } + private void consumeUnknown(FcpMessage fcpMessage) { - for (Consumer unknownMessageHandler : unknownMessageHandlers) { - unknownMessageHandler.accept(fcpMessage); - } - synchronized (endPredicate) { - endPredicate.notifyAll(); - } + consumeUnknownMessage(fcpMessage); + notifySyncObject(); } private void consumeClose(Throwable throwable) { - for (Consumer closeHandler : closeHandlers) { - closeHandler.accept(throwable); - } - synchronized (endPredicate) { - endPredicate.notifyAll(); - } + connectionFailureReason.set(throwable); + notifySyncObject(); } @Override - public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) { - consume(NodeHello.class, nodeHello); + public final void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) { + consume(this::consumeNodeHello, nodeHello); } + protected void consumeNodeHello(NodeHello nodeHello) { } + @Override - public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, + public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { - consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName); + consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName); } + protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { } + @Override - public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { - consume(SSKKeypair.class, sskKeypair); + public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { + consume(this::consumeSSKKeypair, sskKeypair); } + protected void consumeSSKKeypair(SSKKeypair sskKeypair) { } + @Override - public void receivedPeer(FcpConnection fcpConnection, Peer peer) { - consume(Peer.class, peer); + public final void receivedPeer(FcpConnection fcpConnection, Peer peer) { + consume(this::consumePeer, peer); } + protected void consumePeer(Peer peer) { } + @Override - public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) { - consume(EndListPeers.class, endListPeers); + public final void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) { + consume(this::consumeEndListPeers, endListPeers); } + protected void consumeEndListPeers(EndListPeers endListPeers) { } + @Override - public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) { - consume(PeerNote.class, peerNote); + public final void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) { + consume(this::consumePeerNote, peerNote); } + protected void consumePeerNote(PeerNote peerNote) { } + @Override - public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) { - consume(EndListPeerNotes.class, endListPeerNotes); + public final void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) { + consume(this::consumeEndListPeerNotes, endListPeerNotes); } + protected void consumeEndListPeerNotes(EndListPeerNotes endListPeerNotes) { } + @Override - public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) { - consume(PeerRemoved.class, peerRemoved); + public final void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) { + consume(this::consumePeerRemoved, peerRemoved); } + protected void consumePeerRemoved(PeerRemoved peerRemoved) { } + @Override - public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) { - consume(NodeData.class, nodeData); + public final void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) { + consume(this::consumeNodeData, nodeData); } + protected void consumeNodeData(NodeData nodeData) { } + @Override - public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { - consume(TestDDAReply.class, testDDAReply); + public final void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { + consume(this::consumeTestDDAReply, testDDAReply, "Directory"); } + protected void consumeTestDDAReply(TestDDAReply testDDAReply) { } + @Override - public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { - consume(TestDDAComplete.class, testDDAComplete); + public final void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { + consume(this::consumeTestDDAComplete, testDDAComplete, "Directory"); } + protected void consumeTestDDAComplete(TestDDAComplete testDDAComplete) { } + @Override - public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) { - consume(PersistentGet.class, persistentGet); + public final void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) { + consume(this::consumePersistentGet, persistentGet); } + protected void consumePersistentGet(PersistentGet persistentGet) { } + @Override - public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) { - consume(PersistentPut.class, persistentPut); + public final void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) { + consume(this::consumePersistentPut, persistentPut); } + protected void consumePersistentPut(PersistentPut persistentPut) { } + @Override - public void receivedEndListPersistentRequests(FcpConnection fcpConnection, + public final void receivedEndListPersistentRequests(FcpConnection fcpConnection, EndListPersistentRequests endListPersistentRequests) { - consume(EndListPersistentRequests.class, endListPersistentRequests); + consume(this::consumeEndListPersistentRequests, endListPersistentRequests); } + protected void consumeEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { } + @Override - public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) { - consume(URIGenerated.class, uriGenerated); + public final void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) { + consume(this::consumeURIGenerated, uriGenerated); } + protected void consumeURIGenerated(URIGenerated uriGenerated) { } + @Override - public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) { - consume(DataFound.class, dataFound); + public final void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) { + consume(this::consumeDataFound, dataFound); } + protected void consumeDataFound(DataFound dataFound) { } + @Override - public void receivedAllData(FcpConnection fcpConnection, AllData allData) { - consume(AllData.class, allData); + public final void receivedAllData(FcpConnection fcpConnection, AllData allData) { + consume(this::consumeAllData, allData); } + protected void consumeAllData(AllData allData) { } + @Override - public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) { - consume(SimpleProgress.class, simpleProgress); + public final void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) { + consume(this::consumeSimpleProgress, simpleProgress); } + protected void consumeSimpleProgress(SimpleProgress simpleProgress) { } + @Override - public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) { - consume(StartedCompression.class, startedCompression); + public final void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) { + consume(this::consumeStartedCompression, startedCompression); } + protected void consumeStartedCompression(StartedCompression startedCompression) { } + @Override - public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) { - consume(FinishedCompression.class, finishedCompression); + public final void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) { + consume(this::consumeFinishedCompression, finishedCompression); } + protected void consumeFinishedCompression(FinishedCompression finishedCompression) { } + @Override - public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) { - consume(UnknownPeerNoteType.class, unknownPeerNoteType); + public final void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) { + consume(this::consumeUnknownPeerNoteType, unknownPeerNoteType); } + protected void consumeUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { } + @Override - public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection, + public final void receivedUnknownNodeIdentifier(FcpConnection fcpConnection, UnknownNodeIdentifier unknownNodeIdentifier) { - consume(UnknownNodeIdentifier.class, unknownNodeIdentifier); + consume(this::consumeUnknownNodeIdentifier, unknownNodeIdentifier); } + protected void consumeUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { } + @Override - public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) { - consume(ConfigData.class, configData); + public final void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) { + consume(this::consumeConfigData, configData); } + protected void consumeConfigData(ConfigData configData) { } + @Override - public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) { - consume(GetFailed.class, getFailed); + public final void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) { + consume(this::consumeGetFailed, getFailed); } + protected void consumeGetFailed(GetFailed getFailed) { } + @Override - public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) { - consume(PutFailed.class, putFailed); + public final void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) { + consume(this::consumePutFailed, putFailed); } + protected void consumePutFailed(PutFailed putFailed) { } + @Override - public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) { - consume(IdentifierCollision.class, identifierCollision); + public final void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) { + consume(this::consumeIdentifierCollision, identifierCollision); } + protected void consumeIdentifierCollision(IdentifierCollision identifierCollision) { } + @Override - public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) { - consume(PersistentPutDir.class, persistentPutDir); + public final void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) { + consume(this::consumePersistentPutDir, persistentPutDir); } + protected void consumePersistentPutDir(PersistentPutDir persistentPutDir) { } + @Override - public void receivedPersistentRequestRemoved(FcpConnection fcpConnection, + public final void receivedPersistentRequestRemoved(FcpConnection fcpConnection, PersistentRequestRemoved persistentRequestRemoved) { - consume(PersistentRequestRemoved.class, persistentRequestRemoved); + consume(this::consumePersistentRequestRemoved, persistentRequestRemoved); } + protected void consumePersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { } + @Override - public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) { - consume(SubscribedUSKUpdate.class, subscribedUSKUpdate); + public final void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) { + consume(this::consumeSubscribedUSKUpdate, subscribedUSKUpdate); } + protected void consumeSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { } + @Override - public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) { - consume(PluginInfo.class, pluginInfo); + public final void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) { + consume(this::consumePluginInfo, pluginInfo); } + protected void consumePluginInfo(PluginInfo pluginInfo) { } + @Override - public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) { - consume(FCPPluginReply.class, fcpPluginReply); + public final void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) { + consume(this::consumeFCPPluginReply, fcpPluginReply); } + protected void consumeFCPPluginReply(FCPPluginReply fcpPluginReply) { } + @Override - public void receivedPersistentRequestModified(FcpConnection fcpConnection, + public final void receivedPersistentRequestModified(FcpConnection fcpConnection, PersistentRequestModified persistentRequestModified) { - consume(PersistentRequestModified.class, persistentRequestModified); + consume(this::consumePersistentRequestModified, persistentRequestModified); } + protected void consumePersistentRequestModified(PersistentRequestModified persistentRequestModified) { } + @Override - public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) { - consume(PutSuccessful.class, putSuccessful); + public final void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) { + consume(this::consumePutSuccessful, putSuccessful); } + protected void consumePutSuccessful(PutSuccessful putSuccessful) { } + @Override - public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) { - consume(PutFetchable.class, putFetchable); + public final void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) { + consume(this::consumePutFetchable, putFetchable); } + protected void consumePutFetchable(PutFetchable putFetchable) { } + @Override - public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) { - consume(SentFeed.class, sentFeed); + public final void receivedSentFeed(FcpConnection source, SentFeed sentFeed) { + consume(this::consumeSentFeed, sentFeed); } + protected void consumeSentFeed(SentFeed sentFeed) { } + @Override - public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) { - consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed); + public final void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) { + consume(this::consumeReceivedBookmarkFeed, receivedBookmarkFeed); } + protected void consumeReceivedBookmarkFeed(ReceivedBookmarkFeed receivedBookmarkFeed) { } + @Override - public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) { - consume(ProtocolError.class, protocolError); + public final void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) { + consume(this::consumeProtocolError, protocolError); } + protected void consumeProtocolError(ProtocolError protocolError) { } + @Override - public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) { + public final void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) { consumeUnknown(fcpMessage); } + protected void consumeUnknownMessage(FcpMessage fcpMessage) { } + @Override - public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) { + public final void connectionClosed(FcpConnection fcpConnection, Throwable throwable) { consumeClose(throwable); }