import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
-import net.pterodactylus.fcp.highlevel.ConnectResult;
-import net.pterodactylus.fcp.highlevel.HighLevelCallback;
import net.pterodactylus.fcp.highlevel.HighLevelClient;
+import net.pterodactylus.fcp.highlevel.HighLevelClientListener;
import net.pterodactylus.util.io.Closer;
+import net.pterodactylus.util.logging.Logging;
/**
* TODO
- *
+ *
* @author David ‘Bombe’ Roden <bombe@freenetproject.org>
* @version $Id$
*/
-public class NodeManager {
+public class NodeManager implements Iterable<Node>, HighLevelClientListener {
/** Logger. */
- private static final Logger logger = Logger.getLogger(NodeManager.class.getName());
+ private static final Logger logger = Logging.getLogger(NodeManager.class.getName());
/** The FCP client name. */
private final String clientName;
/** Object used for synchronization. */
private final Object syncObject = new Object();
+ /** Node listeners. */
+ private List<NodeListener> nodeListeners = Collections.synchronizedList(new ArrayList<NodeListener>());
+
/** All nodes. */
private List<Node> nodes = Collections.synchronizedList(new ArrayList<Node>());
/** All FCP connections. */
- private Map<Node, HighLevelClient> nodeConnections = Collections.synchronizedMap(new HashMap<Node, HighLevelClient>());
+ private Map<Node, HighLevelClient> nodeClients = Collections.synchronizedMap(new HashMap<Node, HighLevelClient>());
/** Keeps track of which connection is in use right now. */
private Set<HighLevelClient> usedConnections = Collections.synchronizedSet(new HashSet<HighLevelClient>());
/**
* Creates a new FCP collector.
- *
+ *
* @param clientName
* The name of the FCP client
* @param directory
}
//
+ // EVENT MANAGEMENT
+ //
+
+ /**
+ * Adds the given listener to the list of listeners.
+ *
+ * @param nodeListener
+ * The listener to add
+ */
+ public void addNodeListener(NodeListener nodeListener) {
+ nodeListeners.add(nodeListener);
+ }
+
+ /**
+ * Removes the given listener from the list of listeners.
+ *
+ * @param nodeListener
+ * The listener to remove
+ */
+ public void removeNodeListener(NodeListener nodeListener) {
+ nodeListeners.remove(nodeListener);
+ }
+
+ /**
+ * Notifies all listeners that a node was added.
+ *
+ * @param node
+ * The node that was added.
+ */
+ private void fireNodeAdded(Node node) {
+ for (NodeListener nodeListener: nodeListeners) {
+ nodeListener.nodeAdded(node);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a node was removed.
+ *
+ * @param node
+ * The node that was removed
+ */
+ private void fireNodeRemoved(Node node) {
+ for (NodeListener nodeListener: nodeListeners) {
+ nodeListener.nodeRemoved(node);
+ }
+ }
+
+ /**
+ * Notifies all listeners that the given node was connected.
+ *
+ * @param node
+ * The node that is now connected
+ */
+ private void fireNodeConnected(Node node) {
+ for (NodeListener nodeListener: nodeListeners) {
+ nodeListener.nodeConnected(node);
+ }
+ }
+
+ /**
+ * Notifies all listeners that the given node was disconnected.
+ *
+ * @param node
+ * The node that is now disconnected
+ * @param throwable
+ * The exception that caused the disconnect, or <code>null</code>
+ * if there was no exception
+ */
+ private void fireNodeDisconnected(Node node, Throwable throwable) {
+ for (NodeListener nodeListener: nodeListeners) {
+ nodeListener.nodeDisconnected(node, throwable);
+ }
+ }
+
+ //
// ACCESSORS
//
/**
* Returns the directory in which the nodes are stored.
- *
+ *
* @return The directory the nodes are stored in
*/
public String getDirectory() {
/**
* Checks whether the given node is already connected.
- *
+ *
* @param node
* The node to check
* @return <code>true</code> if the node is already connected,
return nodes.contains(node);
}
+ /**
+ * {@inheritDoc}
+ */
+ public Iterator<Node> iterator() {
+ return nodes.iterator();
+ }
+
//
// ACTIONS
//
/**
* Loads nodes.
- *
+ *
* @throws IOException
* if an I/O error occurs loading the nodes
*/
newNode.setPort(nodePort);
loadedNodes.add(newNode);
}
+ logger.fine("loaded " + loadedNodes.size() + " nodes from config");
synchronized (syncObject) {
nodes.clear();
nodes.addAll(loadedNodes);
}
+ for (Node node: nodes) {
+ fireNodeAdded(node);
+ }
}
/**
* Saves all configured nodes.
- *
+ *
* @throws IOException
* if an I/O error occurs saving the nodes
*/
}
/**
- * Adds a connection to the given node. The connection is made instantly so
- * this method may block. If the node can not be connected, it will not be
- * added to the list of nodes.
- *
+ * Adds the given node to this manager.
+ *
+ * @see #connect(Node)
* @param node
* The node to connect to
- * @return <code>true</code> if the connection to the node could be
- * established
- * @throws UnknownHostException
- * if the hostname of the node can not be resolved
- * @throws IOException
- * if an I/O error occurs connecting to the node
*/
- public boolean addNode(Node node) throws UnknownHostException, IOException {
- if (nodes.contains(node)) {
- return true;
+ public void addNode(Node node) {
+ synchronized (syncObject) {
+ if (!nodes.contains(node)) {
+ nodes.add(node);
+ fireNodeAdded(node);
+ }
}
- HighLevelClient highLevelClient = new HighLevelClient(clientName, node.getHostname(), node.getPort());
- HighLevelCallback<ConnectResult> connectCallback = highLevelClient.connect();
- ConnectResult connectResult = null;
- while (connectResult == null) {
- try {
- connectResult = connectCallback.getResult();
- } catch (InterruptedException e) {
- /* ignore. */
+ }
+
+ /**
+ * Removes the given node from the node manager, disconnecting it if it is
+ * currently connected.
+ *
+ * @param node
+ * The node to remove
+ */
+ public void removeNode(Node node) {
+ synchronized (syncObject) {
+ if (!nodes.contains(node)) {
+ return;
+ }
+ if (nodeClients.containsKey(node)) {
+ disconnect(node);
}
+ fireNodeRemoved(node);
}
- if (connectResult.isConnected()) {
+ }
+
+ /**
+ * Tries to establish a connection with the given node.
+ *
+ * @param node
+ * The node to connect to
+ */
+ public void connect(Node node) {
+ try {
+ HighLevelClient highLevelClient = new HighLevelClient(clientName, node.getHostname(), node.getPort());
synchronized (syncObject) {
- nodes.add(node);
- nodeConnections.put(node, highLevelClient);
clientNodes.put(highLevelClient, node);
+ nodeClients.put(node, highLevelClient);
}
+ highLevelClient.addHighLevelClientListener(this);
+ highLevelClient.connect();
+ } catch (IOException ioe1) {
+ fireNodeDisconnected(node, ioe1);
+ }
+ }
+
+ /**
+ * Disconnects the given node without removing it.
+ *
+ * @param node
+ * The node to disconnect
+ */
+ public void disconnect(Node node) {
+ synchronized (syncObject) {
+ if (!nodes.contains(node)) {
+ return;
+ }
+ HighLevelClient highLevelClient = nodeClients.get(node);
+ highLevelClient.disconnect();
}
- return connectResult.isConnected();
}
/**
* Returns a list of all nodes.
- *
+ *
* @return A list of all nodes
*/
public List<Node> getNodes() {
- return new ArrayList<Node>(clientNodes.values());
+ return Collections.unmodifiableList(nodes);
+ }
+
+ /**
+ * “Borrows” a high-level client for the given node. A borrowed client
+ * <strong>has</strong> to be returned to the node manager using
+ * {@link #returnHighLevelClient(HighLevelClient)} when it is no longer in
+ * use, i.e. after a message has been sent! This method will block until a
+ * high-level client for the given node is available.
+ *
+ * @param node
+ * The node to get a high-level client for
+ * @return The high-level client for a node, or <code>null</code> if the
+ * node was disconnected or removed
+ */
+ public HighLevelClient borrowHighLevelClient(Node node) {
+ synchronized (syncObject) {
+ if (!nodeClients.containsKey(node)) {
+ return null;
+ }
+ HighLevelClient highLevelClient = nodeClients.get(node);
+ while (nodeClients.containsKey(node) && usedConnections.contains(highLevelClient)) {
+ try {
+ syncObject.wait();
+ } catch (InterruptedException ie1) {
+ /* ignore. TODO - check. */
+ }
+ }
+ if (!nodeClients.containsKey(node)) {
+ return null;
+ }
+ usedConnections.add(highLevelClient);
+ return highLevelClient;
+ }
+ }
+
+ /**
+ * Returns a borrowed high-level client.
+ *
+ * @see #borrowHighLevelClient(Node)
+ * @param highLevelClient
+ * The high-level client to return
+ */
+ public void returnHighLevelClient(HighLevelClient highLevelClient) {
+ synchronized (syncObject) {
+ if (!clientNodes.containsKey(highLevelClient)) {
+ return;
+ }
+ usedConnections.remove(highLevelClient);
+ syncObject.notifyAll();
+ }
}
//
/**
* Finds a currently unused high-level client, optionally waiting until a
* client is free and marking it used.
- *
+ *
* @param wait
* <code>true</code> to wait for a free connection,
* <code>false</code> to return <code>null</code>
synchronized (syncObject) {
HighLevelClient freeHighLevelClient = null;
while (freeHighLevelClient == null) {
- for (HighLevelClient highLevelClient: nodeConnections.values()) {
+ for (HighLevelClient highLevelClient: nodeClients.values()) {
if (!usedConnections.contains(highLevelClient)) {
freeHighLevelClient = highLevelClient;
break;
if (!wait) {
return null;
}
+ try {
+ syncObject.wait();
+ } catch (InterruptedException e) {
+ /* ignore, just re-check. */
+ }
}
/* we never get here, but the compiler doesn't realize. */
return null;
}
}
+ //
+ // INTERFACE HighLevelClientListener
+ //
+
+ /**
+ * {@inheritDoc}
+ */
+ public void clientConnected(HighLevelClient highLevelClient) {
+ logger.log(Level.FINER, "clientConnected(c=" + highLevelClient + ")");
+ Node node = clientNodes.get(highLevelClient);
+ if (node == null) {
+ logger.log(Level.WARNING, "got event for unknown client");
+ return;
+ }
+ fireNodeConnected(node);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void clientDisconnected(HighLevelClient highLevelClient, Throwable throwable) {
+ logger.log(Level.FINER, "clientDisconnected(c=" + highLevelClient + ",t=" + throwable + ")");
+ synchronized (syncObject) {
+ Node node = clientNodes.remove(highLevelClient);
+ if (node == null) {
+ logger.log(Level.WARNING, "got event for unknown client");
+ return;
+ }
+ nodeClients.remove(node);
+ usedConnections.remove(highLevelClient);
+ fireNodeDisconnected(node, throwable);
+ }
+ }
+
}