private Pipeline(Filter source, Multimap<Filter, Filter> filters) {
this.source = Preconditions.checkNotNull(source, "source must not be null");
this.filters = ArrayListMultimap.create(Preconditions.checkNotNull(filters, "filters must not be null"));
- for (Filter filter : Lists.reverse(filters())) {
- logger.finest(String.format("Adding Listener to %s.", filter.name()));
- filter.addMetadataListener(new MetadataListener() {
-
- @Override
- public void metadataUpdated(Filter filter, Metadata metadata) {
- for (Filter sinks : filters(filter)) {
- logger.fine(String.format("Updating Metadata from %s to %s as %s.", filter.name(), sinks.name(), metadata));
- sinks.metadataUpdated(metadata);
- }
- }
- });
- }
}
//
}
List<Filter> filters = Lists.newArrayList();
filters.add(source);
+ source.open(Metadata.UNKNOWN);
/* collect all source->sink pairs. */
while (!filters.isEmpty()) {
Filter filter = filters.remove(0);
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
- public class Connection implements Runnable {
+ public static class Connection implements Runnable {
/** The source. */
private final Filter source;
//
/**
+ * Returns the source of this connection.
+ *
+ * @return The source of this connection
+ */
+ public Filter source() {
+ return source;
+ }
+
+ /**
+ * Returns the sinks of this connection.
+ *
+ * @return The sinks of this connection
+ */
+ public Collection<Filter> sinks() {
+ return sinks;
+ }
+
+ /**
* Returns the time this connection was started.
*
* @return The time this connection was started (in milliseconds since Jan 1,
startTime = System.currentTimeMillis();
while (!stopped.get()) {
try {
- final byte[] buffer;
+ final DataPacket dataPacket;
try {
logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
- buffer = source.get(4096);
- logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
+ dataPacket = source.get(4096);
+ logger.finest(String.format("Got %d bytes from %s.", dataPacket.buffer().length, source.name()));
} catch (IOException ioe1) {
throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
}
@Override
public Void call() throws Exception {
try {
- logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
- sink.process(buffer);
- logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
+ logger.finest(String.format("Sending %d bytes to %s.", dataPacket.buffer().length, sink.name()));
+ sink.process(dataPacket);
+ logger.finest(String.format("Sent %d bytes to %s.", dataPacket.buffer().length, sink.name()));
} catch (IOException ioe1) {
throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
}
for (Future<Void> future : futures) {
future.get();
}
- counter += buffer.length;
+ counter += dataPacket.buffer().length;
} catch (IOException e) {
/* TODO */
e.printStackTrace();