import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.MoreExecutors;
private Pipeline(Source source, Multimap<Source, Sink> sinks) {
this.source = Preconditions.checkNotNull(source, "source must not be null");
this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
+ for (ControlledComponent component : Lists.reverse(components())) {
+ logger.finest(String.format("Adding Listener to %s.", component));
+ component.addMetadataListener(new MetadataListener() {
+ @Override
+ public void metadataUpdated(ControlledComponent component, Metadata metadata) {
+ if (!(component instanceof Source)) {
+ return;
+ }
+ for (ControlledComponent controlledComponent : sinks((Source) component)) {
+ logger.fine(String.format("Updating Metadata from %s to %s.", component, controlledComponent));
+ controlledComponent.metadataUpdated(metadata);
+ }
+ }
+ });
+ }
}
//
@Override
public Iterator<ControlledComponent> iterator() {
- return ImmutableSet.<ControlledComponent>builder().add(source).addAll(sinks.values()).build().iterator();
+ return components().iterator();
+ }
+
+ //
+ // PRIVATE METHODS
+ //
+
+ /**
+ * Returns all components of this pipeline, listed breadth-first, starting with
+ * the source.
+ *
+ * @return All components of this pipeline
+ */
+ public List<ControlledComponent> components() {
+ ImmutableList.Builder<ControlledComponent> components = ImmutableList.builder();
+ List<ControlledComponent> currentComponents = Lists.newArrayList();
+ components.add(source);
+ currentComponents.add(source);
+ while (!currentComponents.isEmpty()) {
+ Collection<Sink> sinks = this.sinks((Source) currentComponents.remove(0));
+ for (Sink sink : sinks) {
+ components.add(sink);
+ if (sink instanceof Source) {
+ currentComponents.add(sink);
+ }
+ }
+ }
+ return components.build();
}
//