Subversion Repositories splitter

Rev

Rev 438 | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * Copyright (c) 2010, Chris Miller
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 3 as
 * published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License for more details.
 */

package uk.me.parabola.splitter;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BackgroundInputStream extends InputStream {
        private static final int QUEUE_SIZE = 5;
        private static final int BUFFER_SIZE = 32768;
        private static final byte[] EOF_MARKER = new byte[0];

        // These variables are accessed from both threads
        private final BlockingQueue<byte[]> inQueue;
        private final BlockingQueue<byte[]> recycleQueue;
        private final int bufferSize;
        protected final InputStream sourceStream;
        protected volatile boolean closed;

        // These variables are only accessed from the reader thread
        private byte[] currentBuffer;
        private int currentIndex;
        private Thread loaderThread;

        public BackgroundInputStream(InputStream source) {
                this(source, QUEUE_SIZE, BUFFER_SIZE);
        }

        public BackgroundInputStream(InputStream source, int queueSize, int bufferSize)
        {
                inQueue = new ArrayBlockingQueue<byte[]>(queueSize);
                recycleQueue = new ArrayBlockingQueue<byte[]>(queueSize + 1);
                sourceStream = source;
                this.bufferSize = bufferSize;
        }

        @Override
        public int read() throws IOException {
                if (!ensureBuffer()) {
                        return -1;
                }
                int b = currentBuffer[currentIndex++];
                recycle();
                return b;
        }

        @Override
        public int read(byte[] b) throws IOException {
                return read(b, 0, b.length);
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
                int count = 0;
                while (len > 0) {
                        if (!ensureBuffer()) {
                                return count == 0 ? -1 : count;
                        }
                        int remaining = currentBuffer.length - currentIndex;
                        int bytesToCopy = Math.min(remaining, len);
                        System.arraycopy(currentBuffer, currentIndex, b, off, bytesToCopy);
                        count += bytesToCopy;
                        currentIndex += bytesToCopy;
                        off += bytesToCopy;
                        len -= bytesToCopy;
                        recycle();
                }
                return count;
        }

        private boolean ensureBuffer() throws IOException {
                if (loaderThread == null) {
                        loaderThread = new Thread(new Loader(), "BackgroundInputStream");
                        loaderThread.start();
                }
                if (currentBuffer == null) {
                        try {
                                currentBuffer = inQueue.take();
                        } catch (InterruptedException e) {
                                throw new IOException("Failed to take a buffer from the queue", e);
                        }
                        currentIndex = 0;
                }
                return currentBuffer != EOF_MARKER;
        }

        private void recycle() {
                if (currentIndex == currentBuffer.length) {
                        if (currentIndex == bufferSize) {
                                recycleQueue.offer(currentBuffer);
                        }
                        currentBuffer = null;
                }
        }

        @Override
        public int available() throws IOException {
                return currentBuffer == null ? 0 : currentBuffer.length;
        }

        @Override
        public void close() throws IOException {
                closed = true;
                inQueue.clear();
                recycleQueue.clear();
                currentBuffer = null;
        }

        private class Loader implements Runnable {
                @Override
                public void run() {
                        {
                                int bytesRead = 0;
                                while (!closed) {
                                        byte[] buffer = recycleQueue.poll();
                                        if (buffer == null) {
                                                buffer = new byte[bufferSize];
                                        }
                                        int offset = 0;
                                        try {
                                                while ((offset < bufferSize) && ((bytesRead = sourceStream.read(buffer, offset, bufferSize - offset)) != -1)) {
                                                        offset += bytesRead;
                                                }
                                        } catch (IOException e) {
                                                throw new RuntimeException("Unable to read from stream", e);
                                        }
                                        if (offset < bufferSize) {
                                                buffer = Arrays.copyOf(buffer, offset);
                                        }
                                        try {
                                                inQueue.put(buffer);
                                                if (bytesRead == -1) {
                                                        inQueue.put(EOF_MARKER);
                                                        closed = true;
                                                }
                                        } catch (InterruptedException e) {
                                                throw new RuntimeException("Unable to put data onto queue", e);
                                        }
                                        if (closed) {
                                                try {
                                                        sourceStream.close();
                                                } catch (IOException e) {
                                                        throw new RuntimeException("Unable to close source stream", e);
                                                }
                                        }
                                }
                        }
                }
        }
}