try to close all streams as soon as possible to prevent leaks (inserting large sites...
[jSite.git] / src / de / todesbaum / util / freenet / fcp2 / Connection.java
1 /*
2  * todesbaum-lib - 
3  * Copyright (C) 2006 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package de.todesbaum.util.freenet.fcp2;
21
22 import java.io.File;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.io.Writer;
29 import java.net.Socket;
30 import java.nio.charset.Charset;
31 import java.util.ArrayList;
32 import java.util.List;
33
34 import de.todesbaum.util.io.Closer;
35 import de.todesbaum.util.io.LineInputStream;
36 import de.todesbaum.util.io.StreamCopier;
37 import de.todesbaum.util.io.TempFileInputStream;
38
39 /**
40  * A physical connection to a Freenet node.
41  * 
42  * @author David Roden <droden@gmail.com>
43  * @version $Id$
44  */
45 public class Connection {
46
47         /** The listeners that receive events from this connection. */
48         private List<ConnectionListener> connectionListeners = new ArrayList<ConnectionListener>();
49
50         /** The node this connection is connected to. */
51         private final Node node;
52
53         /** The name of this connection. */
54         private final String name;
55
56         /** The network socket of this connection. */
57         private Socket nodeSocket;
58
59         /** The input stream that reads from the socket. */
60         private InputStream nodeInputStream;
61
62         /** The output stream that writes to the socket. */
63         private OutputStream nodeOutputStream;
64
65         /** The thread that reads from the socket. */
66         private NodeReader nodeReader;
67
68         /** A writer for the output stream. */
69         private Writer nodeWriter;
70
71         /** The NodeHello message sent by the node on connect. */
72         protected Message nodeHello;
73
74         /**
75          * Creates a new connection to the specified node with the specified name.
76          * 
77          * @param node
78          *            The node to connect to
79          * @param name
80          *            The name of this connection
81          */
82         public Connection(Node node, String name) {
83                 this.node = node;
84                 this.name = name;
85         }
86
87         /**
88          * Adds a listener that gets notified on connection events.
89          * 
90          * @param connectionListener
91          *            The listener to add
92          */
93         public void addConnectionListener(ConnectionListener connectionListener) {
94                 connectionListeners.add(connectionListener);
95         }
96
97         /**
98          * Removes a listener from the list of registered listeners. Only the first
99          * matching listener is removed.
100          * 
101          * @param connectionListener
102          *            The listener to remove
103          * @see List#remove(java.lang.Object)
104          */
105         public void removeConnectionListener(ConnectionListener connectionListener) {
106                 connectionListeners.remove(connectionListener);
107         }
108
109         /**
110          * Notifies listeners about a received message.
111          * 
112          * @param message
113          *            The received message
114          */
115         protected void fireMessageReceived(Message message) {
116                 for (ConnectionListener connectionListener: connectionListeners) {
117                         connectionListener.messageReceived(this, message);
118                 }
119         }
120
121         /**
122          * Notifies listeners about the loss of the connection.
123          */
124         protected void fireConnectionTerminated() {
125                 for (ConnectionListener connectionListener: connectionListeners) {
126                         connectionListener.connectionTerminated(this);
127                 }
128         }
129
130         /**
131          * Returns the name of the connection.
132          * 
133          * @return The name of the connection
134          */
135         public String getName() {
136                 return name;
137         }
138
139         /**
140          * Connects to the node.
141          * 
142          * @return <code>true</code> if the connection succeeded and the node
143          *         returned a NodeHello message
144          * @throws IOException
145          *             if an I/O error occurs
146          * @see #getNodeHello()
147          */
148         public synchronized boolean connect() throws IOException {
149                 nodeSocket = null;
150                 nodeInputStream = null;
151                 nodeOutputStream = null;
152                 nodeWriter = null;
153                 nodeReader = null;
154                 try {
155                         nodeSocket = new Socket(node.getHostname(), node.getPort());
156                         nodeSocket.setReceiveBufferSize(65535);
157                         nodeInputStream = nodeSocket.getInputStream();
158                         nodeOutputStream = nodeSocket.getOutputStream();
159                         nodeWriter = new OutputStreamWriter(nodeOutputStream, Charset.forName("UTF-8"));
160                         nodeReader = new NodeReader(nodeInputStream);
161                         Thread nodeReaderThread = new Thread(nodeReader);
162                         nodeReaderThread.setDaemon(true);
163                         nodeReaderThread.start();
164                         ClientHello clientHello = new ClientHello();
165                         clientHello.setName(name);
166                         clientHello.setExpectedVersion("2.0");
167                         execute(clientHello);
168                         synchronized (this) {
169                                 try {
170                                         wait();
171                                 } catch (InterruptedException e) {
172                                 }
173                         }
174                         return nodeHello != null;
175                 } catch (IOException ioe1) {
176                         disconnect();
177                         throw ioe1;
178                 }
179         }
180
181         /**
182          * Returns whether this connection is still connected to the node.
183          * 
184          * @return <code>true</code> if this connection is still valid,
185          *         <code>false</code> otherwise
186          */
187         public boolean isConnected() {
188                 return (nodeHello != null) && (nodeSocket != null) && (nodeSocket.isConnected());
189         }
190
191         /**
192          * Returns the NodeHello message the node sent on connection.
193          * 
194          * @return The NodeHello message of the node
195          */
196         public Message getNodeHello() {
197                 return nodeHello;
198         }
199
200         /**
201          * Disconnects from the node.
202          */
203         public void disconnect() {
204                 if (nodeWriter != null) {
205                         try {
206                                 nodeWriter.close();
207                         } catch (IOException ioe1) {
208                         }
209                         nodeWriter = null;
210                 }
211                 if (nodeOutputStream != null) {
212                         try {
213                                 nodeOutputStream.close();
214                         } catch (IOException ioe1) {
215                         }
216                         nodeOutputStream = null;
217                 }
218                 if (nodeInputStream != null) {
219                         try {
220                                 nodeInputStream.close();
221                         } catch (IOException ioe1) {
222                         }
223                         nodeInputStream = null;
224                 }
225                 if (nodeSocket != null) {
226                         try {
227                                 nodeSocket.close();
228                         } catch (IOException ioe1) {
229                         }
230                         nodeSocket = null;
231                 }
232                 synchronized (this) {
233                         notify();
234                 }
235                 fireConnectionTerminated();
236         }
237
238         /**
239          * Executes the specified command.
240          * 
241          * @param command
242          *            The command to execute
243          * @throws IllegalStateException
244          *             if the connection is not connected
245          * @throws IOException
246          *             if an I/O error occurs
247          */
248         public synchronized void execute(Command command) throws IllegalStateException, IOException {
249                 if (nodeSocket == null) {
250                         throw new IllegalStateException("connection is not connected");
251                 }
252                 nodeWriter.write(command.getCommandName() + Command.LINEFEED);
253                 command.write(nodeWriter);
254                 nodeWriter.write("EndMessage" + Command.LINEFEED);
255                 nodeWriter.flush();
256                 if (command.hasPayload()) {
257                         InputStream payloadInputStream = null;
258                         try {
259                                 payloadInputStream = command.getPayload();
260                                 StreamCopier.copy(payloadInputStream, nodeOutputStream, command.getPayloadLength());
261                         } finally {
262                                 Closer.close(payloadInputStream);
263                         }
264                         nodeOutputStream.flush();
265                 }
266         }
267
268         /**
269          * The reader thread for this connection. This is essentially a thread that
270          * reads lines from the node, creates messages from them and notifies
271          * listeners about the messages.
272          * 
273          * @author David Roden &lt;droden@gmail.com&gt;
274          * @version $Id$
275          */
276         private class NodeReader implements Runnable {
277
278                 /** The input stream to read from. */
279                 @SuppressWarnings("hiding")
280                 private InputStream nodeInputStream;
281
282                 /**
283                  * Creates a new reader that reads from the specified input stream.
284                  * 
285                  * @param nodeInputStream
286                  *            The input stream to read from
287                  */
288                 public NodeReader(InputStream nodeInputStream) {
289                         this.nodeInputStream = nodeInputStream;
290                 }
291
292                 /**
293                  * Main loop of the reader. Lines are read and converted into
294                  * {@link Message} objects.
295                  */
296                 public void run() {
297                         LineInputStream nodeReader = null;
298                         try {
299                                 nodeReader = new LineInputStream(nodeInputStream);
300                                 String line = "";
301                                 Message message = null;
302                                 while (line != null) {
303                                         line = nodeReader.readLine();
304                                         // System.err.println("> " + line);
305                                         if (line == null) {
306                                                 break;
307                                         }
308                                         if (message == null) {
309                                                 message = new Message(line);
310                                                 continue;
311                                         }
312                                         if ("Data".equals(line)) {
313                                                 /* need to read message from stream now */
314                                                 File tempFile = null;
315                                                 try {
316                                                         tempFile = File.createTempFile("fcpv2", "data");
317                                                         tempFile.deleteOnExit();
318                                                         FileOutputStream tempFileOutputStream = new FileOutputStream(tempFile);
319                                                         long dataLength = Long.parseLong(message.get("DataLength"));
320                                                         StreamCopier.copy(nodeInputStream, tempFileOutputStream, dataLength);
321                                                         tempFileOutputStream.close();
322                                                         message.setPayloadInputStream(new TempFileInputStream(tempFile));
323                                                 } catch (IOException ioe1) {
324                                                         ioe1.printStackTrace();
325                                                 }
326                                         }
327                                         if ("Data".equals(line) || "EndMessage".equals(line)) {
328                                                 if (message.getName().equals("NodeHello")) {
329                                                         nodeHello = message;
330                                                         synchronized (Connection.this) {
331                                                                 Connection.this.notify();
332                                                         }
333                                                 } else {
334                                                         fireMessageReceived(message);
335                                                 }
336                                                 message = null;
337                                                 continue;
338                                         }
339                                         int equalsPosition = line.indexOf('=');
340                                         if (equalsPosition > -1) {
341                                                 String key = line.substring(0, equalsPosition).trim();
342                                                 String value = line.substring(equalsPosition + 1).trim();
343                                                 if (key.equals("Identifier")) {
344                                                         message.setIdentifier(value);
345                                                 } else {
346                                                         message.put(key, value);
347                                                 }
348                                                 continue;
349                                         }
350                                         /* skip lines consisting of whitespace only */
351                                         if (line.trim().length() == 0) {
352                                                 continue;
353                                         }
354                                         /* if we got here, some error occured! */
355                                         throw new IOException("Unexpected line: " + line);
356                                 }
357                         } catch (IOException ioe1) {
358                                 // ioe1.printStackTrace();
359                         } finally {
360                                 if (nodeReader != null) {
361                                         try {
362                                                 nodeReader.close();
363                                         } catch (IOException ioe1) {
364                                         }
365                                 }
366                                 if (nodeInputStream != null) {
367                                         try {
368                                                 nodeInputStream.close();
369                                         } catch (IOException ioe1) {
370                                         }
371                                 }
372                         }
373                         Connection.this.disconnect();
374                 }
375
376         }
377
378 }