On 06/15/2014 08:47 AM, Peter Zijlstra wrote:> > > > +#ifdef CONFIG_PARAVIRT_SPINLOCKS > + > +/* > + * Write a comment about how all this works... > + */ > + > +#define _Q_LOCKED_SLOW (2U<< _Q_LOCKED_OFFSET) > + > +struct pv_node { > + struct mcs_spinlock mcs; > + struct mcs_spinlock __offset[3]; > + int cpu, head; > +};I am wondering why you need the separate cpu and head variables. I thought one will be enough here. The wait code put the cpu number in head, the the kick_cpu code kick the one in cpu which is just the cpu # of the tail.> + > +#define INVALID_HEAD -1 > +#define NO_HEAD nr_cpu_ids > +I think it is better to use a constant like -2 for NO_HEAD instead of an external variable.> +void __pv_init_node(struct mcs_spinlock *node) > +{ > + struct pv_node *pn = (struct pv_node *)node; > + > + BUILD_BUG_ON(sizeof(struct pv_node)> 5*sizeof(struct mcs_spinlock)); > + > + pn->cpu = smp_processor_id(); > + pn->head = INVALID_HEAD; > +} > + > +static inline struct pv_node *pv_decode_tail(u32 tail) > +{ > + return (struct pv_node *)decode_tail(tail); > +} > + > +void __pv_link_and_wait_node(u32 old, struct mcs_spinlock *node) > +{ > + struct pv_node *ppn, *pn = (struct pv_node *)node; > + unsigned int count; > + > + if (!(old& _Q_TAIL_MASK)) { > + pn->head = NO_HEAD; > + return; > + } > + > + ppn = pv_decode_tail(old); > + ACCESS_ONCE(ppn->mcs.next) = node; > + > + while (ppn->head == INVALID_HEAD) > + cpu_relax(); > + > + pn->head = ppn->head;A race can happen here as pn->head can be changed to the head cpu by the head waiter while being changed by this function at the same time. It is safer to use cmpxchg to make sure that there is no accidental overwriting of the head CPU number.> + > + for (;;) { > + count = SPIN_THRESHOLD; > + > + do { > + if (smp_load_acquire(&node->locked)) > + return; > + > + cpu_relax(); > + } while (--count); > + > + pv_wait(&node->locked, 1); > + } > +} > + > +void __pv_kick_node(struct mcs_spinlock *node) > +{ > + struct pv_node *pn = (struct pv_node *)node; > + > + pv_kick(pn->cpu); > +} > + > +void __pv_wait_head(struct qspinlock *lock) > +{ > + unsigned int count; > + struct pv_node *pn; > + int val, old, new; > + > + for (;;) { > + count = SPIN_THRESHOLD; > + > + do { > + val = smp_load_acquire(&lock->val.counter); > + if (!(val& _Q_LOCKED_PENDING_MASK)) > + return; > + } while (--count); > + > + do { > + pn = pv_decode_tail(atomic_read(&lock->val)); > + > + while (pn->head == INVALID_HEAD) > + cpu_relax(); > + > + pn->head = smp_processor_id(); > + > + } while (pn != pv_decode_tail(atomic_read(&lock->val))); > + > + /* > + * Set _Q_LOCKED_SLOW; bail when the lock is free. > + */ > + val = atomic_read(&lock->val); > + for (;;) { > + if (!(val& _Q_LOCKED_PENDING_MASK)) > + return; > + new = val | _Q_LOCKED_SLOW; > + old = atomic_cmpxchg(&lock->val, val, new); > + if (old == val) > + break; > + val = old; > + } > + > + /* XXX 16bit would be better */ > + pv_wait(&lock->val.counter, new); > + } > +} > + > +static void ___pv_kick_head(struct qspinlock *lock) > +{ > + struct pv_node *pn; > + > + pn = pv_decode_tail(atomic_read(&lock->val)); > + > + while (pn->head == INVALID_HEAD) > + cpu_relax(); > + > + if (WARN_ON_ONCE(pn->head == NO_HEAD)) > + return; > + > + pv_kick(pn->head); > +} > + > +void __pv_queue_unlock(struct qspinlock *lock) > +{ > + int val = atomic_read(&lock->val); > + > + native_queue_unlock(lock); > + > + if (val& _Q_LOCKED_SLOW) > + ___pv_kick_head(lock); > +} > +Again a race can happen here between the reading and writing of the lock value. I can't think of a good way to do that without using cmpxchg.> +#else > + > +static inline void pv_init_node(struct mcs_spinlock *node) { } > +static inline void pv_link_and_wait_node(u32 old, struct mcs_spinlock *node) { } > +static inline void pv_kick_node(struct mcs_spinlock *node) { } > + > +static inline void pv_wait_head(struct qspinlock *lock) { } > + > +#endif > + > /** > * queue_spin_lock_slowpath - acquire the queue spinlock > * @lock: Pointer to queue spinlock structure > @@ -247,6 +417,9 @@ void queue_spin_lock_slowpath(struct qsp > > BUILD_BUG_ON(CONFIG_NR_CPUS>= (1U<< _Q_TAIL_CPU_BITS)); > > + if (pv_enabled()) > + goto queue; > + > if (virt_queue_spin_lock(lock)) > return; > > @@ -323,6 +496,7 @@ void queue_spin_lock_slowpath(struct qsp > node += idx; > node->locked = 0; > node->next = NULL; > + pv_init_node(node); > > /* > * We touched a (possibly) cold cacheline in the per-cpu queue node; > @@ -343,6 +517,7 @@ void queue_spin_lock_slowpath(struct qsp > /* > * if there was a previous node; link it and wait. > */ > + pv_link_and_wait_node(old, node); > if (old& _Q_TAIL_MASK) { > prev = decode_tail(old); > ACCESS_ONCE(prev->next) = node; > @@ -358,6 +533,7 @@ void queue_spin_lock_slowpath(struct qsp > * > * *,x,y -> *,0,0 > */ > + pv_wait_head(lock); > while ((val = smp_load_acquire(&lock->val.counter))& > _Q_LOCKED_PENDING_MASK) > cpu_relax(); > @@ -391,6 +567,7 @@ void queue_spin_lock_slowpath(struct qsp > cpu_relax(); > > arch_mcs_spin_unlock_contended(&next->locked); > + pv_kick_node(next); >pv_kick_node is an expensive operation and it can significantly slow down the locking operation if we have to do it for every subsequent task in the queue. -Longman -------------- next part -------------- An HTML attachment was scrubbed... URL: <http://lists.linuxfoundation.org/pipermail/virtualization/attachments/20140616/4ac6ab07/attachment.html>
Il 17/06/2014 00:08, Waiman Long ha scritto:>> +void __pv_queue_unlock(struct qspinlock *lock) >> +{ >> + int val = atomic_read(&lock->val); >> + >> + native_queue_unlock(lock); >> + >> + if (val & _Q_LOCKED_SLOW) >> + ___pv_kick_head(lock); >> +} >> + > > Again a race can happen here between the reading and writing of the lock > value. I can't think of a good way to do that without using cmpxchg.Could you just use xchg on the locked byte? Paolo
On 06/18/2014 08:03 AM, Paolo Bonzini wrote:> Il 17/06/2014 00:08, Waiman Long ha scritto: >>> +void __pv_queue_unlock(struct qspinlock *lock) >>> +{ >>> + int val = atomic_read(&lock->val); >>> + >>> + native_queue_unlock(lock); >>> + >>> + if (val & _Q_LOCKED_SLOW) >>> + ___pv_kick_head(lock); >>> +} >>> + >> >> Again a race can happen here between the reading and writing of the lock >> value. I can't think of a good way to do that without using cmpxchg. > > Could you just use xchg on the locked byte? > > PaoloThe slowpath flag is just an indication that the queue head cpu might have been suspended. It may not be due to spurious wakeup. Releasing the lock unconditionally may cause the queue to be changed while it is being inspected. It really depending on how the cpu kicking is being handled. My patch delays the unlocking until all the inspections had been done to make sure that we don't waste time doing a cpu kick that is not needed. -Longman
On Mon, Jun 16, 2014 at 06:08:21PM -0400, Waiman Long wrote:> On 06/15/2014 08:47 AM, Peter Zijlstra wrote: > >+struct pv_node { > >+ struct mcs_spinlock mcs; > >+ struct mcs_spinlock __offset[3]; > >+ int cpu, head; > >+}; > > I am wondering why you need the separate cpu and head variables. I thought > one will be enough here. The wait code put the cpu number in head, the the > kick_cpu code kick the one in cpu which is just the cpu # of the tail.The @cpu is the current cpu, the @head is the encoded pointer to the queue head, they aren't necessarily the same. The @head thing is not unlike your next pointer, just backwards.> >+#define INVALID_HEAD -1 > >+#define NO_HEAD nr_cpu_ids > >+ > > I think it is better to use a constant like -2 for NO_HEAD instead of an > external variable.Sure..> >+void __pv_init_node(struct mcs_spinlock *node) > >+{ > >+ struct pv_node *pn = (struct pv_node *)node; > >+ > >+ BUILD_BUG_ON(sizeof(struct pv_node)> 5*sizeof(struct mcs_spinlock)); > >+ > >+ pn->cpu = smp_processor_id(); > >+ pn->head = INVALID_HEAD; > >+} > >+ > >+static inline struct pv_node *pv_decode_tail(u32 tail) > >+{ > >+ return (struct pv_node *)decode_tail(tail); > >+} > >+ > >+void __pv_link_and_wait_node(u32 old, struct mcs_spinlock *node) > >+{ > >+ struct pv_node *ppn, *pn = (struct pv_node *)node; > >+ unsigned int count; > >+ > >+ if (!(old& _Q_TAIL_MASK)) { > >+ pn->head = NO_HEAD; > >+ return; > >+ } > >+ > >+ ppn = pv_decode_tail(old); > >+ ACCESS_ONCE(ppn->mcs.next) = node; > >+ > >+ while (ppn->head == INVALID_HEAD) > >+ cpu_relax(); > >+ > >+ pn->head = ppn->head; > > A race can happen here as pn->head can be changed to the head cpu by the > head waiter while being changed by this function at the same time. It is > safer to use cmpxchg to make sure that there is no accidental overwriting of > the head CPU number.Ok, so I'm not entirely sure I see the race, although its entirely possible, this is far too fragile. But I couldn't get rid of the race with cmpxchg/xchg either. So the idea is 'simple'; have link_and_wait propagate the 'head' 'pointer' from the old to the new tail, and have wait_head set the 'head' pointer on the current tail every time the top waiter goes to sleep. There's the obvious race where both happen at the same time and you're not sure which 'head' 'pointer' won. To solve that what I did was: init: INVALID_HEAD link_and_wait: INVALID_HEAD -> pprev->head , NO_HEAD wait_head: !INVALID_HEAD -> new head This way wait_head must wait for link_and_wait to finish before writing the new head value. Furthermore, if we race such that we obtained the 'old' tail and link_and_wait propagated the 'old' head to the 'new' tail, wait_head will detect this by verifying the tail pointer after writing the new head. We don't need atomics here afaict, but we have wait loops, which of course suck arse for virt :/ I'm not too fond of this scheme; but I thought I'd try and get rid of that O(n) loop you had for finding the head, we simply cannot assume 'small' number of vcpus.> >+void __pv_queue_unlock(struct qspinlock *lock) > >+{ > >+ int val = atomic_read(&lock->val); > >+ > >+ native_queue_unlock(lock); > >+ > >+ if (val& _Q_LOCKED_SLOW) > >+ ___pv_kick_head(lock); > >+} > >+ > > Again a race can happen here between the reading and writing of the lock > value. I can't think of a good way to do that without using cmpxchg.Indeed so, xchg it is I suppose :/> >@@ -358,6 +533,7 @@ void queue_spin_lock_slowpath(struct qsp > > * > > * *,x,y -> *,0,0 > > */ > >+ pv_wait_head(lock); > > while ((val = smp_load_acquire(&lock->val.counter))& > > _Q_LOCKED_PENDING_MASK) > > cpu_relax(); > >@@ -391,6 +567,7 @@ void queue_spin_lock_slowpath(struct qsp > > cpu_relax(); > > > > arch_mcs_spin_unlock_contended(&next->locked); > >+ pv_kick_node(next); > > pv_kick_node is an expensive operation and it can significantly slow down > the locking operation if we have to do it for every subsequent task in the > queue.You might by now have noticed that I don't particularly care too much about (para)virt performance :-) Also, I'm very much trying to get 'simple' things working before making them more complex. -------------- next part -------------- A non-text attachment was scrubbed... Name: not available Type: application/pgp-signature Size: 836 bytes Desc: not available URL: <http://lists.linuxfoundation.org/pipermail/virtualization/attachments/20140707/fc0dabe1/attachment.sig>
On Wed, Jun 18, 2014 at 02:03:12PM +0200, Paolo Bonzini wrote:> Il 17/06/2014 00:08, Waiman Long ha scritto: > >>+void __pv_queue_unlock(struct qspinlock *lock) > >>+{ > >>+ int val = atomic_read(&lock->val); > >>+ > >>+ native_queue_unlock(lock); > >>+ > >>+ if (val & _Q_LOCKED_SLOW) > >>+ ___pv_kick_head(lock); > >>+} > >>+ > > > >Again a race can happen here between the reading and writing of the lock > >value. I can't think of a good way to do that without using cmpxchg. > > Could you just use xchg on the locked byte?I'll have to, indeed. This is racy. -------------- next part -------------- A non-text attachment was scrubbed... Name: not available Type: application/pgp-signature Size: 836 bytes Desc: not available URL: <http://lists.linuxfoundation.org/pipermail/virtualization/attachments/20140707/1a5b8a57/attachment-0001.sig>