3 * Copyright (C) 2006 David Roden
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 package de.todesbaum.util.freenet.fcp2;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.io.Writer;
29 import java.net.Socket;
30 import java.nio.charset.Charset;
31 import java.util.ArrayList;
32 import java.util.List;
34 import de.todesbaum.util.io.Closer;
35 import de.todesbaum.util.io.LineInputStream;
36 import de.todesbaum.util.io.StreamCopier;
37 import de.todesbaum.util.io.TempFileInputStream;
40 * A physical connection to a Freenet node.
42 * @author David Roden <droden@gmail.com>
45 public class Connection {
47 /** The listeners that receive events from this connection. */
48 private List<ConnectionListener> connectionListeners = new ArrayList<ConnectionListener>();
50 /** The node this connection is connected to. */
51 private final Node node;
53 /** The name of this connection. */
54 private final String name;
56 /** The network socket of this connection. */
57 private Socket nodeSocket;
59 /** The input stream that reads from the socket. */
60 private InputStream nodeInputStream;
62 /** The output stream that writes to the socket. */
63 private OutputStream nodeOutputStream;
65 /** The thread that reads from the socket. */
66 private NodeReader nodeReader;
68 /** A writer for the output stream. */
69 private Writer nodeWriter;
71 /** The NodeHello message sent by the node on connect. */
72 protected Message nodeHello;
75 * Creates a new connection to the specified node with the specified name.
78 * The node to connect to
80 * The name of this connection
82 public Connection(Node node, String name) {
88 * Adds a listener that gets notified on connection events.
90 * @param connectionListener
93 public void addConnectionListener(ConnectionListener connectionListener) {
94 connectionListeners.add(connectionListener);
98 * Removes a listener from the list of registered listeners. Only the first
99 * matching listener is removed.
101 * @param connectionListener
102 * The listener to remove
103 * @see List#remove(java.lang.Object)
105 public void removeConnectionListener(ConnectionListener connectionListener) {
106 connectionListeners.remove(connectionListener);
110 * Notifies listeners about a received message.
113 * The received message
115 protected void fireMessageReceived(Message message) {
116 for (ConnectionListener connectionListener: connectionListeners) {
117 connectionListener.messageReceived(this, message);
122 * Notifies listeners about the loss of the connection.
124 protected void fireConnectionTerminated() {
125 for (ConnectionListener connectionListener: connectionListeners) {
126 connectionListener.connectionTerminated(this);
131 * Returns the name of the connection.
133 * @return The name of the connection
135 public String getName() {
140 * Connects to the node.
142 * @return <code>true</code> if the connection succeeded and the node
143 * returned a NodeHello message
144 * @throws IOException
145 * if an I/O error occurs
146 * @see #getNodeHello()
148 public synchronized boolean connect() throws IOException {
150 nodeInputStream = null;
151 nodeOutputStream = null;
155 nodeSocket = new Socket(node.getHostname(), node.getPort());
156 nodeSocket.setReceiveBufferSize(65535);
157 nodeInputStream = nodeSocket.getInputStream();
158 nodeOutputStream = nodeSocket.getOutputStream();
159 nodeWriter = new OutputStreamWriter(nodeOutputStream, Charset.forName("UTF-8"));
160 nodeReader = new NodeReader(nodeInputStream);
161 Thread nodeReaderThread = new Thread(nodeReader);
162 nodeReaderThread.setDaemon(true);
163 nodeReaderThread.start();
164 ClientHello clientHello = new ClientHello();
165 clientHello.setName(name);
166 clientHello.setExpectedVersion("2.0");
167 execute(clientHello);
168 synchronized (this) {
171 } catch (InterruptedException e) {
174 return nodeHello != null;
175 } catch (IOException ioe1) {
182 * Returns whether this connection is still connected to the node.
184 * @return <code>true</code> if this connection is still valid,
185 * <code>false</code> otherwise
187 public boolean isConnected() {
188 return (nodeHello != null) && (nodeSocket != null) && (nodeSocket.isConnected());
192 * Returns the NodeHello message the node sent on connection.
194 * @return The NodeHello message of the node
196 public Message getNodeHello() {
201 * Disconnects from the node.
203 public void disconnect() {
204 if (nodeWriter != null) {
207 } catch (IOException ioe1) {
211 if (nodeOutputStream != null) {
213 nodeOutputStream.close();
214 } catch (IOException ioe1) {
216 nodeOutputStream = null;
218 if (nodeInputStream != null) {
220 nodeInputStream.close();
221 } catch (IOException ioe1) {
223 nodeInputStream = null;
225 if (nodeSocket != null) {
228 } catch (IOException ioe1) {
232 synchronized (this) {
235 fireConnectionTerminated();
239 * Executes the specified command.
242 * The command to execute
243 * @throws IllegalStateException
244 * if the connection is not connected
245 * @throws IOException
246 * if an I/O error occurs
248 public synchronized void execute(Command command) throws IllegalStateException, IOException {
249 if (nodeSocket == null) {
250 throw new IllegalStateException("connection is not connected");
252 nodeWriter.write(command.getCommandName() + Command.LINEFEED);
253 command.write(nodeWriter);
254 nodeWriter.write("EndMessage" + Command.LINEFEED);
256 if (command.hasPayload()) {
257 InputStream payloadInputStream = null;
259 payloadInputStream = command.getPayload();
260 StreamCopier.copy(payloadInputStream, nodeOutputStream, command.getPayloadLength());
262 Closer.close(payloadInputStream);
264 nodeOutputStream.flush();
269 * The reader thread for this connection. This is essentially a thread that
270 * reads lines from the node, creates messages from them and notifies
271 * listeners about the messages.
273 * @author David Roden <droden@gmail.com>
276 private class NodeReader implements Runnable {
278 /** The input stream to read from. */
279 @SuppressWarnings("hiding")
280 private InputStream nodeInputStream;
283 * Creates a new reader that reads from the specified input stream.
285 * @param nodeInputStream
286 * The input stream to read from
288 public NodeReader(InputStream nodeInputStream) {
289 this.nodeInputStream = nodeInputStream;
293 * Main loop of the reader. Lines are read and converted into
294 * {@link Message} objects.
297 LineInputStream nodeReader = null;
299 nodeReader = new LineInputStream(nodeInputStream);
301 Message message = null;
302 while (line != null) {
303 line = nodeReader.readLine();
304 // System.err.println("> " + line);
308 if (message == null) {
309 message = new Message(line);
312 if ("Data".equals(line)) {
313 /* need to read message from stream now */
314 File tempFile = null;
316 tempFile = File.createTempFile("fcpv2", "data");
317 tempFile.deleteOnExit();
318 FileOutputStream tempFileOutputStream = new FileOutputStream(tempFile);
319 long dataLength = Long.parseLong(message.get("DataLength"));
320 StreamCopier.copy(nodeInputStream, tempFileOutputStream, dataLength);
321 tempFileOutputStream.close();
322 message.setPayloadInputStream(new TempFileInputStream(tempFile));
323 } catch (IOException ioe1) {
324 ioe1.printStackTrace();
327 if ("Data".equals(line) || "EndMessage".equals(line)) {
328 if (message.getName().equals("NodeHello")) {
330 synchronized (Connection.this) {
331 Connection.this.notify();
334 fireMessageReceived(message);
339 int equalsPosition = line.indexOf('=');
340 if (equalsPosition > -1) {
341 String key = line.substring(0, equalsPosition).trim();
342 String value = line.substring(equalsPosition + 1).trim();
343 if (key.equals("Identifier")) {
344 message.setIdentifier(value);
346 message.put(key, value);
350 /* skip lines consisting of whitespace only */
351 if (line.trim().length() == 0) {
354 /* if we got here, some error occured! */
355 throw new IOException("Unexpected line: " + line);
357 } catch (IOException ioe1) {
358 // ioe1.printStackTrace();
360 if (nodeReader != null) {
363 } catch (IOException ioe1) {
366 if (nodeInputStream != null) {
368 nodeInputStream.close();
369 } catch (IOException ioe1) {
373 Connection.this.disconnect();