pthread_cond_wait源码分析

  • A+
所属分类:网站技术

三、Glibc的实现

1、数据结构

/* Data structure for conditional variable handling.  Thestructure of
the attribute type is not exposed on purpose.  */
typedef union
{
struct
{
int __lock;保护多线程中cond结构本身的变量操作不会并发,例如对于total_seq进而wakup_seq的使用和递增操作。
unsigned int __futex;另一个线程和这个线程之间在条件点上同步的方式,也就是如果需要和其它线程同步的话,使用这个互斥锁替换pthread_cond_wait传入的互斥锁进行同步。
__extension__ unsigned long long int __total_seq;这个表示在这个条件变量上有多少个线程在等待这个信号。
__extension__ unsigned long long int __wakeup_seq;已经在这个条件变量上执行了多少次唤醒操作。
__extension__ unsigned long long int __woken_seq;这个条件变量中已经被真正唤醒的线程数目。
void *__mutex;保存pthread_cond_wait传入的互斥锁,需要保证pthread_cond_wait和pthread_cond_signal传入的值都是相同值。

unsigned int __nwaiters;表示这个cond结构现在还有多少个线程在使用,当有人在使用的时候,pthread_cond_destroy需要等待所有的操作完成
unsigned int __broadcast_seq; 广播动作发生了多少次,也就是执行了多少次broadcast
} __data;
char __size[__SIZEOF_PTHREAD_COND_T];
__extension__ long long int __align;
} pthread_cond_t;

2、lll_futex_wait的意义

lll_futex_wait(&cond->__data.__futex, futex_val, pshared);
lll_futex_wake (&cond->__data.__nwaiters, 1,pshared);
对于第一个wait,需要传入一个我们用户态判断时使用的futex值,也就是这里的第二个参数futex_val,这样内核会判断进行真正的wait挂起的时候这个地址的是不是还是这个值,如果不是这个wait失败。但是进行wakup的时候不需要传入判断值,可能是假设此时已经获得互斥锁,所以不会有其它线程来竞争了吧。

这个要和pthread_mutex_lock使用的0、1、2三值区分开来,因为这些都是C库规定的语义,内核对他们没有任何特殊要求和语义判断,所以用户态可以随意的改变这个变量的值。

3、pthread_cond_wait的操作

int
__pthread_cond_wait (cond, mutex)
pthread_cond_t *cond;
pthread_mutex_t *mutex;
{
struct _pthread_cleanup_buffer buffer;
struct _condvar_cleanup_buffer cbuffer;
int err;
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;

/* Make sure we are along.  */
lll_lock (cond->__data.__lock, pshared);即将对cond结构的成员进行操作和判断,所以首先获得结构本身保护互斥锁。

/* Now we can release the mutex.  */
err = __pthread_mutex_unlock_usercnt (mutex, 0);释放用户传入的互斥锁,此时另外一个执行pthread_cond_signal的线程可以通过pthread_mutex_lock执行可能的signal判断,但是我们还没有释放数据操作互斥锁,所以另一方执行pthread_cond_signal的时候依然可能会等待。
if (__builtin_expect (err, 0))
{
lll_unlock (cond->__data.__lock, pshared);
return err;
}

/* We have one new user of the condvar.  */
++cond->__data.__total_seq;增加系统中所有需要执行的唤醒次数。
++cond->__data.__futex;增加futex,主要是为了保证用户态数据一致性。
cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;增加cond结构的使用次数。

/* Remember the mutex we are using here.  If thereis already a
different address store this is a bad user bug. Do not store
anything for pshared condvars.  */
if (cond->__data.__mutex != (void *) ~0l)
cond->__data.__mutex = mutex;

/* Prepare structure passed to cancellationhandler.  */
cbuffer.cond = cond;
cbuffer.mutex = mutex;

/* Before we block we enable cancellation. Therefore we have to
install a cancellation handler.  */
__pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);注册撤销点。

/* The current values of the wakeup counter.  The"woken" counter
must exceed this value.  */
unsigned long long int val;
unsigned long long int seq;
val = seq = cond->__data.__wakeup_seq;
/* Remember the broadcast counter.  */
cbuffer.bc_seq = cond->__data.__broadcast_seq;

do
{
unsigned int futex_val =cond->__data.__futex;

/* Prepare to wait.  Releasethe condvar futex.  */
lll_unlock (cond->__data.__lock, pshared);此处真正释放cond操作互斥锁,我们已经不再对其中的变量进行操作。

/* Enable asynchronouscancellation.  Required by the standard.  */
cbuffer.oldtype = __pthread_enable_asynccancel();

/* Wait until woken by signal orbroadcast.  */
lll_futex_wait (&cond->__data.__futex,futex_val, pshared);等待在futex变量上,由于我们刚才保存了futex的原始值,所以如果在上面我们释放了data.lock之后另一个线程修改了这个变量的值,那么这里的lll_futex_wait将会返回失败,所以会继续进行下一轮的while循环,直到连个执行相同,说明我们做的判断时正确的。

/* Disable asynchronouscancellation.  */如果执行到这里,说明我们已经被signal唤醒。
__pthread_disable_asynccancel (cbuffer.oldtype);

/* We are going to look at shareddata again, so get the lock.  */
lll_lock (cond->__data.__lock, pshared);访问变量,需要获得互斥锁。

/* If a broadcast happened, weare done.  */
if (cbuffer.bc_seq !=cond->__data.__broadcast_seq)
goto bc_out;

/* Check whether we are eligiblefor wakeup.  */
val = cond->__data.__wakeup_seq;
}
while (val == seq || cond->__data.__woken_seq == val); 当val!=seq&&cond->data.wokenup!=val的时候可以进行唤醒,也就是另一个放修改了已经执行了唤醒的次数并且已经被唤醒的线程还有名额的时候。

/* Another thread woken up.  */
++cond->__data.__woken_seq;增加系统中已经被唤醒的线程的数目。

bc_out: broadcast跳转到这里。

cond->__data.__nwaiters -= 1 <<COND_NWAITERS_SHIFT;

/* If pthread_cond_destroy was called on this varaiblealready,
notify the pthread_cond_destroy caller all waitershave left
and it can be successfully destroyed.  */
if (cond->__data.__total_seq == -1ULL
&& cond->__data.__nwaiters < (1<< COND_NWAITERS_SHIFT))
lll_futex_wake (&cond->__data.__nwaiters, 1,pshared);

/* We are done with the condvar.  */
lll_unlock (cond->__data.__lock, pshared);

/* The cancellation handling is back to normal, removethe handler.  */
__pthread_cleanup_pop (&buffer, 0);

/* Get the mutex before returning.  */
return __pthread_mutex_cond_lock (mutex);再次获得mutex互斥锁,可能会睡眠,因为我们的这个释放是对上层透明的,而在进入函数的时候我们已经释放了这个互斥锁,所以此时还要进行一次获得操作,从而配对。
}
4、pthread_cond_signal的操作

int
__pthread_cond_signal (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;

/* Make sure we are alone.  */
lll_lock (cond->__data.__lock, pshared);

/* Are there any waiters to be woken?  */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)如果待唤醒次数比已经唤醒的次数多,那么此时就进行一个唤醒操作。
{
/* Yes.  Mark one of them as woken. */
++cond->__data.__wakeup_seq;
++cond->__data.__futex;改变futex的值,这个值的具体意义并不重要,只是为了告诉另一方,这个值已经变化,如果另一方使用的是原始值,那么对futex的wait操作将会失败。

/* Wake one.  */
if (! __builtin_expect (lll_futex_wake_unlock(&cond->__data.__futex, 1,
1,&cond->__data.__lock,
pshared), 0))
return 0;

lll_futex_wake(&cond->__data.__futex, 1, pshared);
}

/* We are done.  */
lll_unlock (cond->__data.__lock, pshared);

return 0;
}
5、__pthread_cond_broadcast

int
__pthread_cond_broadcast (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are alone.  */
lll_lock (cond->__data.__lock, pshared);

/* Are there any waiters to be woken?  */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)判断是否有等待唤醒的线程。
{
/* Yes.  Mark them all as woken.  */
cond->__data.__wakeup_seq =cond->__data.__total_seq;
cond->__data.__woken_seq = cond->__data.__total_seq;
cond->__data.__futex = (unsigned int)cond->__data.__total_seq * 2;
int futex_val = cond->__data.__futex;
/* Signal that a broadcast happened.  */
++cond->__data.__broadcast_seq;

/* We are done.  */
lll_unlock (cond->__data.__lock, pshared);

/* Do not use requeue for psharedcondvars.  */
if (cond->__data.__mutex == (void *) ~0l)
goto wake_all;

/* Wake everybody.  */
pthread_mutex_t *mut = (pthread_mutex_t *)cond->__data.__mutex;

/* XXX: Kernel so far doesn'tsupport requeue to PI futex.  */
/* XXX: Kernel so far can only requeue to thesame type of futex,
in this case private (we don't requeue for pshared condvars).  */
if (__builtin_expect (mut->__data.__kind
& (PTHREAD_MUTEX_PRIO_INHERIT_NP
|PTHREAD_MUTEX_PSHARED_BIT), 0))
goto wake_all;

/* lll_futex_requeue returns 0for success and non-zero
for errors.  */
if (__builtin_expect (lll_futex_requeue(&cond->__data.__futex, 1,
INT_MAX,&mut->__data.__lock,
futex_val,LLL_PRIVATE), 0))把futex上的转移到data.lock中并唤醒,如果失败则直接唤醒而不转移。
{
/* The requeue functionality is not available.  */
wake_all:
lll_futex_wake (&cond->__data.__futex, INT_MAX, pshared);这里的INT_MAX就是告诉内核唤醒所有在这个变量上等待的线程。
}

/* That's all.  */
return 0;
}

/* We are done.  */
lll_unlock (cond->__data.__lock, pshared);

return 0;
}

  • 我的微信
  • 这是我的微信扫一扫
  • weinxin
  • 我的微信公众号
  • 我的微信公众号扫一扫
  • weinxin

发表评论

您必须才能发表评论!