3 * Copyright (C) 2006 David Roden
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 package de.todesbaum.util.freenet.fcp2;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.io.Writer;
29 import java.net.Socket;
30 import java.nio.charset.Charset;
31 import java.util.ArrayList;
32 import java.util.List;
34 import de.todesbaum.util.io.LineInputStream;
35 import de.todesbaum.util.io.StreamCopier;
36 import de.todesbaum.util.io.TempFileInputStream;
39 * A physical connection to a Freenet node.
41 * @author David Roden <droden@gmail.com>
44 public class Connection {
46 /** The listeners that receive events from this connection. */
47 private List<ConnectionListener> connectionListeners = new ArrayList<ConnectionListener>();
49 /** The node this connection is connected to. */
50 private final Node node;
52 /** The name of this connection. */
53 private final String name;
55 /** The network socket of this connection. */
56 private Socket nodeSocket;
58 /** The input stream that reads from the socket. */
59 private InputStream nodeInputStream;
61 /** The output stream that writes to the socket. */
62 private OutputStream nodeOutputStream;
64 /** The thread that reads from the socket. */
65 private NodeReader nodeReader;
67 /** A writer for the output stream. */
68 private Writer nodeWriter;
70 /** The NodeHello message sent by the node on connect. */
71 protected Message nodeHello;
74 * Creates a new connection to the specified node with the specified name.
77 * The node to connect to
79 * The name of this connection
81 public Connection(Node node, String name) {
87 * Adds a listener that gets notified on connection events.
89 * @param connectionListener
92 public void addConnectionListener(ConnectionListener connectionListener) {
93 connectionListeners.add(connectionListener);
97 * Removes a listener from the list of registered listeners. Only the first
98 * matching listener is removed.
100 * @param connectionListener
101 * The listener to remove
102 * @see List#remove(java.lang.Object)
104 public void removeConnectionListener(ConnectionListener connectionListener) {
105 connectionListeners.remove(connectionListener);
109 * Notifies listeners about a received message.
112 * The received message
114 protected void fireMessageReceived(Message message) {
115 for (ConnectionListener connectionListener: connectionListeners) {
116 connectionListener.messageReceived(this, message);
121 * Notifies listeners about the loss of the connection.
123 protected void fireConnectionTerminated() {
124 for (ConnectionListener connectionListener: connectionListeners) {
125 connectionListener.connectionTerminated(this);
130 * Returns the name of the connection.
132 * @return The name of the connection
134 public String getName() {
139 * Connects to the node.
141 * @return <code>true</code> if the connection succeeded and the node
142 * returned a NodeHello message
143 * @throws IOException
144 * if an I/O error occurs
145 * @see #getNodeHello()
147 public synchronized boolean connect() throws IOException {
149 nodeInputStream = null;
150 nodeOutputStream = null;
154 nodeSocket = new Socket(node.getHostname(), node.getPort());
155 nodeSocket.setReceiveBufferSize(65535);
156 nodeInputStream = nodeSocket.getInputStream();
157 nodeOutputStream = nodeSocket.getOutputStream();
158 nodeWriter = new OutputStreamWriter(nodeOutputStream, Charset.forName("UTF-8"));
159 nodeReader = new NodeReader(nodeInputStream);
160 Thread nodeReaderThread = new Thread(nodeReader);
161 nodeReaderThread.setDaemon(true);
162 nodeReaderThread.start();
163 ClientHello clientHello = new ClientHello();
164 clientHello.setName(name);
165 clientHello.setExpectedVersion("2.0");
166 execute(clientHello);
167 synchronized (this) {
170 } catch (InterruptedException e) {
173 return nodeHello != null;
174 } catch (IOException ioe1) {
181 * Returns whether this connection is still connected to the node.
183 * @return <code>true</code> if this connection is still valid,
184 * <code>false</code> otherwise
186 public boolean isConnected() {
187 return (nodeHello != null) && (nodeSocket != null) && (nodeSocket.isConnected());
191 * Returns the NodeHello message the node sent on connection.
193 * @return The NodeHello message of the node
195 public Message getNodeHello() {
200 * Disconnects from the node.
202 public void disconnect() {
203 if (nodeWriter != null) {
206 } catch (IOException ioe1) {
210 if (nodeOutputStream != null) {
212 nodeOutputStream.close();
213 } catch (IOException ioe1) {
215 nodeOutputStream = null;
217 if (nodeInputStream != null) {
219 nodeInputStream.close();
220 } catch (IOException ioe1) {
222 nodeInputStream = null;
224 if (nodeSocket != null) {
227 } catch (IOException ioe1) {
231 synchronized (this) {
234 fireConnectionTerminated();
238 * Executes the specified command.
241 * The command to execute
242 * @throws IllegalStateException
243 * if the connection is not connected
244 * @throws IOException
245 * if an I/O error occurs
247 public synchronized void execute(Command command) throws IllegalStateException, IOException {
248 if (nodeSocket == null) {
249 throw new IllegalStateException("connection is not connected");
251 nodeWriter.write(command.getCommandName() + Command.LINEFEED);
252 command.write(nodeWriter);
253 nodeWriter.write("EndMessage" + Command.LINEFEED);
255 if (command.hasPayload()) {
256 StreamCopier.copy(command.getPayload(), nodeOutputStream, command.getPayloadLength());
257 nodeOutputStream.flush();
262 * The reader thread for this connection. This is essentially a thread that
263 * reads lines from the node, creates messages from them and notifies
264 * listeners about the messages.
266 * @author David Roden <droden@gmail.com>
269 private class NodeReader implements Runnable {
271 /** The input stream to read from. */
272 @SuppressWarnings("hiding")
273 private InputStream nodeInputStream;
276 * Creates a new reader that reads from the specified input stream.
278 * @param nodeInputStream
279 * The input stream to read from
281 public NodeReader(InputStream nodeInputStream) {
282 this.nodeInputStream = nodeInputStream;
286 * Main loop of the reader. Lines are read and converted into
287 * {@link Message} objects.
290 LineInputStream nodeReader = null;
292 nodeReader = new LineInputStream(nodeInputStream);
294 Message message = null;
295 while (line != null) {
296 line = nodeReader.readLine();
297 // System.err.println("> " + line);
301 if (message == null) {
302 message = new Message(line);
305 if ("Data".equals(line)) {
306 /* need to read message from stream now */
307 File tempFile = null;
309 tempFile = File.createTempFile("fcpv2", "data");
310 tempFile.deleteOnExit();
311 FileOutputStream tempFileOutputStream = new FileOutputStream(tempFile);
312 long dataLength = Long.parseLong(message.get("DataLength"));
313 StreamCopier.copy(nodeInputStream, tempFileOutputStream, dataLength);
314 tempFileOutputStream.close();
315 message.setPayloadInputStream(new TempFileInputStream(tempFile));
316 } catch (IOException ioe1) {
317 ioe1.printStackTrace();
320 if ("Data".equals(line) || "EndMessage".equals(line)) {
321 if (message.getName().equals("NodeHello")) {
323 synchronized (Connection.this) {
324 Connection.this.notify();
327 fireMessageReceived(message);
332 int equalsPosition = line.indexOf('=');
333 if (equalsPosition > -1) {
334 String key = line.substring(0, equalsPosition).trim();
335 String value = line.substring(equalsPosition + 1).trim();
336 if (key.equals("Identifier")) {
337 message.setIdentifier(value);
339 message.put(key, value);
343 /* skip lines consisting of whitespace only */
344 if (line.trim().length() == 0) {
347 /* if we got here, some error occured! */
348 throw new IOException("Unexpected line: " + line);
350 } catch (IOException ioe1) {
351 // ioe1.printStackTrace();
353 if (nodeReader != null) {
356 } catch (IOException ioe1) {
359 if (nodeInputStream != null) {
361 nodeInputStream.close();
362 } catch (IOException ioe1) {
366 Connection.this.disconnect();