TimeoutableReader 带有超时功能的行读取器(Java)

在与各种IO设备交互的过程中,按行读取是一个很常见的需求。一般而言,按行读取可以使用BufferedReader类的readLine()方法。不过呢,BufferedReader的readLine()方法有两个问题:

(1)如果换行符迟迟不到达,那么就会一直阻塞;

(2)如果InputStream中的内容有时需要按行读取,而有时又需要按字节读取,那么由于BufferedReader有“预读取”功能,导致InputStream中之后的字节也被BufferedReader读走了,破坏程序正常逻辑。

为此,我有必要自己封装一个行读取器,首先是“不该读的不要读”,其次是要有超时功能,即在指定时间内读不到数据就返回false,而且需要把已读数据放入缓冲区里面,以备下次读取。

代码并不复杂,而且关键地方都已经上了注释。之所以要用一篇博客记录一下,就是因为这个功能太常用了,而BufferedReader太不合适了。这样以后再遇到这样的问题,直接CTRL-C加CTRL-V就行了~

代码中用到了ByteStream,详见《结合链表与数组各自优点的字节流ByteStream》。而TimeUtil的代码如下:

TimeUtil.java

package zjs.util;

/**
 * 时间相关的工具类
 */
public class TimeUtil
{

    /**
     * 延时
     * @param ms 毫秒
     * @return 如果没有被打断,则返回true,否则false
     */
    public static final boolean delay(long ms)
    {
        try
        {
            Thread.sleep(ms);
            return true;
        }
        catch(Exception e)
        {
            return false;
        }
    }

}

TimeoutableReader.java

package zjs.util;

import java.io.IOException;
import java.io.InputStream;

/**
 * 带有超时功能的按行读取器
 */
public class TimeoutableReader
{

    private static final int WAIT_TICK=50;

    //底层输入流
    private InputStream input;
    //字节缓冲区
    private ByteStream buffer;

    /**
     * 基于原始输入流构建读取器
     * @param in 输入流
     */
    public TimeoutableReader(InputStream in)
    {
        input=in;
        buffer=new ByteStream();
    }

    /**
     * 清空输入流中所有已存在的数据
     * @throws IOException 输入流出错
     */
    public void clear() throws IOException
    {
        while(input.available()>0)
            input.read();
        while(buffer.getLength()>0)
            buffer.take();
    }

    /**
     * 关闭输入流
     * @throws IOException 输入流出错
     */
    public void close() throws IOException
    {
        input.close();
    }

    /**
     * 读取一行数据
     * @param timeout 超时时间(毫秒)
     * @param filterEmpty 是否过滤空行
     * @return 一行(已去除换行符),如果超时则返回null
     * @throws IOException 输入流出错
     */
    public String readLine(int timeout,boolean filterEmpty) throws IOException
    {
        StringBuilder sb=new StringBuilder();
        //开始时间
        long startTime=System.currentTimeMillis();
        while(true)
        {
            //如果有数据
            if(available()>0)
            {
                //读取一个字符
                char aChar=(char)readByte();
                //如果读到'\n',说明要换行
                if(aChar=='\n')
                {
                    //获取收到的长度
                    int length=sb.length();
                    //如果最后一个字符是'\r',则删除'\r'
                    if(length>0&&sb.charAt(length-1)=='\r')
                        sb.setLength(length-1);
                    //获取当前的行
                    String line=sb.toString();
                    //清空数据缓存(此行有用,因为下面那行不一定能够return)
                    sb.setLength(0);
                    //如果数据不为空,或者不要求过滤空行,则返回
                    if(!(filterEmpty&&line.equals("")))
                        return line;
                }
                //其他字符则追加到缓存
                else
                    sb.append(aChar);
            }
            else
                TimeUtil.delay(WAIT_TICK);
            //查看当前时间
            long nowTime=System.currentTimeMillis();
            //判断超时
            if(nowTime-startTime>timeout)
            {
                //如果超时,把已读数据放回去
                for(int i=sb.length()-1;i>=0;i--)
                    buffer.untake((byte)sb.charAt(i));
                return null;
            }
        }
    }

    /**
     * 当前可读长度
     * @return 可读长度
     * @throws IOException
     */
    private int available() throws IOException
    {
        return buffer.getLength()+input.available();
    }

    /**
     * 读取一个字节
     * @return 一个字节
     * @throws IOException
     */
    private int readByte() throws IOException
    {
        if(buffer.getLength()>0)
            return buffer.take();
        if(input.available()>0)
            return input.read();
        return 0;
    }

}

可以看见readByte()方法中会优先读取ByteStream中的内容。而readLine()中超时处理的部分里,则把已经读出来的字节倒序塞回ByteStream。这样的话,如果这次读取超时了,下次读取时回重新读,而不是丢弃数据。

可以写这么一个Demo:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import zjs.util.TimeoutableReader;

public class Test
{

    public static void main(String[] args) throws IOException
    {
        PipedInputStream in=new PipedInputStream();
        PipedOutputStream out=new PipedOutputStream();
        in.connect(out);
        TimeoutableReader reader=new TimeoutableReader(in);
        out.write("hello,".getBytes());
        System.out.println(reader.readLine(1000,true));
        out.write("world!\n".getBytes());
        System.out.println(reader.readLine(1000,true));
    }

}

运行结果:

null
hello,world!

一开始,InputStream中只有”hello,”六个字符,那么按行读取以后就会超时,返回null。接着InputStream中多了”world!\n”,那么再次按行读以后,可以立即返回,而且读到了”hello,world!\n”。说明之前的数据没有被丢弃。