private Pipeline(Source source, Multimap<Source, Sink> sinks) {
this.source = Preconditions.checkNotNull(source, "source must not be null");
this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
+ for (ControlledComponent component : Lists.reverse(components())) {
+ logger.finest(String.format("Adding Listener to %s.", component));
+ component.addMetadataListener(new MetadataListener() {
+ @Override
+ public void metadataUpdated(ControlledComponent component, Metadata metadata) {
+ if (!(component instanceof Source)) {
+ return;
+ }
+ for (ControlledComponent controlledComponent : sinks((Source) component)) {
+ logger.fine(String.format("Updating Metadata from %s to %s.", component, controlledComponent));
+ controlledComponent.metadataUpdated(metadata);
+ }
+ }
+ });
+ }
}
//