}
List<Filter> filters = Lists.newArrayList();
filters.add(source);
- source.open(Metadata.UNKNOWN);
+ Metadata currentMetadata = Metadata.UNKNOWN;
/* collect all source->sink pairs. */
while (!filters.isEmpty()) {
Filter filter = filters.remove(0);
+ logger.info(String.format("Opening %s with %s...", filter.name(), currentMetadata));
+ filter.open(currentMetadata);
+ currentMetadata = filter.metadata();
Collection<Filter> sinks = this.filters.get(filter);
connections.add(new Connection(filter, sinks));
for (Filter sink : sinks) {
- logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
- sink.open(filter.metadata());
filters.add(sink);
}
}
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
- public class Connection implements Runnable {
+ public static class Connection implements Runnable {
/** The source. */
private final Filter source;
//
/**
+ * Returns the source of this connection.
+ *
+ * @return The source of this connection
+ */
+ public Filter source() {
+ return source;
+ }
+
+ /**
+ * Returns the sinks of this connection.
+ *
+ * @return The sinks of this connection
+ */
+ public Collection<Filter> sinks() {
+ return sinks;
+ }
+
+ /**
* Returns the time this connection was started.
*
* @return The time this connection was started (in milliseconds since Jan 1,