+ for (ControlledComponent component : Lists.reverse(components())) {
+ logger.finest(String.format("Adding Listener to %s.", component));
+ component.addMetadataListener(new MetadataListener() {
+ @Override
+ public void metadataUpdated(ControlledComponent component, Metadata metadata) {
+ if (!(component instanceof Source)) {
+ return;
+ }
+ for (ControlledComponent controlledComponent : sinks((Source) component)) {
+ logger.fine(String.format("Updating Metadata from %s to %s.", component, controlledComponent));
+ controlledComponent.metadataUpdated(metadata);
+ }
+ }
+ });
+ }
+ }
+
+ //
+ // ACCESSORS
+ //
+
+ /**
+ * Expose this pipeline’s source.
+ *
+ * @return This pipeline’s source
+ */
+ public Source source() {
+ return source;
+ }
+
+ /**
+ * Returns all {@link Sink}s (or {@link Filter}s, really) that are connected to
+ * the given source.
+ *
+ * @param source
+ * The source to get the sinks for
+ * @return The sinks connected to the given source, or an empty list if the
+ * source does not exist in this pipeline
+ */
+ public Collection<Sink> sinks(Source source) {
+ return sinks.get(source);
+ }
+
+ /**
+ * Returns the traffic counters of the given controlled component.
+ *
+ * @param controlledComponent
+ * The controlled component to get the traffic counters for
+ * @return The traffic counters for the given controlled component
+ */
+ public TrafficCounter trafficCounter(ControlledComponent controlledComponent) {
+ long input = -1;
+ long output = -1;
+ for (Connection connection : connections) {
+ /* the connection where the source matches knows the output. */
+ if (connection.source.equals(controlledComponent)) {
+ output = connection.counter();
+ } else if (connection.sinks.contains(controlledComponent)) {
+ input = connection.counter();
+ }
+ }
+ return new TrafficCounter(input, output);