}
List<Filter> filters = Lists.newArrayList();
filters.add(source);
- source.open(Metadata.UNKNOWN);
+ Metadata currentMetadata = Metadata.UNKNOWN;
/* collect all source->sink pairs. */
while (!filters.isEmpty()) {
Filter filter = filters.remove(0);
+ logger.info(String.format("Opening %s with %s...", filter.name(), currentMetadata));
+ filter.open(currentMetadata);
+ currentMetadata = filter.metadata();
Collection<Filter> sinks = this.filters.get(filter);
connections.add(new Connection(filter, sinks));
for (Filter sink : sinks) {
- logger.info(String.format("Opening %s with %s...", sink.name(), source.metadata()));
- sink.open(filter.metadata());
filters.add(sink);
}
}
/** The number of copied bytes. */
private long counter;
+ /** The exception that was encountered, if any. */
+ private Optional<IOException> ioException = Optional.absent();
+
/**
* Creates a new connection.
*
return counter;
}
+ /**
+ * Returns the I/O exception that was encountered while processing this
+ * connection.
+ *
+ * @return The I/O exception that occured, or {@link Optional#absent()} if no
+ * exception occured
+ */
+ public Optional<IOException> ioException() {
+ return ioException;
+ }
+
//
// ACTIONS
//
while (!stopped.get()) {
try {
final DataPacket dataPacket;
- try {
- logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
- dataPacket = source.get(4096);
- logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name()));
- } catch (IOException ioe1) {
- throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
- }
+ logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
+ dataPacket = source.get(4096);
+ logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name()));
List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Filter, Callable<Void>>() {
@Override
@Override
public Void call() throws Exception {
- try {
- logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name()));
- sink.process(dataPacket);
- logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name()));
- } catch (IOException ioe1) {
- throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
- }
+ logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name()));
+ sink.process(dataPacket);
+ logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name()));
return null;
}
};
}
counter += dataPacket.buffer().length;
} catch (IOException e) {
- /* TODO */
- e.printStackTrace();
+ ioException = Optional.of(e);
break;
} catch (InterruptedException e) {
/* TODO */