3 * Copyright (C) 2006 David Roden
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 package de.todesbaum.util.freenet.fcp2;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.io.Writer;
29 import java.net.Socket;
30 import java.nio.charset.Charset;
31 import java.util.ArrayList;
32 import java.util.List;
34 import de.todesbaum.util.io.Closer;
35 import de.todesbaum.util.io.LineInputStream;
36 import de.todesbaum.util.io.StreamCopier;
37 import de.todesbaum.util.io.StreamCopier.ProgressListener;
38 import de.todesbaum.util.io.TempFileInputStream;
41 * A physical connection to a Freenet node.
43 * @author David Roden <droden@gmail.com>
46 public class Connection {
48 /** The listeners that receive events from this connection. */
49 private List<ConnectionListener> connectionListeners = new ArrayList<ConnectionListener>();
51 /** The node this connection is connected to. */
52 private final Node node;
54 /** The name of this connection. */
55 private final String name;
57 /** The network socket of this connection. */
58 private Socket nodeSocket;
60 /** The input stream that reads from the socket. */
61 private InputStream nodeInputStream;
63 /** The output stream that writes to the socket. */
64 private OutputStream nodeOutputStream;
66 /** The thread that reads from the socket. */
67 private NodeReader nodeReader;
69 /** A writer for the output stream. */
70 private Writer nodeWriter;
72 /** The NodeHello message sent by the node on connect. */
73 protected Message nodeHello;
75 /** The temp directory to use. */
76 private String tempDirectory;
79 * Creates a new connection to the specified node with the specified name.
82 * The node to connect to
84 * The name of this connection
86 public Connection(Node node, String name) {
92 * Adds a listener that gets notified on connection events.
94 * @param connectionListener
97 public void addConnectionListener(ConnectionListener connectionListener) {
98 connectionListeners.add(connectionListener);
102 * Removes a listener from the list of registered listeners. Only the first
103 * matching listener is removed.
105 * @param connectionListener
106 * The listener to remove
107 * @see List#remove(java.lang.Object)
109 public void removeConnectionListener(ConnectionListener connectionListener) {
110 connectionListeners.remove(connectionListener);
114 * Notifies listeners about a received message.
117 * The received message
119 protected void fireMessageReceived(Message message) {
120 for (ConnectionListener connectionListener : connectionListeners) {
121 connectionListener.messageReceived(this, message);
126 * Notifies listeners about the loss of the connection.
128 protected void fireConnectionTerminated() {
129 for (ConnectionListener connectionListener : connectionListeners) {
130 connectionListener.connectionTerminated(this);
135 * Returns the name of the connection.
137 * @return The name of the connection
139 public String getName() {
144 * Sets the temp directory to use for creation of temporary files.
146 * @param tempDirectory
147 * The temp directory to use, or {@code null} to use the default
150 public void setTempDirectory(String tempDirectory) {
151 this.tempDirectory = tempDirectory;
155 * Connects to the node.
157 * @return <code>true</code> if the connection succeeded and the node
158 * returned a NodeHello message
159 * @throws IOException
160 * if an I/O error occurs
161 * @see #getNodeHello()
163 public synchronized boolean connect() throws IOException {
165 nodeInputStream = null;
166 nodeOutputStream = null;
170 nodeSocket = new Socket(node.getHostname(), node.getPort());
171 nodeSocket.setReceiveBufferSize(65535);
172 nodeInputStream = nodeSocket.getInputStream();
173 nodeOutputStream = nodeSocket.getOutputStream();
174 nodeWriter = new OutputStreamWriter(nodeOutputStream, Charset.forName("UTF-8"));
175 nodeReader = new NodeReader(nodeInputStream);
176 Thread nodeReaderThread = new Thread(nodeReader);
177 nodeReaderThread.setDaemon(true);
178 nodeReaderThread.start();
179 ClientHello clientHello = new ClientHello();
180 clientHello.setName(name);
181 clientHello.setExpectedVersion("2.0");
182 execute(clientHello);
183 synchronized (this) {
186 } catch (InterruptedException e) {
189 return nodeHello != null;
190 } catch (IOException ioe1) {
197 * Returns whether this connection is still connected to the node.
199 * @return <code>true</code> if this connection is still valid,
200 * <code>false</code> otherwise
202 public boolean isConnected() {
203 return (nodeHello != null) && (nodeSocket != null) && (nodeSocket.isConnected());
207 * Returns the NodeHello message the node sent on connection.
209 * @return The NodeHello message of the node
211 public Message getNodeHello() {
216 * Disconnects from the node.
218 public void disconnect() {
219 Closer.close(nodeWriter);
221 Closer.close(nodeOutputStream);
222 nodeOutputStream = null;
223 Closer.close(nodeInputStream);
224 nodeInputStream = null;
225 nodeInputStream = null;
226 Closer.close(nodeSocket);
228 synchronized (this) {
231 fireConnectionTerminated();
235 * Executes the specified command.
238 * The command to execute
239 * @throws IllegalStateException
240 * if the connection is not connected
241 * @throws IOException
242 * if an I/O error occurs
244 public synchronized void execute(Command command) throws IllegalStateException, IOException {
245 execute(command, null);
249 * Executes the specified command.
252 * The command to execute
253 * @param progressListener
254 * A progress listener for a payload transfer
255 * @throws IllegalStateException
256 * if the connection is not connected
257 * @throws IOException
258 * if an I/O error occurs
260 public synchronized void execute(Command command, ProgressListener progressListener) throws IllegalStateException, IOException {
261 if (nodeSocket == null) {
262 throw new IllegalStateException("connection is not connected");
264 nodeWriter.write(command.getCommandName() + Command.LINEFEED);
265 command.write(nodeWriter);
266 nodeWriter.write("EndMessage" + Command.LINEFEED);
268 if (command.hasPayload()) {
269 InputStream payloadInputStream = null;
271 payloadInputStream = command.getPayload();
272 StreamCopier.copy(payloadInputStream, nodeOutputStream, command.getPayloadLength(), progressListener);
274 Closer.close(payloadInputStream);
276 nodeOutputStream.flush();
281 * The reader thread for this connection. This is essentially a thread that
282 * reads lines from the node, creates messages from them and notifies
283 * listeners about the messages.
285 * @author David Roden <droden@gmail.com>
288 private class NodeReader implements Runnable {
290 /** The input stream to read from. */
291 @SuppressWarnings("hiding")
292 private InputStream nodeInputStream;
295 * Creates a new reader that reads from the specified input stream.
297 * @param nodeInputStream
298 * The input stream to read from
300 public NodeReader(InputStream nodeInputStream) {
301 this.nodeInputStream = nodeInputStream;
305 * Main loop of the reader. Lines are read and converted into
306 * {@link Message} objects.
308 @SuppressWarnings("synthetic-access")
310 LineInputStream nodeReader = null;
312 nodeReader = new LineInputStream(nodeInputStream);
314 Message message = null;
315 while (line != null) {
316 line = nodeReader.readLine();
317 // System.err.println("> " + line);
321 if (message == null) {
322 message = new Message(line);
325 if ("Data".equals(line)) {
326 /* need to read message from stream now */
327 File tempFile = null;
329 tempFile = File.createTempFile("fcpv2", "data", (tempDirectory != null) ? new File(tempDirectory) : null);
330 tempFile.deleteOnExit();
331 FileOutputStream tempFileOutputStream = new FileOutputStream(tempFile);
332 long dataLength = Long.parseLong(message.get("DataLength"));
333 StreamCopier.copy(nodeInputStream, tempFileOutputStream, dataLength);
334 tempFileOutputStream.close();
335 message.setPayloadInputStream(new TempFileInputStream(tempFile));
336 } catch (IOException ioe1) {
337 ioe1.printStackTrace();
340 if ("Data".equals(line) || "EndMessage".equals(line)) {
341 if (message.getName().equals("NodeHello")) {
343 synchronized (Connection.this) {
344 Connection.this.notify();
347 fireMessageReceived(message);
352 int equalsPosition = line.indexOf('=');
353 if (equalsPosition > -1) {
354 String key = line.substring(0, equalsPosition).trim();
355 String value = line.substring(equalsPosition + 1).trim();
356 if (key.equals("Identifier")) {
357 message.setIdentifier(value);
359 message.put(key, value);
363 /* skip lines consisting of whitespace only */
364 if (line.trim().length() == 0) {
367 /* if we got here, some error occured! */
368 throw new IOException("Unexpected line: " + line);
370 } catch (IOException ioe1) {
371 // ioe1.printStackTrace();
373 if (nodeReader != null) {
376 } catch (IOException ioe1) {
379 if (nodeInputStream != null) {
381 nodeInputStream.close();
382 } catch (IOException ioe1) {
386 Connection.this.disconnect();