第一个Linux驱动程序(三)——aMsg的非阻塞式IO之select/poll

在《第一个Linux驱动程序(二)——aMsg的阻塞式IO(互斥与同步)》中讲了阻塞式IO。虽然阻塞式IO很常见很简单,但是在大型软件中用的更多的还是非阻塞式IO。非阻塞式IO大致有两种,一种是多路复用,具体到技术上就是select/poll操作,一种是异步回调,也就是aio。今天要研究的就是select/poll操作。

什么是select/poll呢?可以想象一个http服务器,比如apache,高并发情况下可能需要同时管理成千上万的socket,如果要使用阻塞式IO,那么就需要为每一个socket开启一个线程,这样开销非常大。而select/poll提供了一种能力,能够创建一个文件描述符池,对整个池执行阻塞式IO。当池中任意一个文件描述符可读或者可写,那么调用就返回,并告知应用程序哪个或哪些文件描述符可以做怎样的操作了。select/poll虽然本身是阻塞式的(可以设置超时时间),但是一次阻塞可以同时监控非常多的文件描述符,一旦某个文件描述符就绪,就可以以非阻塞的方式读或写。Java的nio就是基于select/poll的。很多服务器软件,比如http服务器apache、阿里巴巴的开源数据库中间件cobar,也是基于select/poll(当然事实上是更加高级的epoll),这样可以只开一个线程就能监控所有的socket。

通常,每一个使用select/poll操作的文件描述符都会被设置一个标志位O_NONBLOCK,告诉驱动程序,如果不可读则read()立即返回,或者,如果不可写则write()立即返回。这是因为,虽然select/poll操作返回后会指明某个文件描述符可读或可写,但是如果select/poll返回后、read()或write()调用之前,有线程抢先一步,那么该线程就可能阻塞。这也成了一种惯例,即使用select/poll的文件描述符需要设置O_NONBLOCK标志位。所以,接下来我先讲O_NONBLOCK标志位,然后讲select/poll操作。

在讲解之前,需要先来回顾一下《第一个Linux驱动程序(二)——aMsg的阻塞式IO(互斥与同步)》中未给出的所谓的第四段代码:

#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/spinlock.h>
#include <linux/completion.h>

MODULE_LICENSE("Dual BSD/GPL");

//主设备号
#define AMSG_MAJOR 224
//缓冲区大小
#define AMSG_MAX_BUF_SZ 1024

//当前字符串长度
static int g_length=0;
//缓冲区
static char g_buffer[AMSG_MAX_BUF_SZ];
//自旋锁
spinlock_t g_lock;
//read完成量
static struct completion g_comp_read;
//write完成量
static struct completion g_comp_write;

//read操作
static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    while(1)
    {
        //对缓冲区加锁访问
        spin_lock(&g_lock);
        //如果有数据,则跳出循环,开始读取数据
        if(g_length>0)
            break;
        //对缓冲区访问结束,解锁
        spin_unlock(&g_lock);
        //等待write操作的完成,为了防止无限等待,允许用户发送信号中断
        if(wait_for_completion_interruptible(&g_comp_write))
            return -ERESTARTSYS;
    }
    //最多能够读取的字节数(p_count和g_length之间较小者)
    t_size=(p_count<g_length?p_count:g_length);
    //没有成功拷贝的字节数
    t_rest=copy_to_user(p_buf,g_buffer,t_size);
    //不管结果如何,都清空缓冲区
    g_length=0;
    //触发read完成量,通知read操作完成
    complete(&g_comp_read);
    //对缓冲区访问结束,解锁
    spin_unlock(&g_lock);
    //返回成功拷贝的字节数
    return t_size-t_rest;
}

//write操作
static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    while(1)
    {
        //对缓冲区加锁访问
        spin_lock(&g_lock);
        //如果没有数据,则跳出循环,开始写入数据
        if(g_length==0)
            break;
        //对缓冲区访问结束,解锁
        spin_unlock(&g_lock);
        //等待read操作的完成,为了防止无限等待,允许用户发送信号中断
        if(wait_for_completion_interruptible(&g_comp_read))
            return -ERESTARTSYS;
    }
    //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者)
    t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer));
    //没有成功拷贝的字节数
    t_rest=copy_from_user(g_buffer,p_buf,t_size);
    //成功拷贝的字节数,也就是字符串的长度
    g_length=t_size-t_rest;
    //触发write完成量,通知write操作完成
    complete(&g_comp_write);
    //对缓冲区访问结束,解锁
    spin_unlock(&g_lock);
    //返回成功拷贝的字节数
    return g_length;
}

//填充file_operations结构体
static struct file_operations amsg_fops=
{
    .owner=THIS_MODULE,
    .read=amsg_read,
    .write=amsg_write
};

//模块初始化代码
static int amsg_init_module(void)
{
    //注册字符设备(这是Old way)
    int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops);
    //注册失败的处理
    if(t_ret<0)
    {
        printk("Unable to register\n");
        return t_ret;
    }
    //初始化自旋锁
    spin_lock_init(&g_lock);
    //初始化read完成量
    init_completion(&g_comp_read);
    //初始化write完成量
    init_completion(&g_comp_write);
    return 0;
}

//模块清理代码
static void amsg_cleanup_module(void)
{
    //注销字符设备
    unregister_chrdev(AMSG_MAJOR,"aMsg");
    printk("clean up!");
}

module_init(amsg_init_module);
module_exit(amsg_cleanup_module);

显然,这段代码的read()和write()是阻塞式的。接下来我要先后实现非阻塞的read()、write()和select/poll调用。

=============阶段一:O_NONBLOCK标志=============

如果在应用程序中通过fcntl把一个文件描述符的O_NONBLOCK标志位置位,比如:

int flags=fcntl(fd,F_GETFL,0);
flags|=O_NONBLOCK;
fcntl(fd,F_SETFL,flags);

那么对fd的read()操作和write()操作都是非阻塞的。所谓非阻塞的,就是说,如果没有数据可读,那么read()就立刻返回-1,或者,如果数据暂不可写入,那么write()就立刻返回-1。事实上,是否真的实现了非阻塞不是应用程序说了算的,甚至不是Linux内核说了算的,而是相应的驱动程序决定的。请看下面这段代码:

#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/spinlock.h>
#include <linux/completion.h>

MODULE_LICENSE("Dual BSD/GPL");

//主设备号
#define AMSG_MAJOR 224
//缓冲区大小
#define AMSG_MAX_BUF_SZ 1024

//当前字符串长度
static int g_length=0;
//缓冲区
static char g_buffer[AMSG_MAX_BUF_SZ];
//自旋锁
spinlock_t g_lock;
//read完成量
static struct completion g_comp_read;
//write完成量
static struct completion g_comp_write;

//read操作
static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    while(1)
    {
        //对缓冲区加锁访问
        spin_lock(&g_lock);
        //如果有数据,则跳出循环,开始读取数据
        if(g_length>0)
            break;
        //对缓冲区访问结束,解锁
        spin_unlock(&g_lock);
        //如果设置了O_NONBLOCK,那么直接返回-EAGAIN
        if(p_file->f_flags&O_NONBLOCK)
            return -EAGAIN;
        //等待write操作的完成,为了防止无限等待,允许用户发送信号中断
        if(wait_for_completion_interruptible(&g_comp_write))
            return -ERESTARTSYS;
    }
    //最多能够读取的字节数(p_count和g_length之间较小者)
    t_size=(p_count<g_length?p_count:g_length);
    //没有成功拷贝的字节数
    t_rest=copy_to_user(p_buf,g_buffer,t_size);
    //不管结果如何,都清空缓冲区
    g_length=0;
    //触发read完成量,通知read操作完成
    complete(&g_comp_read);
    //对缓冲区访问结束,解锁
    spin_unlock(&g_lock);
    //返回成功拷贝的字节数
    return t_size-t_rest;
}

//write操作
static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    while(1)
    {
        //对缓冲区加锁访问
        spin_lock(&g_lock);
        //如果没有数据,则跳出循环,开始写入数据
        if(g_length==0)
            break;
        //对缓冲区访问结束,解锁
        spin_unlock(&g_lock);
        //如果设置了O_NONBLOCK,那么直接返回-EAGAIN
        if(p_file->f_flags&O_NONBLOCK)
            return -EAGAIN;
        //等待read操作的完成,为了防止无限等待,允许用户发送信号中断
        if(wait_for_completion_interruptible(&g_comp_read))
            return -ERESTARTSYS;
    }
    //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者)
    t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer));
    //没有成功拷贝的字节数
    t_rest=copy_from_user(g_buffer,p_buf,t_size);
    //成功拷贝的字节数,也就是字符串的长度
    g_length=t_size-t_rest;
    //触发write完成量,通知write操作完成
    complete(&g_comp_write);
    //对缓冲区访问结束,解锁
    spin_unlock(&g_lock);
    //返回成功拷贝的字节数
    return g_length;
}

//填充file_operations结构体
static struct file_operations amsg_fops=
{
    .owner=THIS_MODULE,
    .read=amsg_read,
    .write=amsg_write
};

//模块初始化代码
static int amsg_init_module(void)
{
    //注册字符设备(这是Old way)
    int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops);
    //注册失败的处理
    if(t_ret<0)
    {
        printk("Unable to register\n");
        return t_ret;
    }
    //初始化自旋锁
    spin_lock_init(&g_lock);
    //初始化read完成量
    init_completion(&g_comp_read);
    //初始化write完成量
    init_completion(&g_comp_write);
    return 0;
}

//模块清理代码
static void amsg_cleanup_module(void)
{
    //注销字符设备
    unregister_chrdev(AMSG_MAJOR,"aMsg");
    printk("clean up!");
}

module_init(amsg_init_module);
module_exit(amsg_cleanup_module);

在read()的实现中,如果有数据可读,那么不管阻塞式还是非阻塞式IO,那么操作都是一样的——直接读取数据然后返回读取的字节数。相反,如果没有可读的数据,那么显式地判断了struct file的f_flags字段中O_NONBLOCK是否已经置位,如果是,说明应用层希望使用非阻塞IO,那么就立刻返回-EAGAIN;否则阻塞等待数据可读。换句话说,如果驱动中不理会O_NONBLOCK位,那么即使应用层已经把O_NONBLOCK置位,也没法使用阻塞式IO,最终的大权掌握在驱动的手里。write()的实现类似,不再累述。

这段驱动代码可以使用下面的应用层代码来测试:

nioTest.c:

#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>

int main()
{
    int fd=open("/dev/aMsg",O_RDWR);
    if(fd<0)
    {
        perror("cannot open /dev/aMsg!");
        return 1;
    }
    //设置O_NONBLOCK位
    int flags=fcntl(fd,F_GETFL,0);
    flags|=O_NONBLOCK;
    fcntl(fd,F_SETFL,flags);
    //轮询
    while(1)
    {
        char buf[1024];
        int len=read(fd,buf,sizeof(buf));
        if(len<0)
            printf("no data available...\n");
        else
        {
            //字符串末尾置为0,防止溢出
            buf[len]=0;
            printf("len=%d,content='%s'\n",len,buf);
        }
        sleep(1);
    }
    return 0;
}

加载驱动、创建设备节点之后,使用命令编译测试应用程序:

gcc nioTest.c -o nioTest

然后运行:

sudo ./nioTest

可以看到每一秒钟输出一次“no data available…”:

此时,另开一个终端,在里面以root权限执行:

echo -e 'hello,nonblocking IO!\c' > /dev/aMsg

命令中,-e选项让echo启用转义,\c表示不要换行。命令执行后,可以发现nioTest输出了数据:

与预期一致,说明O_NONBLOCK选项已经按预期工作,应用层与驱动层里应外合了~

==============阶段二:select/poll操作==============

正式进入重点。select和poll其实是一回事,只不过select()调用是BSD Unix引入的,而poll()操作是System V引入的,他们完成的功能是一样的,连数据结构都是很相似的。动动脑子就能想到,select/poll操作肯定是需要驱动程序的辅助才能实现的。

假设应用程序把若干个文件描述符放入一个池,调用了select(),那么,Linux内核会依次找到每个文件描述符对应的驱动程序的file_operations.poll()函数。file_operations.poll()本身是非阻塞的,它的工作就是检查当前文件描述符是否可读或者可写,然后返回一个掩码,来指明状态。因此,内核可以通过一个循环,把池中所有的文件描述符的poll()函数都调用一遍。如果其中任意一个文件描述符是可读或者可写,那么select()就返回,并告知应用程序哪个文件描述符可读或可写。那如果所有的文件描述符都不可读不可写呢?此时,内核就会让select()阻塞,把线程挂起,等待通知。通知?谁来通知呢?这个问题问的很好,这就是驱动程序的file_operations.poll()函数需要完成的另一个工作——注册可能导致状态变化的“信号源”。比如在我们上面的代码中,一旦write()成功执行,那么就会触发完成量g_comp_write。而一旦read()执行成功,那么就会触发完成量g_comp_read。因此,驱动程序的file_operations.poll()函数需要向内核“注册”g_comp_write和g_comp_read,相当于告知内核说:“如果这两个中任意一个被触发,那么说明读写状况可能已经改变,麻烦你重新调用一下我的file_operations.poll()函数,看看是否真的可读或可写了”~

OK,直接看代码吧:

#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/spinlock.h>
#include <linux/completion.h>
#include <linux/poll.h>

MODULE_LICENSE("Dual BSD/GPL");

//主设备号
#define AMSG_MAJOR 224
//缓冲区大小
#define AMSG_MAX_BUF_SZ 1024

//当前字符串长度
static int g_length=0;
//缓冲区
static char g_buffer[AMSG_MAX_BUF_SZ];
//自旋锁
spinlock_t g_lock;
//read完成量
static struct completion g_comp_read;
//write完成量
static struct completion g_comp_write;

//read操作
static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    while(1)
    {
        //对缓冲区加锁访问
        spin_lock(&g_lock);
        //如果有数据,则跳出循环,开始读取数据
        if(g_length>0)
            break;
        //对缓冲区访问结束,解锁
        spin_unlock(&g_lock);
        //如果设置了O_NONBLOCK,那么直接返回-EAGAIN
        if(p_file->f_flags&O_NONBLOCK)
            return -EAGAIN;
        //等待write操作的完成,为了防止无限等待,允许用户发送信号中断
        if(wait_for_completion_interruptible(&g_comp_write))
            return -ERESTARTSYS;
    }
    //最多能够读取的字节数(p_count和g_length之间较小者)
    t_size=(p_count<g_length?p_count:g_length);
    //没有成功拷贝的字节数
    t_rest=copy_to_user(p_buf,g_buffer,t_size);
    //不管结果如何,都清空缓冲区
    g_length=0;
    //触发read完成量,通知read操作完成
    complete(&g_comp_read);
    //对缓冲区访问结束,解锁
    spin_unlock(&g_lock);
    //返回成功拷贝的字节数
    return t_size-t_rest;
}

//write操作
static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    while(1)
    {
        //对缓冲区加锁访问
        spin_lock(&g_lock);
        //如果没有数据,则跳出循环,开始写入数据
        if(g_length==0)
            break;
        //对缓冲区访问结束,解锁
        spin_unlock(&g_lock);
        //如果设置了O_NONBLOCK,那么直接返回-EAGAIN
        if(p_file->f_flags&O_NONBLOCK)
            return -EAGAIN;
        //等待read操作的完成,为了防止无限等待,允许用户发送信号中断
        if(wait_for_completion_interruptible(&g_comp_read))
            return -ERESTARTSYS;
    }
    //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者)
    t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer));
    //没有成功拷贝的字节数
    t_rest=copy_from_user(g_buffer,p_buf,t_size);
    //成功拷贝的字节数,也就是字符串的长度
    g_length=t_size-t_rest;
    //触发write完成量,通知write操作完成
    complete(&g_comp_write);
    //对缓冲区访问结束,解锁
    spin_unlock(&g_lock);
    //返回成功拷贝的字节数
    return g_length;
}

//poll操作
static uint amsg_poll(struct file *p_file,poll_table *p_wait_table)
{
    uint t_mask=0;
    //注册g_comp_read.wait
    poll_wait(p_file,&(g_comp_read.wait),p_wait_table);
    //注册g_comp_write.wait
    poll_wait(p_file,&(g_comp_write.wait),p_wait_table);
    //对缓冲区加锁访问
    spin_lock(&g_lock);
    //根据数据状态设置状态掩码
    if(g_length==0)
        t_mask=POLLOUT|POLLWRNORM;
    else
        t_mask=POLLIN|POLLRDNORM;
    //对缓冲区访问结束,解锁
    spin_unlock(&g_lock);
    printk("poll mask=%x\n",t_mask);
    return t_mask;
}

//填充file_operations结构体
static struct file_operations amsg_fops=
{
    .owner=THIS_MODULE,
    .read=amsg_read,
    .write=amsg_write,
    .poll=amsg_poll
};

//模块初始化代码
static int amsg_init_module(void)
{
    //注册字符设备(这是Old way)
    int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops);
    //注册失败的处理
    if(t_ret<0)
    {
        printk("Unable to register\n");
        return t_ret;
    }
    //初始化自旋锁
    spin_lock_init(&g_lock);
    //初始化read完成量
    init_completion(&g_comp_read);
    //初始化write完成量
    init_completion(&g_comp_write);
    return 0;
}

//模块清理代码
static void amsg_cleanup_module(void)
{
    //注销字符设备
    unregister_chrdev(AMSG_MAJOR,"aMsg");
    printk("clean up!");
}

module_init(amsg_init_module);
module_exit(amsg_cleanup_module);

注意代码中的poll_wait()函数,这个函数的名称是我见过最最糟糕的!这个函数本身不阻塞,它的作用仅仅是把第二个参数(一个等待队列)加入到poll_table中!因此,叫做add_to_poll_wait_table()会好的多!

那么为什么是g_comp_read.wait和g_comp_write.wait呢?这个首先要看poll_wait()的原型:

void poll_wait(struct file *filp, wait_queue_head_t *queue, poll_table *wait);

它的第二个参数的类型是定义在<linux/wait.h>中的wait_queue_head_t,这是各种会导致挂起的阻塞操作中最最底层的东西。它其实就是一个队列,队列中的每一个元素都是一个挂起等待同一个事件的线程。当这个队列被触发一个事件时,队列中的线程会被唤醒(可能唤醒一个,也可能唤醒全部,看具体函数)。是不是觉得和完成量很像?是的,完成量就是对wait_queue_head_t的简单封装:

struct completion
{
    unsigned int done;
    wait_queue_head_t wait;
};

因此,只要传入g_comp_read.wait和g_comp_write.wait就行。当触发完成量时,本质上就是在触发等待队列,这样内核也会得到通知,然后重新调用amsg_poll()来查询状态。

可以使用如下应用层代码来测试:

pollTest.c:

#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/select.h>

int main()
{
    int fd=open("/dev/aMsg",O_RDWR);
    if(fd<0)
    {
        perror("cannot open /dev/aMsg!");
        return 1;
    }
    fd_set rds;
    FD_ZERO(&rds);
    FD_SET(fd,&rds);
    int ret=select(fd+1,&rds,NULL,NULL,NULL);
    if(ret<0)
    {
        printf("select error!\n");
        return 1;
    }
    printf("select return\n");
    if(FD_ISSET(fd,&rds))
    {
        char buf[1024];
        int len=read(fd,buf,sizeof(buf));
        buf[len]=0;
        printf("len:%d,content='%s'\n",len,buf);
    }
    close(fd);
    return 0;
}

重新加载驱动后,编译测试程序:

gcc pollTest.c -o pollTest

然后运行:

sudo ./pollTest

会发现进程阻塞。在第二个终端中执行:

dmesg | tail -n 5

可以看到aMsg.ko输出的信息:

104就是POLLOUT|POLLWRNORM的值。在第三个终端中以root执行:

echo -e 'hello,nonblocking IO!\c' > /dev/aMsg

可以看到pollTest输出了信息:

此时再次在第二个终端中执行:

dmesg | tail -n 5

可以看到:

41就是POLLIN|POLLRDNORM的值。

这说明,在应用层调用了select()之后,内核调用了一次驱动程序的poll(),发现不可读(只是可写),于是挂起。当有数据写入后,触发了g_comp_write(本质上触发了等待队列g_comp_write.wait),于是内核重新调用了驱动程序的poll(),返回POLLIN|POLLRDNORM,说明可以读取,于是select()返回。

这些行为都与预期的一致,说明poll()正确实现了。