- List<Source> sources = Lists.newArrayList(pipeline.source());
- while (!sources.isEmpty()) {
- Collection<Sink> sinks = pipeline.sinks(sources.remove(0));
- for (Sink sink : sinks) {
- /* only count real sinks, everything else is filter. */
- if (sink instanceof Filter) {
- sources.add((Filter) sink);
- } else {
- sinkCount++;
- }
+ for (ControlledComponent component : pipeline.components()) {
+ if (!(component instanceof Source)) {
+ logger.finest(String.format("%s is not a Source, skipping.", component.name()));
+ sinkCount++;
+ continue;
+ }
+ Collection<Sink> sinks = pipeline.sinks((Source) component);
+ logger.finest(String.format("%s has %d sinks: %s", component.name(), sinks.size(), sinks));
+ if (sinks.isEmpty()) {
+ sinkCount++;