用于 Java 中 InputStream 的多个读取器

2022-09-02 11:49:40

我有一个输入流,我正在从中读取字符。我希望多个读者访问此输入流。似乎实现这一目标的合理方法是将传入的数据写入StringBuffer或StringBuilder,并让多个读者读取该数据。不幸的是,StringBufferInputStream已被弃用。StringReader 读取的是字符串,而不是不断更新的可变对象。我有哪些选择?写我自己的?


答案 1

注意:我的另一个答案更一般(在我看来更好)。

正如@dimo414所指出的,下面的答案要求第一个读者始终领先于第二个读者。如果这确实是您的情况,那么这个答案可能仍然更好,因为它建立在标准类之上。


若要创建两个独立于同一源读取的读取器,必须确保它们不使用来自同一流的数据。

这可以通过组合Apache Commons的TeeInputStreamPipedInputStreamPipedOutputStream来实现,如下所示:

import java.io.*;
import org.apache.commons.io.input.TeeInputStream;
class Test {
    public static void main(String[] args) throws IOException {

        // Create the source input stream.
        InputStream is = new FileInputStream("filename.txt");

        // Create a piped input stream for one of the readers.
        PipedInputStream in = new PipedInputStream();

        // Create a tee-splitter for the other reader.
        TeeInputStream tee = new TeeInputStream(is, new PipedOutputStream(in));

        // Create the two buffered readers.
        BufferedReader br1 = new BufferedReader(new InputStreamReader(tee));
        BufferedReader br2 = new BufferedReader(new InputStreamReader(in));

        // Do some interleaved reads from them.
        System.out.println("One line from br1:");
        System.out.println(br1.readLine());
        System.out.println();

        System.out.println("Two lines from br2:");
        System.out.println(br2.readLine());
        System.out.println(br2.readLine());
        System.out.println();

        System.out.println("One line from br1:");
        System.out.println(br1.readLine());
        System.out.println();
    }
}

输出:

One line from br1:
Line1: Lorem ipsum dolor sit amet,      <-- reading from start

Two lines from br2:
Line1: Lorem ipsum dolor sit amet,      <-- reading from start
Line2: consectetur adipisicing elit,

One line from br1:
Line2: consectetur adipisicing elit,    <-- resumes on line 2

答案 2

正如您可能已经注意到的那样,一旦您从输入流中读取了一个字节,它就会永远消失(除非您自己将其保存在某个地方)。

下面的解决方案确实会保存字节,直到所有订阅输入流都读取它。

它的工作原理如下:

// Create a SplittableInputStream from the originalStream
SplittableInputStream is  = new SplittableInputStream(originalStream);

// Fork this to get more input streams reading independently from originalStream
SplittableInputStream is2 = is.split();
SplittableInputStream is3 = is.split();

每次它都会产生一个新的,它将从拆分点读取字节。issplit()InputStreamis

外观如下(复制粘贴!SplittableInputStream

class SplittableInputStream extends InputStream {

    // Almost an input stream: The read-method takes an id.
    static class MultiplexedSource {

        static int MIN_BUF = 4096;

        // Underlying source
        private InputStream source;

        // Read positions of each SplittableInputStream
        private List<Integer> readPositions = new ArrayList<>();

        // Data to be read by the SplittableInputStreams
        int[] buffer = new int[MIN_BUF];

        // Last valid position in buffer
        int writePosition = 0;

        public MultiplexedSource(InputStream source) {
            this.source = source;
        }

        // Add a multiplexed reader. Return new reader id.
        int addSource(int splitId) {
            readPositions.add(splitId == -1 ? 0 : readPositions.get(splitId));
            return readPositions.size() - 1;
        }

        // Make room for more data (and drop data that has been read by
        // all readers)
        private void readjustBuffer() {
            int from = Collections.min(readPositions);
            int to = Collections.max(readPositions);
            int newLength = Math.max((to - from) * 2, MIN_BUF);
            int[] newBuf = new int[newLength];
            System.arraycopy(buffer, from, newBuf, 0, to - from);
            for (int i = 0; i < readPositions.size(); i++)
                readPositions.set(i, readPositions.get(i) - from);
            writePosition -= from;
            buffer = newBuf;
        }

        // Read and advance position for given reader
        public int read(int readerId) throws IOException {

            // Enough data in buffer?
            if (readPositions.get(readerId) >= writePosition) {
                readjustBuffer();
                buffer[writePosition++] = source.read();
            }

            int pos = readPositions.get(readerId);
            int b = buffer[pos];
            if (b != -1)
                readPositions.set(readerId, pos + 1);
            return b;
        }
    }

    // Non-root fields
    MultiplexedSource multiSource;
    int myId;

    // Public constructor: Used for first SplittableInputStream
    public SplittableInputStream(InputStream source) {
        multiSource = new MultiplexedSource(source);
        myId = multiSource.addSource(-1);
    }

    // Private constructor: Used in split()
    private SplittableInputStream(MultiplexedSource multiSource, int splitId) {
        this.multiSource = multiSource;
        myId = multiSource.addSource(splitId);
    }

    // Returns a new InputStream that will read bytes from this position
    // onwards.
    public SplittableInputStream split() {
        return new SplittableInputStream(multiSource, myId);
    }

    @Override
    public int read() throws IOException {
        return multiSource.read(myId);
    }
}

最后,演示一下:

String str = "Lorem ipsum\ndolor sit\namet\n";
InputStream is = new ByteArrayInputStream(str.getBytes("UTF-8"));

// Create the two buffered readers.
SplittableInputStream is1 = new SplittableInputStream(is);
SplittableInputStream is2 = is1.split();

BufferedReader br1 = new BufferedReader(new InputStreamReader(is1));
BufferedReader br2 = new BufferedReader(new InputStreamReader(is2));

// Do some interleaved reads from them.
System.out.println("One line from br1:");
System.out.println(br1.readLine());
System.out.println();

System.out.println("Two lines from br2:");
System.out.println(br2.readLine());
System.out.println(br2.readLine());
System.out.println();

System.out.println("One line from br1:");
System.out.println(br1.readLine());
System.out.println();

输出:

One line from br1:
Lorem ipsum

Two lines from br2:
Lorem ipsum
dolor sit

One line from br1:
dolor sit

推荐