X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=923012e2640da80bcee51c1b20f07acc06aec9a7;hb=2548b73c91fa18f4e59fbe9d8f652d5580f643bc;hp=5bbcd636091231c4b64c74b528eaeacde258f21d;hpb=633a841142f978235ed9f745b6ba16c278963e62;p=sonitus.git diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java index 5bbcd63..923012e 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -72,19 +72,6 @@ public class Pipeline implements Iterable { private Pipeline(Filter source, Multimap filters) { this.source = Preconditions.checkNotNull(source, "source must not be null"); this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null")); - for (Filter filter : Lists.reverse(filters())) { - logger.finest(String.format("Adding Listener to %s.", filter.name())); - filter.addMetadataListener(new MetadataListener() { - - @Override - public void metadataUpdated(Filter filter, Metadata metadata) { - for (Filter sinks : filters(filter)) { - logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata)); - sinks.metadataUpdated(metadata); - } - } - }); - } } // @@ -151,6 +138,7 @@ public class Pipeline implements Iterable { } List filters = Lists.newArrayList(); filters.add(source); + source.open(Metadata.UNKNOWN); /* collect all source->sink pairs. */ while (!filters.isEmpty()) { Filter filter = filters.remove(0); @@ -308,7 +296,7 @@ public class Pipeline implements Iterable { * * @author David ‘Bombe’ Roden */ - public class Connection implements Runnable { + public static class Connection implements Runnable { /** The source. */ private final Filter source; @@ -388,11 +376,11 @@ public class Pipeline implements Iterable { startTime = System.currentTimeMillis(); while (!stopped.get()) { try { - final byte[] buffer; + final DataPacket dataPacket; try { logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name())); - buffer = source.get(4096); - logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name())); + dataPacket = source.get(4096); + logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name())); } catch (IOException ioe1) { throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1); } @@ -405,9 +393,9 @@ public class Pipeline implements Iterable { @Override public Void call() throws Exception { try { - logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name())); - sink.process(buffer); - logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name())); + logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name())); + sink.process(dataPacket); + logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name())); } catch (IOException ioe1) { throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1); } @@ -420,7 +408,7 @@ public class Pipeline implements Iterable { for (Future future : futures) { future.get(); } - counter += buffer.length; + counter += dataPacket.buffer().length; } catch (IOException e) { /* TODO */ e.printStackTrace();