第一个Linux驱动程序(二)——aMsg的阻塞式IO(互斥与同步)

在之前《第一个Linux驱动程序(一)——aMsg的open()、read()和write()(驱动程序的基本结构)》一文中,我们研究了Linux驱动程序的基本结构,以及不可或缺的几个重要操作——open()、read()和write()。不过呢,aMsg离真实的驱动程序还很远,很多内核功能都没有用上,比如今天要讲的主题——互斥与同步。

《Linux Device Driver》一书中花了整个第5章和第6章的前半段来讲解驱动程序以及Linux内核中的互斥与同步。不过呢,有多年Java多线程编程的经验,理解互斥与同步还是很容易的。虽然Linux内核中的互斥与同步工具略有不同,不过基本思想都是大同小异的。

==========阶段一:互斥(互斥量与自旋锁及其区别)=========

最最通用的互斥方法莫过于信号量semaphore(以下简称”sem”)。信号量sem其实就是一个包含一个整数值res的结构体和两个操作P(sem)和V(sem)。可以把res值理解为可用资源的个数,而P操作就是占用一个资源,而V操作则是释放一个资源。而互斥量mutex就是一个res值为1的信号量,使得同时最多只能有一个线程占用资源,可用于保护临界数据。如果某个线程对某个信号量上锁(P操作),而该信号量的res值为0,那么该线程就会被挂起,直到有可用资源为止(某个线程执行了V操作)。

除了信号量(互斥量)以外,另一个用于保护临界数据的常用工具就是自旋锁spinlock。自旋锁的原理很简单,就是使用一个循环不断检测直到某个等待的条件满足。可以想象spinlock包含了一个布尔值,如果是true,那么说明已经上锁,于是对自旋锁的上锁操作就会不停查询布尔值的状态,直到为false,于是立刻设置为true,完成上锁。当然,说着简单,事实上必须有硬件的配合才能做到原子操作,好在操作系统帮我们封装好了。

通过描述,就能发现互斥量和自旋锁一个最大的区别——在互斥访问临界数据时,如果资源已被别的线程锁定,互斥量把当前线程挂起(让出CPU),而自旋锁占着CPU忙等。乍一看,觉得当然是互斥量的做法妥当,因为自旋锁忙等肯定浪费时间。然而事实上,自旋锁往往能够达到更好的性能,内核中更多地选择使用自旋锁。这是为什么呢?不要忘了,挂起与恢复线程是需要时间开销的!如果线程在临界区中的滞留时间很短(比如只是修改一个整数值),那么发生冲突时,忙等的时间远小于挂起又恢复线程所需的时间。而驱动与内核中充满了这种小的临界区。

另一个需要注意的地方是,当某个线程在等待自旋锁时,该线程还是可能被抢占的。但是,如果某个线程已经获得了自旋锁但还未释放自旋锁(在临界区中)时,系统不会抢占该线程。之所以这么设计,是为了避免该线程在持有自旋锁时休眠。如果一个线程持有自旋锁休眠,那么将导致其他等待该自旋锁的线程毫无意义地忙等,往往更加浪费时间。

因此,如果临界区滞留时间很短而且不会中途休眠,那么就用自旋锁;否则使用互斥量!

在Linux内核与驱动程序中,要使用信号量,需要包含头文件:

#include <linux/semaphore.h>

初始化信号量要使用如下函数:

void sema_init(struct semaphore *sem, int val);

如果是互斥量(特殊的信号量),那么就是:

struct semaphore g_sem;
sema_init(&g_sem,1);

对信号量进行上锁(P操作),可以使用如下函数之一:

void down(struct semaphore *sem);
int down_interruptible(struct semaphore *sem);
int down_trylock(struct semaphore *sem);

它们的行为应该能够从名称上看出来:第一个函数如果阻塞,那么就无法被中断,直到信号量被释放;第二个函数如果阻塞,可以被信号(比如Ctrl+C)中断,然后返回非零值;第三个函数会试图上锁,如果资源已经被占用,上锁失败,则返回非零值。

对信号量进行释放(V操作),可以使用如下函数:

void up(struct semaphore *sem);

如果要在内核中使用自旋锁,那么需要包含头文件:

#include <linux/spinlock.h>

初始化自旋锁使用函数:

void spin_lock_init(spinlock_t *lock);

自旋锁上锁使用函数:

void spin_lock(spinlock_t *lock);

解锁使用函数:

void spin_unlock(spinlock_t *lock);

事实上,自旋锁的加锁和解锁分别有四个函数,引用《Linux Device Driver》中的原话:

每一个加锁函数都有一个对应的解锁函数:

void spin_unlock(spinlock_t *lock);
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);

后三个都与中断有关,等接触到中断再来研究。

=================阶段二:同步================

线程之间的关系除了互斥外,还有同步。通常表现为,线程A需要线程B先完成某事,然后才能进行接下来的任务。因此,需要某种同步机制,能够让一个线程通知另一个线程某事已经完成。Linux内核专门为这种需求设计了一种机制——完成量completion。

完成量很类似于Windows中的事件Event。若干个线程可以同时等待某个完成量,它们都被挂起。当另一个线程触发了完成量时,等待该完成量的若干个线程中的一个或者全部(看具体的API)就会被唤醒。

其实,完成量的本质就是信号量,但是它为了它的特殊目的而进行了改进。假设这么一个场景:线程A需要等待线程B完成某个任务。那么可以创建一个res值为0的信号量sem,然后线程A对sem执行P操作,从而挂起。然后,当B完成了任务之后,对sem执行V操作,从而唤醒A。那么问题来了,如果有多个等待线程A1,A2,A3…An呢?如果线程B需要唤醒这n个线程,就得调用n次V操作。如果中途数量上出错,可能就导致该信号量不可再重用(需要重新初始化)。虽然也不是什么难事,但是代码就会太繁琐,而且语义不详。这也就是为什么要单独弄出一个完成量的原因。

要在内核态使用完成量,需要引用头文件:

#include <linux/completion.h>

初始化完成量使用:

void init_completion(struct completion *c);

使用如下函数等待完成量:

void wait_for_completion(struct completion *c);
int wait_for_completion_interruptible(struct completion *c);
// 被唤醒返回0,被中断返回1

触发完成量则使用:

void complete(struct completion *c);
void complete_all(struct completion *c);

第一个函数从等待的线程中随机选择一个释放,而第二个函数则释放所有线程。

==========阶段三:aMsg的阻塞式IO的4种实现==========

学以至用,我要用Linux的互斥与同步工具为aMsg实现阻塞式IO:

(1)如果当前缓冲区没有字符串,那么read()操作阻塞,直到某个线程调用了write()操作写入了数据,则返回该写入的字符串;

(2)如果当前缓冲区有字符串,那么read()则立刻返回字符串,并清空缓冲区;

(3)如果当前缓冲区没有字符串,那么write()操作写入字符串,并立刻返回;

(4)如果当前缓冲区有字符串,那么write()操作阻塞,直到某个线程调用了read()操作读取了数据,则写入字符串。

第一种实现是参考了“生产者-消费者”问题的解决方法。假设有一个长度为10的队列,生产者向队列放入产品,而消费者从队列取出产品。那么可以创建两个信号量sem_r和sem_w,分别表示可以取出的产品个数和可以放入产品的空位数,所以一开始sem_r的值是0,而sem_w的值是10。消费者在取出产品之前,对sem_r执行P操作,成功取出产品后,对sem_w执行V操作。相反地,生产者在放入产品之前,对sem_w执行P操作,成功放入产品后,对sem_r执行V操作。为了防止多个生产者、多个消费者同时操作队列而导致数据破坏,可以再使用一个锁lock来保护队列。

而在这个问题中,队列的长度只有1,于是可以退化一下,sem_r的值为0,而sem_w的值为1。如果锁lock使用互斥量来实现,那么代码就是这样的了:

#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/semaphore.h>

MODULE_LICENSE("Dual BSD/GPL");

//主设备号
#define AMSG_MAJOR 224
//缓冲区大小
#define AMSG_MAX_BUF_SZ 1024

//当前字符串长度
static int g_length=0;
//缓冲区
static char g_buffer[AMSG_MAX_BUF_SZ];
//read信号量
struct semaphore g_sem_read;
//write信号量
struct semaphore g_sem_write;
//互斥锁
struct semaphore g_mutex;

//read操作
static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    //等待有空间可读,为了防止无限等待,允许用户发送信号中断
    if(down_interruptible(&g_sem_read))
        return -ERESTARTSYS;
    //对缓冲区加锁访问
    down(&g_mutex);
    //最多能够读取的字节数(p_count和g_length之间较小者)
    t_size=(p_count<g_length?p_count:g_length);
    //没有成功拷贝的字节数
    t_rest=copy_to_user(p_buf,g_buffer,t_size);
    //不管结果如何,都清空缓冲区
    g_length=0;
    //对缓冲区访问结束,解锁
    up(&g_mutex);
    //增加一个可写空间
    up(&g_sem_write);
    //返回成功拷贝的字节数
    return t_size-t_rest;
}

//write操作
static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    //等待有空间可写,为了防止无限等待,允许用户发送信号中断
    if(down_interruptible(&g_sem_write))
        return -ERESTARTSYS;
    //对缓冲区加锁访问
    down(&g_mutex);
    //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者)
    t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer));
    //没有成功拷贝的字节数
    t_rest=copy_from_user(g_buffer,p_buf,t_size);
    //成功拷贝的字节数,也就是字符串的长度
    g_length=t_size-t_rest;
    //对缓冲区访问结束,解锁
    up(&g_mutex);
    //增加一个可读空间
    up(&g_sem_read);
    //返回成功拷贝的字节数
    return g_length;
}

//填充file_operations结构体
static struct file_operations amsg_fops=
{
    .owner=THIS_MODULE,
    .read=amsg_read,
    .write=amsg_write
};

//模块初始化代码
static int amsg_init_module(void)
{
    //注册字符设备(这是Old way)
    int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops);
    //注册失败的处理
    if(t_ret<0)
    {
        printk("Unable to register\n");
        return t_ret;
    }
    //初始化read信号量
    sema_init(&g_sem_read,0);
    //初始化write信号量
    sema_init(&g_sem_write,1);
    //初始化互斥量
    sema_init(&g_mutex,1);
    return 0;
}

//模块清理代码
static void amsg_cleanup_module(void)
{
    //注销字符设备
    unregister_chrdev(AMSG_MAJOR,"aMsg");
    printk("clean up!");
}

module_init(amsg_init_module);
module_exit(amsg_cleanup_module);

第一种实现能够理解的话,第二种就很容易了——把互斥量替换为自旋锁即可:

#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/semaphore.h>
#include <linux/spinlock.h>

MODULE_LICENSE("Dual BSD/GPL");

//主设备号
#define AMSG_MAJOR 224
//缓冲区大小
#define AMSG_MAX_BUF_SZ 1024

//当前字符串长度
static int g_length=0;
//缓冲区
static char g_buffer[AMSG_MAX_BUF_SZ];
//read信号量
struct semaphore g_sem_read;
//write信号量
struct semaphore g_sem_write;
//自旋锁
spinlock_t g_lock;

//read操作
static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    //等待有空间可读,为了防止无限等待,允许用户发送信号中断
    if(down_interruptible(&g_sem_read))
        return -ERESTARTSYS;
    //对缓冲区加锁访问
    spin_lock(&g_lock);
    //最多能够读取的字节数(p_count和g_length之间较小者)
    t_size=(p_count<g_length?p_count:g_length);
    //没有成功拷贝的字节数
    t_rest=copy_to_user(p_buf,g_buffer,t_size);
    //不管结果如何,都清空缓冲区
    g_length=0;
    //对缓冲区访问结束,解锁
    spin_unlock(&g_lock);
    //增加一个可写空间
    up(&g_sem_write);
    //返回成功拷贝的字节数
    return t_size-t_rest;
}

//write操作
static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    //等待有空间可写,为了防止无限等待,允许用户发送信号中断
    if(down_interruptible(&g_sem_write))
        return -ERESTARTSYS;
    //对缓冲区加锁访问
    spin_lock(&g_lock);
    //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者)
    t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer));
    //没有成功拷贝的字节数
    t_rest=copy_from_user(g_buffer,p_buf,t_size);
    //成功拷贝的字节数,也就是字符串的长度
    g_length=t_size-t_rest;
    //对缓冲区访问结束,解锁
    spin_unlock(&g_lock);
    //增加一个可读空间
    up(&g_sem_read);
    //返回成功拷贝的字节数
    return g_length;
}

//填充file_operations结构体
static struct file_operations amsg_fops=
{
    .owner=THIS_MODULE,
    .read=amsg_read,
    .write=amsg_write
};

//模块初始化代码
static int amsg_init_module(void)
{
    //注册字符设备(这是Old way)
    int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops);
    //注册失败的处理
    if(t_ret<0)
    {
        printk("Unable to register\n");
        return t_ret;
    }
    //初始化read信号量
    sema_init(&g_sem_read,0);
    //初始化write信号量
    sema_init(&g_sem_write,1);
    //初始化自旋锁
    spin_lock_init(&g_lock);
    return 0;
}

//模块清理代码
static void amsg_cleanup_module(void)
{
    //注销字符设备
    unregister_chrdev(AMSG_MAJOR,"aMsg");
    printk("clean up!");
}

module_init(amsg_init_module);
module_exit(amsg_cleanup_module);

除了这种基于队列的思想以外,还有一个更加通用的办法。假设要检测的条件不是基于资源数量的,而是某个数学运算,那么就无法与信号量建立直接关系了。更加通用的办法的思想如下:首先获得互斥锁,然后检测条件是否成立,如果不成立,那么就等待。不过不要忘了,要先释放锁然后等待。通常等待完成量,等完成量被触发后,线程醒来,重新获得锁,再次检验,如此循环,直到条件成立为止。于是有了第三种实现:

#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <asm/uaccess.h>
#include <linux/semaphore.h>
#include <linux/completion.h>

MODULE_LICENSE("Dual BSD/GPL");

//主设备号
#define AMSG_MAJOR 224
//缓冲区大小
#define AMSG_MAX_BUF_SZ 1024

//当前字符串长度
static int g_length=0;
//缓冲区
static char g_buffer[AMSG_MAX_BUF_SZ];
//互斥锁
static struct semaphore g_mutex;
//read完成量
static struct completion g_comp_read;
//write完成量
static struct completion g_comp_write;

//read操作
static ssize_t amsg_read(struct file* p_file,char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    while(1)
    {
        //获取锁,以便访问临界资源
        if(down_interruptible(&g_mutex))
            return -ERESTARTSYS;
        //如果有数据,则跳出循环,开始读取数据
        if(g_length>0)
            break;
        //释放锁
        up(&g_mutex);
        //等待write操作的完成,为了防止无限等待,允许用户发送信号中断
        if(wait_for_completion_interruptible(&g_comp_write))
            return -ERESTARTSYS;
    }
    //最多能够读取的字节数(p_count和g_length之间较小者)
    t_size=(p_count<g_length?p_count:g_length);
    //没有成功拷贝的字节数
    t_rest=copy_to_user(p_buf,g_buffer,t_size);
    //不管结果如何,都清空缓冲区
    g_length=0;
    //触发read完成量,通知read操作完成
    complete(&g_comp_read);
    //释放锁
    up(&g_mutex);
    //返回成功拷贝的字节数
    return t_size-t_rest;
}

//write操作
static ssize_t amsg_write(struct file* p_file,const char* p_buf,size_t p_count,loff_t* p_offset)
{
    int t_size,t_rest;
    while(1)
    {
        //获取锁,以便访问临界资源
        if(down_interruptible(&g_mutex))
            return -ERESTARTSYS;
        //如果没有数据,则跳出循环,开始写入数据
        if(g_length==0)
            break;
        //释放锁
        up(&g_mutex);
        //等待read操作的完成,为了防止无限等待,允许用户发送信号中断
        if(wait_for_completion_interruptible(&g_comp_read))
            return -ERESTARTSYS;
    }
    //最多能够写入的字节数(p_count和sizeof(g_buffer)之间较小者)
    t_size=(p_count<sizeof(g_buffer)?p_count:sizeof(g_buffer));
    //没有成功拷贝的字节数
    t_rest=copy_from_user(g_buffer,p_buf,t_size);
    //成功拷贝的字节数,也就是字符串的长度
    g_length=t_size-t_rest;
    //触发write完成量,通知write操作完成
    complete(&g_comp_write);
    //释放锁
    up(&g_mutex);
    //返回成功拷贝的字节数
    return g_length;
}

//填充file_operations结构体
static struct file_operations amsg_fops=
{
    .owner=THIS_MODULE,
    .read=amsg_read,
    .write=amsg_write
};

//模块初始化代码
static int amsg_init_module(void)
{
    //注册字符设备(这是Old way)
    int t_ret=register_chrdev(AMSG_MAJOR,"aMsg",&amsg_fops);
    //注册失败的处理
    if(t_ret<0)
    {
        printk("Unable to register\n");
        return t_ret;
    }
    //初始化互斥量
    sema_init(&g_mutex,1);
    //初始化read完成量
    init_completion(&g_comp_read);
    //初始化write完成量
    init_completion(&g_comp_write);
    return 0;
}

//模块清理代码
static void amsg_cleanup_module(void)
{
    //注销字符设备
    unregister_chrdev(AMSG_MAJOR,"aMsg");
    printk("clean up!");
}

module_init(amsg_init_module);
module_exit(amsg_cleanup_module);

如果把第三种实现中的互斥量换成自旋锁,那么就是第四种实现了~

===============阶段四:测试===============

四个版本的代码(最后一个版本没有直接给出)的效果都是一样的,选任意一份,make、insmod、mknod(具体查看《第一个Linux驱动程序(一)——aMsg的open()、read()和write()(驱动程序的基本结构)》)。

然后开启一个终端,切换为root权限,执行:

cat /dev/aMsg

可以观察到进程阻塞。

再开启一个终端,切换为root权限,执行:

echo hello > /dev/aMsg

可以发现命令立刻执行结束,而第一个终端中输出了cat,然后继续阻塞(因为cat命令再次调用了read())。

在第一个终端中按Ctrl+C,中断阻塞,结束进程。

在第二个终端中,连续两次执行:

echo hello > /dev/aMsg

会发现第一次执行立刻返回,成功写入字符串,而第二次则阻塞,因为此时缓冲区已经有字符串了,无法再写入。然后在第一个终端中输入:

cat /dev/aMsg

可以发现输出两个”hello”,而第二个终端中阻塞的写操作结束。

以上现象都与预期一致!

会用了信号量(互斥量)、自旋锁和完成量,那么几乎所有互斥与同步问题就能够解决了~