2 * jSite - Connection.java - Copyright © 2006–2012 David Roden
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19 package de.todesbaum.util.freenet.fcp2;
22 import java.io.FileOutputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.io.OutputStreamWriter;
27 import java.io.Writer;
28 import java.net.Socket;
29 import java.nio.charset.Charset;
30 import java.util.ArrayList;
31 import java.util.List;
33 import de.todesbaum.util.io.Closer;
34 import de.todesbaum.util.io.LineInputStream;
35 import de.todesbaum.util.io.StreamCopier;
36 import de.todesbaum.util.io.StreamCopier.ProgressListener;
37 import de.todesbaum.util.io.TempFileInputStream;
40 * A physical connection to a Freenet node.
42 * @author David Roden <droden@gmail.com>
45 public class Connection {
47 /** The listeners that receive events from this connection. */
48 private List<ConnectionListener> connectionListeners = new ArrayList<ConnectionListener>();
50 /** The node this connection is connected to. */
51 private final Node node;
53 /** The name of this connection. */
54 private final String name;
56 /** The network socket of this connection. */
57 private Socket nodeSocket;
59 /** The input stream that reads from the socket. */
60 private InputStream nodeInputStream;
62 /** The output stream that writes to the socket. */
63 private OutputStream nodeOutputStream;
65 /** The thread that reads from the socket. */
66 private NodeReader nodeReader;
68 /** A writer for the output stream. */
69 private Writer nodeWriter;
71 /** The NodeHello message sent by the node on connect. */
72 protected Message nodeHello;
74 /** The temp directory to use. */
75 private String tempDirectory;
78 * Creates a new connection to the specified node with the specified name.
81 * The node to connect to
83 * The name of this connection
85 public Connection(Node node, String name) {
91 * Adds a listener that gets notified on connection events.
93 * @param connectionListener
96 public void addConnectionListener(ConnectionListener connectionListener) {
97 connectionListeners.add(connectionListener);
101 * Removes a listener from the list of registered listeners. Only the first
102 * matching listener is removed.
104 * @param connectionListener
105 * The listener to remove
106 * @see List#remove(java.lang.Object)
108 public void removeConnectionListener(ConnectionListener connectionListener) {
109 connectionListeners.remove(connectionListener);
113 * Notifies listeners about a received message.
116 * The received message
118 protected void fireMessageReceived(Message message) {
119 for (ConnectionListener connectionListener : connectionListeners) {
120 connectionListener.messageReceived(this, message);
125 * Notifies listeners about the loss of the connection.
127 protected void fireConnectionTerminated() {
128 for (ConnectionListener connectionListener : connectionListeners) {
129 connectionListener.connectionTerminated(this);
134 * Returns the name of the connection.
136 * @return The name of the connection
138 public String getName() {
143 * Sets the temp directory to use for creation of temporary files.
145 * @param tempDirectory
146 * The temp directory to use, or {@code null} to use the default
149 public void setTempDirectory(String tempDirectory) {
150 this.tempDirectory = tempDirectory;
154 * Connects to the node.
156 * @return <code>true</code> if the connection succeeded and the node
157 * returned a NodeHello message
158 * @throws IOException
159 * if an I/O error occurs
160 * @see #getNodeHello()
162 public synchronized boolean connect() throws IOException {
164 nodeInputStream = null;
165 nodeOutputStream = null;
169 nodeSocket = new Socket(node.getHostname(), node.getPort());
170 nodeSocket.setReceiveBufferSize(65535);
171 nodeInputStream = nodeSocket.getInputStream();
172 nodeOutputStream = nodeSocket.getOutputStream();
173 nodeWriter = new OutputStreamWriter(nodeOutputStream, Charset.forName("UTF-8"));
174 nodeReader = new NodeReader(nodeInputStream);
175 Thread nodeReaderThread = new Thread(nodeReader);
176 nodeReaderThread.setDaemon(true);
177 nodeReaderThread.start();
178 ClientHello clientHello = new ClientHello();
179 clientHello.setName(name);
180 clientHello.setExpectedVersion("2.0");
181 execute(clientHello);
182 synchronized (this) {
185 } catch (InterruptedException e) {
188 return nodeHello != null;
189 } catch (IOException ioe1) {
196 * Returns whether this connection is still connected to the node.
198 * @return <code>true</code> if this connection is still valid,
199 * <code>false</code> otherwise
201 public boolean isConnected() {
202 return (nodeHello != null) && (nodeSocket != null) && (nodeSocket.isConnected());
206 * Returns the NodeHello message the node sent on connection.
208 * @return The NodeHello message of the node
210 public Message getNodeHello() {
215 * Disconnects from the node.
217 public void disconnect() {
218 Closer.close(nodeWriter);
220 Closer.close(nodeOutputStream);
221 nodeOutputStream = null;
222 Closer.close(nodeInputStream);
223 nodeInputStream = null;
224 nodeInputStream = null;
225 Closer.close(nodeSocket);
227 synchronized (this) {
230 fireConnectionTerminated();
234 * Executes the specified command.
237 * The command to execute
238 * @throws IllegalStateException
239 * if the connection is not connected
240 * @throws IOException
241 * if an I/O error occurs
243 public synchronized void execute(Command command) throws IllegalStateException, IOException {
244 execute(command, null);
248 * Executes the specified command.
251 * The command to execute
252 * @param progressListener
253 * A progress listener for a payload transfer
254 * @throws IllegalStateException
255 * if the connection is not connected
256 * @throws IOException
257 * if an I/O error occurs
259 public synchronized void execute(Command command, ProgressListener progressListener) throws IllegalStateException, IOException {
260 if (nodeSocket == null) {
261 throw new IllegalStateException("connection is not connected");
263 nodeWriter.write(command.getCommandName() + Command.LINEFEED);
264 command.write(nodeWriter);
265 nodeWriter.write("EndMessage" + Command.LINEFEED);
267 if (command.hasPayload()) {
268 InputStream payloadInputStream = null;
270 payloadInputStream = command.getPayload();
271 StreamCopier.copy(payloadInputStream, nodeOutputStream, command.getPayloadLength(), progressListener);
273 Closer.close(payloadInputStream);
275 nodeOutputStream.flush();
280 * The reader thread for this connection. This is essentially a thread that
281 * reads lines from the node, creates messages from them and notifies
282 * listeners about the messages.
284 * @author David Roden <droden@gmail.com>
287 private class NodeReader implements Runnable {
289 /** The input stream to read from. */
290 @SuppressWarnings("hiding")
291 private InputStream nodeInputStream;
294 * Creates a new reader that reads from the specified input stream.
296 * @param nodeInputStream
297 * The input stream to read from
299 public NodeReader(InputStream nodeInputStream) {
300 this.nodeInputStream = nodeInputStream;
304 * Main loop of the reader. Lines are read and converted into
305 * {@link Message} objects.
307 @SuppressWarnings("synthetic-access")
309 LineInputStream nodeReader = null;
311 nodeReader = new LineInputStream(nodeInputStream);
313 Message message = null;
314 while (line != null) {
315 line = nodeReader.readLine();
316 // System.err.println("> " + line);
320 if (message == null) {
321 message = new Message(line);
324 if ("Data".equals(line)) {
325 /* need to read message from stream now */
326 File tempFile = null;
328 tempFile = File.createTempFile("fcpv2", "data", (tempDirectory != null) ? new File(tempDirectory) : null);
329 tempFile.deleteOnExit();
330 FileOutputStream tempFileOutputStream = new FileOutputStream(tempFile);
331 long dataLength = Long.parseLong(message.get("DataLength"));
332 StreamCopier.copy(nodeInputStream, tempFileOutputStream, dataLength);
333 tempFileOutputStream.close();
334 message.setPayloadInputStream(new TempFileInputStream(tempFile));
335 } catch (IOException ioe1) {
336 ioe1.printStackTrace();
339 if ("Data".equals(line) || "EndMessage".equals(line)) {
340 if (message.getName().equals("NodeHello")) {
342 synchronized (Connection.this) {
343 Connection.this.notify();
346 fireMessageReceived(message);
351 int equalsPosition = line.indexOf('=');
352 if (equalsPosition > -1) {
353 String key = line.substring(0, equalsPosition).trim();
354 String value = line.substring(equalsPosition + 1).trim();
355 if (key.equals("Identifier")) {
356 message.setIdentifier(value);
358 message.put(key, value);
362 /* skip lines consisting of whitespace only */
363 if (line.trim().length() == 0) {
366 /* if we got here, some error occured! */
367 throw new IOException("Unexpected line: " + line);
369 } catch (IOException ioe1) {
370 // ioe1.printStackTrace();
372 if (nodeReader != null) {
375 } catch (IOException ioe1) {
378 if (nodeInputStream != null) {
380 nodeInputStream.close();
381 } catch (IOException ioe1) {
385 Connection.this.disconnect();