Register metadata listeners at all components for metadata updating.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / Pipeline.java
index c24502f..75536fc 100644 (file)
@@ -34,8 +34,8 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -71,6 +71,21 @@ public class Pipeline implements Iterable<ControlledComponent> {
        private Pipeline(Source source, Multimap<Source, Sink> sinks) {
                this.source = Preconditions.checkNotNull(source, "source must not be null");
                this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
+               for (ControlledComponent component : Lists.reverse(components())) {
+                       logger.finest(String.format("Adding Listener to %s.", component));
+                       component.addMetadataListener(new MetadataListener() {
+                               @Override
+                               public void metadataUpdated(ControlledComponent component, Metadata metadata) {
+                                       if (!(component instanceof Source)) {
+                                               return;
+                                       }
+                                       for (ControlledComponent controlledComponent : sinks((Source) component)) {
+                                               logger.fine(String.format("Updating Metadata from %s to %s.", component, controlledComponent));
+                                               controlledComponent.metadataUpdated(metadata);
+                                       }
+                               }
+                       });
+               }
        }
 
        //
@@ -172,7 +187,34 @@ public class Pipeline implements Iterable<ControlledComponent> {
 
        @Override
        public Iterator<ControlledComponent> iterator() {
-               return ImmutableSet.<ControlledComponent>builder().add(source).addAll(sinks.values()).build().iterator();
+               return components().iterator();
+       }
+
+       //
+       // PRIVATE METHODS
+       //
+
+       /**
+        * Returns all components of this pipeline, listed breadth-first, starting with
+        * the source.
+        *
+        * @return All components of this pipeline
+        */
+       public List<ControlledComponent> components() {
+               ImmutableList.Builder<ControlledComponent> components = ImmutableList.builder();
+               List<ControlledComponent> currentComponents = Lists.newArrayList();
+               components.add(source);
+               currentComponents.add(source);
+               while (!currentComponents.isEmpty()) {
+                       Collection<Sink> sinks = this.sinks((Source) currentComponents.remove(0));
+                       for (Sink sink : sinks) {
+                               components.add(sink);
+                               if (sink instanceof Source) {
+                                       currentComponents.add(sink);
+                               }
+                       }
+               }
+               return components.build();
        }
 
        //