Fix formatting.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
index ae4924b..5ba0518 100644 (file)
@@ -72,19 +72,6 @@ public class Pipeline implements Iterable<Filter> {
        private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
                this.source = Preconditions.checkNotNull(source, "source must not be null");
                this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
-               for (Filter filter : Lists.reverse(filters())) {
-                       logger.finest(String.format("Adding Listener to %s.", filter.name()));
-                       filter.addMetadataListener(new MetadataListener() {
-
-                               @Override
-                               public void metadataUpdated(Filter filter, Metadata metadata) {
-                                       for (Filter sinks : filters(filter)) {
-                                               logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata));
-                                               sinks.metadataUpdated(metadata);
-                                       }
-                               }
-                       });
-               }
        }
 
        //
@@ -106,7 +93,8 @@ public class Pipeline implements Iterable<Filter> {
         * @param filter
         *              The filter to get the connected filters for
         * @return The filters connected to the given filter, or an empty list if the
-        *         filter does not exist in this pipeline, or is not connected to any filters
+        *         filter does not exist in this pipeline, or is not connected to any
+        *         filters
         */
        public List<Filter> filters(Filter filter) {
                return filters.get(filter);
@@ -151,15 +139,16 @@ public class Pipeline implements Iterable<Filter> {
                }
                List<Filter> filters = Lists.newArrayList();
                filters.add(source);
-               source.open(Metadata.UNKNOWN);
+               Metadata currentMetadata = Metadata.UNKNOWN;
                /* collect all source->sink pairs. */
                while (!filters.isEmpty()) {
                        Filter filter = filters.remove(0);
+                       logger.info(String.format("Opening %s with %s...", filter.name(), currentMetadata));
+                       filter.open(currentMetadata);
+                       currentMetadata = filter.metadata();
                        Collection<Filter> sinks = this.filters.get(filter);
                        connections.add(new Connection(filter, sinks));
                        for (Filter sink : sinks) {
-                               logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
-                               sink.open(filter.metadata());
                                filters.add(sink);
                        }
                }
@@ -309,7 +298,10 @@ public class Pipeline implements Iterable<Filter> {
         *
         * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
         */
-       public class Connection implements Runnable {
+       public static class Connection implements Runnable {
+
+               /** The logger. */
+               private static final Logger logger = Logger.getLogger(Connection.class.getName());
 
                /** The source. */
                private final Filter source;
@@ -329,6 +321,9 @@ public class Pipeline implements Iterable<Filter> {
                /** The number of copied bytes. */
                private long counter;
 
+               /** The exception that was encountered, if any. */
+               private Optional<IOException> ioException = Optional.absent();
+
                /**
                 * Creates a new connection.
                 *
@@ -352,6 +347,24 @@ public class Pipeline implements Iterable<Filter> {
                //
 
                /**
+                * Returns the source of this connection.
+                *
+                * @return The source of this connection
+                */
+               public Filter source() {
+                       return source;
+               }
+
+               /**
+                * Returns the sinks of this connection.
+                *
+                * @return The sinks of this connection
+                */
+               public Collection<Filter> sinks() {
+                       return sinks;
+               }
+
+               /**
                 * Returns the time this connection was started.
                 *
                 * @return The time this connection was started (in milliseconds since Jan 1,
@@ -371,6 +384,17 @@ public class Pipeline implements Iterable<Filter> {
                        return counter;
                }
 
+               /**
+                * Returns the I/O exception that was encountered while processing this
+                * connection.
+                *
+                * @return The I/O exception that occured, or {@link Optional#absent()} if no
+                *         exception occured
+                */
+               public Optional<IOException> ioException() {
+                       return ioException;
+               }
+
                //
                // ACTIONS
                //
@@ -389,14 +413,10 @@ public class Pipeline implements Iterable<Filter> {
                        startTime = System.currentTimeMillis();
                        while (!stopped.get()) {
                                try {
-                                       final byte[] buffer;
-                                       try {
-                                               logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
-                                               buffer = source.get(4096);
-                                               logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
-                                       } catch (IOException ioe1) {
-                                               throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
-                                       }
+                                       final DataPacket dataPacket;
+                                       logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
+                                       dataPacket = source.get(4096);
+                                       logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name()));
                                        List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Filter, Callable<Void>>() {
 
                                                @Override
@@ -405,13 +425,9 @@ public class Pipeline implements Iterable<Filter> {
 
                                                                @Override
                                                                public Void call() throws Exception {
-                                                                       try {
-                                                                               logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
-                                                                               sink.process(buffer);
-                                                                               logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
-                                                                       } catch (IOException ioe1) {
-                                                                               throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
-                                                                       }
+                                                                       logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name()));
+                                                                       sink.process(dataPacket);
+                                                                       logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name()));
                                                                        return null;
                                                                }
                                                        };
@@ -421,10 +437,9 @@ public class Pipeline implements Iterable<Filter> {
                                        for (Future<Void> future : futures) {
                                                future.get();
                                        }
-                                       counter += buffer.length;
+                                       counter += dataPacket.buffer().length;
                                } catch (IOException e) {
-                                       /* TODO */
-                                       e.printStackTrace();
+                                       ioException = Optional.of(e);
                                        break;
                                } catch (InterruptedException e) {
                                        /* TODO */