Improve logging.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
index 975c21c..ab6500e 100644 (file)
@@ -34,8 +34,8 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -46,7 +46,7 @@ import com.google.common.util.concurrent.MoreExecutors;
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class Pipeline implements Iterable<Controlled> {
+public class Pipeline implements Iterable<ControlledComponent> {
 
        /** The logger. */
        private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
@@ -71,6 +71,21 @@ public class Pipeline implements Iterable<Controlled> {
        private Pipeline(Source source, Multimap<Source, Sink> sinks) {
                this.source = Preconditions.checkNotNull(source, "source must not be null");
                this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
+               for (ControlledComponent component : Lists.reverse(components())) {
+                       logger.finest(String.format("Adding Listener to %s.", component.name()));
+                       component.addMetadataListener(new MetadataListener() {
+                               @Override
+                               public void metadataUpdated(ControlledComponent component, Metadata metadata) {
+                                       if (!(component instanceof Source)) {
+                                               return;
+                                       }
+                                       for (ControlledComponent controlledComponent : sinks((Source) component)) {
+                                               logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata));
+                                               controlledComponent.metadataUpdated(metadata);
+                                       }
+                               }
+                       });
+               }
        }
 
        //
@@ -102,18 +117,18 @@ public class Pipeline implements Iterable<Controlled> {
        /**
         * Returns the traffic counters of the given controlled component.
         *
-        * @param controlled
+        * @param controlledComponent
         *              The controlled component to get the traffic counters for
         * @return The traffic counters for the given controlled component
         */
-       public TrafficCounter trafficCounter(Controlled controlled) {
+       public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
                long input = -1;
                long output = -1;
                for (Connection connection : connections) {
                        /* the connection where the source matches knows the output. */
-                       if (connection.source.equals(controlled)) {
+                       if (connection.source.equals(controlledComponent)) {
                                output = connection.counter();
-                       } else if (connection.sinks.contains(controlled)) {
+                       } else if (connection.sinks.contains(controlledComponent)) {
                                input = connection.counter();
                        }
                }
@@ -144,6 +159,7 @@ public class Pipeline implements Iterable<Controlled> {
                        Collection<Sink> sinks = this.sinks.get(source);
                        connections.add(new Connection(source, sinks));
                        for (Sink sink : sinks) {
+                               logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
                                sink.open(source.metadata());
                                if (sink instanceof Filter) {
                                        sources.add((Source) sink);
@@ -151,8 +167,15 @@ public class Pipeline implements Iterable<Controlled> {
                        }
                }
                for (Connection connection : connections) {
-                       logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks));
-                       new Thread(connection).start();
+                       String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Sink, String>() {
+
+                               @Override
+                               public String apply(Sink sink) {
+                                       return sink.name();
+                               }
+                       }));
+                       logger.info(String.format("Starting Thread: %s", threadName));
+                       new Thread(connection, threadName).start();
                }
        }
 
@@ -171,8 +194,35 @@ public class Pipeline implements Iterable<Controlled> {
        //
 
        @Override
-       public Iterator<Controlled> iterator() {
-               return ImmutableSet.<Controlled>builder().add(source).addAll(sinks.values()).build().iterator();
+       public Iterator<ControlledComponent> iterator() {
+               return components().iterator();
+       }
+
+       //
+       // PRIVATE METHODS
+       //
+
+       /**
+        * Returns all components of this pipeline, listed breadth-first, starting with
+        * the source.
+        *
+        * @return All components of this pipeline
+        */
+       public List<ControlledComponent> components() {
+               ImmutableList.Builder<ControlledComponent> components = ImmutableList.builder();
+               List<ControlledComponent> currentComponents = Lists.newArrayList();
+               components.add(source);
+               currentComponents.add(source);
+               while (!currentComponents.isEmpty()) {
+                       Collection<Sink> sinks = this.sinks((Source) currentComponents.remove(0));
+                       for (Sink sink : sinks) {
+                               components.add(sink);
+                               if (sink instanceof Source) {
+                                       currentComponents.add(sink);
+                               }
+                       }
+               }
+               return components.build();
        }
 
        //
@@ -282,6 +332,9 @@ public class Pipeline implements Iterable<Controlled> {
                /** The executor service. */
                private final ExecutorService executorService;
 
+               /** The time the connection was started. */
+               private long startTime;
+
                /** The number of copied bytes. */
                private long counter;
 
@@ -296,7 +349,7 @@ public class Pipeline implements Iterable<Controlled> {
                public Connection(Source source, Collection<Sink> sinks) {
                        this.source = source;
                        this.sinks = sinks;
-                       if (sinks.size() == 1) {
+                       if (sinks.size() < 2) {
                                executorService = MoreExecutors.sameThreadExecutor();
                        } else {
                                executorService = Executors.newCachedThreadPool();
@@ -308,6 +361,16 @@ public class Pipeline implements Iterable<Controlled> {
                //
 
                /**
+                * Returns the time this connection was started.
+                *
+                * @return The time this connection was started (in milliseconds since Jan 1,
+                *         1970 UTC)
+                */
+               public long startTime() {
+                       return startTime;
+               }
+
+               /**
                 * Returns the number of bytes that this connection has received from its
                 * source during its lifetime.
                 *
@@ -332,18 +395,16 @@ public class Pipeline implements Iterable<Controlled> {
 
                @Override
                public void run() {
-                       Metadata firstMetadata = null;
+                       startTime = System.currentTimeMillis();
                        while (!stopped.get()) {
                                try {
-                                       final Metadata lastMetadata = firstMetadata;
-                                       final Metadata metadata = firstMetadata = source.metadata();
                                        final byte[] buffer;
                                        try {
-                                               logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
+                                               logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
                                                buffer = source.get(4096);
-                                               logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
+                                               logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
                                        } catch (IOException ioe1) {
-                                               throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
+                                               throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
                                        }
                                        List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
 
@@ -353,15 +414,12 @@ public class Pipeline implements Iterable<Controlled> {
 
                                                                @Override
                                                                public Void call() throws Exception {
-                                                                       if (!metadata.equals(lastMetadata)) {
-                                                                               sink.metadataUpdated(metadata);
-                                                                       }
                                                                        try {
-                                                                               logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
+                                                                               logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
                                                                                sink.process(buffer);
-                                                                               logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
+                                                                               logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
                                                                        } catch (IOException ioe1) {
-                                                                               throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
+                                                                               throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
                                                                        }
                                                                        return null;
                                                                }