On 03/16/2015 09:16 AM, Peter Zijlstra wrote:> Implement simple paravirt support for the qspinlock. > > Provide a separate (second) version of the spin_lock_slowpath for > paravirt along with a special unlock path. > > The second slowpath is generated by adding a few pv hooks to the > normal slowpath, but where those will compile away for the native > case, they expand into special wait/wake code for the pv version. > > The actual MCS queue can use extra storage in the mcs_nodes[] array to > keep track of state and therefore uses directed wakeups. > > The head contender has no such storage available and reverts to the > per-cpu lock entry similar to the current kvm code. We can do a single > enrty because any nesting will wake the vcpu and cause the lower loop > to retry. > > Signed-off-by: Peter Zijlstra (Intel)<peterz at infradead.org> > --- > include/asm-generic/qspinlock.h | 3 > kernel/locking/qspinlock.c | 69 +++++++++++++- > kernel/locking/qspinlock_paravirt.h | 177 ++++++++++++++++++++++++++++++++++++ > 3 files changed, 248 insertions(+), 1 deletion(-) > > --- a/include/asm-generic/qspinlock.h > +++ b/include/asm-generic/qspinlock.h > @@ -118,6 +118,9 @@ static __always_inline bool virt_queue_s > } > #endif > > +extern void __pv_queue_spin_lock_slowpath(struct qspinlock *lock, u32 val); > +extern void __pv_queue_spin_unlock(struct qspinlock *lock); > + > /* > * Initializier > */ > --- a/kernel/locking/qspinlock.c > +++ b/kernel/locking/qspinlock.c > @@ -18,6 +18,9 @@ > * Authors: Waiman Long<waiman.long at hp.com> > * Peter Zijlstra<peterz at infradead.org> > */ > + > +#ifndef _GEN_PV_LOCK_SLOWPATH > + > #include<linux/smp.h> > #include<linux/bug.h> > #include<linux/cpumask.h> > @@ -65,13 +68,21 @@ > > #include "mcs_spinlock.h" > > +#ifdef CONFIG_PARAVIRT_SPINLOCKS > +#define MAX_NODES 8 > +#else > +#define MAX_NODES 4 > +#endif > + > /* > * Per-CPU queue node structures; we can never have more than 4 nested > * contexts: task, softirq, hardirq, nmi. > * > * Exactly fits one 64-byte cacheline on a 64-bit architecture. > + * > + * PV doubles the storage and uses the second cacheline for PV state. > */ > -static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[4]); > +static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[MAX_NODES]); > > /* > * We must be able to distinguish between no-tail and the tail at 0:0, > @@ -230,6 +241,32 @@ static __always_inline void set_locked(s > WRITE_ONCE(l->locked, _Q_LOCKED_VAL); > } > > + > +/* > + * Generate the native code for queue_spin_unlock_slowpath(); provide NOPs for > + * all the PV callbacks. > + */ > + > +static __always_inline void __pv_init_node(struct mcs_spinlock *node) { } > +static __always_inline void __pv_wait_node(struct mcs_spinlock *node) { } > +static __always_inline void __pv_kick_node(struct mcs_spinlock *node) { } > + > +static __always_inline void __pv_wait_head(struct qspinlock *lock) { } > + > +#define pv_enabled() false > + > +#define pv_init_node __pv_init_node > +#define pv_wait_node __pv_wait_node > +#define pv_kick_node __pv_kick_node > + > +#define pv_wait_head __pv_wait_head > + > +#ifdef CONFIG_PARAVIRT_SPINLOCKS > +#define queue_spin_lock_slowpath native_queue_spin_lock_slowpath > +#endif > + > +#endif /* _GEN_PV_LOCK_SLOWPATH */ > + > /** > * queue_spin_lock_slowpath - acquire the queue spinlock > * @lock: Pointer to queue spinlock structure > @@ -259,6 +296,9 @@ void queue_spin_lock_slowpath(struct qsp > > BUILD_BUG_ON(CONFIG_NR_CPUS>= (1U<< _Q_TAIL_CPU_BITS)); > > + if (pv_enabled()) > + goto queue; > + > if (virt_queue_spin_lock(lock)) > return; > > @@ -335,6 +375,7 @@ void queue_spin_lock_slowpath(struct qsp > node += idx; > node->locked = 0; > node->next = NULL; > + pv_init_node(node); > > /* > * We touched a (possibly) cold cacheline in the per-cpu queue node; > @@ -360,6 +401,7 @@ void queue_spin_lock_slowpath(struct qsp > prev = decode_tail(old); > WRITE_ONCE(prev->next, node); > > + pv_wait_node(node); > arch_mcs_spin_lock_contended(&node->locked); > } > > @@ -374,6 +416,7 @@ void queue_spin_lock_slowpath(struct qsp > * sequentiality; this is because the set_locked() function below > * does not imply a full barrier. > */ > + pv_wait_head(lock); > while ((val = smp_load_acquire(&lock->val.counter))& _Q_LOCKED_PENDING_MASK) > cpu_relax(); > > @@ -406,6 +449,7 @@ void queue_spin_lock_slowpath(struct qsp > cpu_relax(); > > arch_mcs_spin_unlock_contended(&next->locked); > + pv_kick_node(next); > > release: > /* > @@ -414,3 +458,26 @@ void queue_spin_lock_slowpath(struct qsp > this_cpu_dec(mcs_nodes[0].count); > } > EXPORT_SYMBOL(queue_spin_lock_slowpath); > + > +/* > + * Generate the paravirt code for queue_spin_unlock_slowpath(). > + */ > +#if !defined(_GEN_PV_LOCK_SLOWPATH)&& defined(CONFIG_PARAVIRT_SPINLOCKS) > +#define _GEN_PV_LOCK_SLOWPATH > + > +#undef pv_enabled > +#define pv_enabled() true > + > +#undef pv_init_node > +#undef pv_wait_node > +#undef pv_kick_node > + > +#undef pv_wait_head > + > +#undef queue_spin_lock_slowpath > +#define queue_spin_lock_slowpath __pv_queue_spin_lock_slowpath > + > +#include "qspinlock_paravirt.h" > +#include "qspinlock.c" > + > +#endif > --- /dev/null > +++ b/kernel/locking/qspinlock_paravirt.h > @@ -0,0 +1,177 @@ > +#ifndef _GEN_PV_LOCK_SLOWPATH > +#error "do not include this file" > +#endif > + > +/* > + * Implement paravirt qspinlocks; the general idea is to halt the vcpus instead > + * of spinning them. > + * > + * This relies on the architecture to provide two paravirt hypercalls: > + * > + * pv_wait(u8 *ptr, u8 val) -- suspends the vcpu if *ptr == val > + * pv_kick(cpu) -- wakes a suspended vcpu > + * > + * Using these we implement __pv_queue_spin_lock_slowpath() and > + * __pv_queue_spin_unlock() to replace native_queue_spin_lock_slowpath() and > + * native_queue_spin_unlock(). > + */ > + > +#define _Q_SLOW_VAL (2U<< _Q_LOCKED_OFFSET) > + > +enum vcpu_state { > + vcpu_running = 0, > + vcpu_halted, > +}; > + > +struct pv_node { > + struct mcs_spinlock mcs; > + struct mcs_spinlock __res[3]; > + > + int cpu; > + u8 state; > +}; > + > +/* > + * Initialize the PV part of the mcs_spinlock node. > + */ > +static void pv_init_node(struct mcs_spinlock *node) > +{ > + struct pv_node *pn = (struct pv_node *)node; > + > + BUILD_BUG_ON(sizeof(struct pv_node)> 5*sizeof(struct mcs_spinlock)); > + > + pn->cpu = smp_processor_id(); > + pn->state = vcpu_running; > +} > + > +/* > + * Wait for node->locked to become true, halt the vcpu after a short spin. > + * pv_kick_node() is used to wake the vcpu again. > + */ > +static void pv_wait_node(struct mcs_spinlock *node) > +{ > + struct pv_node *pn = (struct pv_node *)node; > + int loop; > + > + for (;;) { > + for (loop = SPIN_THRESHOLD; loop; loop--) { > + if (READ_ONCE(node->locked)) > + goto done; > + > + cpu_relax(); > + } > + > + /* > + * Order pn->state vs pn->locked thusly: > + * > + * [S] pn->state = vcpu_halted [S] next->locked = 1 > + * MB MB > + * [L] pn->locked [RmW] pn->state = vcpu_running > + * > + * Matches the xchg() from pv_kick_node(). > + */ > + (void)xchg(&pn->state, vcpu_halted); > + > + if (READ_ONCE(node->locked)) > + goto done; > + > + pv_wait(&pn->state, vcpu_halted); > + } > +done: > + pn->state = vcpu_running; > + > + /* > + * By now our node->locked should be 1 and our caller will not actually > + * spin-wait for it. We do however rely on our caller to do a > + * load-acquire for us. > + */ > +} > + > +/* > + * Called after setting next->locked = 1, used to wake those stuck in > + * pv_wait_node(). > + */ > +static void pv_kick_node(struct mcs_spinlock *node) > +{ > + struct pv_node *pn = (struct pv_node *)node; > + > + /* > + * Note that because node->locked is already set, this actual mcs_spinlock > + * entry could be re-used already. > + * > + * This should be fine however, kicking people for no reason is harmless. > + * > + * See the comment in pv_wait_node(). > + */ > + if (xchg(&pn->state, vcpu_running) == vcpu_halted) > + pv_kick(pn->cpu); > +} > + > +static DEFINE_PER_CPU(struct qspinlock *, __pv_lock_wait); > + > +/* > + * Wait for l->locked to become clear; halt the vcpu after a short spin. > + * __pv_queue_spin_unlock() will wake us. > + */ > +static void pv_wait_head(struct qspinlock *lock) > +{ > + struct __qspinlock *l = (void *)lock; > + int loop; > + > + for (;;) { > + for (loop = SPIN_THRESHOLD; loop; loop--) { > + if (!READ_ONCE(l->locked)) > + goto done; > + > + cpu_relax(); > + } > + > + this_cpu_write(__pv_lock_wait, lock);We may run into the same problem of needing to have 4 queue nodes per CPU. If an interrupt happens just after the write and before the actual wait and it goes through the same sequence, it will overwrite the __pv_lock_wait[] entry. So we may have lost wakeup. That is why the pvticket lock code did that just before the actual wait with interrupt disabled. We probably couldn't disable interrupt here. So we may need to move the actual write to the KVM and Xen code if we keep the current logic.> + /* > + * __pv_lock_wait must be set before setting _Q_SLOW_VAL > + * > + * [S] __pv_lock_wait = lock [RmW] l = l->locked = 0 > + * MB MB > + * [S] l->locked = _Q_SLOW_VAL [L] __pv_lock_wait > + * > + * Matches the xchg() in pv_queue_spin_unlock(). > + */ > + if (!cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_SLOW_VAL)) > + goto done; > + > + pv_wait(&l->locked, _Q_SLOW_VAL); > + } > +done: > + this_cpu_write(__pv_lock_wait, NULL); > + > + /* > + * Lock is unlocked now; the caller will acquire it without waiting. > + * As with pv_wait_node() we rely on the caller to do a load-acquire > + * for us. > + */ > +} > + > +/* > + * To be used in stead of queue_spin_unlock() for paravirt locks. Wakes > + * pv_wait_head() if appropriate. > + */ > +void __pv_queue_spin_unlock(struct qspinlock *lock) > +{ > + struct __qspinlock *l = (void *)lock; > + int cpu; > + > + if (xchg(&l->locked, 0) != _Q_SLOW_VAL) > + return; > + > + /* > + * At this point the memory pointed at by lock can be freed/reused, > + * however we can still use the pointer value to search in our cpu > + * array. > + * > + * XXX: get rid of this loop > + */ > + for_each_possible_cpu(cpu) { > + if (per_cpu(__pv_lock_wait, cpu) == lock) > + pv_kick(cpu); > + } > +}I do want to get rid of this loop too. On average, we have to scan about half the number of CPUs available. So it isn't that different performance-wise compared with my original idea of following the list from tail to head. And how about your idea of propagating the current head down the linked list? -Longman -------------- next part -------------- An HTML attachment was scrubbed... URL: <http://lists.linuxfoundation.org/pipermail/virtualization/attachments/20150318/70d21582/attachment.html>
On Wed, Mar 18, 2015 at 04:50:37PM -0400, Waiman Long wrote:> >+ this_cpu_write(__pv_lock_wait, lock); > > We may run into the same problem of needing to have 4 queue nodes per CPU. > If an interrupt happens just after the write and before the actual wait and > it goes through the same sequence, it will overwrite the __pv_lock_wait[] > entry. So we may have lost wakeup. That is why the pvticket lock code did > that just before the actual wait with interrupt disabled. We probably > couldn't disable interrupt here. So we may need to move the actual write to > the KVM and Xen code if we keep the current logic.Hmm indeed. So I didn't actually mean to keep this code, but yes I missed that.> >+ /* > >+ * At this point the memory pointed at by lock can be freed/reused, > >+ * however we can still use the pointer value to search in our cpu > >+ * array. > >+ * > >+ * XXX: get rid of this loop > >+ */ > >+ for_each_possible_cpu(cpu) { > >+ if (per_cpu(__pv_lock_wait, cpu) == lock) > >+ pv_kick(cpu); > >+ } > >+} > > I do want to get rid of this loop too. On average, we have to scan about > half the number of CPUs available. So it isn't that different > performance-wise compared with my original idea of following the list from > tail to head. And how about your idea of propagating the current head down > the linked list?Yeah, so I was half done with that patch when I fell asleep on the flight home. Didn't get around to 'fixing' it. It applies and builds but doesn't actually work. _However_ I'm not sure I actually like it much. The problem I have with it are these wait loops, they can generate the same problem we're trying to avoid. So I was now thinking of hashing the lock pointer; let me go and quickly put something together. --- kernel/locking/qspinlock.c | 8 +- kernel/locking/qspinlock_paravirt.h | 124 ++++++++++++++++++++++++++++-------- 2 files changed, 101 insertions(+), 31 deletions(-) --- a/kernel/locking/qspinlock.c +++ b/kernel/locking/qspinlock.c @@ -246,10 +246,10 @@ static __always_inline void set_locked(s */ static __always_inline void __pv_init_node(struct mcs_spinlock *node) { } -static __always_inline void __pv_wait_node(struct mcs_spinlock *node) { } +static __always_inline void __pv_wait_node(u32 old, struct mcs_spinlock *node) { } static __always_inline void __pv_kick_node(struct mcs_spinlock *node) { } -static __always_inline void __pv_wait_head(struct qspinlock *lock) { } +static __always_inline void __pv_wait_head(struct qspinlock *lock, struct mcs_spinlock *node) { } #define pv_enabled() false @@ -399,7 +399,7 @@ void queue_spin_lock_slowpath(struct qsp prev = decode_tail(old); WRITE_ONCE(prev->next, node); - pv_wait_node(node); + pv_wait_node(old, node); arch_mcs_spin_lock_contended(&node->locked); } @@ -414,7 +414,7 @@ void queue_spin_lock_slowpath(struct qsp * sequentiality; this is because the set_locked() function below * does not imply a full barrier. */ - pv_wait_head(lock); + pv_wait_head(lock, node); while ((val = smp_load_acquire(&lock->val.counter)) & _Q_LOCKED_PENDING_MASK) cpu_relax(); --- a/kernel/locking/qspinlock_paravirt.h +++ b/kernel/locking/qspinlock_paravirt.h @@ -29,8 +29,14 @@ struct pv_node { int cpu; u8 state; + struct pv_node *head; }; +static inline struct pv_node *pv_decode_tail(u32 tail) +{ + return (struct pv_node *)decode_tail(tail); +} + /* * Initialize the PV part of the mcs_spinlock node. */ @@ -42,17 +48,49 @@ static void pv_init_node(struct mcs_spin pn->cpu = smp_processor_id(); pn->state = vcpu_running; + pn->head = NULL; +} + +static void pv_propagate_head(struct pv_node *pn, struct pv_node *ppn) +{ + /* + * When we race against the first waiter or ourselves we have to wait + * until the previous link updates its head pointer before we can + * propagate it. + */ + while (!READ_ONCE(ppn->head)) { + /* + * queue_spin_lock_slowpath could have been waiting for the + * node->next store above before setting node->locked. + */ + if (ppn->mcs.locked) + return; + + cpu_relax(); + } + /* + * If we interleave such that the store from pv_set_head() happens + * between our load and store we could have over-written the new head + * value. + * + * Therefore use cmpxchg to only propagate the old head value if our + * head value is unmodified. + */ + (void)cmpxchg(&pn->head, NULL, READ_ONCE(ppn->head)); } /* * Wait for node->locked to become true, halt the vcpu after a short spin. * pv_kick_node() is used to wake the vcpu again. */ -static void pv_wait_node(struct mcs_spinlock *node) +static void pv_wait_node(u32 old, struct mcs_spinlock *node) { + struct pv_node *ppn = pv_decode_tail(old); struct pv_node *pn = (struct pv_node *)node; int loop; + pv_propagate_head(pn, ppn); + for (;;) { for (loop = SPIN_THRESHOLD; loop; loop--) { if (READ_ONCE(node->locked)) @@ -107,13 +145,33 @@ static void pv_kick_node(struct mcs_spin pv_kick(pn->cpu); } -static DEFINE_PER_CPU(struct qspinlock *, __pv_lock_wait); +static void pv_set_head(struct qspinlock *lock, struct pv_node *head) +{ + struct pv_node *tail, *new_tail; + + new_tail = pv_decode_tail(atomic_read(&lock->val)); + do { + tail = new_tail; + while (!READ_ONCE(tail->head)) + cpu_relax(); + + (void)xchg(&tail->head, head); + /* + * pv_set_head() pv_wait_node() + * + * [S] tail->head, head [S] lock->tail + * MB MB + * [L] lock->tail [L] tail->head + */ + new_tail = pv_decode_tail(atomic_read(&lock->val)); + } while (tail != new_tail); +} /* * Wait for l->locked to become clear; halt the vcpu after a short spin. * __pv_queue_spin_unlock() will wake us. */ -static void pv_wait_head(struct qspinlock *lock) +static void pv_wait_head(struct qspinlock *lock, struct mcs_spinlock *node) { struct __qspinlock *l = (void *)lock; int loop; @@ -121,28 +179,24 @@ static void pv_wait_head(struct qspinloc for (;;) { for (loop = SPIN_THRESHOLD; loop; loop--) { if (!READ_ONCE(l->locked)) - goto done; + return; cpu_relax(); } - this_cpu_write(__pv_lock_wait, lock); + pv_set_head(lock, (struct pv_node *)node); /* - * __pv_lock_wait must be set before setting _Q_SLOW_VAL - * - * [S] __pv_lock_wait = lock [RmW] l = l->locked = 0 - * MB MB - * [S] l->locked = _Q_SLOW_VAL [L] __pv_lock_wait + * The head must be set before we set _Q_SLOW_VAL such that + * when pv_queue_spin_unlock() observes _Q_SLOW_VAL it must + * also observe the head pointer. * - * Matches the xchg() in pv_queue_spin_unlock(). + * We rely on the implied MB from the below cmpxchg() */ if (!cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_SLOW_VAL)) - goto done; + return; pv_wait(&l->locked, _Q_SLOW_VAL); } -done: - this_cpu_write(__pv_lock_wait, NULL); /* * Lock is unlocked now; the caller will acquire it without waiting. @@ -157,21 +211,37 @@ static void pv_wait_head(struct qspinloc */ void __pv_queue_spin_unlock(struct qspinlock *lock) { - struct __qspinlock *l = (void *)lock; - int cpu; + u32 old, val = atomic_read(&lock->val); + struct pv_node *pn = NULL; - if (xchg(&l->locked, 0) != _Q_SLOW_VAL) - return; + for (;;) { + if (val & _Q_SLOW_VAL) { + /* + * If we observe _Q_SLOW_VAL we must also observe + * pn->head; see pv_wait_head(); + */ + smp_rmb(); + pn = pv_decode_tail(val); + while (!READ_ONCE(pn->head)) + cpu_relax(); + pn = READ_ONCE(pn->head); + } + /* + * The cmpxchg() ensures the above loads are complete before + * we release the lock. + */ + old = atomic_cmpxchg(&lock->val, val, val & _Q_TAIL_MASK); + if (old == val) + break; + + val = old; + } /* - * At this point the memory pointed at by lock can be freed/reused, - * however we can still use the pointer value to search in our cpu - * array. - * - * XXX: get rid of this loop + * We've freed the lock so the lock storage can be gone; however since + * the pv_node structure is static storage we can safely use that + * still. */ - for_each_possible_cpu(cpu) { - if (per_cpu(__pv_lock_wait, cpu) == lock) - pv_kick(cpu); - } + if (pn) + pv_kick(pn->cpu); }
On Thu, Mar 19, 2015 at 11:12:42AM +0100, Peter Zijlstra wrote:> So I was now thinking of hashing the lock pointer; let me go and quickly > put something together.A little something like so; ideally we'd allocate the hashtable since NR_CPUS is kinda bloated, but it shows the idea I think. And while this has loops in (the rehashing thing) their fwd progress does not depend on other CPUs. And I suspect that for the typical lock contention scenarios its unlikely we ever really get into long rehashing chains. --- include/linux/lfsr.h | 49 ++++++++++++ kernel/locking/qspinlock_paravirt.h | 143 ++++++++++++++++++++++++++++++++---- 2 files changed, 178 insertions(+), 14 deletions(-) --- /dev/null +++ b/include/linux/lfsr.h @@ -0,0 +1,49 @@ +#ifndef _LINUX_LFSR_H +#define _LINUX_LFSR_H + +/* + * Simple Binary Galois Linear Feedback Shift Register + * + * http://en.wikipedia.org/wiki/Linear_feedback_shift_register + * + */ + +extern void __lfsr_needs_more_taps(void); + +static __always_inline u32 lfsr_taps(int bits) +{ + if (bits == 1) return 0x0001; + if (bits == 2) return 0x0001; + if (bits == 3) return 0x0003; + if (bits == 4) return 0x0009; + if (bits == 5) return 0x0012; + if (bits == 6) return 0x0021; + if (bits == 7) return 0x0041; + if (bits == 8) return 0x008E; + if (bits == 9) return 0x0108; + if (bits == 10) return 0x0204; + if (bits == 11) return 0x0402; + if (bits == 12) return 0x0829; + if (bits == 13) return 0x100D; + if (bits == 14) return 0x2015; + + /* + * For more taps see: + * http://users.ece.cmu.edu/~koopman/lfsr/index.html + */ + __lfsr_needs_more_taps(); + + return 0; +} + +static inline u32 lfsr(u32 val, int bits) +{ + u32 bit = val & 1; + + val >>= 1; + if (bit) + val ^= lfsr_taps(bits); + return val; +} + +#endif /* _LINUX_LFSR_H */ --- a/kernel/locking/qspinlock_paravirt.h +++ b/kernel/locking/qspinlock_paravirt.h @@ -2,6 +2,9 @@ #error "do not include this file" #endif +#include <linux/hash.h> +#include <linux/lfsr.h> + /* * Implement paravirt qspinlocks; the general idea is to halt the vcpus instead * of spinning them. @@ -107,7 +110,120 @@ static void pv_kick_node(struct mcs_spin pv_kick(pn->cpu); } -static DEFINE_PER_CPU(struct qspinlock *, __pv_lock_wait); +/* + * Hash table using open addressing with an LFSR probe sequence. + * + * Since we should not be holding locks from NMI context (very rare indeed) the + * max load factor is 0.75, which is around the point where open addressing + * breaks down. + * + * Instead of probing just the immediate bucket we probe all buckets in the + * same cacheline. + * + * http://en.wikipedia.org/wiki/Hash_table#Open_addressing + * + */ + +#define HB_RESERVED ((struct qspinlock *)1) + +struct pv_hash_bucket { + struct qspinlock *lock; + int cpu; +}; + +/* + * XXX dynamic allocate using nr_cpu_ids instead... + */ +#define PV_LOCK_HASH_BITS (2 + NR_CPUS_BITS) + +#if PV_LOCK_HASH_BITS < 6 +#undef PV_LOCK_HASH_BITS +#define PB_LOCK_HASH_BITS 6 +#endif + +#define PV_LOCK_HASH_SIZE (1 << PV_LOCK_HASH_BITS) + +static struct pv_hash_bucket __pv_lock_hash[PV_LOCK_HASH_SIZE] ____cacheline_aligned; + +#define PV_HB_PER_LINE (SMP_CACHE_BYTES / sizeof(struct pv_hash_bucket)) + +static inline u32 hash_align(u32 hash) +{ + return hash & ~(PV_HB_PER_LINE - 1); +} + +static struct qspinlock **pv_hash(struct qspinlock *lock) +{ + u32 hash = hash_ptr(lock, PV_LOCK_HASH_BITS); + struct pv_hash_bucket *hb, *end; + + if (!hash) + hash = 1; + + hb = &__pv_lock_hash[hash_align(hash)]; + for (;;) { + for (end = hb + PV_HB_PER_LINE; hb < end; hb++) { + if (cmpxchg(&hb->lock, NULL, HB_RESERVED)) { + WRITE_ONCE(hb->cpu, smp_processor_id()); + /* + * Since we must read lock first and cpu + * second, we must write cpu first and lock + * second, therefore use HB_RESERVE to mark an + * entry in use before writing the values. + * + * This can cause hb_hash_find() to not find a + * cpu even though _Q_SLOW_VAL, this is not a + * problem since we re-check l->locked before + * going to sleep and the unlock will have + * cleared l->locked already. + */ + smp_wmb(); /* matches rmb from pv_hash_find */ + WRITE_ONCE(hb->lock, lock); + goto done; + } + } + + hash = lfsr(hash, PV_LOCK_HASH_BITS); + hb = &__pv_lock_hash[hash_align(hash)]; + } + +done: + return &hb->lock; +} + +static int pv_hash_find(struct qspinlock *lock) +{ + u64 hash = hash_ptr(lock, PV_LOCK_HASH_BITS); + struct pv_hash_bucket *hb, *end; + int cpu = -1; + + if (!hash) + hash = 1; + + hb = &__pv_lock_hash[hash_align(hash)]; + for (;;) { + for (end = hb + PV_HB_PER_LINE; hb < end; hb++) { + struct qspinlock *l = READ_ONCE(hb->lock); + + /* + * If we hit an unused bucket, there is no match. + */ + if (!l) + goto done; + + if (l == lock) { + smp_rmb(); /* matches wmb from pv_hash() */ + cpu = READ_ONCE(hb->cpu); + goto done; + } + } + + hash = lfsr(hash, PV_LOCK_HASH_BITS); + hb = &__pv_lock_hash[hash_align(hash)]; + } +done: + return cpu; +} /* * Wait for l->locked to become clear; halt the vcpu after a short spin. @@ -116,6 +232,7 @@ static DEFINE_PER_CPU(struct qspinlock * static void pv_wait_head(struct qspinlock *lock) { struct __qspinlock *l = (void *)lock; + struct qspinlock **lp = NULL; int loop; for (;;) { @@ -126,13 +243,13 @@ static void pv_wait_head(struct qspinloc cpu_relax(); } - this_cpu_write(__pv_lock_wait, lock); + lp = pv_hash(lock); /* - * __pv_lock_wait must be set before setting _Q_SLOW_VAL + * lp must be set before setting _Q_SLOW_VAL * - * [S] __pv_lock_wait = lock [RmW] l = l->locked = 0 + * [S] lp = lock [RmW] l = l->locked = 0 * MB MB - * [S] l->locked = _Q_SLOW_VAL [L] __pv_lock_wait + * [S] l->locked = _Q_SLOW_VAL [L] lp * * Matches the xchg() in pv_queue_spin_unlock(). */ @@ -142,7 +259,8 @@ static void pv_wait_head(struct qspinloc pv_wait(&l->locked, _Q_SLOW_VAL); } done: - this_cpu_write(__pv_lock_wait, NULL); + if (lp) + WRITE_ONCE(*lp, NULL); /* * Lock is unlocked now; the caller will acquire it without waiting. @@ -165,13 +283,10 @@ void __pv_queue_spin_unlock(struct qspin /* * At this point the memory pointed at by lock can be freed/reused, - * however we can still use the pointer value to search in our cpu - * array. - * - * XXX: get rid of this loop + * however we can still use the pointer value to search in our hash + * table. */ - for_each_possible_cpu(cpu) { - if (per_cpu(__pv_lock_wait, cpu) == lock) - pv_kick(cpu); - } + cpu = pv_hash_find(lock); + if (cpu >= 0) + pv_kick(cpu); }