+++ /dev/null
-/*
- * Sonitus - ConnectException.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-package net.pterodactylus.sonitus.data;
-
-/**
- * Exception that signals an error when {@link Sink#connect(Source) connecting}
- * a {@link Source} to a {@link Sink}.
- *
- * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
- */
-public class ConnectException extends Exception {
-
- /** Creates a new connect exception. */
- public ConnectException() {
- super();
- }
-
- /**
- * Creates a new connect exception.
- *
- * @param message
- * The message of the exception
- */
- public ConnectException(String message) {
- super(message);
- }
-
- /**
- * Creates a new connect exception.
- *
- * @param cause
- * The cause of the exception
- */
- public ConnectException(Throwable cause) {
- super(cause);
- }
-
- /**
- * Creates a new connect exception.
- *
- * @param message
- * The message of the exception
- * @param cause
- * The cause of the exception
- */
- public ConnectException(String message, Throwable cause) {
- super(message, cause);
- }
-
-}
+++ /dev/null
-/*
- * Sonitus - Connection.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-package net.pterodactylus.sonitus.data;
-
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A connection reads bytes from a {@link Source} and feeds it to a sink. This
- * class is meant to be subclassed by each {@link Sink}, overriding the {@link
- * #feed(byte[])} method to actually feed the data into the sink. The {@link
- * #feed(byte[])} method is also responsible for blocking for an appropriate
- * amount of time; this method determines how fast a {@link Source} is
- * consumed.
- *
- * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
- */
-public abstract class Connection implements Runnable {
-
- /** The logger. */
- private static final Logger logger = Logger.getLogger(Connection.class.getName());
-
- /** The source to consume. */
- private final Source source;
-
- /**
- * Creates a new connection that will read from the given source.
- *
- * @param source
- * The source to read
- */
- public Connection(Source source) {
- this.source = source;
- }
-
- //
- // RUNNABLE METHODS
- //
-
- @Override
- public void run() {
- while (true) {
- byte[] buffer = null;
- try {
- buffer = source.get(bufferSize());
- } catch (IOException ioe1) {
- logger.log(Level.WARNING, "Source died!", ioe1);
- break;
- }
- try {
- feed(buffer);
- } catch (IOException ioe1) {
- logger.log(Level.WARNING, "Sink died!", ioe1);
- break;
- }
- }
- try {
- logger.info("Connection finished.");
- finish();
- } catch (IOException ioe1) {
- /* well, what can we do? nothing. */
- }
- }
-
- //
- // SUBCLASS METHODS
- //
-
- /**
- * Returns the number of bytes that will be requested from the source.
- *
- * @return The number of bytes that will be requested from the source
- */
- protected abstract int bufferSize();
-
- /**
- * Feeds the read data into the sink. The given buffer is always filled and
- * never contains excess elements.
- *
- * @param buffer
- * The data
- * @throws IOException
- * if an I/O error occurs
- */
- protected abstract void feed(byte[] buffer) throws IOException;
-
- /**
- * Notifies the sink that the source does not deliver any more data.
- *
- * @throws IOException
- * if an I/O error occurs
- */
- protected abstract void finish() throws IOException;
-
-}
package net.pterodactylus.sonitus.data;
/**
- * A filter processes an input to produce an output.
+ * A filter is both a {@link Source} and a {@link Sink}. It is used to process
+ * the audio date in whatever way seems appropriate.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
--- /dev/null
+/*
+ * Sonitus - Pipeline.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+/**
+ * A pipeline is responsible for streaming audio data from a {@link Source} to
+ * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class Pipeline {
+
+ /** The logger. */
+ private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
+
+ /** The source of the audio stream. */
+ private final Source source;
+
+ /** The sinks for each source. */
+ private final Multimap<Source, Sink> sinks;
+
+ /** All started feeders. */
+ private final List<Feeder> feeders = Lists.newArrayList();
+
+ /**
+ * Creates a new pipeline.
+ *
+ * @param source
+ * The source of the audio stream
+ * @param sinks
+ * The sinks for each source
+ */
+ private Pipeline(Source source, Multimap<Source, Sink> sinks) {
+ this.source = Preconditions.checkNotNull(source, "source must not be null");
+ this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
+ }
+
+ //
+ // ACTIONS
+ //
+
+ /**
+ * Starts the pipeline.
+ *
+ * @throws IOException
+ * if any of the sinks can not be opened
+ * @throws IllegalStateException
+ * if the pipeline is already running
+ */
+ public void start() throws IOException, IllegalStateException {
+ if (!feeders.isEmpty()) {
+ throw new IllegalStateException("Pipeline is already running!");
+ }
+ List<Source> sources = Lists.newArrayList();
+ sources.add(source);
+ /* collect all source->sink pairs. */
+ while (!sources.isEmpty()) {
+ Source source = sources.remove(0);
+ Collection<Sink> sinks = this.sinks.get(source);
+ feeders.add(new Feeder(source, sinks));
+ for (Sink sink : sinks) {
+ sink.open(source.metadata());
+ if (sink instanceof Filter) {
+ sources.add((Source) sink);
+ }
+ }
+ }
+ for (Feeder feeder : feeders) {
+ logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
+ new Thread(feeder).start();
+ }
+ }
+
+ public void stop() {
+ if (!feeders.isEmpty()) {
+ /* pipeline is not running. */
+ return;
+ }
+ for (Feeder feeder : feeders) {
+ feeder.stop();
+ }
+ }
+
+ //
+ // STATIC METHODS
+ //
+
+ /**
+ * Returns a new pipeline builder.
+ *
+ * @param source
+ * The source at which to start
+ * @return A builder for a new pipeline
+ */
+ public static Builder builder(Source source) {
+ return new Builder(source);
+ }
+
+ /**
+ * A builder for a {@link Pipeline}.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+ public static class Builder {
+
+ /** The source of the pipeline. */
+ private final Source source;
+
+ /** The sinks to which each source streams. */
+ private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
+
+ /** The last added source. */
+ private Source lastSource;
+
+ /**
+ * Creates a new builder.
+ *
+ * @param source
+ * The source that starts the pipeline
+ */
+ private Builder(Source source) {
+ this.source = source;
+ lastSource = source;
+ }
+
+ /**
+ * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
+ * {@link Source}.
+ *
+ * @param sink
+ * The sink to add
+ * @return This builder
+ * @throws IllegalStateException
+ * if the last added {@link Sink} was not also a {@link Source}
+ */
+ public Builder to(Sink sink) {
+ Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
+ nextSinks.put(lastSource, sink);
+ lastSource = (sink instanceof Filter) ? (Source) sink : null;
+ return this;
+ }
+
+ /**
+ * Locates the given source and sets it as the last added node so that the
+ * next invocation of {@link #to(Sink)} can “fork” the pipeline.
+ *
+ * @param source
+ * The source to locate
+ * @return This builder
+ * @throws IllegalStateException
+ * if the given source was not previously added as a sink
+ */
+ public Builder find(Source source) {
+ Preconditions.checkState(nextSinks.containsValue(source));
+ lastSource = source;
+ return this;
+ }
+
+ /**
+ * Builds the pipeline.
+ *
+ * @return The created pipeline
+ */
+ public Pipeline build() {
+ return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
+ }
+
+ }
+
+ /**
+ * A feeder is responsible for streaming audio from one {@link Source} to an
+ * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
+ * creating a {@link Thread} wrapping it and starting said thread.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+ private class Feeder implements Runnable {
+
+ /** The source. */
+ private final Source source;
+
+ /** The sinks. */
+ private final Collection<Sink> sinks;
+
+ /** Whether the feeder was stopped. */
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+
+ /** The executor service. */
+ private final ExecutorService executorService;
+
+ /**
+ * Creates a new feeder.
+ *
+ * @param source
+ * The source of the stream
+ * @param sinks
+ * The sinks to which to stream
+ */
+ public Feeder(Source source, Collection<Sink> sinks) {
+ this.source = source;
+ this.sinks = sinks;
+ if (sinks.size() == 1) {
+ executorService = Executors.newSingleThreadExecutor();
+ } else {
+ executorService = Executors.newCachedThreadPool();
+ }
+ }
+
+ //
+ // ACTIONS
+ //
+
+ /** Stops this feeder. */
+ public void stop() {
+ stopped.set(true);
+ }
+
+ //
+ // RUNNABLE METHODS
+ //
+
+ @Override
+ public void run() {
+ Metadata firstMetadata = source.metadata();
+ while (!stopped.get()) {
+ try {
+ final Metadata lastMetadata = firstMetadata;
+ final Metadata metadata = firstMetadata = source.metadata();
+ final byte[] buffer = source.get(4096);
+ List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
+
+ @Override
+ public Callable<Void> apply(final Sink sink) {
+ return new Callable<Void>() {
+
+ @Override
+ public Void call() throws Exception {
+ if (!metadata.equals(lastMetadata)) {
+ sink.metadataUpdated(metadata);
+ }
+ sink.process(buffer);
+ return null;
+ }
+ };
+ }
+ }).toList());
+ /* check all threads for exceptions. */
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (IOException e) {
+ /* TODO */
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ /* TODO */
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ /* TODO */
+ e.printStackTrace();
+ }
+ }
+ }
+
+ }
+
+}
+++ /dev/null
-/*
- * Sonitus - MultiSourceSink.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-package net.pterodactylus.sonitus.data;
-
-/**
- * Extension of the {@link Sink} interface that supports changing the source
- * without causing a reconnection in the sink.
- *
- * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
- */
-public interface ReusableSink extends Sink {
-
-}
-/*
- * Sonitus - Sink.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
package net.pterodactylus.sonitus.data;
+import java.io.IOException;
+
/**
- * A sink is an endpoint for an audio stream. It might be a file on disk, it can
- * be an audio output in the current system, or it can be sent to a remote
- * streaming server for broadcasting.
+ * A sink is a destination for audio data. It can be played on speakers, it can
+ * be written to a file, or it can be sent to a remote streaming server.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
public interface Sink {
/**
- * Connects this sink to the given source.
+ * Opens this sink using the format parameters of the given metadata.
+ *
+ * @param metadata
+ * The metadata of the stream
+ * @throws IOException
+ * if an I/O error occurs
+ */
+ void open(Metadata metadata) throws IOException;
+
+ /** Closes this sink. */
+ void close();
+
+ /**
+ * Processes the given buffer of data.
*
- * @param source
- * The source to connect to
- * @throws ConnectException
- * if the source can not be connected, e.g. due to a {@link Metadata}
- * mismatch
+ * @param buffer
+ * The data to process
+ * @throws IOException
+ * if an I/O error occurs
*/
- void connect(Source source) throws ConnectException;
+ void process(byte[] buffer) throws IOException;
- /** Notifies the sink that a source has updated its metadata. */
- void metadataUpdated();
+ /**
+ * Notifies the sink that the metadata of the audio stream has changed. This
+ * method should return as fast as possible, i.e. every heavy lifting should be
+ * done from another thread.
+ *
+ * @param metadata
+ * The new metadata
+ */
+ void metadataUpdated(Metadata metadata);
}
-/*
- * Sonitus - Source.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
package net.pterodactylus.sonitus.data;
-import java.io.EOFException;
import java.io.IOException;
/**
- * Defines an arbitrary media source. This can be almost anything; an MP3 file,
- * a FastTracker module, or a decoded WAVE file.
+ * A source produces an audio stream and accompanying metadata.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
public interface Source {
/**
- * Returns the metadata of this source.
+ * Returns the metadata of the audio stream.
*
- * @return The metadata of this source
+ * @return The metadata of the audio stream
*/
Metadata metadata();
/**
- * Retrieves the given name of bytes from this source. The source should always
- * try to read as much data as was requested but is free to return a byte array
- * with less elements that requested. However, the byte array will always be
- * the same size as the data that was actually read, i.e. there are no excess
- * elements in the returned array.
+ * Retrieves data from the audio stream.
*
* @param bufferSize
- * The size of the buffer
- * @return A buffer that contains the read data
- * @throws EOFException
- * if the end of the source was reached
+ * The maximum amount of bytes to retrieve from the audio stream
+ * @return A buffer filled with up to {@code bufferSize} bytes of data; the
+ * returned buffer may contain less data than requested but will not
+ * contain excess elements
* @throws IOException
* if an I/O error occurs
*/
- byte[] get(int bufferSize) throws EOFException, IOException;
+ byte[] get(int bufferSize) throws IOException;
}
--- /dev/null
+/*
+ * Sonitus - AbstractFilter.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data.filter;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Arrays;
+
+import net.pterodactylus.sonitus.data.Filter;
+import net.pterodactylus.sonitus.data.Metadata;
+
+import com.google.common.io.Closeables;
+
+/**
+ * Dummy {@link Filter} implementation that pipes its input to its output.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class DummyFilter implements Filter {
+
+ /** The input stream from which to read. */
+ private InputStream inputStream;
+
+ /** The output stream to which to write. */
+ private OutputStream outputStream;
+
+ /** The current metadata. */
+ private Metadata metadata;
+
+ //
+ // FILTER METHODS
+ //
+
+ @Override
+ public void open(Metadata metadata) throws IOException {
+ this.metadata = metadata;
+ inputStream = createInputStream();
+ outputStream = createOutputStream();
+ }
+
+ @Override
+ public void close() {
+ try {
+ Closeables.close(outputStream, true);
+ Closeables.close(inputStream, true);
+ } catch (IOException e) {
+ /* won’t throw. */
+ }
+ }
+
+ @Override
+ public Metadata metadata() {
+ return metadata;
+ }
+
+ @Override
+ public void metadataUpdated(Metadata metadata) {
+ this.metadata = metadata;
+ }
+
+ @Override
+ public void process(byte[] buffer) throws IOException {
+ outputStream.write(buffer);
+ outputStream.flush();
+ }
+
+ @Override
+ public byte[] get(int bufferSize) throws IOException {
+ byte[] buffer = new byte[bufferSize];
+ int read = inputStream.read(buffer);
+ if (read == -1) {
+ throw new EOFException();
+ }
+ return Arrays.copyOf(buffer, read);
+ }
+
+ //
+ // SUBCLASS METHODS
+ //
+
+ /**
+ * Creates the input stream from which {@link net.pterodactylus.sonitus.data.Pipeline}
+ * will read the audio data. If you override this, you have to override {@link
+ * #createOutputStream()}, too!
+ *
+ * @return The input stream to read from
+ * @throws IOException
+ * if an I/O error occurs
+ */
+ protected InputStream createInputStream() throws IOException {
+ return new PipedInputStream();
+ }
+
+ /**
+ * Creates the output stream to which {@link net.pterodactylus.sonitus.data.Pipeline}
+ * will write the audio data. If you override this, you have to override {@link
+ * #createInputStream()}, too!
+ *
+ * @return The output stream to write to
+ * @throws IOException
+ * if an I/O error occurs
+ */
+ protected OutputStream createOutputStream() throws IOException {
+ return new PipedOutputStream((PipedInputStream) inputStream);
+ }
+
+}
package net.pterodactylus.sonitus.data.filter;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.PipedOutputStream;
-import java.util.Arrays;
import java.util.logging.Logger;
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
-import net.pterodactylus.sonitus.data.Filter;
import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
import net.pterodactylus.sonitus.io.InputStreamDrainer;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
/**
- * {@link Filter} implementation that runs its {@link Source} through an
- * external program.
+ * {@link net.pterodactylus.sonitus.data.Filter} implementation that runs its
+ * {@link net.pterodactylus.sonitus.data.Source} through an external program.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
-public abstract class ExternalFilter implements Filter {
+public abstract class ExternalFilter extends DummyFilter {
/** The logger. */
private final Logger logger = Logger.getLogger(getClass().getName());
- /** The source. */
- private Source source;
-
- private InputStream processInputStream;
+ /** The external process. */
+ private Process process;
//
// FILTER METHODS
//
@Override
- public Metadata metadata() {
- return source.metadata();
+ public void open(Metadata metadata) throws IOException {
+ process = Runtime.getRuntime().exec(Iterables.toArray(ImmutableList.<String>builder().add(binary(metadata)).addAll(parameters(metadata)).build(), String.class));
+ InputStream processError = process.getErrorStream();
+ new Thread(new InputStreamDrainer(processError)).start();
+ super.open(metadata);
}
@Override
- public byte[] get(int bufferSize) throws EOFException, IOException {
- byte[] buffer = new byte[bufferSize];
- int read = processInputStream.read(buffer);
- if (read == -1) {
- throw new EOFException();
- }
- return Arrays.copyOf(buffer, read);
+ public void close() {
+ process.destroy();
}
+ //
+ // DUMMYFILTER METHODS
+ //
+
@Override
- public void connect(Source source) throws ConnectException {
- Preconditions.checkNotNull(source, "source must not be null");
-
- this.source = source;
- try {
- final Process process = Runtime.getRuntime().exec(Iterables.toArray(ImmutableList.<String>builder().add(binary(source.metadata())).addAll(parameters(source.metadata())).build(), String.class));
- processInputStream = process.getInputStream();
- final OutputStream processInput = process.getOutputStream();
- final InputStream processError = process.getErrorStream();
- final PipedOutputStream pipedOutputStream = new PipedOutputStream();
- new Thread(new InputStreamDrainer(processError)).start();
- new Thread(new Connection(source) {
-
- @Override
- protected int bufferSize() {
- return 4096;
- }
-
- @Override
- protected void feed(byte[] buffer) throws IOException {
- processInput.write(buffer);
- processInput.flush();
- }
-
- @Override
- protected void finish() throws IOException {
- processInput.close();
- processError.close();
- }
- }).start();
- } catch (IOException ioe1) {
-
- }
+ protected InputStream createInputStream() throws IOException {
+ return process.getInputStream();
}
@Override
- public void metadataUpdated() {
- /* ignore. */
+ protected OutputStream createOutputStream() throws IOException {
+ return process.getOutputStream();
}
//
package net.pterodactylus.sonitus.data.filter;
-import static com.google.common.base.Preconditions.*;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.inject.internal.util.$Preconditions.checkState;
+
+import java.io.IOException;
-import net.pterodactylus.sonitus.data.ConnectException;
import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
/**
- * Basic {@link ExternalFilter} implementation that verifies that the connected
- * source is MP3-encoded and returns a PCM-encoded stream.
+ * Basic {@link net.pterodactylus.sonitus.data.filter.ExternalFilter}
+ * implementation that verifies that the connected source is MP3-encoded and
+ * returns a PCM-encoded stream.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
}
@Override
- public void connect(Source source) throws ConnectException {
- checkNotNull(source, "source must not be null");
- checkState(source.metadata().encoding().equalsIgnoreCase("MP3"), "source must be MP3-encoded");
+ public void open(Metadata metadata) throws IOException {
+ checkNotNull(metadata, "metadata must not be null");
+ checkState(metadata.encoding().equalsIgnoreCase("MP3"), "source must be MP3-encoded");
- super.connect(source);
+ super.open(metadata);
}
}
package net.pterodactylus.sonitus.data.filter;
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
-import com.google.common.base.Preconditions;
+import net.pterodactylus.sonitus.data.Metadata;
/**
- * Basic {@link ExternalFilter} implementation that verifies that the connected
- * source is PCM-encoded and that returns an MP3-encoded metadata.
+ * Basic {@link net.pterodactylus.sonitus.data.filter.ExternalFilter}
+ * implementation that verifies that the connected source is PCM-encoded and
+ * that returns an MP3-encoded metadata.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
}
@Override
- public void connect(Source source) throws ConnectException {
- Preconditions.checkNotNull(source, "source must not be null");
- Preconditions.checkState(source.metadata().encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
+ public void open(Metadata metadata) throws IOException {
+ checkNotNull(metadata, "metadata must not be null");
+ checkState(metadata.encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
- super.connect(source);
+ super.open(metadata);
}
}
+++ /dev/null
-/*
- * Sonitus - MultiSource.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-package net.pterodactylus.sonitus.data.filter;
-
-import static com.google.common.base.Preconditions.*;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Filter;
-import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.ReusableSink;
-import net.pterodactylus.sonitus.data.Source;
-import net.pterodactylus.sonitus.data.event.SourceFinishedEvent;
-
-import com.google.common.eventbus.EventBus;
-import com.google.inject.Inject;
-
-/**
- * {@link ReusableSink} implementation that supports changing the source without
- * letting the {@link net.pterodactylus.sonitus.data.Sink} know.
- *
- * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
- */
-public class MultiSourceFilter implements Filter, ReusableSink {
-
- /** The logger. */
- private static final Logger logger = Logger.getLogger(MultiSourceFilter.class.getName());
-
- /** The event bus. */
- private final EventBus eventBus;
-
- /** The current source. */
- private final AtomicReference<Source> source = new AtomicReference<Source>();
-
- @Inject
- public MultiSourceFilter(EventBus eventBus) {
- this.eventBus = eventBus;
- }
-
- @Override
- public Metadata metadata() {
- return source.get().metadata();
- }
-
- @Override
- public byte[] get(int bufferSize) throws EOFException, IOException {
- try {
- return source.get().get(bufferSize);
- } catch (EOFException eofe1) {
- eventBus.post(new SourceFinishedEvent(source.get()));
- throw eofe1;
- }
- }
-
- @Override
- public void connect(Source source) throws ConnectException {
- checkNotNull(source, "source must not be null");
-
- Source oldSource = this.source.getAndSet(source);
- if (oldSource != null) {
- checkArgument(oldSource.metadata().channels() == source.metadata().channels(), "source’s channel count must equal existing source’s channel count");
- checkArgument(oldSource.metadata().frequency() == source.metadata().frequency(), "source’s frequency must equal existing source’s frequency");
- checkArgument(oldSource.metadata().encoding().equalsIgnoreCase(source.metadata().encoding()), "source’s encoding must equal existing source’s encoding");
- }
- }
-
- @Override
- public void metadataUpdated() {
- /* ignore. */
- }
-
-}
package net.pterodactylus.sonitus.data.filter;
-import net.pterodactylus.sonitus.data.ConnectException;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+
import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
/**
}
@Override
- public void connect(Source source) throws ConnectException {
- Preconditions.checkNotNull(source, "source must not be null");
- Preconditions.checkArgument(source.metadata().encoding().equalsIgnoreCase("Vorbis"), "source must be Vorbis-encoded");
+ public void open(Metadata metadata) throws IOException {
+ checkNotNull(metadata, "metadata must not be null");
+ checkArgument(metadata.encoding().equalsIgnoreCase("Vorbis"), "source must be Vorbis-encoded");
- super.connect(source);
+ super.open(metadata);
}
//
--- /dev/null
+/*
+ * Sonitus - PredicateFilter.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data.filter;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import net.pterodactylus.sonitus.data.Filter;
+import net.pterodactylus.sonitus.data.Metadata;
+
+import com.google.common.base.Predicate;
+
+/**
+ * {@link Filter} implementation that uses a {@link Predicate} to determine
+ * whether a filter will be used or the data will only be passed through.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class PredicateFilter extends DummyFilter {
+
+ /** The predicate. */
+ private final Predicate<Metadata> metadataPredicate;
+
+ /** The filter to use if the predicate matches. */
+ private final Filter filter;
+
+ /** Whether the predicate currently matches. */
+ private final AtomicBoolean metadataMatches = new AtomicBoolean(false);
+
+ /**
+ * Creates a new predicate filter.
+ *
+ * @param metadataPredicate
+ * The predicate to evaluate every time the metadata changes
+ * @param filter
+ * The filter to use if the predicate matches the metadata
+ */
+ public PredicateFilter(Predicate<Metadata> metadataPredicate, Filter filter) {
+ this.metadataPredicate = metadataPredicate;
+ this.filter = filter;
+ }
+
+ //
+ // FILTER METHODS
+ //
+
+ @Override
+ public void open(Metadata metadata) throws IOException {
+ checkNotNull(metadata, "metadata must not be null");
+
+ metadataMatches.set(metadataPredicate.apply(metadata));
+ if (metadataMatches.get()) {
+ filter.open(metadata);
+ } else {
+ super.open(metadata);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (metadataMatches.get()) {
+ filter.close();
+ } else {
+ super.close();
+ }
+ }
+
+ @Override
+ public void metadataUpdated(Metadata metadata) {
+ metadataMatches.set(metadataPredicate.apply(metadata));
+ if (metadataMatches.get()) {
+ filter.metadataUpdated(metadata);
+ } else {
+ super.metadataUpdated(metadata);
+ }
+ }
+
+ @Override
+ public void process(byte[] buffer) throws IOException {
+ if (metadataMatches.get()) {
+ filter.process(buffer);
+ } else {
+ super.process(buffer);
+ }
+ }
+
+ @Override
+ public Metadata metadata() {
+ if (metadataMatches.get()) {
+ return filter.metadata();
+ } else {
+ return super.metadata();
+ }
+ }
+
+ @Override
+ public byte[] get(int bufferSize) throws IOException {
+ if (metadataMatches.get()) {
+ return filter.get(bufferSize);
+ } else {
+ return super.get(bufferSize);
+ }
+ }
+
+}
package net.pterodactylus.sonitus.data.filter;
-import java.io.EOFException;
import java.io.IOException;
import java.util.logging.Logger;
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Filter;
import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
-
-import com.google.common.base.Preconditions;
/**
* Rate limiting filter that only passes a specified amount of data per second
- * from its {@link Source} to its {@link net.pterodactylus.sonitus.data.Sink}.
+ * from its {@link net.pterodactylus.sonitus.data.Source} to its {@link
+ * net.pterodactylus.sonitus.data.Sink}.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
-public class RateLimitingFilter implements Filter {
+public class RateLimitingFilter extends DummyFilter {
/** The logger. */
private static final Logger logger = Logger.getLogger(RateLimitingFilter.class.getName());
/** The limiting rate in bytes/second. */
private final int rate;
- /** The fast start time. */
- private final long fastStartTime;
-
- /** The source. */
- private Source source;
+ /** The start time. */
+ private long startTime;
- /** The remaining fast start time. */
- private long remainingFastStartTime;
+ /** The number of bytes. */
+ private long counter;
/**
* Creates a new rate limiting filter.
*/
public RateLimitingFilter(int rate, long fastStartTime) {
this.rate = rate;
- this.fastStartTime = fastStartTime;
- remainingFastStartTime = fastStartTime;
+ this.counter = (long) (-rate * (fastStartTime / 1000.0));
}
//
//
@Override
- public Metadata metadata() {
- return source.metadata();
+ public void open(Metadata metadata) throws IOException {
+ super.open(metadata);
+ startTime = System.currentTimeMillis();
}
@Override
- public byte[] get(int bufferSize) throws EOFException, IOException {
- long now = System.currentTimeMillis();
- byte[] buffer = source.get(bufferSize);
+ public void process(byte[] buffer) throws IOException {
+ super.process(buffer);
/* delay. */
- long waitTime = 1000 * buffer.length / rate;
- remainingFastStartTime = Math.max(remainingFastStartTime - waitTime, 0);
- while ((remainingFastStartTime == 0) && (System.currentTimeMillis() - now) < waitTime) {
+ counter += buffer.length;
+ long waitTime = (long) (counter / (rate / 1000.0));
+ while ((System.currentTimeMillis() - startTime) < waitTime) {
try {
- long limitDelay = waitTime - (System.currentTimeMillis() - now);
+ long limitDelay = waitTime - (System.currentTimeMillis() - startTime);
logger.finest(String.format("Waiting %d ms...", limitDelay));
Thread.sleep(limitDelay);
} catch (InterruptedException ie1) {
/* ignore, keep looping. */
}
}
- return buffer;
- }
-
- @Override
- public void connect(Source source) throws ConnectException {
- Preconditions.checkNotNull(source, "source must not be null");
-
- this.source = source;
- }
-
- @Override
- public void metadataUpdated() {
- /* ignore. */
+ logger.finest(String.format("Processed %d Bytes during %d ms, that’s %.1f bytes/s.", counter, System.currentTimeMillis() - startTime, counter / ((System.currentTimeMillis() - startTime) / 1000.0)));
}
}
package net.pterodactylus.sonitus.data.filter;
-import static com.google.common.base.Preconditions.*;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
-import net.pterodactylus.sonitus.data.ConnectException;
import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
import com.google.common.collect.ImmutableList;
}
@Override
- public void connect(Source source) throws ConnectException {
- checkNotNull(source, "source must not be null");
- checkArgument(source.metadata().encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
+ public void open(Metadata metadata) throws IOException {
+ checkNotNull(metadata, "metadata must not be null");
+ checkArgument(metadata.encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
- super.connect(source);
+ super.open(metadata);
}
//
package net.pterodactylus.sonitus.data.sink;
-import static com.google.common.base.Preconditions.*;
-
+import java.io.IOException;
import java.util.logging.Logger;
import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.AudioSystem;
import javax.sound.sampled.LineUnavailableException;
import javax.sound.sampled.SourceDataLine;
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
import net.pterodactylus.sonitus.data.Metadata;
import net.pterodactylus.sonitus.data.Sink;
-import net.pterodactylus.sonitus.data.Source;
+
+import com.google.common.base.Preconditions;
/**
- * {@link Sink} implementation that uses the JDK’s {@link
- * javax.sound.sampled.AudioSystem} to play all {@link net.pterodactylus.sonitus.data.Source}s.
+ * {@link net.pterodactylus.sonitus.data.Sink} implementation that uses the
+ * JDK’s {@link javax.sound.sampled.AudioSystem} to play all {@link
+ * net.pterodactylus.sonitus.data.Source}s.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
/** The logger. */
private static final Logger logger = Logger.getLogger(AudioSink.class.getName());
- /** The current source. */
- private Source source;
+ /** The current metadata. */
+ private Metadata metadata;
- @Override
- public void connect(Source source) throws ConnectException {
- this.source = checkNotNull(source, "source must not be null");
- checkState(source.metadata().encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
+ /** The audio output. */
+ private SourceDataLine sourceDataLine;
- final Metadata sourceMetadata = source.metadata();
- AudioFormat audioFormat = new AudioFormat(sourceMetadata.frequency(), 16, sourceMetadata.channels(), true, false);
+ @Override
+ public void open(Metadata metadata) throws IOException {
+ Preconditions.checkArgument(metadata.encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
+ AudioFormat audioFormat = new AudioFormat(metadata.frequency(), 16, metadata.channels(), true, false);
try {
- final SourceDataLine sourceDataLine = AudioSystem.getSourceDataLine(audioFormat);
+ sourceDataLine = AudioSystem.getSourceDataLine(audioFormat);
sourceDataLine.open(audioFormat);
sourceDataLine.start();
- new Thread(new Connection(source) {
-
- @Override
- protected int bufferSize() {
- return sourceMetadata.channels() * sourceMetadata.frequency() * 2;
- }
-
- @Override
- protected void feed(byte[] buffer) {
- sourceDataLine.write(buffer, 0, buffer.length);
- logger.finest(String.format("AudioSink: Wrote %d Bytes.", buffer.length));
- }
-
- @Override
- protected void finish() {
- sourceDataLine.stop();
- }
- }).start();
- metadataUpdated();
- } catch (LineUnavailableException lue1) {
- throw new ConnectException(lue1);
+ } catch (LineUnavailableException e) {
+ /* TODO */
+ throw new IOException(e);
}
}
@Override
- public void metadataUpdated() {
+ public void close() {
+ sourceDataLine.stop();
+ sourceDataLine.close();
+ }
+
+ @Override
+ public void metadataUpdated(Metadata metadata) {
/* ignore. */
}
+ @Override
+ public void process(byte[] buffer) {
+ sourceDataLine.write(buffer, 0, buffer.length);
+ logger.finest(String.format("AudioSink: Wrote %d Bytes.", buffer.length));
+ }
+
}
package net.pterodactylus.sonitus.data.sink;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.logging.Logger;
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
+import net.pterodactylus.sonitus.data.Metadata;
import net.pterodactylus.sonitus.data.Sink;
-import net.pterodactylus.sonitus.data.Source;
-
-import com.google.common.base.Preconditions;
/**
- * {@link Sink} that writes all received data into a file.
+ * {@link net.pterodactylus.sonitus.data.Sink} that writes all received data
+ * into a file.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
/** The path of the file to write to. */
private final String path;
+ private FileOutputStream fileOutputStream;
+
/**
* Creates a new file sink that will write to the given path.
*
}
@Override
- public void connect(Source source) throws ConnectException {
- Preconditions.checkNotNull(source, "source must not be null");
+ public void open(Metadata metadata) throws IOException {
+ fileOutputStream = new FileOutputStream(path);
+ }
+ @Override
+ public void close() {
try {
- final FileOutputStream fileOutputStream = new FileOutputStream(path);
- new Thread(new Connection(source) {
-
- @Override
- protected int bufferSize() {
- return 65536;
- }
-
- @Override
- protected void feed(byte[] buffer) throws IOException {
- fileOutputStream.write(buffer);
- logger.finest(String.format("FileSink: Wrote %d Bytes.", buffer.length));
- }
-
- @Override
- protected void finish() throws IOException {
- fileOutputStream.close();
- }
- }).start();
- } catch (FileNotFoundException fnfe1) {
- throw new ConnectException(fnfe1);
+ fileOutputStream.close();
+ } catch (IOException e) {
+ /* ignore. */
}
}
@Override
- public void metadataUpdated() {
+ public void metadataUpdated(Metadata metadata) {
/* ignore. */
}
+ @Override
+ public void process(byte[] buffer) throws IOException {
+ fileOutputStream.write(buffer);
+ logger.finest(String.format("FileSink: Wrote %d Bytes.", buffer.length));
+ }
+
}
package net.pterodactylus.sonitus.data.sink;
-import static com.google.common.base.Preconditions.*;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
import net.pterodactylus.sonitus.data.Metadata;
import net.pterodactylus.sonitus.data.Sink;
-import net.pterodactylus.sonitus.data.Source;
import net.pterodactylus.sonitus.io.InputStreamDrainer;
import com.google.common.base.Function;
import com.google.common.io.Closeables;
/**
- * {@link Sink} implementation that delivers all incoming data to an Icecast2
- * server.
+ * {@link net.pterodactylus.sonitus.data.Sink} implementation that delivers all
+ * incoming data to an Icecast2 server.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
/** Whether to publish the server. */
private final boolean publishServer;
- /** The connected source. */
- private Source source;
+ private OutputStream socketOutputStream;
/**
* Creates a new Icecast2 sink.
//
@Override
- public void connect(Source source) throws ConnectException {
- checkNotNull(source, "source must not be null");
+ public void open(Metadata metadata) throws IOException {
+ logger.info(String.format("Connecting to %s:%d...", server, port));
+ Socket socket = new Socket(server, port);
+ logger.info("Connected.");
+ socketOutputStream = socket.getOutputStream();
+ InputStream socketInputStream = socket.getInputStream();
+
+ sendLine(socketOutputStream, String.format("SOURCE /%s ICE/1.0", mountPoint));
+ sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password)));
+ sendLine(socketOutputStream, String.format("Content-Type: %s", getContentType(metadata)));
+ sendLine(socketOutputStream, String.format("ICE-Name: %s", serverName));
+ sendLine(socketOutputStream, String.format("ICE-Description: %s", serverDescription));
+ sendLine(socketOutputStream, String.format("ICE-Genre: %s", genre));
+ sendLine(socketOutputStream, String.format("ICE-Public: %d", publishServer ? 1 : 0));
+ sendLine(socketOutputStream, "");
+ socketOutputStream.flush();
+
+ new Thread(new InputStreamDrainer(socketInputStream)).start();
+
+ metadataUpdated(metadata);
+ }
- this.source = source;
+ @Override
+ public void close() {
try {
- logger.info(String.format("Connecting to %s:%d...", server, port));
- final Socket socket = new Socket(server, port);
- logger.info("Connected.");
- final OutputStream socketOutputStream = socket.getOutputStream();
- final InputStream socketInputStream = socket.getInputStream();
-
- sendLine(socketOutputStream, String.format("SOURCE /%s ICE/1.0", mountPoint));
- sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password)));
- sendLine(socketOutputStream, String.format("Content-Type: %s", getContentType(source.metadata())));
- sendLine(socketOutputStream, String.format("ICE-Name: %s", serverName));
- sendLine(socketOutputStream, String.format("ICE-Description: %s", serverDescription));
- sendLine(socketOutputStream, String.format("ICE-Genre: %s", genre));
- sendLine(socketOutputStream, String.format("ICE-Public: %d", publishServer ? 1 : 0));
- sendLine(socketOutputStream, "");
- socketOutputStream.flush();
-
- new Thread(new InputStreamDrainer(socketInputStream)).start();
- new Thread(new Connection(source) {
-
- private long counter;
-
- @Override
- protected int bufferSize() {
- return 4096;
- }
-
- @Override
- protected void feed(byte[] buffer) throws IOException {
- socketOutputStream.write(buffer);
- socketOutputStream.flush();
- counter += buffer.length;
- logger.finest(String.format("Wrote %d Bytes.", counter));
- }
-
- @Override
- protected void finish() throws IOException {
- Closeables.close(socketOutputStream, true);
- if (socket != null) {
- socket.close();
- }
- }
- }).start();
-
- metadataUpdated();
- } catch (IOException ioe1) {
- throw new ConnectException(ioe1);
+ Closeables.close(socketOutputStream, true);
+ } catch (IOException e) {
+ /* will never throw. */
}
}
@Override
- public void metadataUpdated() {
- Metadata metadata = source.metadata();
- String metadataString = String.format("%s (%s)", Joiner.on(" - ").skipNulls().join(FluentIterable.from(Arrays.asList(metadata.artist(), metadata.name())).transform(new Function<Optional<String>, Object>() {
+ public void metadataUpdated(final Metadata metadata) {
+ new Thread(new Runnable() {
@Override
- public Object apply(Optional<String> input) {
- return input.orNull();
- }
- })), "Sonitus");
- logger.info(String.format("Updating metadata to %s", metadataString));
+ public void run() {
+ String metadataString = String.format("%s (%s)", Joiner.on(" - ").skipNulls().join(FluentIterable.from(Arrays.asList(metadata.artist(), metadata.name())).transform(new Function<Optional<String>, Object>() {
- Socket socket = null;
- OutputStream socketOutputStream = null;
- try {
- socket = new Socket(server, port);
- socketOutputStream = socket.getOutputStream();
-
- sendLine(socketOutputStream, String.format("GET /admin/metadata?pass=%s&mode=updinfo&mount=/%s&song=%s HTTP/1.0", password, mountPoint, URLEncoder.encode(metadataString, "UTF-8")));
- sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password)));
- sendLine(socketOutputStream, String.format("User-Agent: Mozilla/Sonitus"));
- sendLine(socketOutputStream, "");
- socketOutputStream.flush();
-
- new InputStreamDrainer(socket.getInputStream()).run();
- } catch (IOException ioe1) {
- logger.log(Level.WARNING, "Could not update metadata!", ioe1);
- } finally {
- try {
- Closeables.close(socketOutputStream, true);
- if (socket != null) {
- socket.close();
+ @Override
+ public Object apply(Optional<String> input) {
+ return input.orNull();
+ }
+ })), "Sonitus");
+ logger.info(String.format("Updating metadata to %s", metadataString));
+
+ Socket socket = null;
+ OutputStream socketOutputStream = null;
+ try {
+ socket = new Socket(server, port);
+ socketOutputStream = socket.getOutputStream();
+
+ sendLine(socketOutputStream, String.format("GET /admin/metadata?pass=%s&mode=updinfo&mount=/%s&song=%s HTTP/1.0", password, mountPoint, URLEncoder.encode(metadataString, "UTF-8")));
+ sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password)));
+ sendLine(socketOutputStream, String.format("User-Agent: Mozilla/Sonitus"));
+ sendLine(socketOutputStream, "");
+ socketOutputStream.flush();
+
+ new InputStreamDrainer(socket.getInputStream()).run();
+ } catch (IOException ioe1) {
+ logger.log(Level.WARNING, "Could not update metadata!", ioe1);
+ } finally {
+ try {
+ Closeables.close(socketOutputStream, true);
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (IOException ioe1) {
+ /* ignore. */
+ }
}
- } catch (IOException ioe1) {
- /* ignore. */
}
- }
+ }).start();
+ }
+
+ @Override
+ public void process(byte[] buffer) throws IOException {
+ socketOutputStream.write(buffer);
+ socketOutputStream.flush();
}
//
* The output stream to send the line to
* @param line
* The line to send
- * @throws IOException
+ * @throws java.io.IOException
* if an I/O error occurs
*/
private static void sendLine(OutputStream outputStream, String line) throws IOException {
* @param password
* The password to encode
* @return The encoded password
- * @throws UnsupportedEncodingException
+ * @throws java.io.UnsupportedEncodingException
* if the UTF-8 encoding is not supported (which can never happen)
*/
private static String generatePassword(String password) throws UnsupportedEncodingException {
package net.pterodactylus.sonitus.data.source;
-import static com.google.common.base.Preconditions.*;
+import static com.google.common.base.Preconditions.checkNotNull;
import static net.pterodactylus.sonitus.data.Metadata.UNKNOWN_CHANNELS;
import static net.pterodactylus.sonitus.data.Metadata.UNKNOWN_ENCODING;
import static net.pterodactylus.sonitus.data.Metadata.UNKNOWN_FREQUENCY;
import net.pterodactylus.sonitus.io.IdentifyingInputStream;
import com.google.common.base.Optional;
-import com.google.common.io.ByteStreams;
/**
- * A {@link Source} that is read from the local file system.
+ * A {@link net.pterodactylus.sonitus.data.Source} that is read from the local
+ * file system.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
*
* @param path
* The path of the file
- * @throws IOException
+ * @throws java.io.IOException
* if the file can not be found, or an I/O error occurs
*/
public FileSource(String path) throws IOException {
//
@Override
- public Metadata metadata() {
- return metadata;
- }
-
- @Override
public byte[] get(int bufferSize) throws IOException {
byte[] buffer = new byte[bufferSize];
- int read = ByteStreams.read(fileInputStream, buffer, 0, bufferSize);
- if (read == 0) {
+ int read = fileInputStream.read(buffer);
+ if (read == -1) {
throw new EOFException();
}
return Arrays.copyOf(buffer, read);
}
+ @Override
+ public Metadata metadata() {
+ return metadata;
+ }
+
//
// OBJECT METHODS
//
--- /dev/null
+/*
+ * Sonitus - MultiSource.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data.source;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import net.pterodactylus.sonitus.data.Metadata;
+import net.pterodactylus.sonitus.data.Source;
+import net.pterodactylus.sonitus.data.event.SourceFinishedEvent;
+
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+
+/**
+ * {@link Source} implementation that simply forwards another source and
+ * supports changing the source without letting the {@link
+ * net.pterodactylus.sonitus.data.Sink} know.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class MultiSource implements Source {
+
+ /** The logger. */
+ private static final Logger logger = Logger.getLogger(MultiSource.class.getName());
+
+ /** The event bus. */
+ private final EventBus eventBus;
+
+ /** The current source. */
+ private final AtomicReference<Source> source = new AtomicReference<Source>();
+
+ /** Whether the source was changed. */
+ private boolean sourceChanged;
+
+ @Inject
+ public MultiSource(EventBus eventBus) {
+ this.eventBus = eventBus;
+ }
+
+ //
+ // ACTIONS
+ //
+
+ /**
+ * Sets the new source to use.
+ *
+ * @param source
+ * The new source to use
+ */
+ public void setSource(Source source) {
+ checkNotNull(source, "source must not be null");
+
+ Source oldSource = this.source.getAndSet(source);
+ if (oldSource != null) {
+ synchronized (this.source) {
+ sourceChanged = true;
+ this.source.notifyAll();
+ }
+ logger.info(String.format("Next Source set: %s", source));
+ }
+ }
+
+ //
+ // SOURCE METHODS
+ //
+
+ @Override
+ public Metadata metadata() {
+ return source.get().metadata();
+ }
+
+ @Override
+ public byte[] get(int bufferSize) throws EOFException, IOException {
+ while (true) {
+ try {
+ return source.get().get(bufferSize);
+ } catch (EOFException eofe1) {
+ eventBus.post(new SourceFinishedEvent(source.get()));
+ synchronized (source) {
+ while (!sourceChanged) {
+ try {
+ logger.info("Waiting for next Source...");
+ source.wait();
+ logger.info("Was notified.");
+ } catch (InterruptedException ioe1) {
+ /* ignore: we’ll end up here again if we were interrupted. */
+ }
+ }
+ }
+ } finally {
+ synchronized (source) {
+ sourceChanged = false;
+ }
+ }
+ }
+ }
+
+}