互斥锁

悲观锁(Pessimistic Locking)

悲观并发控制,又名悲观锁(Pessimistic Concurrency Control,PCC)是一种并发控制的方法。它可以阻止一个事务以影响其他用户的方式来修改数据。如果一个事务执行的操作都某行数据应用了锁,那只有当这个事务把锁释放,其他事务才能够执行与该锁冲突的操作。悲观并发控制主要用于数据争用激烈的环境,以及发生并发冲突时使用锁保护数据的成本要低于回滚事务的成本的环境中。

在编程语言中,悲观锁可能存在以下缺陷:

  • 在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。
  • 一个线程持有锁会导致其它所有需要此锁的线程挂起。
  • 如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。

数据库中悲观锁主要由以下问题:悲观锁大多数情况下依靠数据库的锁机制实现,以保证操作最大程度的独占性。如果加锁的时间过长,其他用户长时间无法访问,影响了程序的并发访问性,同时这样对数据库性能开销影响也很大,特别是对长事务而言,这样的开销往往无法承受,特别是对长事务而言。如一个金融系统,当某个操作员读取用户的数据,并在读出的用户数据的基础上进行修改时(如更改用户帐户余额),如果采用悲观锁机制,也就意味着整个操作过程中(从操作员读出数据、开始修改直至提交修改结果的全过程,甚至还包括操作员中途去煮咖啡的时间),数据库记录始终处于加锁状态,可以想见,如果面对几百上千个并发,这样的情况将导致怎样的后果。

互斥锁/排他锁

互斥锁即对互斥量进行分加锁,和自旋锁类似,唯一不同的是竞争不到锁的线程会回去睡会觉,等到锁可用再来竞争,第一个切入的线程加锁后,其他竞争失败者继续回去睡觉直到再次接到通知、竞争。

互斥锁算是目前并发系统中最常用的一种锁,POSIX、C++11、Java 等均支持。处理 POSIX 的加锁比较普通外,C++ 和 Java 的加锁方式很有意思。C++ 中可以使用一种 AutoLock(常见于 chromium 等开源项目中)工作方式类似 auto_ptr 智 能指针,在 C++11 中官方将其标准化为 std::lock_guard 和 std::unique_lock。Java 中使用 synchronized 紧跟同步代码块(也可修饰方法)的方式同步代码,非常灵活。这两种实现都巧妙的利用各自语言特性实现了非常优雅的加锁方式。当然除此之外他们也支持传统的类 似于 POSIX 的加锁模式。

struct mutex {
        atomic_t                count;
        spinlock_t              wait_lock;
        struct list_head        wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
        struct task_struct      *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        struct optimistic_spin_queue osq;
#endif
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map      dep_map;
#endif
};

This structure is defined in the include/linux/mutex.h header file and contains similar to the semaphore structure set of fields. The first field of the mutex structure is - count. Value of this field represents state of a mutex. In a case when the value of the count field is 1, a mutex is in unlocked state. When the value of the count field is zero, a mutex is in the locked state. Additionally value of the count field may be negative. In this case a mutex is in the locked state and has possible waiters.

互斥(Mutex)是一种用途非常广泛的内核对象。能够保证多个线程对同一共享资源的互斥访问。同临界区有些类似,只有拥有互斥对象的线程才具有访问资源 的权限,由于互斥对象只有一个,因此就决定了任何情况下此共享资源都不会同时被多个线程所访问。当前占据资源的线程在任务处理完后应将拥有的互斥对象交 出,以便其他线程在获得后得以访问资源。与其他几种内核对象不同,互斥对象在操作系统中拥有特殊代码,并由操作系统来管理,操作系统甚至还允许其进行一些 其他内核对象所不能进行的非常规操作。互斥量跟临界区很相似,只有拥有互斥对象的线程才具有访问资源的权限,由于互斥对象只有一个,因此就决定了任何情况 下此共享资源都不会同时被多个线程所访问。当前占据资源的线程在任务处理完后应将拥有的互斥对象交出,以便其他线程在获得后得以访问资源。互斥量比临界区 复杂。因为使用互斥不仅仅能够在同一应用程序不同线程中实现资源的安全共享,而且可以在不同应用程序的线程之间实现对资源的安全共享。

互斥量表现互斥现象的数据结构,也被当作二元信号灯。一个互斥基本上是一个多任务敏感的二元信号,它能用作同步多任务的行为,它常用作保护从中断来的临界段代码并且在共享同步使用的资源。

Mutex 本质上说就是一把锁,提供对资源的独占访问,所以 Mutex 主要的作用是用于互斥。Mutex 对象的值,只有 0 和 1 两个值。这两个值也分别代表了 Mutex 的两种状态。值为 0, 表示锁定状态,当前对象被锁定,用户进程/线程如果试图 Lock 临界资源,则进入排队等待;值为 1,表示空闲状态,当前对象为空闲,用户进程/线程可以 Lock 临界资源,之后 Mutex 值减 1 变为 0。

Mutex 可以被抽象为四个操作:

  • 创建 Create

  • 加锁 Lock

  • 解锁 Unlock

  • 销毁 Destroy

Mutex 被创建时可以有初始值,表示 Mutex 被创建后,是锁定状态还是空闲状态。在同一个线程中,为了防止死锁,系统不允许连续两次对 Mutex 加锁(系统一般会在第二次调用立刻返回)。也就是说,加锁和解锁这两个对应的操作,需要在同一个线程中完成。

不同操作系统中提供的 Mutex 函数: 动作\系统 Win32 Linyx Solaris

创建 CreateMutex pthread_mutex_init mutex_init

加锁 WaitForSingleObject pthread_mutex_lock mutex_lock

解锁 ReleaseMutex pthread_mutex_unlock mutex_unlock

销毁 CloseHandle pthread_mutex_destroy mutex_destroy

互斥量本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量。对互斥量进行枷锁以后,其他视图再次对互斥量加锁的线程都会被阻塞直到当前线程释放该互斥锁。如果释放互斥量时有一个以上的线程阻塞,那么所有该锁上的阻塞线程都会变成可运行状态,第一个变成运行状态的线程可以对互斥量加锁,其他线程就会看到互斥量依然是锁着,只能再次阻塞等待它重新变成可用,这样,一次只有一个线程可以向前执行。

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);//互斥初始化
int pthread_mutex_destroy(pthread_mutex_t *mutex);//销毁互斥
int pthread_mutex_lock(pthread_mutex_t *mutex);//锁定互斥
int pthread_mutex_unlock(pthread_mutex_t *mutex);//解锁互斥
int pthread_mutex_trylock(pthread_mutex_t *mutex);//销毁互斥

pthread_t mutex;
pthread_mutex_init(&mutex, NULL);
pthread_mutex_lock(&mutex);
// ...
pthread_mutex_unlock(&mutex);
pthread_mutex_detroy(&mutex);

可递归锁/可重入锁

也叫做锁递归,就是获取一个已经获取的锁。不支持线程获取它已经获取且尚未解锁的方式叫做不可递归或不支持重入。带重入特性的锁在重入时会判断是否同一个线程,如果是,则使持锁计数器+1(0 代表没有被线程获取,又或者是锁被释放)。C++11 中同时支持两种锁,递归锁 std::recursive_mutex 和非递归 std::mutex。Java 的两种互斥锁实现以及读写锁实现均支持重入。POSIX 使用一种叫做重入函数的方法保证函数的线程安全,锁粒度是调用而非线程。

在所有的线程同步方法中,恐怕互斥锁(mutex)的出场率远远高于其它方法。互斥锁的理解和基本使用方法都很容易,这里不做更多介绍了。 Mutex 可以分为递归锁(recursive mutex)和非递归锁(non-recursive mutex)。可递归锁也可称为可重入锁(reentrant mutex),非递归锁又叫不可重入锁(non-reentrant mutex)。 二者唯一的区别是,同一个线程可以多次获取同一个递归锁,不会产生死锁。而如果一个线程多次获取同一个非递归锁,则会产生死锁。 Windows 下的 Mutex 和 Critical Section 是可递归的。Linux 下的 pthread_mutex_t 锁默认是非递归的。可以显示的设置 PTHREAD_MUTEX_RECURSIVE 属性,将 pthread_mutex_t 设为递归锁。

    MutexLock mutex;

    void foo()
    {
        mutex.lock();
        // do something
        mutex.unlock();
    }

    void bar()
    {
        mutex.lock();
        // do something
        foo();
        mutex.unlock();
    }

foo 函数和 bar 函数都获取了同一个锁,而 bar 函数又会调用 foo 函数。如果 MutexLock 锁是个非递归锁,则这个程序会立即死锁。因此在为一段程序加锁时要格外小心,否则很容易因为这种调用关系而造成死锁。 不要存在侥幸心理,觉得这种情况是很少出现的。当代码复杂到一定程度,被多个人维护,调用关系错综复杂时,程序中很容易犯这样的错误。庆幸的是,这种原因造成的死锁很容易被排除。 但 是这并不意味着应该用递归锁去代替非递归锁。递归锁用起来固然简单,但往往会隐藏某些代码问题。比如调用函数和被调用函数以为自己拿到了锁,都在修改同一 个对象,这时就很容易出现问题。因此在能使用非递归锁的情况下,应该尽量使用非递归锁,因为死锁相对来说,更容易通过调试发现。程序设计如果有问题,应该 暴露的越早越好。 为了避免上述情况造成的死锁,AUPE v2 一书在第 12 章提出了一种设计方法。即如果一个函数既有可能在已加锁的情况下使用,也有可能在未加锁的情况下使用,往往将这个函数拆成两个版本—加锁版本和不加锁版本(添加 nolock 后缀)。 例如将 foo()函数拆成两个函数。

// 不加锁版本
void foo_nolock()
{
	// do something
}
// 加锁版本
void fun()
{
    mutex.lock();
	foo_nolock();
	mutex.unlock();
}

为了接口的将来的扩展性,可以将 bar()函数用同样方法拆成 bar_withou_lock()函数和 bar()函数。 在 Douglas C. Schmidt(ACE 框架的主要编写者)的“Strategized Locking, Thread-safe Interface, and Scoped Locking”论文中,提出了一个基于 C++的线程安全接口模式(Thread-safe interface pattern),与 AUPE 的方法有异曲同工之妙。即在设计接口的时候,每个函数也被拆成两个函数,没有使用锁的函数是 private 或者 protected 类型,使用锁的的函数是 public 类型。接口如下:

class T
{
public:
	foo(); //加锁
	bar(); //加锁
private:
	foo_nolock();
	bar_nolock();
}

条件变量

条件变量是线程可用的另一种同步机制。互斥量用于上锁,条件变量则用于等待,并且条件变量总是需要与互斥量一起使用,运行线程以无竞争的方式等待特定的条件发生。

条件变量本身是由互斥量保护的,线程在改变条件变量之前必须首先锁住互斥量。其他线程在获得互斥量之前不会察觉到这种变化,因为互斥量必须在锁定之后才能计算条件。

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);//初始化条件变量
int pthread_cond_destroy(pthread_cond_t *cond);//销毁条件变量
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);//无条件等待条件变量变为真
int pthread_cond_timewait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *tsptr);//在给定时间内,等待条件变量变为真
eg.pthread_mutex_t mutex;
pthread_cond_t cond;
// ...
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
// ...
pthread_mutex_unlock(&mutex);
// ...

注意,pthread_cond_wait 执行的流程首先将这个 mutex 解锁, 然后等待条件变量被唤醒, 如果没有被唤醒, 该线程将一直休眠, 也就是说, 该线程将一直阻塞在这个 pthread_cond_wait 调用中, 而当此线程被唤醒时, 将自动将这个 mutex 加锁,然后再进行条件变量判断(原因是“惊群效应”,如果是多个线程都在等待这个条件,而同时只能有一个线程进行处理,此时就必须要再次条件判断,以使只有一个线程进入临界区处理。),如果满足,则线程继续执行。

读写锁

读写锁与互斥量类似,不过读写锁拥有更高的并行性。互斥量要么是锁住状态,要么是不加锁状态,而且一次只有一个线程可以对其加锁。读写锁有 3 种状态:读模式下加锁状态,写模式下加锁状态,不加锁状态。一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁。当读写锁是写加锁状态时,在这个锁被解锁之前,所有视图对这个锁加锁的线程都会被阻塞。当读写锁在读加锁状态时,所有试图以读模式对它进行加锁的线程都可以得到访问权,但是任何希望以写模式对此锁进行加锁的线程都会阻塞,直到所有的线程释放它们的读锁为止。

#include <pthread.h>

int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *rwlockattr);//初始化读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);//销毁读写锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//读模式锁定读写锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);//写模式锁定读写锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);//解锁读写锁
eg.pthread_rwlock_t q_lock;
pthread_rwlock_init(&q_lock, NULL);
pthread_rwlock_rdlock(&q_lock);
...
pthread_rwlock_unlock(&q_lock);
pthread_rwlock_detroy(&q_lock);

支持两种模式的锁,当采用写模式上锁时与互斥锁相同,是独占模式。但读模式上锁可以被多个读线程读取。即写时使用互斥锁,读时采用共享锁,故又叫共享-独 占锁。一种常见的错误认为数据只有在写入时才需要锁,事实是即使是读操作也需要锁保护,如果不这么做的话,读写锁的读模式便毫无意义。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <list>

bool quitting = false;
int data = 0;

pthread_rwlock_t rwlock = PTHREAD_RWLOCK_WRITER_NONRECURSIVE_INITIALIZER_NP;

class Reader {
	int id_;
	pthread_t thread_;

public:
	explicit Reader(int id) :
		id_(id)
	{
		pthread_create(&thread_, nullptr, &Reader::_run, this);
	}

	~Reader() {
		pthread_join(thread_, nullptr);
	}

	void run() {
		while (!quitting) {
			pthread_rwlock_rdlock(&rwlock);
			printf("[%d]: %d\n", id_, data);
			pthread_rwlock_unlock(&rwlock);

			pthread_yield(); // junk
		}
	}

private:
	static void* _run(void* p) {
		Reader* reader = static_cast<Reader*>(p);
		reader->run();
		return nullptr;
	}
};

void writer() {
	pthread_yield();
	for (;;) {
		sleep(1);

		printf("rwlock: Acquiring for WRITE\n");
		pthread_rwlock_wrlock(&rwlock);
		printf("rwlock: hardcore WRITE-action @.@ BEGIN\n");
		data = data + 1;
		quitting = data == 42;
		sleep(2);
		printf("rwlock: hardcore WRITE-action @.@ DONE\n");
		pthread_rwlock_unlock(&rwlock);
	}
}

int main(int argc, const char *argv[])
{
	srand(time(nullptr));

	std::list<Reader*> readers;

	for (int i = 1, e = (argc == 2 ? atoi(argv[1]) : 4); i <= e; ++i)
		readers.push_back(new Reader(i));

	writer();

	for (auto reader: readers)
		delete reader;

	return 0;
}
上一页
下一页