在之前《第一个Linux驱动程序(一)——aMsg的open()、read()和write()(驱动程序的基本结构)》一文中,我们研究了Linux驱动程序的基本结构,以及不可或缺的几个重要操作——open()、read()和write()。不过呢,aMsg离真实的驱动程序还很远,很多内核功能都没有用上,比如今天要讲的主题——互斥与同步。
《Linux Device Driver》一书中花了整个第5章和第6章的前半段来讲解驱动程序以及Linux内核中的互斥与同步。不过呢,有多年Java多线程编程的经验,理解互斥与同步还是很容易的。虽然Linux内核中的互斥与同步工具略有不同,不过基本思想都是大同小异的。
==========阶段一:互斥(互斥量与自旋锁及其区别)=========
最最通用的互斥方法莫过于信号量semaphore(以下简称”sem”)。信号量sem其实就是一个包含一个整数值res的结构体和两个操作P(sem)和V(sem)。可以把res值理解为可用资源的个数,而P操作就是占用一个资源,而V操作则是释放一个资源。而互斥量mutex就是一个res值为1的信号量,使得同时最多只能有一个线程占用资源,可用于保护临界数据。如果某个线程对某个信号量上锁(P操作),而该信号量的res值为0,那么该线程就会被挂起,直到有可用资源为止(某个线程执行了V操作)。
除了信号量(互斥量)以外,另一个用于保护临界数据的常用工具就是自旋锁spinlock。自旋锁的原理很简单,就是使用一个循环不断检测直到某个等待的条件满足。可以想象spinlock包含了一个布尔值,如果是true,那么说明已经上锁,于是对自旋锁的上锁操作就会不停查询布尔值的状态,直到为false,于是立刻设置为true,完成上锁。当然,说着简单,事实上必须有硬件的配合才能做到原子操作,好在操作系统帮我们封装好了。
通过描述,就能发现互斥量和自旋锁一个最大的区别——在互斥访问临界数据时,如果资源已被别的线程锁定,互斥量把当前线程挂起(让出CPU),而自旋锁占着CPU忙等。乍一看,觉得当然是互斥量的做法妥当,因为自旋锁忙等肯定浪费时间。然而事实上,自旋锁往往能够达到更好的性能,内核中更多地选择使用自旋锁。这是为什么呢?不要忘了,挂起与恢复线程是需要时间开销的!如果线程在临界区中的滞留时间很短(比如只是修改一个整数值),那么发生冲突时,忙等的时间远小于挂起又恢复线程所需的时间。而驱动与内核中充满了这种小的临界区。
另一个需要注意的地方是,当某个线程在等待自旋锁时,该线程还是可能被抢占的。但是,如果某个线程已经获得了自旋锁但还未释放自旋锁(在临界区中)时,系统不会抢占该线程。之所以这么设计,是为了避免该线程在持有自旋锁时休眠。如果一个线程持有自旋锁休眠,那么将导致其他等待该自旋锁的线程毫无意义地忙等,往往更加浪费时间。
因此,如果临界区滞留时间很短而且不会中途休眠,那么就用自旋锁;否则使用互斥量!
在Linux内核与驱动程序中,要使用信号量,需要包含头文件:
#include <linux/semaphore.h>
初始化信号量要使用如下函数:
void sema_init(struct semaphore *sem, int val);
如果是互斥量(特殊的信号量),那么就是:
struct semaphore g_sem; sema_init(&g_sem,1);
对信号量进行上锁(P操作),可以使用如下函数之一:
void down(struct semaphore *sem); int down_interruptible(struct semaphore *sem); int down_trylock(struct semaphore *sem);
它们的行为应该能够从名称上看出来:第一个函数如果阻塞,那么就无法被中断,直到信号量被释放;第二个函数如果阻塞,可以被信号(比如Ctrl+C)中断,然后返回非零值;第三个函数会试图上锁,如果资源已经被占用,上锁失败,则返回非零值。
对信号量进行释放(V操作),可以使用如下函数:
void up(struct semaphore *sem);
如果要在内核中使用自旋锁,那么需要包含头文件:
#include <linux/spinlock.h>
初始化自旋锁使用函数:
void spin_lock_init(spinlock_t *lock);
自旋锁上锁使用函数:
void spin_lock(spinlock_t *lock);
解锁使用函数:
void spin_unlock(spinlock_t *lock);
事实上,自旋锁的加锁和解锁分别有四个函数,引用《Linux Device Driver》中的原话:
每一个加锁函数都有一个对应的解锁函数:void spin_unlock(spinlock_t *lock); void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags); void spin_unlock_irq(spinlock_t *lock); void spin_unlock_bh(spinlock_t *lock);
后三个都与中断有关,等接触到中断再来研究。
=================阶段二:同步================
线程之间的关系除了互斥外,还有同步。通常表现为,线程A需要线程B先完成某事,然后才能进行接下来的任务。因此,需要某种同步机制,能够让一个线程通知另一个线程某事已经完成。Linux内核专门为这种需求设计了一种机制——完成量completion。
完成量很类似于Windows中的事件Event。若干个线程可以同时等待某个完成量,它们都被挂起。当另一个线程触发了完成量时,等待该完成量的若干个线程中的一个或者全部(看具体的API)就会被唤醒。
其实,完成量的本质就是信号量,但是它为了它的特殊目的而进行了改进。假设这么一个场景:线程A需要等待线程B完成某个任务。那么可以创建一个res值为0的信号量sem,然后线程A对sem执行P操作,从而挂起。然后,当B完成了任务之后,对sem执行V操作,从而唤醒A。那么问题来了,如果有多个等待线程A1,A2,A3…An呢?如果线程B需要唤醒这n个线程,就得调用n次V操作。如果中途数量上出错,可能就导致该信号量不可再重用(需要重新初始化)。虽然也不是什么难事,但是代码就会太繁琐,而且语义不详。这也就是为什么要单独弄出一个完成量的原因。
要在内核态使用完成量,需要引用头文件:
#include <linux/completion.h>
初始化完成量使用:
void init_completion(struct completion *c);
使用如下函数等待完成量:
void wait_for_completion(struct completion *c); int wait_for_completion_interruptible(struct completion *c); // 被唤醒返回0,被中断返回1
触发完成量则使用:
void complete(struct completion *c); void complete_all(struct completion *c);
第一个函数从等待的线程中随机选择一个释放,而第二个函数则释放所有线程。
==========阶段三:aMsg的阻塞式IO的4种实现==========
学以至用,我要用Linux的互斥与同步工具为aMsg实现阻塞式IO:
(1)如果当前缓冲区没有字符串,那么read()操作阻塞,直到某个线程调用了write()操作写入了数据,则返回该写入的字符串;
(2)如果当前缓冲区有字符串,那么read()则立刻返回字符串,并清空缓冲区;
(3)如果当前缓冲区没有字符串,那么write()操作写入字符串,并立刻返回;
(4)如果当前缓冲区有字符串,那么write()操作阻塞,直到某个线程调用了read()操作读取了数据,则写入字符串。
第一种实现是参考了“生产者-消费者”问题的解决方法。假设有一个长度为10的队列,生产者向队列放入产品,而消费者从队列取出产品。那么可以创建两个信号量sem_r和sem_w,分别表示可以取出的产品个数和可以放入产品的空位数,所以一开始sem_r的值是0,而sem_w的值是10。消费者在取出产品之前,对sem_r执行P操作,成功取出产品后,对sem_w执行V操作。相反地,生产者在放入产品之前,对sem_w执行P操作,成功放入产品后,对sem_r执行V操作。为了防止多个生产者、多个消费者同时操作队列而导致数据破坏,可以再使用一个锁lock来保护队列。
而在这个问题中,队列的长度只有1,于是可以退化一下,sem_r的值为0,而sem_w的值为1。如果锁lock使用互斥量来实现,那么代码就是这样的了:
#include <linux/module.h> #include <linux/init.h> #include <linux/fs.h> #include <asm/uaccess.h> #include <linux/semaphore.h> MODULE_LICENSE("Dual BSD/GPL"); //主设备号 #define AMSG_MAJOR 224 //缓冲区大小 #define AMSG_MAX_BUF_SZ 1024 //当前字符串长度 static int g_length=0; //缓冲区 static char g_buffer[AMSG_MAX_BUF_SZ]; //read信号量 struct semaphore g_sem_read; //write信号量 struct semaphore g_sem_write; //互斥锁 struct semaphore g_mutex; //read操作 static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; //等待有空间可读,为了防止无限等待,允许用户发送信号中断 if(down_interruptible(&g_sem_read)) return -ERESTARTSYS; //对缓冲区加锁访问 down(&g_mutex); //最多能够读取的字节数(p_count和g_length之间较小者) t_size=(p_count<g_length?p_count:g_length); //没有成功拷贝的字节数 t_rest=copy_to_user(p_buf,g_buffer,t_size); //不管结果如何,都清空缓冲区 g_length=0; //对缓冲区访问结束,解锁 up(&g_mutex); //增加一个可写空间 up(&g_sem_write); //返回成功拷贝的字节数 return t_size-t_rest; } //write操作 static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; //等待有空间可写,为了防止无限等待,允许用户发送信号中断 if(down_interruptible(&g_sem_write)) return -ERESTARTSYS; //对缓冲区加锁访问 down(&g_mutex); //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者) t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer)); //没有成功拷贝的字节数 t_rest=copy_from_user(g_buffer,p_buf,t_size); //成功拷贝的字节数,也就是字符串的长度 g_length=t_size-t_rest; //对缓冲区访问结束,解锁 up(&g_mutex); //增加一个可读空间 up(&g_sem_read); //返回成功拷贝的字节数 return g_length; } //填充file_operations结构体 static struct file_operations amsg_fops= { .owner=THIS_MODULE, .read=amsg_read, .write=amsg_write }; //模块初始化代码 static int amsg_init_module(void) { //注册字符设备(这是Old way) int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops); //注册失败的处理 if(t_ret<0) { printk("Unable to register\n"); return t_ret; } //初始化read信号量 sema_init(&g_sem_read,0); //初始化write信号量 sema_init(&g_sem_write,1); //初始化互斥量 sema_init(&g_mutex,1); return 0; } //模块清理代码 static void amsg_cleanup_module(void) { //注销字符设备 unregister_chrdev(AMSG_MAJOR,"aMsg"); printk("clean up!"); } module_init(amsg_init_module); module_exit(amsg_cleanup_module);
第一种实现能够理解的话,第二种就很容易了——把互斥量替换为自旋锁即可:
#include <linux/module.h> #include <linux/init.h> #include <linux/fs.h> #include <asm/uaccess.h> #include <linux/semaphore.h> #include <linux/spinlock.h> MODULE_LICENSE("Dual BSD/GPL"); //主设备号 #define AMSG_MAJOR 224 //缓冲区大小 #define AMSG_MAX_BUF_SZ 1024 //当前字符串长度 static int g_length=0; //缓冲区 static char g_buffer[AMSG_MAX_BUF_SZ]; //read信号量 struct semaphore g_sem_read; //write信号量 struct semaphore g_sem_write; //自旋锁 spinlock_t g_lock; //read操作 static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; //等待有空间可读,为了防止无限等待,允许用户发送信号中断 if(down_interruptible(&g_sem_read)) return -ERESTARTSYS; //对缓冲区加锁访问 spin_lock(&g_lock); //最多能够读取的字节数(p_count和g_length之间较小者) t_size=(p_count<g_length?p_count:g_length); //没有成功拷贝的字节数 t_rest=copy_to_user(p_buf,g_buffer,t_size); //不管结果如何,都清空缓冲区 g_length=0; //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //增加一个可写空间 up(&g_sem_write); //返回成功拷贝的字节数 return t_size-t_rest; } //write操作 static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; //等待有空间可写,为了防止无限等待,允许用户发送信号中断 if(down_interruptible(&g_sem_write)) return -ERESTARTSYS; //对缓冲区加锁访问 spin_lock(&g_lock); //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者) t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer)); //没有成功拷贝的字节数 t_rest=copy_from_user(g_buffer,p_buf,t_size); //成功拷贝的字节数,也就是字符串的长度 g_length=t_size-t_rest; //对缓冲区访问结束,解锁 spin_unlock(&g_lock); //增加一个可读空间 up(&g_sem_read); //返回成功拷贝的字节数 return g_length; } //填充file_operations结构体 static struct file_operations amsg_fops= { .owner=THIS_MODULE, .read=amsg_read, .write=amsg_write }; //模块初始化代码 static int amsg_init_module(void) { //注册字符设备(这是Old way) int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops); //注册失败的处理 if(t_ret<0) { printk("Unable to register\n"); return t_ret; } //初始化read信号量 sema_init(&g_sem_read,0); //初始化write信号量 sema_init(&g_sem_write,1); //初始化自旋锁 spin_lock_init(&g_lock); return 0; } //模块清理代码 static void amsg_cleanup_module(void) { //注销字符设备 unregister_chrdev(AMSG_MAJOR,"aMsg"); printk("clean up!"); } module_init(amsg_init_module); module_exit(amsg_cleanup_module);
除了这种基于队列的思想以外,还有一个更加通用的办法。假设要检测的条件不是基于资源数量的,而是某个数学运算,那么就无法与信号量建立直接关系了。更加通用的办法的思想如下:首先获得互斥锁,然后检测条件是否成立,如果不成立,那么就等待。不过不要忘了,要先释放锁然后等待。通常等待完成量,等完成量被触发后,线程醒来,重新获得锁,再次检验,如此循环,直到条件成立为止。于是有了第三种实现:
#include <linux/module.h> #include <linux/init.h> #include <linux/fs.h> #include <asm/uaccess.h> #include <linux/semaphore.h> #include <linux/completion.h> MODULE_LICENSE("Dual BSD/GPL"); //主设备号 #define AMSG_MAJOR 224 //缓冲区大小 #define AMSG_MAX_BUF_SZ 1024 //当前字符串长度 static int g_length=0; //缓冲区 static char g_buffer[AMSG_MAX_BUF_SZ]; //互斥锁 static struct semaphore g_mutex; //read完成量 static struct completion g_comp_read; //write完成量 static struct completion g_comp_write; //read操作 static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; while(1) { //获取锁,以便访问临界资源 if(down_interruptible(&g_mutex)) return -ERESTARTSYS; //如果有数据,则跳出循环,开始读取数据 if(g_length>0) break; //释放锁 up(&g_mutex); //等待write操作的完成,为了防止无限等待,允许用户发送信号中断 if(wait_for_completion_interruptible(&g_comp_write)) return -ERESTARTSYS; } //最多能够读取的字节数(p_count和g_length之间较小者) t_size=(p_count<g_length?p_count:g_length); //没有成功拷贝的字节数 t_rest=copy_to_user(p_buf,g_buffer,t_size); //不管结果如何,都清空缓冲区 g_length=0; //触发read完成量,通知read操作完成 complete(&g_comp_read); //释放锁 up(&g_mutex); //返回成功拷贝的字节数 return t_size-t_rest; } //write操作 static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset) { int t_size,t_rest; while(1) { //获取锁,以便访问临界资源 if(down_interruptible(&g_mutex)) return -ERESTARTSYS; //如果没有数据,则跳出循环,开始写入数据 if(g_length==0) break; //释放锁 up(&g_mutex); //等待read操作的完成,为了防止无限等待,允许用户发送信号中断 if(wait_for_completion_interruptible(&g_comp_read)) return -ERESTARTSYS; } //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者) t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer)); //没有成功拷贝的字节数 t_rest=copy_from_user(g_buffer,p_buf,t_size); //成功拷贝的字节数,也就是字符串的长度 g_length=t_size-t_rest; //触发write完成量,通知write操作完成 complete(&g_comp_write); //释放锁 up(&g_mutex); //返回成功拷贝的字节数 return g_length; } //填充file_operations结构体 static struct file_operations amsg_fops= { .owner=THIS_MODULE, .read=amsg_read, .write=amsg_write }; //模块初始化代码 static int amsg_init_module(void) { //注册字符设备(这是Old way) int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops); //注册失败的处理 if(t_ret<0) { printk("Unable to register\n"); return t_ret; } //初始化互斥量 sema_init(&g_mutex,1); //初始化read完成量 init_completion(&g_comp_read); //初始化write完成量 init_completion(&g_comp_write); return 0; } //模块清理代码 static void amsg_cleanup_module(void) { //注销字符设备 unregister_chrdev(AMSG_MAJOR,"aMsg"); printk("clean up!"); } module_init(amsg_init_module); module_exit(amsg_cleanup_module);
如果把第三种实现中的互斥量换成自旋锁,那么就是第四种实现了~
===============阶段四:测试===============
四个版本的代码(最后一个版本没有直接给出)的效果都是一样的,选任意一份,make、insmod、mknod(具体查看《第一个Linux驱动程序(一)——aMsg的open()、read()和write()(驱动程序的基本结构)》)。
然后开启一个终端,切换为root权限,执行:
cat /dev/aMsg
可以观察到进程阻塞。
再开启一个终端,切换为root权限,执行:
echo hello > /dev/aMsg
可以发现命令立刻执行结束,而第一个终端中输出了cat,然后继续阻塞(因为cat命令再次调用了read())。
在第一个终端中按Ctrl+C,中断阻塞,结束进程。
在第二个终端中,连续两次执行:
echo hello > /dev/aMsg
会发现第一次执行立刻返回,成功写入字符串,而第二次则阻塞,因为此时缓冲区已经有字符串了,无法再写入。然后在第一个终端中输入:
cat /dev/aMsg
可以发现输出两个”hello”,而第二个终端中阻塞的写操作结束。
以上现象都与预期一致!
会用了信号量(互斥量)、自旋锁和完成量,那么几乎所有互斥与同步问题就能够解决了~