======================================================= DESIGN-futex-CV.txt-with-async.cancelable-wait ======================================================= pthread_cond_{timed}wait(); pthread_cond_signal(); pthread_cond_broadcast(); pthread_cond_destroy(); // Bounds for signal/broadcast counting #define MIN -2 // INT_MIN #define MAX 1 // INT_MAX // Just to release ALL waiters #define ALL UINT_MAX // End-Of-Cycle value for signalling event counting #define EOC(futex) ((m_futex) < 0 ? -1 : MAX) struct pthread_cond_t { unsigned int lock: internal mutex. signed int futex: internal futex. cycling (next value: always ++): 0... MAX: cycle change/futex=MIN/WAKE ALL. MIN...-1: cycle change/futex= 0/WAKE ALL. cycle change/WAKE ALL is done by the *last* waiter on the *current* cycle (not counting EOC-waiters). pthread_cond_destroy() *enforces* End-Of-Cycle just to ensure *safe* destruction - synchronization with respect to *exiting* waiters (if any). unsigned int wakeups: number of "pending" wakeups caused by signal/broadcast. unsigned int waiters[2]: number of threads waiting on the cv ["when(*)"]: [0] - EOC(cv->futex) != cv->futex. [1] - EOC(cv->futex) == cv->futex. (*) at the time of their {timed}wait() call, [0] = [1]; [1] = 0; at the cycle change. }; // auto "context" object inside wait() or extra pthread_t's fields struct cond_wait_context_t { pthread_cond_t* cv; pthread_mutex_t* mutex; int futex; }; cond_wait_cleanup_handler(cond_wait_context_t* ctx) { cond_wait_cleanup (ctx->cv, ctx->mutex, ctx->futex); } cond_wait_cleanup_handler2(cond_wait_context_t* ctx) { // cond_signal (ctx->cv); <-- WRONG /* broadcast is needed to prevent some "late" waiter from stealing a signal targeted at the old one. Example: two threads A and B waiting (that's reflected in shared data associated with mutex). Thread C locks the mutex and sees that A and B are waiting. C cancels A, issues a *signal* meant to unblock B and goes to wait on the same CV afterwards. Now, if A can consume a signal (that was meant to unblock B), it does need to make a *broadcast* to unblock B because a single signal can be delivered to C... waking it "spuriously" (that's OK) but leaving B blocked. */ cond_broadcast (ctx->cv); } cond_wait_timeout(cv, mutex, timeout) { lock(cv->lock); mutex_unlock(mutex); // here or below ++cv->waiters[ EOC(cv->futex) == (context.futex = cv->futex) ]; unlock(cv->lock); // mutex_unlock(mutex); context.cv = cv; context.mutex = mutex; pthread_cleanup_push (cond_wait_cleanup_handler, &ctx); pthread_cleanup_push (cond_wait_cleanup_handler2, &ctx); enable_async_cancel(); FUTEX WAIT (cv->futex, futex, timeout); disable_async_cancel(); pthread_cleanup_pop (0); // cond_wait_cleanup_handler2 pthread_cleanup_pop (1); // cond_wait_cleanup_handler } cond_wait_cleanup(cv, mutex, futex, canceled) { lock(cv->lock); if ( futex != cv->futex ) { --cv->waiters[0]; if ( 0 != cv->wakeups && 0 == --cv->wakeups && EOC(cv->futex) == cv->futex ) { cv->futex = ( cv->futex < 0 ) ? 0 : MIN; cv->waiters[0] = cv->wakeups = cv->waiters[1]; cv->waiters[1] = 0; FUTEX WAKE (cv->futex, ALL); } } else --cv->waiters[ EOC(cv->futex) == futex ]; unlock(cv->lock); mutex_lock(mutex); } cond_signal(cv) { unsigned int do_wakeup; lock(cv->lock); if ( do_wakeup = (cv->waiters[0] > cv->wakeups) ) { if ( EOC(cv->futex) == ++cv->futex ) { do_wakeup = ALL; cv->wakeups = cv->waiters[0]; } else ++cv->wakeups; } unlock(cv->lock); if (do_wakeup) FUTEX WAKE (cv->futex, do_wakeup); } cond_broadcast(cv) { bool do_wakeup; lock(cv->lock); if ( do_wakeup = (cv->waiters[0] > cv->wakeups) ) { cv->wakeups = cv->waiters[0]; ++cv->futex; } unlock(cv->lock); if (do_wakeup) FUTEX WAKE (cv->futex, ALL); } cond_destroy(cv) { unsigned int futex; int old_cancel_state; lock(cv->lock); // optional EBUSY checking if ( cv->waiters[0] > cv->wakeups ) { unlock(cv->lock); return EBUSY; } while ( cv->waiters[0] ) { futex = cv->futex = EOC(cv->futex); unlock(cv->lock); FUTEX WAIT (cv->futex, futex); // NOT a cancelation point lock(cv->lock); } // unlock(cv->lock); // now it's safe to reclaim } it might generate spurious wakeups in >>both<< signal() *and* broadcast() case, but those are allowed by POSIX. =======================================================