X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2FPipeline.java;h=75536fc732de25979706a27a1a3a4d9222852fe4;hb=07121c8fc483608738841207a92bdcc130ef0587;hp=ee67eec2f1fd89da67853fb1727b5bfb7ba64f7c;hpb=fa5196e029c48636f0c318311244395cfae49953;p=sonitus.git diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java index ee67eec..75536fc 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -34,8 +34,8 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.MoreExecutors; @@ -71,6 +71,21 @@ public class Pipeline implements Iterable { private Pipeline(Source source, Multimap sinks) { this.source = Preconditions.checkNotNull(source, "source must not be null"); this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null"); + for (ControlledComponent component : Lists.reverse(components())) { + logger.finest(String.format("Adding Listener to %s.", component)); + component.addMetadataListener(new MetadataListener() { + @Override + public void metadataUpdated(ControlledComponent component, Metadata metadata) { + if (!(component instanceof Source)) { + return; + } + for (ControlledComponent controlledComponent : sinks((Source) component)) { + logger.fine(String.format("Updating Metadata from %s to %s.", component, controlledComponent)); + controlledComponent.metadataUpdated(metadata); + } + } + }); + } } // @@ -172,7 +187,34 @@ public class Pipeline implements Iterable { @Override public Iterator iterator() { - return ImmutableSet.builder().add(source).addAll(sinks.values()).build().iterator(); + return components().iterator(); + } + + // + // PRIVATE METHODS + // + + /** + * Returns all components of this pipeline, listed breadth-first, starting with + * the source. + * + * @return All components of this pipeline + */ + public List components() { + ImmutableList.Builder components = ImmutableList.builder(); + List currentComponents = Lists.newArrayList(); + components.add(source); + currentComponents.add(source); + while (!currentComponents.isEmpty()) { + Collection sinks = this.sinks((Source) currentComponents.remove(0)); + for (Sink sink : sinks) { + components.add(sink); + if (sink instanceof Source) { + currentComponents.add(sink); + } + } + } + return components.build(); } // @@ -335,8 +377,6 @@ public class Pipeline implements Iterable { Metadata firstMetadata = null; while (!stopped.get()) { try { - final Metadata lastMetadata = firstMetadata; - final Metadata metadata = firstMetadata = source.metadata(); final byte[] buffer; try { logger.finest(String.format("Getting %d bytes from %s...", 4096, source)); @@ -353,9 +393,6 @@ public class Pipeline implements Iterable { @Override public Void call() throws Exception { - if (!metadata.equals(lastMetadata)) { - sink.metadataUpdated(metadata); - } try { logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink)); sink.process(buffer);