在《第一个Linux驱动程序(二)——aMsg的阻塞式IO(互斥与同步)》中讲了阻塞式IO。虽然阻塞式IO很常见很简单,但是在大型软件中用的更多的还是非阻塞式IO。非阻塞式IO大致有两种,一种是多路复用,具体到技术上就是select/poll操作,一种是异步回调,也就是aio。今天要研究的就是select/poll操作。
什么是select/poll呢?可以想象一个http服务器,比如apache,高并发情况下可能需要同时管理成千上万的socket,如果要使用阻塞式IO,那么就需要为每一个socket开启一个线程,这样开销非常大。而select/poll提供了一种能力,能够创建一个文件描述符池,对整个池执行阻塞式IO。当池中任意一个文件描述符可读或者可写,那么调用就返回,并告知应用程序哪个或哪些文件描述符可以做怎样的操作了。select/poll虽然本身是阻塞式的(可以设置超时时间),但是一次阻塞可以同时监控非常多的文件描述符,一旦某个文件描述符就绪,就可以以非阻塞的方式读或写。Java的nio就是基于select/poll的。很多服务器软件,比如http服务器apache、阿里巴巴的开源数据库中间件cobar,也是基于select/poll(当然事实上是更加高级的epoll),这样可以只开一个线程就能监控所有的socket。
通常,每一个使用select/poll操作的文件描述符都会被设置一个标志位O_NONBLOCK,告诉驱动程序,如果不可读则read()立即返回,或者,如果不可写则write()立即返回。这是因为,虽然select/poll操作返回后会指明某个文件描述符可读或可写,但是如果select/poll返回后、read()或write()调用之前,有线程抢先一步,那么该线程就可能阻塞。这也成了一种惯例,即使用select/poll的文件描述符需要设置O_NONBLOCK标志位。所以,接下来我先讲O_NONBLOCK标志位,然后讲select/poll操作。
在讲解之前,需要先来回顾一下《第一个Linux驱动程序(二)——aMsg的阻塞式IO(互斥与同步)》中未给出的所谓的第四段代码:
#include <linux/module.h> #include <linux/init.h> #include <linux/fs.h> #include <asm/uaccess.h> #include <linux/spinlock.h> #include <linux/completion.h> MODULE_LICENSE("Dual BSD/GPL"); //主设备号 #define AMSG_MAJOR 224 //缓冲区大小 #define AMSG_MAX_BUF_SZ 1024 //当前字符串长度 static int g_length=0; //缓冲区 static char g_buffer[AMSG_MAX_BUF_SZ]; //自旋锁 spinlock_t g_lock; //read完成量 static struct completion g_comp_read; //write完成量 static struct completion g_comp_write; //read操作 static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; while(1) { //对缓冲区加锁访问 spin_lock(&g_lock); //如果有数据,则跳出循环,开始读取数据 if(g_length>0) break; //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //等待write操作的完成,为了防止无限等待,允许用户发送信号中断 if(wait_for_completion_interruptible(&g_comp_write)) return -ERESTARTSYS; } //最多能够读取的字节数(p_count和g_length之间较小者) t_size=(p_count<g_length?p_count:g_length); //没有成功拷贝的字节数 t_rest=copy_to_user(p_buf,g_buffer,t_size); //不管结果如何,都清空缓冲区 g_length=0; //触发read完成量,通知read操作完成 complete(&g_comp_read); //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //返回成功拷贝的字节数 return t_size-t_rest; } //write操作 static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; while(1) { //对缓冲区加锁访问 spin_lock(&g_lock); //如果没有数据,则跳出循环,开始写入数据 if(g_length==0) break; //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //等待read操作的完成,为了防止无限等待,允许用户发送信号中断 if(wait_for_completion_interruptible(&g_comp_read)) return -ERESTARTSYS; } //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者) t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer)); //没有成功拷贝的字节数 t_rest=copy_from_user(g_buffer,p_buf,t_size); //成功拷贝的字节数,也就是字符串的长度 g_length=t_size-t_rest; //触发write完成量,通知write操作完成 complete(&g_comp_write); //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //返回成功拷贝的字节数 return g_length; } //填充file_operations结构体 static struct file_operations amsg_fops= { .owner=THIS_MODULE, .read=amsg_read, .write=amsg_write }; //模块初始化代码 static int amsg_init_module(void) { //注册字符设备(这是Old way) int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops); //注册失败的处理 if(t_ret<0) { printk("Unable to register\n"); return t_ret; } //初始化自旋锁 spin_lock_init(&g_lock); //初始化read完成量 init_completion(&g_comp_read); //初始化write完成量 init_completion(&g_comp_write); return 0; } //模块清理代码 static void amsg_cleanup_module(void) { //注销字符设备 unregister_chrdev(AMSG_MAJOR,"aMsg"); printk("clean up!"); } module_init(amsg_init_module); module_exit(amsg_cleanup_module);
显然,这段代码的read()和write()是阻塞式的。接下来我要先后实现非阻塞的read()、write()和select/poll调用。
=============阶段一:O_NONBLOCK标志=============
如果在应用程序中通过fcntl把一个文件描述符的O_NONBLOCK标志位置位,比如:
int flags=fcntl(fd,F_GETFL,0); flags|=O_NONBLOCK; fcntl(fd,F_SETFL,flags);
那么对fd的read()操作和write()操作都是非阻塞的。所谓非阻塞的,就是说,如果没有数据可读,那么read()就立刻返回-1,或者,如果数据暂不可写入,那么write()就立刻返回-1。事实上,是否真的实现了非阻塞不是应用程序说了算的,甚至不是Linux内核说了算的,而是相应的驱动程序决定的。请看下面这段代码:
#include <linux/module.h> #include <linux/init.h> #include <linux/fs.h> #include <asm/uaccess.h> #include <linux/spinlock.h> #include <linux/completion.h> MODULE_LICENSE("Dual BSD/GPL"); //主设备号 #define AMSG_MAJOR 224 //缓冲区大小 #define AMSG_MAX_BUF_SZ 1024 //当前字符串长度 static int g_length=0; //缓冲区 static char g_buffer[AMSG_MAX_BUF_SZ]; //自旋锁 spinlock_t g_lock; //read完成量 static struct completion g_comp_read; //write完成量 static struct completion g_comp_write; //read操作 static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; while(1) { //对缓冲区加锁访问 spin_lock(&g_lock); //如果有数据,则跳出循环,开始读取数据 if(g_length>0) break; //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //如果设置了O_NONBLOCK,那么直接返回-EAGAIN if(p_file->f_flags&O_NONBLOCK) return -EAGAIN; //等待write操作的完成,为了防止无限等待,允许用户发送信号中断 if(wait_for_completion_interruptible(&g_comp_write)) return -ERESTARTSYS; } //最多能够读取的字节数(p_count和g_length之间较小者) t_size=(p_count<g_length?p_count:g_length); //没有成功拷贝的字节数 t_rest=copy_to_user(p_buf,g_buffer,t_size); //不管结果如何,都清空缓冲区 g_length=0; //触发read完成量,通知read操作完成 complete(&g_comp_read); //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //返回成功拷贝的字节数 return t_size-t_rest; } //write操作 static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; while(1) { //对缓冲区加锁访问 spin_lock(&g_lock); //如果没有数据,则跳出循环,开始写入数据 if(g_length==0) break; //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //如果设置了O_NONBLOCK,那么直接返回-EAGAIN if(p_file->f_flags&O_NONBLOCK) return -EAGAIN; //等待read操作的完成,为了防止无限等待,允许用户发送信号中断 if(wait_for_completion_interruptible(&g_comp_read)) return -ERESTARTSYS; } //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者) t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer)); //没有成功拷贝的字节数 t_rest=copy_from_user(g_buffer,p_buf,t_size); //成功拷贝的字节数,也就是字符串的长度 g_length=t_size-t_rest; //触发write完成量,通知write操作完成 complete(&g_comp_write); //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //返回成功拷贝的字节数 return g_length; } //填充file_operations结构体 static struct file_operations amsg_fops= { .owner=THIS_MODULE, .read=amsg_read, .write=amsg_write }; //模块初始化代码 static int amsg_init_module(void) { //注册字符设备(这是Old way) int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops); //注册失败的处理 if(t_ret<0) { printk("Unable to register\n"); return t_ret; } //初始化自旋锁 spin_lock_init(&g_lock); //初始化read完成量 init_completion(&g_comp_read); //初始化write完成量 init_completion(&g_comp_write); return 0; } //模块清理代码 static void amsg_cleanup_module(void) { //注销字符设备 unregister_chrdev(AMSG_MAJOR,"aMsg"); printk("clean up!"); } module_init(amsg_init_module); module_exit(amsg_cleanup_module);
在read()的实现中,如果有数据可读,那么不管阻塞式还是非阻塞式IO,那么操作都是一样的——直接读取数据然后返回读取的字节数。相反,如果没有可读的数据,那么显式地判断了struct file的f_flags字段中O_NONBLOCK是否已经置位,如果是,说明应用层希望使用非阻塞IO,那么就立刻返回-EAGAIN;否则阻塞等待数据可读。换句话说,如果驱动中不理会O_NONBLOCK位,那么即使应用层已经把O_NONBLOCK置位,也没法使用阻塞式IO,最终的大权掌握在驱动的手里。write()的实现类似,不再累述。
这段驱动代码可以使用下面的应用层代码来测试:
nioTest.c:
#include <stdio.h> #include <unistd.h> #include <fcntl.h> int main() { int fd=open("/dev/aMsg",O_RDWR); if(fd<0) { perror("cannot open /dev/aMsg!"); return 1; } //设置O_NONBLOCK位 int flags=fcntl(fd,F_GETFL,0); flags|=O_NONBLOCK; fcntl(fd,F_SETFL,flags); //轮询 while(1) { char buf[1024]; int len=read(fd,buf,sizeof(buf)); if(len<0) printf("no data available...\n"); else { //字符串末尾置为0,防止溢出 buf[len]=0; printf("len=%d,content='%s'\n",len,buf); } sleep(1); } return 0; }
加载驱动、创建设备节点之后,使用命令编译测试应用程序:
gcc nioTest.c -o nioTest
然后运行:
sudo ./nioTest
可以看到每一秒钟输出一次“no data available…”:
此时,另开一个终端,在里面以root权限执行:
echo -e 'hello,nonblocking IO!\c' > /dev/aMsg
命令中,-e选项让echo启用转义,\c表示不要换行。命令执行后,可以发现nioTest输出了数据:
==============阶段二:select/poll操作==============
正式进入重点。select和poll其实是一回事,只不过select()调用是BSD Unix引入的,而poll()操作是System V引入的,他们完成的功能是一样的,连数据结构都是很相似的。动动脑子就能想到,select/poll操作肯定是需要驱动程序的辅助才能实现的。
假设应用程序把若干个文件描述符放入一个池,调用了select(),那么,Linux内核会依次找到每个文件描述符对应的驱动程序的file_operations.poll()函数。file_operations.poll()本身是非阻塞的,它的工作就是检查当前文件描述符是否可读或者可写,然后返回一个掩码,来指明状态。因此,内核可以通过一个循环,把池中所有的文件描述符的poll()函数都调用一遍。如果其中任意一个文件描述符是可读或者可写,那么select()就返回,并告知应用程序哪个文件描述符可读或可写。那如果所有的文件描述符都不可读不可写呢?此时,内核就会让select()阻塞,把线程挂起,等待通知。通知?谁来通知呢?这个问题问的很好,这就是驱动程序的file_operations.poll()函数需要完成的另一个工作——注册可能导致状态变化的“信号源”。比如在我们上面的代码中,一旦write()成功执行,那么就会触发完成量g_comp_write。而一旦read()执行成功,那么就会触发完成量g_comp_read。因此,驱动程序的file_operations.poll()函数需要向内核“注册”g_comp_write和g_comp_read,相当于告知内核说:“如果这两个中任意一个被触发,那么说明读写状况可能已经改变,麻烦你重新调用一下我的file_operations.poll()函数,看看是否真的可读或可写了”~
OK,直接看代码吧:
#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/spinlock.h>
#include <linux/completion.h>
#include <linux/poll.h>
MODULE_LICENSE("Dual BSD/GPL");
//主设备号
#define AMSG_MAJOR 224
//缓冲区大小
#define AMSG_MAX_BUF_SZ 1024
//当前字符串长度
static int g_length=0;
//缓冲区
static char g_buffer[AMSG_MAX_BUF_SZ];
//自旋锁
spinlock_t g_lock;
//read完成量
static struct completion g_comp_read;
//write完成量
static struct completion g_comp_write;
//read操作
static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset)
{
int t_size,t_rest;
while(1)
{
//对缓冲区加锁访问
spin_lock(&g_lock);
//如果有数据,则跳出循环,开始读取数据
if(g_length>0)
break;
//对缓冲区访问结束,解锁
spin_unlock(&g_lock);
//如果设置了O_NONBLOCK,那么直接返回-EAGAIN
if(p_file->f_flags&O_NONBLOCK)
return -EAGAIN;
//等待write操作的完成,为了防止无限等待,允许用户发送信号中断
if(wait_for_completion_interruptible(&g_comp_write))
return -ERESTARTSYS;
}
//最多能够读取的字节数(p_count和g_length之间较小者)
t_size=(p_count<g_length?p_count:g_length);
//没有成功拷贝的字节数
t_rest=copy_to_user(p_buf,g_buffer,t_size);
//不管结果如何,都清空缓冲区
g_length=0;
//触发read完成量,通知read操作完成
complete(&g_comp_read);
//对缓冲区访问结束,解锁
spin_unlock(&g_lock);
//返回成功拷贝的字节数
return t_size-t_rest;
}
//write操作
static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset)
{
int t_size,t_rest;
while(1)
{
//对缓冲区加锁访问
spin_lock(&g_lock);
//如果没有数据,则跳出循环,开始写入数据
if(g_length==0)
break;
//对缓冲区访问结束,解锁
spin_unlock(&g_lock);
//如果设置了O_NONBLOCK,那么直接返回-EAGAIN
if(p_file->f_flags&O_NONBLOCK)
return -EAGAIN;
//等待read操作的完成,为了防止无限等待,允许用户发送信号中断
if(wait_for_completion_interruptible(&g_comp_read))
return -ERESTARTSYS;
}
//最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者)
t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer));
//没有成功拷贝的字节数
t_rest=copy_from_user(g_buffer,p_buf,t_size);
//成功拷贝的字节数,也就是字符串的长度
g_length=t_size-t_rest;
//触发write完成量,通知write操作完成
complete(&g_comp_write);
//对缓冲区访问结束,解锁
spin_unlock(&g_lock);
//返回成功拷贝的字节数
return g_length;
}
//poll操作
static uint amsg_poll(struct file *p_file,poll_table *p_wait_table)
{
uint t_mask=0;
//注册g_comp_read.wait
poll_wait(p_file,&(g_comp_read.wait),p_wait_table);
//注册g_comp_write.wait
poll_wait(p_file,&(g_comp_write.wait),p_wait_table);
//对缓冲区加锁访问
spin_lock(&g_lock);
//根据数据状态设置状态掩码
if(g_length==0)
t_mask=POLLOUT|POLLWRNORM;
else
t_mask=POLLIN|POLLRDNORM;
//对缓冲区访问结束,解锁
spin_unlock(&g_lock);
printk("poll mask=%x\n",t_mask);
return t_mask;
}
//填充file_operations结构体
static struct file_operations amsg_fops=
{
.owner=THIS_MODULE,
.read=amsg_read,
.write=amsg_write,
.poll=amsg_poll
};
//模块初始化代码
static int amsg_init_module(void)
{
//注册字符设备(这是Old way)
int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops);
//注册失败的处理
if(t_ret<0)
{
printk("Unable to register\n");
return t_ret;
}
//初始化自旋锁
spin_lock_init(&g_lock);
//初始化read完成量
init_completion(&g_comp_read);
//初始化write完成量
init_completion(&g_comp_write);
return 0;
}
//模块清理代码
static void amsg_cleanup_module(void)
{
//注销字符设备
unregister_chrdev(AMSG_MAJOR,"aMsg");
printk("clean up!");
}
module_init(amsg_init_module);
module_exit(amsg_cleanup_module);
注意代码中的poll_wait()函数,这个函数的名称是我见过最最糟糕的!这个函数本身不阻塞,它的作用仅仅是把第二个参数(一个等待队列)加入到poll_table中!因此,叫做add_to_poll_wait_table()会好的多!
那么为什么是g_comp_read.wait和g_comp_write.wait呢?这个首先要看poll_wait()的原型:
void poll_wait(struct file *filp, wait_queue_head_t *queue, poll_table *wait);
它的第二个参数的类型是定义在<linux/wait.h>中的wait_queue_head_t,这是各种会导致挂起的阻塞操作中最最底层的东西。它其实就是一个队列,队列中的每一个元素都是一个挂起等待同一个事件的线程。当这个队列被触发一个事件时,队列中的线程会被唤醒(可能唤醒一个,也可能唤醒全部,看具体函数)。是不是觉得和完成量很像?是的,完成量就是对wait_queue_head_t的简单封装:
struct completion { unsigned int done; wait_queue_head_t wait; };
因此,只要传入g_comp_read.wait和g_comp_write.wait就行。当触发完成量时,本质上就是在触发等待队列,这样内核也会得到通知,然后重新调用amsg_poll()来查询状态。
可以使用如下应用层代码来测试:
pollTest.c:
#include <stdio.h> #include <unistd.h> #include <fcntl.h> #include <sys/select.h> int main() { int fd=open("/dev/aMsg",O_RDWR); if(fd<0) { perror("cannot open /dev/aMsg!"); return 1; } fd_set rds; FD_ZERO(&rds); FD_SET(fd,&rds); int ret=select(fd+1,&rds,NULL,NULL,NULL); if(ret<0) { printf("select error!\n"); return 1; } printf("select return\n"); if(FD_ISSET(fd,&rds)) { char buf[1024]; int len=read(fd,buf,sizeof(buf)); buf[len]=0; printf("len:%d,content='%s'\n",len,buf); } close(fd); return 0; }
重新加载驱动后,编译测试程序:
gcc pollTest.c -o pollTest
然后运行:
sudo ./pollTest
会发现进程阻塞。在第二个终端中执行:
dmesg | tail -n 5
可以看到aMsg.ko输出的信息:
echo -e 'hello,nonblocking IO!\c' > /dev/aMsg
可以看到pollTest输出了信息:
dmesg | tail -n 5
可以看到:
这说明,在应用层调用了select()之后,内核调用了一次驱动程序的poll(),发现不可读(只是可写),于是挂起。当有数据写入后,触发了g_comp_write(本质上触发了等待队列g_comp_write.wait),于是内核重新调用了驱动程序的poll(),返回POLLIN|POLLRDNORM,说明可以读取,于是select()返回。
这些行为都与预期的一致,说明poll()正确实现了。