On Thu, 6 May 2021 09:57:42 +0800 Yunsheng Lin wrote:
@@ -159,8 +160,37 @@ static inline bool qdisc_is_empty(const struct Qdisc *qdisc) static inline bool qdisc_run_begin(struct Qdisc *qdisc) { if (qdisc->flags & TCQ_F_NOLOCK) {
bool dont_retry = test_bit(__QDISC_STATE_MISSED,
&qdisc->state);
if (spin_trylock(&qdisc->seqlock))
goto nolock_empty;
/* If the flag is set before doing the spin_trylock() and
* the above spin_trylock() return false, it means other cpu
* holding the lock will do dequeuing for us, or it wil see
s/wil/will/
* the flag set after releasing lock and reschedule the
* net_tx_action() to do the dequeuing.
I don't understand why MISSED is checked before the trylock. Could you explain why it can't be tested directly here?
*/
if (dont_retry)
return false;
/* We could do set_bit() before the first spin_trylock(),
* and avoid doing second spin_trylock() completely, then
* we could have multi cpus doing the set_bit(). Here use
* dont_retry to avoid doing the set_bit() and the second
* spin_trylock(), which has 5% performance improvement than
* doing the set_bit() before the first spin_trylock().
*/
set_bit(__QDISC_STATE_MISSED, &qdisc->state);
/* Retry again in case other CPU may not see the new flag
* after it releases the lock at the end of qdisc_run_end().
if (!spin_trylock(&qdisc->seqlock)) return false;*/
+nolock_empty: WRITE_ONCE(qdisc->empty, false); } else if (qdisc_is_running(qdisc)) { return false; @@ -176,8 +206,13 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) static inline void qdisc_run_end(struct Qdisc *qdisc) { write_seqcount_end(&qdisc->running);
- if (qdisc->flags & TCQ_F_NOLOCK)
- if (qdisc->flags & TCQ_F_NOLOCK) { spin_unlock(&qdisc->seqlock);
if (unlikely(test_bit(__QDISC_STATE_MISSED,
&qdisc->state)))
__netif_schedule(qdisc);
- }
}
static inline bool qdisc_may_bulk(const struct Qdisc *qdisc) diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c index 44991ea..9bc73ea 100644 --- a/net/sched/sch_generic.c +++ b/net/sched/sch_generic.c @@ -640,8 +640,10 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) { struct pfifo_fast_priv *priv = qdisc_priv(qdisc); struct sk_buff *skb = NULL;
- bool need_retry = true; int band;
+retry: for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -652,6 +654,16 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } if (likely(skb)) { qdisc_update_stats_at_dequeue(qdisc, skb);
- } else if (need_retry &&
test_and_clear_bit(__QDISC_STATE_MISSED,
&qdisc->state)) {
Why test_and_clear_bit() here? AFAICT this is the only place the bit is cleared. So the test and clear do not have to be atomic.
To my limited understanding on x86 test_bit() is never a locked operation, while test_and_clear_bit() is always locked. So we'd save an atomic operation in un-contended case if we tested first and then cleared.
/* do another dequeuing after clearing the flag to
* avoid calling __netif_schedule().
*/
smp_mb__after_atomic();
test_and_clear_bit() which returned true implies a memory barrier, AFAIU, so the barrier is not needed with the code as is. It will be needed if we switch to test_bit() + clear_bit(), but please clarify what it is paring with.
need_retry = false;
} else { WRITE_ONCE(qdisc->empty, true); }goto retry;