第3篇:锁
在多处理器系统上实现并发,我们需要考虑到很多问题,特别是对于共享区域的互斥执行。在操作系统内核中存在着大量的共享数据结构,如果不仔细进行并发控制,将会出现各种各样令人头疼的问题。所以我们需要锁,对于共享的数据,我们在访问的时候上锁,有助于我们解决并发问题。
自旋锁
在xv6中,提供了两种类型锁的实现,一种是自旋锁,一种是睡眠锁,我们先来看看自旋锁。xv6的自旋锁是自旋锁实现的一个典范,非常值得学习。
首先我们查看
kernel/spinlock.h
中自旋锁结构的定义:struct spinlock {
uint locked; // Is the lock held?
// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
};
可以看到,自旋锁结构体定义了一个是否上锁的状态,该锁的名字和该锁由哪个CPU占有。有了这个,我们可以设置在不同情况下使用不同的锁,可以提高并发的性能,而且当出现并发bug时,可以通过锁的名字和CPU定位bug出现的地方。
接下来是获得锁的过程,在阅读代码之前,我们要先搞清楚几个关于互斥的基本假设:
- 需要一个原子指令
- 需要有编译优化屏障
- 需要内存屏障
这三个条件,是防止硬件和编译器对指令进行乱序的调整,导致发生不正确的行为,从而引起并发bug。
下面在
kernel/spinlock.c
中查看获取锁的代码:// Acquire the lock.
// Loops (spins) until the lock is acquired.
void
acquire(struct spinlock *lk)
{
push_off(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
// On RISC-V, sync_lock_test_and_set turns into an atomic swap:
// a5 = 1
// s1 = &lk->locked
// amoswap.w.aq a5, a5, (s1)
while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen strictly after the lock is acquired.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Record info about lock acquisition for holding() and debugging.
lk->cpu = mycpu();
}
可以看到,获取锁的函数第一件事是关闭中断。中断是会引发并发问题的,事实上,最早的操作系统并发性就是来自于中断。这里的复杂性是:我们允许在中断处理程序中使用自旋锁保护多处理器共享的数据结构,但是我们又不希望中断处理程序被其他中断打断。如果我们在上锁时没有关中断,此时中断处于打开状态,如果其他处理器触发了同样的中断,上了同样的锁,一把锁就会被上两次,然后就出现了AA型死锁的情况。所以,获取锁的第一件事是关中断,相应的,释放锁的最后一件事是开中断。
当然,其实不仅仅是获取锁第一件事关中断,释放锁最后一件事是开中断就可以防止问题的发生。在xv6中,实际上开关中断的是
intr_on()
和intr_off()
函数,那为什么这里不是直接使用这两个函数呢?实际上,如果在释放锁时开中断,也就意味着此时允许再来一次中断,如果在释放锁的函数开中断之前执行一次获取锁操作,然后执行开中断,就有可能会发生线程安全问题。所以xv6在开关中断的过程中,使用CPU相对应的一些数据结构来避免这个问题,它记录了上锁的嵌套,在开中断时只有计数器为0,才允许开中断。
void
push_off(void)
{
int old = intr_get();
intr_off();
if(mycpu()->noff == 0)
mycpu()->intena = old;
mycpu()->noff += 1;
}
void
pop_off(void)
{
struct cpu *c = mycpu();
if(intr_get())
panic("pop_off - interruptible");
if(c->noff < 1)
panic("pop_off");
c->noff -= 1;
if(c->noff == 0 && c->intena)
intr_on();
}
xv6这样做不仅是确保当一个自旋锁被中断处理程序使用时,永远不能持有该锁,更是做出了更严格的限制:当CPU获取任何锁时,总是禁止该CPU上的中断。而在其他CPU上,中断仍然是可以发生的,相应的记录数据结构只是存在于某一个相应的CPU上。
并且,xv6还进行了防御性编程,使用
if(holding(lk)) panic();
进行了AA型死锁检查。接着,在自旋锁的自旋循环中,使用了原子指令
__sync_lock_test_and_set
,这满足了对互斥的第一个假设,这个指令实现了原子交换,而原子交换就是实现互斥的关键。然后还有熟悉的内存屏障代码
__sync_synchronize()
,这个指令可以防止编译器和硬件对读写指令执行的顺序做优化,于是我们就满足了互斥的三个基本假设了,得到了一个正确且巧妙的自旋锁的实现。睡眠锁
然而,自旋锁也是有缺陷的,有时候内核需要长时间保持锁,例如在磁盘上读写文件时,磁盘的读写需要很长的时间,而此时除了进入临界区的进程,其他进程想要获取自旋锁就必须长时间空转,会浪费很长时间的CPU。而且,一个进程在持有自旋锁时不会让出CPU,然而我们希望持有锁的进程在等待磁盘I/O时其他进程也可以使用CPU。于是xv6设计了睡眠锁,它可以在等待获取锁时让出CPU,也可以在持有锁时让步。
睡眠锁的结构体定义也与自旋锁类似,多了一个被自旋锁保护的字段(定义在
kernel/sleeplock.h
中)。struct sleeplock {
uint locked; // Is the lock held?
struct spinlock lk; // spinlock protecting this sleep lock
// For debugging:
char *name; // Name of lock.
int pid; // Process holding lock
};
睡眠锁获取的实现和自旋锁不一样,因为睡眠锁允许中断,所以不需要关中断。并且,睡眠锁需要让出CPU,于是我们使用
sleep
和wakeup
实现了睡眠锁在获取和释放时的操作,事实上这里也就实现了一个同步机制,我们可以把睡眠锁理解为条件变量。在kernel/sleeplock.c
中查看代码:void
acquiresleep(struct sleeplock *lk)
{
acquire(&lk->lk);
while (lk->locked) {
sleep(lk, &lk->lk);
}
lk->locked = 1;
lk->pid = myproc()->pid;
release(&lk->lk);
}
void
releasesleep(struct sleeplock *lk)
{
acquire(&lk->lk);
lk->locked = 0;
lk->pid = 0;
wakeup(lk);
release(&lk->lk);
}
在
sleep
的具体实现中,会释放原有的lock,然后通过进程状态为SLEEPING
使进程进入睡眠状态,再调用sched()
释放CPU。而对于唤醒操作
wakeup
,会将该通道的所有睡眠进程唤醒。在kernel/proc.c
中可以查看sleep
和wakeup
的代码:// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
acquire(&p->lock); //DOC: sleeplock1
release(lk);
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
release(&p->lock);
acquire(lk);
}
// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void *chan)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++) {
if(p != myproc()){
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}
}
在xv6中有了睡眠锁,我们也可以实现同步的生产者消费者模型。
可以看到的是xv6中的自旋锁和睡眠锁的实现是相当不一样的,两者的应用场景也各不相同。
对于自旋锁,更适合在操作系统内核的并发数据结构(短临界区)中使用,这种场景下锁的持有者在很短时间内可以释放锁,且操作系统可以关闭中断,在中断处理程序中是必须使用自旋锁的,睡眠锁是中断使能的。
而在需要等待长时间的场景下,睡眠锁的效果更好,当然,自旋锁也是可以在这种场景下使用的,只是性能会很差。也就是说,睡眠锁不能在自旋锁的临界区中使用,而自旋锁可以在睡眠锁的临界区中使用。
Loading Comments...