线程同步

2014-09-29 19:23:31   最后更新: 2014-10-05 18:18:15   访问数量:1247




当多个控制线成共享相同的内存时,需要确保每个线程看到一致的数据视图

下图描述了两个线程读写相同变量的例子:

 

线程A读取变量然后给变量赋予一个新值,然后写入内存

但是,与此同时,B从内存中读取相同变量,由于A尚未将改变后的变量写入内存,所以B读到的是原值

如果此时线程A与线程B都要对内存或寄存器中的变量进行加1操作,由于线程A读到的是原值,所以A写入了原值加1,线程B在线程A写入前读取到该值,也进行加一操作,写入内存,则虽然两个线程对该变量进行了加1操作,事实上改变量却只加了1

解决这个问题的方案是加锁,让线程A在对该变量操作的过程中,不允许线程B对该变量进行操作,如下图所示:

 

如果修改操作是原子性的,则不存在竞争,但是在现代操作系统中,存储器访问总是需要多个总线周期,多处理器系统的总线周期常常是在多个处理器上交叉进行的

因此在线程中使用、修改变量的时候,需要对变量加锁

互斥量从本质上说就是一把锁,在访问资源前对互斥量加锁,访问后释放该锁

当锁释放时,所有等待该锁释放的线程都会变成可运行状态,第一个变为运行状态的线程可以对互斥量加锁,其他线程将会因此而继续等待,直到他继续变为可用,因此,在这种方式下,每次只会有一个线程可以向前执行

 

互斥量的初始化与销毁

互斥量用 pthread_mutex_t 数据类型表示,在使用互斥量以前,必须首先对它进行初始化

可以直接把互斥量置为 PTHREAD_MUTEX_INITIALIZER (只对静态分配的互斥量),这种方式称为静态初始化

也可以通过调用 pthread_mutex_init 函数进行初始化,这种方式称为动态初始化

动态初始化的互斥量必须在释放内存前调用 pthread_mutex_destroy

 

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutex_t *restrict attr); int pthread_mutex_destroy(pthread_mutex_t *mutex);

 

 

pthread_mutex_init 函数的第二个参数 attr 是互斥量属性,使用默认的属性则将 attr 置为 NULL

 

互斥量的加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_trylock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);

 

pthread_mutex_unlock 函数会在互斥量已经加锁的情况下阻塞线程直到互斥量被释放

pthread_mutex_trylock 函数则不会将线程阻塞,如果互斥量是加锁状态,则会返回 EBUSY

 

示例:使用互斥量保护数据结构

#include <stdlib.h> #include <pthread.h> struct foo { int f_count; pthread_mutex_t f_lock; /* more stuff */ }; struct foo *foo_alloc() { struct foo *fp; if ((fp = malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; if (pthread_mutex_init(&fp->f_lock, NULL) != 0) { free(fp); return NULL; } } return fp; } void foo_hold(struct foo *fp) { pthread_mutext_lock(&fp->f_lock); fp->f_count++; pthread_mutext_unlock(&fp->f_lock); } void foo_rele(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); if (--fp->f_count == 0) { pthread_mutex_unlock(&fp->f_lock); pthread_mutex_destroy(&fp->f_lock); free(fp); } else pthread_mutex_unlock(&fp->f_lock); }

 

在对引用计数加一减一以及检查引用计数是否为0这些操作中,都需要锁住互斥量

 

避免死锁

如果线程试图对同一个互斥量加锁两次,那么他自身就会陷入死锁

但是,还是有很多更不明显的方式也能产生死锁

当程序中使用多个互斥量,如果允许一个线程一直占有第一个互斥量,并且在试图锁住第二个互斥量时处于阻塞状态,而与此同时,拥有第二个互斥量的线程也在试图锁住第一个互斥量,此时,由于两个线程在相互请求另一个线程占用的资源,导致两个线程都无法向前继续运行,于是就会产生死锁

当然,死锁是可以避免的,例如,如果程序需要对互斥量A和B同时加锁,如果所有线程总是在对互斥量B加锁之前锁住互斥量A,那么使用这两个互斥量不会产生死锁,只有当一个线程试图以另一个线程相反的顺序锁住互斥量时,才可能出现死锁

示例:使用两个互斥量(避免死锁)

#include <stdlib.h> #include <pthread.h> #define NHASH 29 #define HASH(fp) (((unsigned long)fp)%NHASH) struct foo *fh[NHASH]; pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER; struct foo { int f_count; pthread_mutex_t f_lock; struct foo *f_next; int f_id; /* more stuff */ } struct foo *foo_alloc() { struct foo *fp; int inx; if ((fp = malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; if (pthread_mutex_init(&fp->f_lock, NULL) != 0) { free(fp); return NULL; } idx = HASH(fp); pthread_mutex_lock(&hash_lock); fp->f_next = fh[idx]; fh[idx] = fp; pthread_mutex_lock(&fp->f_lock); pthread_mutex_unlock(&hashlock); pthread_mutex_unlock(&fp->f_lock); } return fp; } void foo_hold(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); fp->f_count++; pthread_mutex_unlock(&fp->f_lock); } struct foo *foo_find(int fd) { struct foo *fp; int idx; idx = HASH(fp); pthread_mutex_lock(&hashlock); for (fp=fh[idx]; fp!=NULL; fp=fp->f_next) { if (fp->f_id == id) { foo_hold(fp); break; } } pthread_mutex_unlock(&hashlock); return fp; } void foo_rele() { struct foo *tfp; int idx; pthread_mutex_lock(&fp->f_lock); if (fp->f_count == 1) { pthread_mutex_unclock(&fp->f_lock); pthread_mutex_lock(&hashlock); pthread_mutex_lock(&fp->f_lock); if (fp->f_count != 1) { fp->f_count--; pthread_mutex_unlock(&fp->f_lock); pthread_mutex_unlock(&hashlock); return; } idx = HASH(fp); tfp = fh[idx]; if (tfp == fp) { fh[idx] = fp->f_next; } else { while (ftp->f_next != fp) tfp = tfp->f_next; tfp->f_next = fp->next; } pthread_mutex_unlock(&hashlock); pthread_mutex_unlock(&fp->f_lock); pthread_mutex_destroy(&fp->f_lock); } else { fp->f_count++; pthread_mutex_unclock(&fp->f_lock); } }

 

分配函数中用了散列列表锁,将新的结构添加到散列表中,需要先锁住散列列表锁

在对结构的操作中,用到了两个互斥量 -- 散列列表锁和结构中的互斥锁

在对整个散列表进行操作的时候,需要首先对全局的散列列表锁进行加锁,以防止在操作中(包括查询),其他线程对散列表结构进行修改

在对结构内部的数据进行修改之前需要锁住结构的互斥量,但在此之前,需要首先锁住散列列表锁,只要保证每次锁住结构体以前先锁住散列列表锁就不会构成死锁

但是这样反复加锁、解锁太过复杂,下面的例子进行了重新的设计

例程 -- 简化的加、解锁

#include <stdlib.h> #include <pthread.h> #define NHASH 29 #define HASH(fp) (((unsigned long)fp)%NHASH) struct foo *fh[NHASH]; pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER; struct foo { int f_count; pthread_mutex_t f_lock; struct foo *f_next; int f_id; }; struct foo *foo_alloc() { struct foo *fp; int idx; if ((fp = malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; if (pthread_mutex_init(&fp->f_lock, NULL) != 0) { free(fp); return NULL; } idx = HASH(fp); pthread_mutex_lock(&hashlock); fp->f_next = fh[idx]; fh[idx] = fp; pthread_mutex_lock(&fp->f_lock); pthread_mutex_unlock(&hashlock); } return fp; } void foo_hold(struct foo *fp) { pthread_mutex_lock(&hashlock); fp->f_count++; pthread_mutex_unlock(&hashlock); } struct foo *foo_find(int id) { struct foo *fp; int idx; idx = HASH(fp); pthread_mutex_lock(&hashlock); for (fp=fh[idx]; fp!=NULL; fp=fp->f_next) { if (fp->f_id == id) { fp->f_count++; break; } } pthread_mutex_unlock(&hashlock); return fp; } void foo_rele(struct foo *fp) { struct foo *tfp; int idx; pthread_mutex_lock(&hashlock); if (--fp->f_count == 0) { idx = HASH(fp); tfp = fh[idx]; if (tfp == fp) { fh[idx] = fp->f_next; } else { while (tfp->f_next != fp) tfp = tfp->f_next; tfp->f_next = fp->next; } pthread_mutex_unlock(&hashlock); pthread_mutex_destroy(&fp->f_lock); free(fp); } else pthread_mutex_unlock(&hashlock); }

 

  • 不明白程序第36行为什么要对 fp->f_lock 加锁,而且只是对他加锁而不解锁,那么 fp->f_lock 存在的价值是什么?既然有 hashlock 存在,就可以保证每个操作都不会被其他线程中断,那么 fp->lock 存在的价值是什么?

 

在程序设计中,如果设计的锁过多,锁的开销会使系统性能受到影响,并且让代码看起来太复杂

而如果设计的锁过少,就会出现很多线程等待相同的锁

 

读写锁也称为“共享-独占锁”

读写锁与互斥量类似,但是读写锁允许更高的并行性,互斥量只有加锁和解锁两种状态,而且每次只能有一个线程对互斥量加锁

而读写锁具有三种状态:读模式加锁、写模式加锁、不加锁,一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁

写加锁状态的读写锁与互斥量一样,在解锁前所有试图对这个锁加锁的线程都会被阻塞,但读加锁状态的读写锁则允许所有线程对读写锁读加锁

 

读写锁很适合用在对于数据结构的读次数远大于写次数的情况中

 

读写锁的初始化和销毁

与互斥量一样,读写锁使用之前必须初始化,在释放前必须销毁

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr); int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

 

定义于 pthread.h 中

调用成功返回0,否则返回错误编号

 

读写锁的加锁与解锁

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

 

定义于 pthread.h 中

调用成功返回0,否则返回错误编号

 

与互斥量一样,读写锁也有不阻塞的函数版本

int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

 

定义于 pthread.h 中

调用成功返回0,否则返回错误编号

 

示例:读写锁的使用

#include <stdlib.h> #include <pthread.h> struct job { struct job *j_next; struct job *j_prev; pthread_t j_id; } struct queue { struct job *q_head; struct job *q_tail; pthread_rwlock_t q_lock; } int queue_init(struct queue *qp) { int err; qp->q_head = NULL; qp->q_tail = NULL; err = pthread_rwlock_init(&qp->q_lock, NULL); if (err != 0) return err; return 0; } void job_insert(struct queue *qp, struct job *jp) { pthread_rwlock_wrlock(&qp->q_lock); jp->j_next = qp->q_head; jp->j_prev = NULL; if (qp->q_head != NULL) qp->q_head->q_prev = jp; else qp->q_tail = jp; qp->q_head = jp; pthread_rwlock_unlock(&qp->q_lock); } void job_append(struct queue *qp, struct job *jp) { pthread_rwlock_wrlock(&qp->q_lock); jp->j_next = NULL; jp->j_prev = qp->q_tail; if (qp->q_tail != NULL) qp->q_tail->j_next = jp; else qp->q_head = jp; pthread_rwlock_unlock(&qp->q_lock); } void job_remove(struct queue *qp, struct job *jp) { pthread_rwlock_wrlock(&qp->q_lock); if (jp == qp->q_head) { qp->q_head = jp->j_next; if (qp->q_tail == jp) qp->q_tail = NULL; } else if (jp == qp->q_tail) { qp->q_head = jp->j_prev; if (qp->q_head == jp) qp->q_head = NULL; } else { jp->j_prev->j_next = jp->j_next; jp->j_next->j_prev = jp->j_prev; } pthread_rwlock_unlock(&qp->q_lock); } struct job *job_find(struct queue *qp, pthread_t id) { struct job *jp; if (pthread_rwlock_rdlock(&qp->q_lock) != 0) return NULL; for (jp = qp->q_head; jp != NULL; jp = jp->j_next) if (pthread_equel(jp->j_id, id)) break; pthread_rwlock_unlock(&qp->q_lock); return jp; }

 

上面的例子展示了读写锁的使用

任务被作为队列的节点添加到了队列中,每个线程根据任务的 j_job 属性在队列中读取自己需要处理的任务,在进行增、删操作时,使用了读写锁的写状态锁,在进行查找操作时使用了读写锁的读状态锁,这样,当线程搜索队列的频率远远高于增删时,使用读写锁要比使用互斥量带来更大的性能提升

 

由于作业是按照线程id区分的,所以在同一时间,只会有一个线程占用,所以没有必要为每个作业增加额外的锁

 

条件变量是线程可以使用的另一种同步机制,用于让线程以武警正的方式等待某个特定的条件发生

条件变量也是需要使用互斥量保护的,线程在改变条件状态前,必须先锁住互斥量,其他线程在获得条件变量之前也必须锁住互斥量,这样才不会察觉到这种改

条件变量的初始化与销毁

与互斥量一样,条件变量可以用两种方式初始化

  1. 静态初始化 -- 直接把常量 PTHREAD_COND_INITIALIZER 付给静态分配的条件变量
  2. 动态初始化 -- 使用 pthread_cond_init 函数

对于动态初始化的条件变量,在释放底层的内存空间之前,需要使用 pthread_cond_destroy 函数销毁

int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr); int pthread_cond_destroy(pthread_cond_t *cond);

 

定义于 pthread.h 中

调用成功返回0,否则返回错误编号

 

attr 为 NULL 时创建默认树形的条件变量

等待条件变量

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); int pthread_cond_timedwaite(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict timeout); struct timespec { time_t tv_sec; /* seconds */ time_t tv_nsec; /* nanoseconds */ }

 

定义于 pthread.h 中

调用成功返回0,否则返回错误编号

 

pthread_cond_wait 函数会等待直到条件变为真,如果超时,则返回一个出错码 -- ETIMEDOUT

pthread_cond_timedwaite 允许设定等待超时

  • 等待函数有下列几点需要注意

  1. 传递给等待函数的互斥量必须是锁住状态的,函数原子性的解锁、测试,这样线程就不会错过条件的任何变化,当等待函数返回时,互斥量将被再次锁住
  2. pthread_cond_timedwaite 的参数 timespec 结构是绝对时间而不是相对时间,比如需要等待三分钟,则需要将当前时间加上3分钟传递给 timespec 结构

 

下面的函数实现了到相对时间到 timespec 结构的转化

void maketimeout(struct timespec *tsp, long minutes) { struct timeval now; gettimeofday(&now); tsp->tv_sec = now.tv_sec; tsp->tv_nsec = now.tv_usec * 1000; tsp->tv_sec += minutes * 60; }

 

 

唤醒线程

int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_cond_t *cond);

 

定义于 pthread.h 中

调用成功返回0,否则返回错误编号

 

这两个函数用于通知线程条件已经满足

pthread_cond_signal 函数将唤醒某个等待该条件的线程,pthread_cond_broadcast 则唤醒所有等待该条件的线程

 

需要注意的是,一定要在改变条件状态以后再给线程发送信号

 

条件变量的使用

#include <pthread.h> struct msg { sturct msg *m_next; }; struct msg *workq; pthread_cond_t qready = PTHREAD_COND_INITIALIZER; pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER; void process_msg() { struct msg *mp; while (1) { pthread_mutext_lock(&qlock); while (workq == NULL) pthread_cond_wait(&qready, &qlock); mp = workq; workq = mp->m_next; pthread_mutex_unlock(&qlock); } } void enqueue_msg(struct msg *mp) { pthread_mutex_lock(&qlock); mp->m_next = workq; workq = mp; pthread_mutex_unlock(&qlock); pthread_cond_signal(&qready); }

 

 






读书笔记      技术帖      龙潭书斋      apue      unix环境高级编程      进程      线程      互斥量      线程同步      mutex      pthread_mutex_init      pthread_mutext_destroy      pthread_mutex_lock      pthread_mutex_unlock      pthread_mutex_trylock      死锁     


京ICP备15018585号