Implement iterable.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
index 93648bc..9b6d160 100644 (file)
@@ -19,6 +19,7 @@ package net.pterodactylus.sonitus.data;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -33,6 +34,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -43,7 +45,7 @@ import com.google.common.util.concurrent.MoreExecutors;
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class Pipeline {
+public class Pipeline implements Iterable<Controlled> {
 
        /** The logger. */
        private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
@@ -71,6 +73,32 @@ public class Pipeline {
        }
 
        //
+       // ACCESSORS
+       //
+
+       /**
+        * Expose this pipeline’s source.
+        *
+        * @return This pipeline’s source
+        */
+       public Source source() {
+               return source;
+       }
+
+       /**
+        * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
+        * the given source.
+        *
+        * @param source
+        *              The source to get the sinks for
+        * @return The sinks connected to the given source, or an empty list if the
+        *         source does not exist in this pipeline
+        */
+       public Collection<Sink> sinks(Source source) {
+               return sinks.get(source);
+       }
+
+       //
        // ACTIONS
        //
 
@@ -117,6 +145,15 @@ public class Pipeline {
        }
 
        //
+       // ITERABLE METHODS
+       //
+
+       @Override
+       public Iterator<Controlled> iterator() {
+               return ImmutableSet.<Controlled>builder().add(source).addAll(sinks.values()).build().iterator();
+       }
+
+       //
        // STATIC METHODS
        //
 
@@ -261,7 +298,14 @@ public class Pipeline {
                                try {
                                        final Metadata lastMetadata = firstMetadata;
                                        final Metadata metadata = firstMetadata = source.metadata();
-                                       final byte[] buffer = source.get(4096);
+                                       final byte[] buffer;
+                                       try {
+                                               logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
+                                               buffer = source.get(4096);
+                                               logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
+                                       } catch (IOException ioe1) {
+                                               throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
+                                       }
                                        List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
 
                                                @Override
@@ -273,7 +317,13 @@ public class Pipeline {
                                                                        if (!metadata.equals(lastMetadata)) {
                                                                                sink.metadataUpdated(metadata);
                                                                        }
-                                                                       sink.process(buffer);
+                                                                       try {
+                                                                               logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
+                                                                               sink.process(buffer);
+                                                                               logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
+                                                                       } catch (IOException ioe1) {
+                                                                               throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
+                                                                       }
                                                                        return null;
                                                                }
                                                        };