From 3ccb8ad5bde9b6dbdd88719ef52da13deb2eff59 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Tue, 7 Feb 2023 16:40:16 +0100 Subject: [PATCH] =?utf8?q?=E2=99=BB=EF=B8=8F=20Turn=20FcpConnection=20into?= =?utf8?q?=20an=20interface?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This will break all existing clients which is why this happens on a new branch that will eventually turn into version 0.2. --- .../pterodactylus/fcp/DefaultFcpConnection.java | 461 +++++++++++++++++++++ .../java/net/pterodactylus/fcp/FcpConnection.java | 334 +-------------- .../pterodactylus/fcp/FcpConnectionHandler.java | 167 -------- .../net/pterodactylus/fcp/highlevel/FcpClient.java | 7 +- 4 files changed, 483 insertions(+), 486 deletions(-) create mode 100644 src/main/java/net/pterodactylus/fcp/DefaultFcpConnection.java delete mode 100644 src/main/java/net/pterodactylus/fcp/FcpConnectionHandler.java diff --git a/src/main/java/net/pterodactylus/fcp/DefaultFcpConnection.java b/src/main/java/net/pterodactylus/fcp/DefaultFcpConnection.java new file mode 100644 index 0000000..6890df4 --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/DefaultFcpConnection.java @@ -0,0 +1,461 @@ +/* + * jFCPlib - FcpConnection.java - Copyright © 2008–2023 David Roden + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package net.pterodactylus.fcp; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.pterodactylus.fcp.FcpUtils.TempInputStream; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Default {@link FcpConnection} implementation. + */ +public class DefaultFcpConnection implements FcpConnection { + + /** Logger. */ + private static final Logger logger = Logger.getLogger(DefaultFcpConnection.class.getName()); + + /** The default port for FCP v2. */ + public static final int DEFAULT_PORT = 9481; + + /** Listener management. */ + private final FcpListenerManager fcpListenerManager = new FcpListenerManager(this); + + /** The address of the node. */ + private final InetAddress address; + + /** The port number of the node’s FCP port. */ + private final int port; + + /** The remote socket. */ + private Socket remoteSocket; + + /** The input stream from the node. */ + private InputStream remoteInputStream; + + /** The output stream to the node. */ + private OutputStream remoteOutputStream; + + /** The connection handler. */ + private FcpConnectionHandler connectionHandler; + + /** Incoming message statistics. */ + private static final Map incomingMessageStatistics = Collections.synchronizedMap(new HashMap<>()); + + /** + * Creates a new FCP connection to the freenet node running on localhost, + * using the default port. + * + * @throws UnknownHostException + * if the hostname can not be resolved + */ + public DefaultFcpConnection() throws UnknownHostException { + this(InetAddress.getLocalHost()); + } + + /** + * Creates a new FCP connection to the Freenet node running on the given + * host, listening on the default port. + * + * @param host + * The hostname of the Freenet node + * @throws UnknownHostException + * if host can not be resolved + */ + public DefaultFcpConnection(String host) throws UnknownHostException { + this(host, DEFAULT_PORT); + } + + /** + * Creates a new FCP connection to the Freenet node running on the given + * host, listening on the given port. + * + * @param host + * The hostname of the Freenet node + * @param port + * The port number of the node’s FCP port + * @throws UnknownHostException + * if host can not be resolved + */ + public DefaultFcpConnection(String host, int port) throws UnknownHostException { + this(InetAddress.getByName(host), port); + } + + /** + * Creates a new FCP connection to the Freenet node running at the given + * address, listening on the default port. + * + * @param address + * The address of the Freenet node + */ + public DefaultFcpConnection(InetAddress address) { + this(address, DEFAULT_PORT); + } + + /** + * Creates a new FCP connection to the Freenet node running at the given + * address, listening on the given port. + * + * @param address + * The address of the Freenet node + * @param port + * The port number of the node’s FCP port + */ + public DefaultFcpConnection(InetAddress address, int port) { + this.address = address; + this.port = port; + } + + // + // LISTENER MANAGEMENT + // + + @Override + public void addFcpListener(FcpListener fcpListener) { + fcpListenerManager.addListener(fcpListener); + } + + @Override + public void removeFcpListener(FcpListener fcpListener) { + fcpListenerManager.removeListener(fcpListener); + } + + @Override + public synchronized boolean isClosed() { + return connectionHandler == null; + } + + // + // ACTIONS + // + + @Override + public synchronized void connect() throws IOException, IllegalStateException { + if (connectionHandler != null) { + throw new IllegalStateException("already connected, disconnect first"); + } + logger.info("connecting to " + address + ":" + port + "…"); + remoteSocket = new Socket(address, port); + remoteInputStream = remoteSocket.getInputStream(); + remoteOutputStream = remoteSocket.getOutputStream(); + new Thread(connectionHandler = new FcpConnectionHandler(remoteInputStream)).start(); + } + + @Override + @Deprecated + public synchronized void disconnect() { + close(); + } + + @Override + public void close() { + handleDisconnect(null); + } + + @Override + public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException { + logger.fine("sending message: " + fcpMessage.getName()); + fcpMessage.write(remoteOutputStream); + } + + // + // PACKAGE-PRIVATE METHODS + // + + /** + * Handles the given message, notifying listeners. This message should only + * be called by {@link FcpConnectionHandler}. + * + * @param fcpMessage + * The received message + */ + private void handleMessage(FcpMessage fcpMessage) throws IOException{ + logger.fine("received message: " + fcpMessage.getName()); + String messageName = fcpMessage.getName(); + countMessage(messageName); + if ("SimpleProgress".equals(messageName)) { + fcpListenerManager.fireReceivedSimpleProgress(new SimpleProgress(fcpMessage)); + } else if ("ProtocolError".equals(messageName)) { + fcpListenerManager.fireReceivedProtocolError(new ProtocolError(fcpMessage)); + } else if ("PersistentGet".equals(messageName)) { + fcpListenerManager.fireReceivedPersistentGet(new PersistentGet(fcpMessage)); + } else if ("PersistentPut".equals(messageName)) { + fcpListenerManager.fireReceivedPersistentPut(new PersistentPut(fcpMessage)); + } else if ("PersistentPutDir".equals(messageName)) { + fcpListenerManager.fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage)); + } else if ("URIGenerated".equals(messageName)) { + fcpListenerManager.fireReceivedURIGenerated(new URIGenerated(fcpMessage)); + } else if ("EndListPersistentRequests".equals(messageName)) { + fcpListenerManager.fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage)); + } else if ("Peer".equals(messageName)) { + fcpListenerManager.fireReceivedPeer(new Peer(fcpMessage)); + } else if ("PeerNote".equals(messageName)) { + fcpListenerManager.fireReceivedPeerNote(new PeerNote(fcpMessage)); + } else if ("StartedCompression".equals(messageName)) { + fcpListenerManager.fireReceivedStartedCompression(new StartedCompression(fcpMessage)); + } else if ("FinishedCompression".equals(messageName)) { + fcpListenerManager.fireReceivedFinishedCompression(new FinishedCompression(fcpMessage)); + } else if ("GetFailed".equals(messageName)) { + fcpListenerManager.fireReceivedGetFailed(new GetFailed(fcpMessage)); + } else if ("PutFetchable".equals(messageName)) { + fcpListenerManager.fireReceivedPutFetchable(new PutFetchable(fcpMessage)); + } else if ("PutSuccessful".equals(messageName)) { + fcpListenerManager.fireReceivedPutSuccessful(new PutSuccessful(fcpMessage)); + } else if ("PutFailed".equals(messageName)) { + fcpListenerManager.fireReceivedPutFailed(new PutFailed(fcpMessage)); + } else if ("DataFound".equals(messageName)) { + fcpListenerManager.fireReceivedDataFound(new DataFound(fcpMessage)); + } else if ("SubscribedUSKUpdate".equals(messageName)) { + fcpListenerManager.fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage)); + } else if ("SubscribedUSK".equals(messageName)) { + fcpListenerManager.fireReceivedSubscribedUSK(new SubscribedUSK(fcpMessage)); + } else if ("IdentifierCollision".equals(messageName)) { + fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage)); + } else if ("AllData".equals(messageName)) { + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream)); + } else if ("EndListPeerNotes".equals(messageName)) { + fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage)); + } else if ("EndListPeers".equals(messageName)) { + fcpListenerManager.fireReceivedEndListPeers(new EndListPeers(fcpMessage)); + } else if ("SSKKeypair".equals(messageName)) { + fcpListenerManager.fireReceivedSSKKeypair(new SSKKeypair(fcpMessage)); + } else if ("PeerRemoved".equals(messageName)) { + fcpListenerManager.fireReceivedPeerRemoved(new PeerRemoved(fcpMessage)); + } else if ("PersistentRequestModified".equals(messageName)) { + fcpListenerManager.fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage)); + } else if ("PersistentRequestRemoved".equals(messageName)) { + fcpListenerManager.fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage)); + } else if ("UnknownPeerNoteType".equals(messageName)) { + fcpListenerManager.fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage)); + } else if ("UnknownNodeIdentifier".equals(messageName)) { + fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage)); + } else if ("FCPPluginReply".equals(messageName)) { + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"), 0)); + fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream)); + } else if ("PluginInfo".equals(messageName)) { + fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage)); + } else if ("PluginRemoved".equals(messageName)) { + fcpListenerManager.fireReceivedPluginRemoved(new PluginRemoved(fcpMessage)); + } else if ("NodeData".equals(messageName)) { + fcpListenerManager.fireReceivedNodeData(new NodeData(fcpMessage)); + } else if ("TestDDAReply".equals(messageName)) { + fcpListenerManager.fireReceivedTestDDAReply(new TestDDAReply(fcpMessage)); + } else if ("TestDDAComplete".equals(messageName)) { + fcpListenerManager.fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage)); + } else if ("ConfigData".equals(messageName)) { + fcpListenerManager.fireReceivedConfigData(new ConfigData(fcpMessage)); + } else if ("NodeHello".equals(messageName)) { + fcpListenerManager.fireReceivedNodeHello(new NodeHello(fcpMessage)); + } else if ("CloseConnectionDuplicateClientName".equals(messageName)) { + fcpListenerManager.fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage)); + } else if ("SentFeed".equals(messageName)) { + fcpListenerManager.fireSentFeed(new SentFeed(fcpMessage)); + } else if ("ReceivedBookmarkFeed".equals(messageName)) { + fcpListenerManager.fireReceivedBookmarkFeed(new ReceivedBookmarkFeed(fcpMessage)); + } else { + fcpListenerManager.fireMessageReceived(fcpMessage); + } + } + + /** + * Handles a disconnect from the node. + * + * @param throwable + * The exception that caused the disconnect, or + * null if there was no exception + */ + private synchronized void handleDisconnect(Throwable throwable) { + FcpUtils.close(remoteInputStream); + FcpUtils.close(remoteOutputStream); + FcpUtils.close(remoteSocket); + if (connectionHandler != null) { + connectionHandler.stop(); + connectionHandler = null; + fcpListenerManager.fireConnectionClosed(throwable); + } + } + + // + // PRIVATE METHODS + // + + /** + * Incremets the counter in {@link #incomingMessageStatistics} by + * 1 for the given message name. + * + * @param name + * The name of the message to count + */ + private void countMessage(String name) { + int oldValue = 0; + if (incomingMessageStatistics.containsKey(name)) { + oldValue = incomingMessageStatistics.get(name); + } + incomingMessageStatistics.put(name, oldValue + 1); + logger.finest("count for " + name + ": " + (oldValue + 1)); + } + + private synchronized InputStream getInputStream(long dataLength) throws IOException { + return new TempInputStream(remoteInputStream, dataLength); + } + + /** + * Handles an FCP connection to a node. + */ + class FcpConnectionHandler implements Runnable { + + /** The logger. */ + private final Logger logger = Logger.getLogger(FcpConnectionHandler.class.getName()); + + /** The input stream from the node. */ + private final InputStream remoteInputStream; + + /** Whether to stop the connection handler. */ + private boolean shouldStop; + + /** Whether the next read line feed should be ignored. */ + private boolean ignoreNextLinefeed; + + /** + * Creates a new connection handler that operates on the given input stream. + * + * @param remoteInputStream + * The input stream from the node + */ + public FcpConnectionHandler(InputStream remoteInputStream) { + this.remoteInputStream = remoteInputStream; + } + + /** + * {@inheritDoc} + */ + @Override + public void run() { + FcpMessage fcpMessage = null; + Throwable throwable = null; + while (true) { + synchronized (this) { + if (shouldStop) { + break; + } + } + try { + String line = readLine(); + logger.log(Level.FINEST, String.format("read line: %1$s", line)); + if (line == null) { + throwable = new EOFException(); + break; + } + if (line.length() == 0) { + continue; + } + line = line.trim(); + if (fcpMessage == null) { + fcpMessage = new FcpMessage(line); + continue; + } + if ("EndMessage".equalsIgnoreCase(line) || "Data".equalsIgnoreCase(line)) { + handleMessage(fcpMessage); + fcpMessage = null; + } + int equalSign = line.indexOf('='); + if (equalSign == -1) { + /* something's fishy! */ + continue; + } + String field = line.substring(0, equalSign); + String value = line.substring(equalSign + 1); + assert fcpMessage != null: "fcp message is null"; + fcpMessage.setField(field, value); + } catch (IOException ioe1) { + throwable = ioe1; + break; + } + } + handleDisconnect(throwable); + } + + /** + * Stops the connection handler. + */ + public void stop() { + synchronized (this) { + shouldStop = true; + } + } + + // + // PRIVATE METHODS + // + + /** + * Reads bytes from {@link #remoteInputStream} until ‘\r’ or ‘\n’ are + * encountered and decodes the read bytes using UTF-8. + * + * @return The decoded line + * @throws IOException + * if an I/O error occurs + */ + private String readLine() throws IOException { + byte[] readBytes = new byte[512]; + int readIndex = 0; + while (true) { + int nextByte = remoteInputStream.read(); + if (nextByte == -1) { + if (readIndex == 0) { + return null; + } + break; + } + if (nextByte == 10) { + if (!ignoreNextLinefeed) { + break; + } + } + ignoreNextLinefeed = false; + if (nextByte == 13) { + ignoreNextLinefeed = true; + break; + } + if (readIndex == readBytes.length) { + /* recopy & enlarge array */ + byte[] newReadBytes = new byte[readBytes.length * 2]; + System.arraycopy(readBytes, 0, newReadBytes, 0, readBytes.length); + readBytes = newReadBytes; + } + readBytes[readIndex++] = (byte) nextByte; + } + return new String(readBytes, UTF_8); + } + + } + +} diff --git a/src/main/java/net/pterodactylus/fcp/FcpConnection.java b/src/main/java/net/pterodactylus/fcp/FcpConnection.java index 3331bdf..a26e3e8 100644 --- a/src/main/java/net/pterodactylus/fcp/FcpConnection.java +++ b/src/main/java/net/pterodactylus/fcp/FcpConnection.java @@ -1,188 +1,41 @@ -/* - * jFCPlib - FcpConnection.java - Copyright © 2008–2016 David Roden - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - package net.pterodactylus.fcp; import java.io.Closeable; -import java.io.FilterInputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.Socket; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.logging.Logger; - -import net.pterodactylus.fcp.FcpUtils.TempInputStream; /** * An FCP connection to a Freenet node. - * - * @author David ‘Bombe’ Roden <bombe@freenetproject.org> */ -public class FcpConnection implements Closeable { - - /** Logger. */ - private static final Logger logger = Logger.getLogger(FcpConnection.class.getName()); - - /** The default port for FCP v2. */ - public static final int DEFAULT_PORT = 9481; - - /** Listener management. */ - private final FcpListenerManager fcpListenerManager = new FcpListenerManager(this); - - /** The address of the node. */ - private final InetAddress address; - - /** The port number of the node’s FCP port. */ - private final int port; - - /** The remote socket. */ - private Socket remoteSocket; - - /** The input stream from the node. */ - private InputStream remoteInputStream; - - /** The output stream to the node. */ - private OutputStream remoteOutputStream; - - /** The connection handler. */ - private FcpConnectionHandler connectionHandler; - - /** Incoming message statistics. */ - private static final Map incomingMessageStatistics = Collections.synchronizedMap(new HashMap()); +public interface FcpConnection extends Closeable { /** - * Creates a new FCP connection to the freenet node running on localhost, - * using the default port. - * - * @throws UnknownHostException - * if the hostname can not be resolved - */ - public FcpConnection() throws UnknownHostException { - this(InetAddress.getLocalHost()); - } - - /** - * Creates a new FCP connection to the Freenet node running on the given - * host, listening on the default port. - * - * @param host - * The hostname of the Freenet node - * @throws UnknownHostException - * if host can not be resolved - */ - public FcpConnection(String host) throws UnknownHostException { - this(host, DEFAULT_PORT); - } - - /** - * Creates a new FCP connection to the Freenet node running on the given - * host, listening on the given port. - * - * @param host - * The hostname of the Freenet node - * @param port - * The port number of the node’s FCP port - * @throws UnknownHostException - * if host can not be resolved - */ - public FcpConnection(String host, int port) throws UnknownHostException { - this(InetAddress.getByName(host), port); - } - - /** - * Creates a new FCP connection to the Freenet node running at the given - * address, listening on the default port. - * - * @param address - * The address of the Freenet node - */ - public FcpConnection(InetAddress address) { - this(address, DEFAULT_PORT); - } - - /** - * Creates a new FCP connection to the Freenet node running at the given - * address, listening on the given port. + * Adds the given listener to the list of listeners. * - * @param address - * The address of the Freenet node - * @param port - * The port number of the node’s FCP port + * @param fcpListener The listener to add */ - public FcpConnection(InetAddress address, int port) { - this.address = address; - this.port = port; - } - - // - // LISTENER MANAGEMENT - // + void addFcpListener(FcpListener fcpListener); /** - * Adds the given listener to the list of listeners. + * Removes the given listener from the list of listeners. * - * @param fcpListener - * The listener to add + * @param fcpListener The listener to remove */ - public void addFcpListener(FcpListener fcpListener) { - fcpListenerManager.addListener(fcpListener); - } + void removeFcpListener(FcpListener fcpListener); /** - * Removes the given listener from the list of listeners. + * Returns whether this connection has been closed, either by {@link #close()} or by the remote side. * - * @param fcpListener - * The listener to remove + * @return {@code true} if this connection is closed, {@code false} otherwise */ - public void removeFcpListener(FcpListener fcpListener) { - fcpListenerManager.removeListener(fcpListener); - } - - public synchronized boolean isClosed() { - return connectionHandler == null; - } - - // - // ACTIONS - // + boolean isClosed(); /** * Connects to the node. * - * @throws IOException - * if an I/O error occurs - * @throws IllegalStateException - * if there is already a connection to the node + * @throws IOException if an I/O error occurs + * @throws IllegalStateException if there is already a connection to the node */ - public synchronized void connect() throws IOException, IllegalStateException { - if (connectionHandler != null) { - throw new IllegalStateException("already connected, disconnect first"); - } - logger.info("connecting to " + address + ":" + port + "…"); - remoteSocket = new Socket(address, port); - remoteInputStream = remoteSocket.getInputStream(); - remoteOutputStream = remoteSocket.getOutputStream(); - new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start(); - } + void connect() throws IOException, IllegalStateException; /** * Disconnects from the node. If there is no connection to the node, this @@ -191,172 +44,21 @@ public class FcpConnection implements Closeable { * @deprecated Use {@link #close()} instead */ @Deprecated - public synchronized void disconnect() { - close(); - } + void disconnect(); /** * Closes the connection. If there is no connection to the node, this * method does nothing. */ @Override - public void close() { - handleDisconnect(null); - } + void close(); /** * Sends the given FCP message. * - * @param fcpMessage - * The FCP message to send - * @throws IOException - * if an I/O error occurs + * @param fcpMessage The FCP message to send + * @throws IOException if an I/O error occurs */ - public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException { - logger.fine("sending message: " + fcpMessage.getName()); - fcpMessage.write(remoteOutputStream); - } - - // - // PACKAGE-PRIVATE METHODS - // - - /** - * Handles the given message, notifying listeners. This message should only - * be called by {@link FcpConnectionHandler}. - * - * @param fcpMessage - * The received message - */ - void handleMessage(FcpMessage fcpMessage) throws IOException{ - logger.fine("received message: " + fcpMessage.getName()); - String messageName = fcpMessage.getName(); - countMessage(messageName); - if ("SimpleProgress".equals(messageName)) { - fcpListenerManager.fireReceivedSimpleProgress(new SimpleProgress(fcpMessage)); - } else if ("ProtocolError".equals(messageName)) { - fcpListenerManager.fireReceivedProtocolError(new ProtocolError(fcpMessage)); - } else if ("PersistentGet".equals(messageName)) { - fcpListenerManager.fireReceivedPersistentGet(new PersistentGet(fcpMessage)); - } else if ("PersistentPut".equals(messageName)) { - fcpListenerManager.fireReceivedPersistentPut(new PersistentPut(fcpMessage)); - } else if ("PersistentPutDir".equals(messageName)) { - fcpListenerManager.fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage)); - } else if ("URIGenerated".equals(messageName)) { - fcpListenerManager.fireReceivedURIGenerated(new URIGenerated(fcpMessage)); - } else if ("EndListPersistentRequests".equals(messageName)) { - fcpListenerManager.fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage)); - } else if ("Peer".equals(messageName)) { - fcpListenerManager.fireReceivedPeer(new Peer(fcpMessage)); - } else if ("PeerNote".equals(messageName)) { - fcpListenerManager.fireReceivedPeerNote(new PeerNote(fcpMessage)); - } else if ("StartedCompression".equals(messageName)) { - fcpListenerManager.fireReceivedStartedCompression(new StartedCompression(fcpMessage)); - } else if ("FinishedCompression".equals(messageName)) { - fcpListenerManager.fireReceivedFinishedCompression(new FinishedCompression(fcpMessage)); - } else if ("GetFailed".equals(messageName)) { - fcpListenerManager.fireReceivedGetFailed(new GetFailed(fcpMessage)); - } else if ("PutFetchable".equals(messageName)) { - fcpListenerManager.fireReceivedPutFetchable(new PutFetchable(fcpMessage)); - } else if ("PutSuccessful".equals(messageName)) { - fcpListenerManager.fireReceivedPutSuccessful(new PutSuccessful(fcpMessage)); - } else if ("PutFailed".equals(messageName)) { - fcpListenerManager.fireReceivedPutFailed(new PutFailed(fcpMessage)); - } else if ("DataFound".equals(messageName)) { - fcpListenerManager.fireReceivedDataFound(new DataFound(fcpMessage)); - } else if ("SubscribedUSKUpdate".equals(messageName)) { - fcpListenerManager.fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage)); - } else if ("SubscribedUSK".equals(messageName)) { - fcpListenerManager.fireReceivedSubscribedUSK(new SubscribedUSK(fcpMessage)); - } else if ("IdentifierCollision".equals(messageName)) { - fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage)); - } else if ("AllData".equals(messageName)) { - InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); - fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream)); - } else if ("EndListPeerNotes".equals(messageName)) { - fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage)); - } else if ("EndListPeers".equals(messageName)) { - fcpListenerManager.fireReceivedEndListPeers(new EndListPeers(fcpMessage)); - } else if ("SSKKeypair".equals(messageName)) { - fcpListenerManager.fireReceivedSSKKeypair(new SSKKeypair(fcpMessage)); - } else if ("PeerRemoved".equals(messageName)) { - fcpListenerManager.fireReceivedPeerRemoved(new PeerRemoved(fcpMessage)); - } else if ("PersistentRequestModified".equals(messageName)) { - fcpListenerManager.fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage)); - } else if ("PersistentRequestRemoved".equals(messageName)) { - fcpListenerManager.fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage)); - } else if ("UnknownPeerNoteType".equals(messageName)) { - fcpListenerManager.fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage)); - } else if ("UnknownNodeIdentifier".equals(messageName)) { - fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage)); - } else if ("FCPPluginReply".equals(messageName)) { - InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"), 0)); - fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream)); - } else if ("PluginInfo".equals(messageName)) { - fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage)); - } else if ("PluginRemoved".equals(messageName)) { - fcpListenerManager.fireReceivedPluginRemoved(new PluginRemoved(fcpMessage)); - } else if ("NodeData".equals(messageName)) { - fcpListenerManager.fireReceivedNodeData(new NodeData(fcpMessage)); - } else if ("TestDDAReply".equals(messageName)) { - fcpListenerManager.fireReceivedTestDDAReply(new TestDDAReply(fcpMessage)); - } else if ("TestDDAComplete".equals(messageName)) { - fcpListenerManager.fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage)); - } else if ("ConfigData".equals(messageName)) { - fcpListenerManager.fireReceivedConfigData(new ConfigData(fcpMessage)); - } else if ("NodeHello".equals(messageName)) { - fcpListenerManager.fireReceivedNodeHello(new NodeHello(fcpMessage)); - } else if ("CloseConnectionDuplicateClientName".equals(messageName)) { - fcpListenerManager.fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage)); - } else if ("SentFeed".equals(messageName)) { - fcpListenerManager.fireSentFeed(new SentFeed(fcpMessage)); - } else if ("ReceivedBookmarkFeed".equals(messageName)) { - fcpListenerManager.fireReceivedBookmarkFeed(new ReceivedBookmarkFeed(fcpMessage)); - } else { - fcpListenerManager.fireMessageReceived(fcpMessage); - } - } - - /** - * Handles a disconnect from the node. - * - * @param throwable - * The exception that caused the disconnect, or - * null if there was no exception - */ - synchronized void handleDisconnect(Throwable throwable) { - FcpUtils.close(remoteInputStream); - FcpUtils.close(remoteOutputStream); - FcpUtils.close(remoteSocket); - if (connectionHandler != null) { - connectionHandler.stop(); - connectionHandler = null; - fcpListenerManager.fireConnectionClosed(throwable); - } - } - - // - // PRIVATE METHODS - // - - /** - * Incremets the counter in {@link #incomingMessageStatistics} by - * 1 for the given message name. - * - * @param name - * The name of the message to count - */ - private void countMessage(String name) { - int oldValue = 0; - if (incomingMessageStatistics.containsKey(name)) { - oldValue = incomingMessageStatistics.get(name); - } - incomingMessageStatistics.put(name, oldValue + 1); - logger.finest("count for " + name + ": " + (oldValue + 1)); - } - - private synchronized InputStream getInputStream(long dataLength) throws IOException { - return new TempInputStream(remoteInputStream, dataLength); - } + void sendMessage(FcpMessage fcpMessage) throws IOException; } diff --git a/src/main/java/net/pterodactylus/fcp/FcpConnectionHandler.java b/src/main/java/net/pterodactylus/fcp/FcpConnectionHandler.java deleted file mode 100644 index aa897e2..0000000 --- a/src/main/java/net/pterodactylus/fcp/FcpConnectionHandler.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * jFCPlib - FcpConnectionHandler.java - Copyright © 2008–2016 David Roden - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.pterodactylus.fcp; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Handles an FCP connection to a node. - * - * @author David ‘Bombe’ Roden <bombe@freenetproject.org> - */ -class FcpConnectionHandler implements Runnable { - - /** The logger. */ - private static final Logger logger = Logger.getLogger(FcpConnectionHandler.class.getName()); - - /** The underlying connection. */ - private final FcpConnection fcpConnection; - - /** The input stream from the node. */ - private final InputStream remoteInputStream; - - /** Whether to stop the connection handler. */ - private boolean shouldStop; - - /** Whether the next read line feed should be ignored. */ - private boolean ignoreNextLinefeed; - - /** - * Creates a new connection handler that operates on the given connection - * and input stream. - * - * @param fcpConnection - * The underlying FCP connection - * @param remoteInputStream - * The input stream from the node - */ - public FcpConnectionHandler(FcpConnection fcpConnection, InputStream remoteInputStream) { - this.fcpConnection = fcpConnection; - this.remoteInputStream = remoteInputStream; - } - - /** - * {@inheritDoc} - */ - @Override - public void run() { - FcpMessage fcpMessage = null; - Throwable throwable = null; - while (true) { - synchronized (this) { - if (shouldStop) { - break; - } - } - try { - String line = readLine(); - logger.log(Level.FINEST, String.format("read line: %1$s", line)); - if (line == null) { - throwable = new EOFException(); - break; - } - if (line.length() == 0) { - continue; - } - line = line.trim(); - if (fcpMessage == null) { - fcpMessage = new FcpMessage(line); - continue; - } - if ("EndMessage".equalsIgnoreCase(line) || "Data".equalsIgnoreCase(line)) { - fcpConnection.handleMessage(fcpMessage); - fcpMessage = null; - } - int equalSign = line.indexOf('='); - if (equalSign == -1) { - /* something's fishy! */ - continue; - } - String field = line.substring(0, equalSign); - String value = line.substring(equalSign + 1); - assert fcpMessage != null: "fcp message is null"; - fcpMessage.setField(field, value); - } catch (IOException ioe1) { - throwable = ioe1; - break; - } - } - fcpConnection.handleDisconnect(throwable); - } - - /** - * Stops the connection handler. - */ - public void stop() { - synchronized (this) { - shouldStop = true; - } - } - - // - // PRIVATE METHODS - // - - /** - * Reads bytes from {@link #remoteInputStream} until ‘\r’ or ‘\n’ are - * encountered and decodes the read bytes using UTF-8. - * - * @return The decoded line - * @throws IOException - * if an I/O error occurs - */ - private String readLine() throws IOException { - byte[] readBytes = new byte[512]; - int readIndex = 0; - while (true) { - int nextByte = remoteInputStream.read(); - if (nextByte == -1) { - if (readIndex == 0) { - return null; - } - break; - } - if (nextByte == 10) { - if (!ignoreNextLinefeed) { - break; - } - } - ignoreNextLinefeed = false; - if (nextByte == 13) { - ignoreNextLinefeed = true; - break; - } - if (readIndex == readBytes.length) { - /* recopy & enlarge array */ - byte[] newReadBytes = new byte[readBytes.length * 2]; - System.arraycopy(readBytes, 0, newReadBytes, 0, readBytes.length); - readBytes = newReadBytes; - } - readBytes[readIndex++] = (byte) nextByte; - } - ByteBuffer byteBuffer = ByteBuffer.wrap(readBytes, 0, readIndex); - return Charset.forName("UTF-8").decode(byteBuffer).toString(); - } - -} diff --git a/src/main/java/net/pterodactylus/fcp/highlevel/FcpClient.java b/src/main/java/net/pterodactylus/fcp/highlevel/FcpClient.java index 473fe98..ced9995 100644 --- a/src/main/java/net/pterodactylus/fcp/highlevel/FcpClient.java +++ b/src/main/java/net/pterodactylus/fcp/highlevel/FcpClient.java @@ -43,6 +43,7 @@ import net.pterodactylus.fcp.ClientHello; import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; import net.pterodactylus.fcp.ConfigData; import net.pterodactylus.fcp.DataFound; +import net.pterodactylus.fcp.DefaultFcpConnection; import net.pterodactylus.fcp.EndListPeerNotes; import net.pterodactylus.fcp.EndListPeers; import net.pterodactylus.fcp.EndListPersistentRequests; @@ -123,7 +124,7 @@ public class FcpClient implements Closeable { * if the given hostname can not be resolved */ public FcpClient(String hostname) throws UnknownHostException { - this(hostname, FcpConnection.DEFAULT_PORT); + this(hostname, DefaultFcpConnection.DEFAULT_PORT); } /** @@ -147,7 +148,7 @@ public class FcpClient implements Closeable { * The host address of the Freenet node */ public FcpClient(InetAddress host) { - this(host, FcpConnection.DEFAULT_PORT); + this(host, DefaultFcpConnection.DEFAULT_PORT); } /** @@ -159,7 +160,7 @@ public class FcpClient implements Closeable { * The Freenet node’s FCP port */ public FcpClient(InetAddress host, int port) { - this(new FcpConnection(host, port), false); + this(new DefaultFcpConnection(host, port), false); } /** -- 2.7.4