}
}
for (Connection connection : connections) {
- logger.info(String.format("Starting Connection from %s to %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Sink, String>() {
+ String threadName = String.format("%s → %s.", connection.source.name(), FluentIterable.from(connection.sinks).transform(new Function<Sink, String>() {
+
@Override
public String apply(Sink sink) {
return sink.name();
}
- })));
- new Thread(connection).start();
+ }));
+ logger.info(String.format("Starting Thread: %s", threadName));
+ new Thread(connection, threadName).start();
}
}
public Connection(Source source, Collection<Sink> sinks) {
this.source = source;
this.sinks = sinks;
- if (sinks.size() == 1) {
+ if (sinks.size() < 2) {
executorService = MoreExecutors.sameThreadExecutor();
} else {
executorService = Executors.newCachedThreadPool();
try {
final byte[] buffer;
try {
- logger.finest(String.format("Getting %d bytes from %s...", 4096, source));
+ logger.finest(String.format("Getting %d bytes from %s...", 4096, source.name()));
buffer = source.get(4096);
- logger.finest(String.format("Got %d bytes from %s.", buffer.length, source));
+ logger.finest(String.format("Got %d bytes from %s.", buffer.length, source.name()));
} catch (IOException ioe1) {
- throw new IOException(String.format("I/O error while reading from %s.", source), ioe1);
+ throw new IOException(String.format("I/O error while reading from %s.", source.name()), ioe1);
}
List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
@Override
public Void call() throws Exception {
try {
- logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink));
+ logger.finest(String.format("Sending %d bytes to %s.", buffer.length, sink.name()));
sink.process(buffer);
- logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink));
+ logger.finest(String.format("Sent %d bytes to %s.", buffer.length, sink.name()));
} catch (IOException ioe1) {
- throw new IOException(String.format("I/O error while writing to %s", sink), ioe1);
+ throw new IOException(String.format("I/O error while writing to %s", sink.name()), ioe1);
}
return null;
}