this.source = Preconditions.checkNotNull(source, "source must not be null");
this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
for (ControlledComponent component : Lists.reverse(components())) {
- logger.finest(String.format("Adding Listener to %s.", component));
+ logger.finest(String.format("Adding Listener to %s.", component.name()));
component.addMetadataListener(new MetadataListener() {
@Override
public void metadataUpdated(ControlledComponent component, Metadata metadata) {
return;
}
for (ControlledComponent controlledComponent : sinks((Source) component)) {
- logger.fine(String.format("Updating Metadata from %s to %s.", component, controlledComponent));
+ logger.fine(String.format("Updating Metadata from %s to %s as %s.", component.name(), controlledComponent.name(), metadata));
controlledComponent.metadataUpdated(metadata);
}
}
Collection<Sink> sinks = this.sinks.get(source);
connections.add(new Connection(source, sinks));
for (Sink sink : sinks) {
+ logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
sink.open(source.metadata());
if (sink instanceof Filter) {
sources.add((Source) sink);
}
}
for (Connection connection : connections) {
- logger.info(String.format("Starting Connection from %s to %s.", connection.source, connection.sinks));
+ logger.info(String.format("Starting Connection from %s to %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Sink, String>() {
+ @Override
+ public String apply(Sink sink) {
+ return sink.name();
+ }
+ })));
new Thread(connection).start();
}
}
/** The executor service. */
private final ExecutorService executorService;
+ /** The time the connection was started. */
+ private long startTime;
+
/** The number of copied bytes. */
private long counter;
public Connection(Source source, Collection<Sink> sinks) {
this.source = source;
this.sinks = sinks;
- if (sinks.size() == 1) {
+ if (sinks.size() < 2) {
executorService = MoreExecutors.sameThreadExecutor();
} else {
executorService = Executors.newCachedThreadPool();
//
/**
+ * Returns the time this connection was started.
+ *
+ * @return The time this connection was started (in milliseconds since Jan 1,
+ * 1970 UTC)
+ */
+ public long startTime() {
+ return startTime;
+ }
+
+ /**
* Returns the number of bytes that this connection has received from its
* source during its lifetime.
*
@Override
public void run() {
- Metadata firstMetadata = null;
+ startTime = System.currentTimeMillis();
while (!stopped.get()) {
try {
final byte[] buffer;