X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=5ba05185e99a7993da0accee4ab4c02d7b200a4f;hb=5a7138b599f351dfcd28337adef24aae0c6d4b4b;hp=46d23749c79069e201b7fc46382437ff5b3ef42b;hpb=3a12209e82233cd79677a0d847321f41b41aa9a5;p=sonitus.git diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java index 46d2374..5ba0518 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -93,7 +93,8 @@ public class Pipeline implements Iterable { * @param filter * The filter to get the connected filters for * @return The filters connected to the given filter, or an empty list if the - * filter does not exist in this pipeline, or is not connected to any filters + * filter does not exist in this pipeline, or is not connected to any + * filters */ public List filters(Filter filter) { return filters.get(filter); @@ -138,15 +139,16 @@ public class Pipeline implements Iterable { } List filters = Lists.newArrayList(); filters.add(source); - source.open(Metadata.UNKNOWN); + Metadata currentMetadata = Metadata.UNKNOWN; /* collect all source->sink pairs. */ while (!filters.isEmpty()) { Filter filter = filters.remove(0); + logger.info(String.format("Opening %s with %s...", filter.name(), currentMetadata)); + filter.open(currentMetadata); + currentMetadata = filter.metadata(); Collection sinks = this.filters.get(filter); connections.add(new Connection(filter, sinks)); for (Filter sink : sinks) { - logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata())); - sink.open(filter.metadata()); filters.add(sink); } } @@ -296,7 +298,10 @@ public class Pipeline implements Iterable { * * @author David ‘Bombe’ Roden */ - public class Connection implements Runnable { + public static class Connection implements Runnable { + + /** The logger. */ + private static final Logger logger = Logger.getLogger(Connection.class.getName()); /** The source. */ private final Filter source; @@ -316,6 +321,9 @@ public class Pipeline implements Iterable { /** The number of copied bytes. */ private long counter; + /** The exception that was encountered, if any. */ + private Optional ioException = Optional.absent(); + /** * Creates a new connection. * @@ -339,6 +347,24 @@ public class Pipeline implements Iterable { // /** + * Returns the source of this connection. + * + * @return The source of this connection + */ + public Filter source() { + return source; + } + + /** + * Returns the sinks of this connection. + * + * @return The sinks of this connection + */ + public Collection sinks() { + return sinks; + } + + /** * Returns the time this connection was started. * * @return The time this connection was started (in milliseconds since Jan 1, @@ -358,6 +384,17 @@ public class Pipeline implements Iterable { return counter; } + /** + * Returns the I/O exception that was encountered while processing this + * connection. + * + * @return The I/O exception that occured, or {@link Optional#absent()} if no + * exception occured + */ + public Optional ioException() { + return ioException; + } + // // ACTIONS // @@ -377,13 +414,9 @@ public class Pipeline implements Iterable { while (!stopped.get()) { try { final DataPacket dataPacket; - try { - logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name())); - dataPacket = source.get(4096); - logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name())); - } catch (IOException ioe1) { - throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1); - } + logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name())); + dataPacket = source.get(4096); + logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name())); List> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function>() { @Override @@ -392,13 +425,9 @@ public class Pipeline implements Iterable { @Override public Void call() throws Exception { - try { - logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name())); - sink.process(dataPacket); - logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name())); - } catch (IOException ioe1) { - throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1); - } + logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name())); + sink.process(dataPacket); + logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name())); return null; } }; @@ -410,8 +439,7 @@ public class Pipeline implements Iterable { } counter += dataPacket.buffer().length; } catch (IOException e) { - /* TODO */ - e.printStackTrace(); + ioException = Optional.of(e); break; } catch (InterruptedException e) { /* TODO */