+ this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
+ for (Filter filter : Lists.reverse(filters())) {
+ logger.finest(String.format("Adding Listener to %s.", filter.name()));
+ filter.addMetadataListener(new MetadataListener() {
+
+ @Override
+ public void metadataUpdated(Filter filter, Metadata metadata) {
+ for (Filter sinks : filters(filter)) {
+ logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata));
+ sinks.metadataUpdated(metadata);
+ }
+ }
+ });
+ }
+ }
+
+ //
+ // ACCESSORS
+ //
+
+ /**
+ * Expose this pipeline’s source.
+ *
+ * @return This pipeline’s source
+ */
+ public Filter source() {
+ return source;
+ }
+
+ /**
+ * Returns all {@link Filter}s that are connected to the given filter.
+ *
+ * @param filter
+ * The filter to get the connected filters for
+ * @return The filters connected to the given filter, or an empty list if the
+ * filter does not exist in this pipeline, or is not connected to any filters
+ */
+ public List<Filter> filters(Filter filter) {
+ return filters.get(filter);
+ }
+
+ /**
+ * Returns the traffic counters of the given filter.
+ *
+ * @param filter
+ * The filter to get the traffic counters for
+ * @return The traffic counters for the given filter
+ */
+ public TrafficCounter trafficCounter(Filter filter) {
+ long input = -1;
+ long output = -1;
+ for (Connection connection : connections) {
+ /* the connection where the source matches knows the output. */
+ if (connection.source.equals(filter)) {
+ output = connection.counter();
+ } else if (connection.sinks.contains(filter)) {
+ input = connection.counter();
+ }
+ }
+ return new TrafficCounter(input, output);