2 * Sonitus - MultiSource.java - Copyright © 2013 David Roden
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 package net.pterodactylus.sonitus.data.filter;
20 import static com.google.common.base.Preconditions.*;
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.io.PipedInputStream;
27 import java.io.PipedOutputStream;
28 import java.util.Arrays;
29 import java.util.logging.Level;
30 import java.util.logging.Logger;
32 import net.pterodactylus.sonitus.data.ConnectException;
33 import net.pterodactylus.sonitus.data.Filter;
34 import net.pterodactylus.sonitus.data.Format;
35 import net.pterodactylus.sonitus.data.Metadata;
36 import net.pterodactylus.sonitus.data.ReusableSink;
37 import net.pterodactylus.sonitus.data.Source;
38 import net.pterodactylus.sonitus.data.event.SourceFinishedEvent;
40 import com.google.common.eventbus.EventBus;
41 import com.google.inject.Inject;
44 * {@link ReusableSink} implementation that supports changing the source without
45 * letting the {@link net.pterodactylus.sonitus.data.Sink} know.
47 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
49 public class MultiSourceFilter implements Filter, ReusableSink {
52 private static final Logger logger = Logger.getLogger(MultiSourceFilter.class.getName());
54 /** Object used for synchronization. */
55 private final Object syncObject = new Object();
58 private final EventBus eventBus;
60 /** The connection. */
61 private Connection connection;
64 public MultiSourceFilter(EventBus eventBus) {
65 this.eventBus = eventBus;
69 public Format format() {
70 synchronized (syncObject) {
71 return connection.source.format();
76 public Metadata metadata() {
77 synchronized (syncObject) {
78 return connection.source.metadata();
83 public byte[] get(int bufferSize) throws EOFException, IOException {
84 byte[] buffer = new byte[bufferSize];
85 InputStream inputStream;
86 synchronized (syncObject) {
87 inputStream = connection.pipedInputStream;
89 int read = inputStream.read(buffer);
90 return Arrays.copyOf(buffer, read);
94 public void connect(Source source) throws ConnectException {
95 checkNotNull(source, "source must not be null");
96 if ((connection != null) && (connection.source != null)) {
97 checkArgument(connection.source.format().equals(source.format()), "source’s format must equal this sink’s format");
100 if (connection == null) {
101 connection = new Connection();
102 new Thread(connection).start();
105 connection.source(source);
106 } catch (IOException ioe1) {
107 throw new ConnectException(ioe1);
112 public void metadataUpdated() {
117 * The connection feeds the input from the currently connected source to the
118 * input stream that {@link #get(int)} will get its data from.
120 * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
122 private class Connection implements Runnable {
124 /** The currently connected source. */
125 /* synchronized by syncObject. */
126 private Source source;
128 /** The input stream that {@link #get(int)} will read from. */
129 /* synchronized by syncObject. */
130 private PipedInputStream pipedInputStream;
132 /** The output stream that the source will be fed into. */
133 /* synchronized by syncObject. */
134 private PipedOutputStream pipedOutputStream;
137 * Changes the source of the connection.
140 * The new source of the connection
141 * @return This connection
142 * @throws IOException
143 * if an I/O error occurs
145 public Connection source(Source source) throws IOException {
146 synchronized (syncObject) {
147 if (this.source != null) {
148 eventBus.post(new SourceFinishedEvent(this.source));
150 this.source = source;
151 pipedInputStream = new PipedInputStream();
152 pipedOutputStream = new PipedOutputStream(pipedInputStream);
153 syncObject.notifyAll();
161 /* wait for source to be set. */
162 OutputStream outputStream;
164 logger.finest("Entering synchronized block...");
165 synchronized (syncObject) {
166 logger.finest("Entered synchronized block.");
167 source = this.source;
168 while (source == null) {
170 logger.finest("Waiting for source to connect...");
172 } catch (InterruptedException ie1) {
173 /* ignore, keep waiting. */
175 source = this.source;
177 outputStream = pipedOutputStream;
179 logger.finest("Exited synchronized block.");
181 byte[] buffer = null;
182 boolean readSuccessful = false;
183 while (!readSuccessful) {
185 buffer = source.get(4096);
186 logger.finest(String.format("Read %d Bytes.", buffer.length));
187 if (buffer.length > 0) {
188 readSuccessful = true;
190 } catch (IOException e) {
191 /* TODO - notify & wait */
196 outputStream.write(buffer);
197 logger.finest(String.format("Wrote %d Bytes.", buffer.length));
198 } catch (IOException ioe1) {
199 /* okay, the sink has died, exit. */
200 logger.log(Level.WARNING, "Could not write to pipe!", ioe1);
205 logger.info("Exiting.");