Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calles sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue().
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com --- include/net/sch_generic.h | 7 +++++-- net/core/dev.c | 11 +++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/include/net/sch_generic.h b/include/net/sch_generic.h index 2d6eb60..6591356 100644 --- a/include/net/sch_generic.h +++ b/include/net/sch_generic.h @@ -161,7 +161,6 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) if (qdisc->flags & TCQ_F_NOLOCK) { if (!spin_trylock(&qdisc->seqlock)) return false; - WRITE_ONCE(qdisc->empty, false); } else if (qdisc_is_running(qdisc)) { return false; } @@ -176,8 +175,12 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) static inline void qdisc_run_end(struct Qdisc *qdisc) { write_seqcount_end(&qdisc->running); - if (qdisc->flags & TCQ_F_NOLOCK) + if (qdisc->flags & TCQ_F_NOLOCK) { spin_unlock(&qdisc->seqlock); + + if (unlikely(!READ_ONCE(qdisc->empty))) + __netif_schedule(qdisc); + } }
static inline bool qdisc_may_bulk(const struct Qdisc *qdisc) diff --git a/net/core/dev.c b/net/core/dev.c index 2bfdd52..fa8504d 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -3791,7 +3791,18 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, qdisc_calculate_pkt_len(skb, q);
if (q->flags & TCQ_F_NOLOCK) { + if (q->flags & TCQ_F_CAN_BYPASS && READ_ONCE(q->empty) && qdisc_run_begin(q)) { + qdisc_bstats_cpu_update(q, skb); + + if (sch_direct_xmit(skb, q, dev, txq, NULL, true) && !READ_ONCE(q->empty)) + __qdisc_run(q); + + qdisc_run_end(q); + return NET_XMIT_SUCCESS; + } + rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK; + WRITE_ONCE(q->empty, false); qdisc_run(q);
if (unlikely(to_free))
Hi Yunsheng,
On Sat, Mar 13, 2021 at 10:47:47AM +0800, Yunsheng Lin wrote:
Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calles sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue().
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com
I can confirm the ~10% IP forwarding throughput improvement brought by this patch, but as you might be aware, there was a previous attempt to add qdisc bypass to pfifo_fast by Paolo Abeni: https://lore.kernel.org/netdev/661cc33a-5f65-2769-cc1a-65791cb4b131@pengutro... It was reverted because TX reordering was observed with SocketCAN (although, presumably it should also be seen with Ethernet and such).
In fact I have a setup with two NXP LS1028A-RDB boards (which use the drivers/net/can/flexcan.c driver and the pfifo_fast qdisc):
+-----------+ +-----------+ | | | | | Generator | | DUT | | |--------------------->| | | canfdtest | reflector | canfdtest | | |<---------------------| | | can1 | | can0 | | | | | +-----------+ +-----------+
where reordering happens in the TX side of the DUT and is noticed in the RX side of the generator. The test frames are classic CAN, not CAN FD.
I was able to run the canfdtest described above successfully for several hours (10 million CAN frames) on the current net-next (HEAD at commit 34bb97512641 ("net: fddi: skfp: Mundane typo fixes throughout the file smt.h")) with no reordering.
Then, after applying your patch, I am seeing TX reordering within a few minutes (less than 100K frames sent), therefore this reintroduces the bug due to which Paolo's patch was reverted.
Sadly I am not knowledgeable enough to give you any hints as to what is going wrong, but in case you have any ideas for debug, I would be glad to test them out on my boards.
Cc += linux-can@vger.kernel.org
On 3/14/21 1:03 AM, Vladimir Oltean wrote:
On Sat, Mar 13, 2021 at 10:47:47AM +0800, Yunsheng Lin wrote:
Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calles sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue().
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com
I can confirm the ~10% IP forwarding throughput improvement brought by this patch, but as you might be aware, there was a previous attempt to add qdisc bypass to pfifo_fast by Paolo Abeni: https://lore.kernel.org/netdev/661cc33a-5f65-2769-cc1a-65791cb4b131@pengutro... It was reverted because TX reordering was observed with SocketCAN (although, presumably it should also be seen with Ethernet and such).
Thanks for testing that, I just stumbled over this patch by accident.
Marc
On 2021/3/14 18:15, Marc Kleine-Budde wrote:
Cc += linux-can@vger.kernel.org
On 3/14/21 1:03 AM, Vladimir Oltean wrote:
On Sat, Mar 13, 2021 at 10:47:47AM +0800, Yunsheng Lin wrote:
Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calles sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue().
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com
I can confirm the ~10% IP forwarding throughput improvement brought by this patch, but as you might be aware, there was a previous attempt to add qdisc bypass to pfifo_fast by Paolo Abeni: https://lore.kernel.org/netdev/661cc33a-5f65-2769-cc1a-65791cb4b131@pengutro...
Thanks for mention the previous attempt to add qdisc bypass to pfifo_fast.
It was reverted because TX reordering was observed with SocketCAN (although, presumably it should also be seen with Ethernet and such).
When writing this patch, I was more foucusing on packet stuck problem when TCQ_F_CAN_BYPASS is added for lockless qdisc.
When I am looking at flexcan_start_xmit() used by the can driver you mentioned, it calls netif_stop_queue() to disable the queue when sending each skb, which may cuause other skb to be requeued, see dev_requeue_skb() called by sch_direct_xmit(), and q->empty is still true when this happens, so other cpu may send skb directly bypassing the requeued skb, causing an out of order problem.
I will try to deal with the above requeued skb problem, and see if there are other timing issus beside the requeued skb problem.
Thanks for the testing again.
Thanks for testing that, I just stumbled over this patch by accident.
Marc
Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calls sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, see pfifo_fast_enqueue(), and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue(), a spinlock is added to avoid the race between enqueue/dequeue and qdisc->empty setting.
If there is requeued skb in q->gso_skb, and qdisc->empty is true, do not allow bypassing requeued skb. enqueuing and dequeuing in q->gso_skb is always protected by qdisc->seqlock, so is the access of q->gso_skb by skb_queue_empty();
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com --- RFC V2: fix requeued skb out of order and data race problem. --- include/net/pkt_sched.h | 2 ++ include/net/sch_generic.h | 7 +++++-- net/core/dev.c | 14 ++++++++++++++ net/sched/sch_generic.c | 31 ++++++++++++++++++++++++++++++- 4 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/include/net/pkt_sched.h b/include/net/pkt_sched.h index f5c1bee..c760f6a 100644 --- a/include/net/pkt_sched.h +++ b/include/net/pkt_sched.h @@ -122,6 +122,8 @@ void qdisc_warn_nonwc(const char *txt, struct Qdisc *qdisc); bool sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, struct net_device *dev, struct netdev_queue *txq, spinlock_t *root_lock, bool validate); +bool sch_may_need_requeuing(struct sk_buff *skb, struct Qdisc *q, + struct net_device *dev);
void __qdisc_run(struct Qdisc *q);
diff --git a/include/net/sch_generic.h b/include/net/sch_generic.h index 2d6eb60..6591356 100644 --- a/include/net/sch_generic.h +++ b/include/net/sch_generic.h @@ -161,7 +161,6 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) if (qdisc->flags & TCQ_F_NOLOCK) { if (!spin_trylock(&qdisc->seqlock)) return false; - WRITE_ONCE(qdisc->empty, false); } else if (qdisc_is_running(qdisc)) { return false; } @@ -176,8 +175,12 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) static inline void qdisc_run_end(struct Qdisc *qdisc) { write_seqcount_end(&qdisc->running); - if (qdisc->flags & TCQ_F_NOLOCK) + if (qdisc->flags & TCQ_F_NOLOCK) { spin_unlock(&qdisc->seqlock); + + if (unlikely(!READ_ONCE(qdisc->empty))) + __netif_schedule(qdisc); + } }
static inline bool qdisc_may_bulk(const struct Qdisc *qdisc) diff --git a/net/core/dev.c b/net/core/dev.c index 2bfdd52..8f4afb6 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -3791,6 +3791,20 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, qdisc_calculate_pkt_len(skb, q);
if (q->flags & TCQ_F_NOLOCK) { + if (q->flags & TCQ_F_CAN_BYPASS && READ_ONCE(q->empty) && + qdisc_run_begin(q)) { + qdisc_bstats_cpu_update(q, skb); + + if (sch_may_need_requeuing(skb, q, dev)) + __qdisc_run(q); + else if (sch_direct_xmit(skb, q, dev, txq, NULL, true) && + !READ_ONCE(q->empty)) + __qdisc_run(q); + + qdisc_run_end(q); + return NET_XMIT_SUCCESS; + } + rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK; qdisc_run(q);
diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c index 49eae93..0df1462 100644 --- a/net/sched/sch_generic.c +++ b/net/sched/sch_generic.c @@ -273,6 +273,23 @@ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate, return skb; }
+bool sch_may_need_requeuing(struct sk_buff *skb, struct Qdisc *q, + struct net_device *dev) +{ + bool again = false; + + if (likely(skb_queue_empty(&q->gso_skb))) + return false; + + /* need validating before requeuing */ + skb = validate_xmit_skb_list(skb, dev, &again); + if (unlikely(!skb)) + return true; + + dev_requeue_skb(skb, q); + return true; +} + /* * Transmit possibly several skbs, and handle the return status as * required. Owning running seqcount bit guarantees that @@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { */ struct pfifo_fast_priv { struct skb_array q[PFIFO_FAST_BANDS]; + + /* protect against data race between enqueue/dequeue and + * qdisc->empty setting + */ + spinlock_t lock; };
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, unsigned int pkt_len = qdisc_pkt_len(skb); int err;
- err = skb_array_produce(q, skb); + spin_lock(&priv->lock); + err = __ptr_ring_produce(&q->ring, skb); + WRITE_ONCE(qdisc->empty, false); + spin_unlock(&priv->lock);
if (unlikely(err)) { if (qdisc_is_percpu_stats(qdisc)) @@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) struct sk_buff *skb = NULL; int band;
+ spin_lock(&priv->lock); for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } else { WRITE_ONCE(qdisc->empty, true); } + spin_unlock(&priv->lock);
return skb; } @@ -739,6 +766,8 @@ static int pfifo_fast_init(struct Qdisc *qdisc, struct nlattr *opt,
/* Can by-pass the queue discipline */ qdisc->flags |= TCQ_F_CAN_BYPASS; + + spin_lock_init(&priv->lock); return 0; }
On Mon, Mar 15, 2021 at 11:10:18AM +0800, Yunsheng Lin wrote:
Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calls sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, see pfifo_fast_enqueue(), and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue(), a spinlock is added to avoid the race between enqueue/dequeue and qdisc->empty setting.
If there is requeued skb in q->gso_skb, and qdisc->empty is true, do not allow bypassing requeued skb. enqueuing and dequeuing in q->gso_skb is always protected by qdisc->seqlock, so is the access of q->gso_skb by skb_queue_empty();
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com
RFC V2: fix requeued skb out of order and data race problem.
This is great. Looks like requeued skbs bypassing the qdisc were indeed the problem. I ran my stress test for 4:30 hours (7.7 million CAN frames) and all were received in the same order as canfdtest enqueued them in the socket.
I'll let the test run some more, just thought I'd let you know that things are looking good so far. I'll leave you a Tested-by when you submit the final version of the patch as non-RFC.
On 3/15/21 4:10 AM, Yunsheng Lin wrote:
Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calls sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, see pfifo_fast_enqueue(), and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue(), a spinlock is added to avoid the race between enqueue/dequeue and qdisc->empty setting.
If there is requeued skb in q->gso_skb, and qdisc->empty is true, do not allow bypassing requeued skb. enqueuing and dequeuing in q->gso_skb is always protected by qdisc->seqlock, so is the access of q->gso_skb by skb_queue_empty();
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com
I gave it a short test on a single core imx. No problem here.
Marc
On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote:
@@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { */ struct pfifo_fast_priv { struct skb_array q[PFIFO_FAST_BANDS];
- /* protect against data race between enqueue/dequeue and
* qdisc->empty setting
*/
- spinlock_t lock;
};
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, unsigned int pkt_len = qdisc_pkt_len(skb); int err;
- err = skb_array_produce(q, skb);
spin_lock(&priv->lock);
err = __ptr_ring_produce(&q->ring, skb);
WRITE_ONCE(qdisc->empty, false);
spin_unlock(&priv->lock);
if (unlikely(err)) { if (qdisc_is_percpu_stats(qdisc))
@@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) struct sk_buff *skb = NULL; int band;
- spin_lock(&priv->lock); for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } else { WRITE_ONCE(qdisc->empty, true); }
spin_unlock(&priv->lock);
return skb;
}
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
On 2021/3/16 2:53, Jakub Kicinski wrote:
On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote:
@@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { */ struct pfifo_fast_priv { struct skb_array q[PFIFO_FAST_BANDS];
- /* protect against data race between enqueue/dequeue and
* qdisc->empty setting
*/
- spinlock_t lock;
};
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, unsigned int pkt_len = qdisc_pkt_len(skb); int err;
- err = skb_array_produce(q, skb);
spin_lock(&priv->lock);
err = __ptr_ring_produce(&q->ring, skb);
WRITE_ONCE(qdisc->empty, false);
spin_unlock(&priv->lock);
if (unlikely(err)) { if (qdisc_is_percpu_stats(qdisc))
@@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) struct sk_buff *skb = NULL; int band;
- spin_lock(&priv->lock); for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } else { WRITE_ONCE(qdisc->empty, true); }
spin_unlock(&priv->lock);
return skb;
}
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
Yes, the lock breaks the "lockless" of the lockless qdisc for now I do not how to solve the below data race locklessly:
CPU1: CPU2: dequeue skb . . . . enqueue skb . . . WRITE_ONCE(qdisc->empty, false); . . . . WRITE_ONCE(qdisc->empty, true);
If the above happens, the qdisc->empty is true even if the qdisc has some skb, which may cuase out of order or packet stuck problem.
It seems we may need to update ptr_ring' status(empty or not) while enqueuing/dequeuing atomically in the ptr_ring implementation.
Any better idea?
.
On 2021/3/16 8:35, Yunsheng Lin wrote:
On 2021/3/16 2:53, Jakub Kicinski wrote:
On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote:
@@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { */ struct pfifo_fast_priv { struct skb_array q[PFIFO_FAST_BANDS];
- /* protect against data race between enqueue/dequeue and
* qdisc->empty setting
*/
- spinlock_t lock;
};
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, unsigned int pkt_len = qdisc_pkt_len(skb); int err;
- err = skb_array_produce(q, skb);
spin_lock(&priv->lock);
err = __ptr_ring_produce(&q->ring, skb);
WRITE_ONCE(qdisc->empty, false);
spin_unlock(&priv->lock);
if (unlikely(err)) { if (qdisc_is_percpu_stats(qdisc))
@@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) struct sk_buff *skb = NULL; int band;
- spin_lock(&priv->lock); for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } else { WRITE_ONCE(qdisc->empty, true); }
spin_unlock(&priv->lock);
return skb;
}
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
Yes, the lock breaks the "lockless" of the lockless qdisc for now I do not how to solve the below data race locklessly:
CPU1: CPU2: dequeue skb . . . . enqueue skb . . . WRITE_ONCE(qdisc->empty, false); . . . . WRITE_ONCE(qdisc->empty, true);
If the above happens, the qdisc->empty is true even if the qdisc has some skb, which may cuase out of order or packet stuck problem.
It seems we may need to update ptr_ring' status(empty or not) while enqueuing/dequeuing atomically in the ptr_ring implementation.
Any better idea?
It seems we can use __ptr_ring_empty() within the qdisc->seqlock protection, because qdisc->seqlock is clearly served as r->consumer_lock.
.
Linuxarm mailing list -- linuxarm@openeuler.org To unsubscribe send an email to linuxarm-leave@openeuler.org
On Tue, Mar 16, 2021 at 1:35 AM Yunsheng Lin linyunsheng@huawei.com wrote:
On 2021/3/16 2:53, Jakub Kicinski wrote:
On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote:
@@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { */ struct pfifo_fast_priv { struct skb_array q[PFIFO_FAST_BANDS];
- /* protect against data race between enqueue/dequeue and
* qdisc->empty setting
*/
- spinlock_t lock;
};
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, unsigned int pkt_len = qdisc_pkt_len(skb); int err;
- err = skb_array_produce(q, skb);
spin_lock(&priv->lock);
err = __ptr_ring_produce(&q->ring, skb);
WRITE_ONCE(qdisc->empty, false);
spin_unlock(&priv->lock);
if (unlikely(err)) { if (qdisc_is_percpu_stats(qdisc))
@@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) struct sk_buff *skb = NULL; int band;
- spin_lock(&priv->lock); for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } else { WRITE_ONCE(qdisc->empty, true); }
spin_unlock(&priv->lock);
return skb;
}
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
Yes, the lock breaks the "lockless" of the lockless qdisc for now I do not how to solve the below data race locklessly:
CPU1: CPU2: dequeue skb . . . . enqueue skb . . . WRITE_ONCE(qdisc->empty, false); . . . .
WRITE_ONCE(qdisc->empty, true);
Maybe it is time to fully document/explain how this can possibly work.
lockless qdisc used concurrently by multiple cpus, using WRITE_ONCE() and READ_ONCE() ?
Just say no to this.
If the above happens, the qdisc->empty is true even if the qdisc has some skb, which may cuase out of order or packet stuck problem.
It seems we may need to update ptr_ring' status(empty or not) while enqueuing/dequeuing atomically in the ptr_ring implementation.
Any better idea?
On 2021/3/16 16:15, Eric Dumazet wrote:
On Tue, Mar 16, 2021 at 1:35 AM Yunsheng Lin linyunsheng@huawei.com wrote:
On 2021/3/16 2:53, Jakub Kicinski wrote:
On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote:
@@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { */ struct pfifo_fast_priv { struct skb_array q[PFIFO_FAST_BANDS];
- /* protect against data race between enqueue/dequeue and
* qdisc->empty setting
*/
- spinlock_t lock;
};
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, unsigned int pkt_len = qdisc_pkt_len(skb); int err;
- err = skb_array_produce(q, skb);
spin_lock(&priv->lock);
err = __ptr_ring_produce(&q->ring, skb);
WRITE_ONCE(qdisc->empty, false);
spin_unlock(&priv->lock);
if (unlikely(err)) { if (qdisc_is_percpu_stats(qdisc))
@@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) struct sk_buff *skb = NULL; int band;
- spin_lock(&priv->lock); for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } else { WRITE_ONCE(qdisc->empty, true); }
spin_unlock(&priv->lock);
return skb;
}
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
Yes, the lock breaks the "lockless" of the lockless qdisc for now I do not how to solve the below data race locklessly:
CPU1: CPU2: dequeue skb . . . . enqueue skb . . . WRITE_ONCE(qdisc->empty, false); . . . .
WRITE_ONCE(qdisc->empty, true);
Maybe it is time to fully document/explain how this can possibly work.
I might be able to provide some document/explain on how the lockless qdisc work for I was looking through the code the past few days.
By "lockless", I suppose it means there is no lock between enqueuing and dequeuing, whoever grasps the qdisc->seqlock can dequeue the skb and send it out after enqueuing the skb it tries to send, other CPU which does not grasp the qdisc->seqlock just enqueue the skb and return, hoping other cpu holding the qdisc->seqlock can dequeue it's skb and send it out.
For the locked qdisck(the one without TCQ_F_NOLOCK flags set), it holds the qdisc_lock() when doing the enqueuing/dequeuing and sch_direct_xmit(), and in sch_direct_xmit() it releases the qdisc_lock() when doing skb validation and calling dev_hard_start_xmit() to send skb to the driver, and hold the qdisc_lock() again after calling dev_hard_start_xmit(), so other cpu may grasps the qdisc_lock() and repeat the above process during that qdisc_lock() release period.
So the main difference between lockess qdisc and locked qdisc is if there is a lock to protect both enqueuing and dequeuing. For example, pfifo_fast uses ptr_ring to become lockless for enqueuing or dequeuing, but it still needs producer_lock to protect the concurrent enqueue, and consumer_lock to protect the concurrent dequeue. for Other qdisc that can not provide the lockless between enqueuing or dequeuing, maybe we implement the locking in the specific qdisc implementation, so that it still can claim to be "lockless", like the locking added for pfifo_fast in this patch.
Idealy we can claim all qdisc to be lockess qdisc as long as we make sure all qdisc either use lockless implementation, or use internal lock to protect between enqueuing and dequeuing, so that we can remove the locked dqisc and will have less contention between enqueuing and dequeuing.
Below patch is the first try to remove the difference between lockess qdisc and locked qdisc, so that we can see if we can remove the locked qdisc in the future:
https://patchwork.kernel.org/project/netdevbpf/patch/1615800610-34700-1-git-...
I may be wrong about the above analysis, if there is any error, please point it out.
lockless qdisc used concurrently by multiple cpus, using WRITE_ONCE() and READ_ONCE() ?
Just say no to this.
what do you mean "no" here? Do you mean it is impossible to implement lockless qdisc correctly? Or TCQ_F_CAN_BYPASS for lockless qdisc is a wrong direction? And is there any reason?
If the above happens, the qdisc->empty is true even if the qdisc has some skb, which may cuase out of order or packet stuck problem.
It seems we may need to update ptr_ring' status(empty or not) while enqueuing/dequeuing atomically in the ptr_ring implementation.
Any better idea?
.
On Mon, Mar 15, 2021 at 2:07 PM Jakub Kicinski kuba@kernel.org wrote:
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
It has never been truly lockless, it uses two spinlocks in the ring buffer implementation, and it introduced a q->seqlock recently, with this patch now we have priv->lock, 4 locks in total. So our "lockless" qdisc ends up having more locks than others. ;) I don't think we are going to a right direction...
Thanks.
On 2021/3/17 6:48, Cong Wang wrote:
On Mon, Mar 15, 2021 at 2:07 PM Jakub Kicinski kuba@kernel.org wrote:
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
It has never been truly lockless, it uses two spinlocks in the ring buffer implementation, and it introduced a q->seqlock recently, with this patch now we have priv->lock, 4 locks in total. So our "lockless" qdisc ends up having more locks than others. ;) I don't think we are going to a right direction...
Yes, we have 4 locks in total, but lockless qdisc only use two locks in this patch, which are priv->lock and q->seqlock.
The qdisc at least uses two locks, which is qdisc_lock(q) and q->busylock, which seems to have bigger contention when concurrent accessing to the same qdisc.
If we want to reduce the total number of lock, we can use qdisc_lock(q) for lockless qdisc and remove q->seqlock:)
Thanks.
.
Cong Wang xiyou.wangcong@gmail.com writes:
On Mon, Mar 15, 2021 at 2:07 PM Jakub Kicinski kuba@kernel.org wrote:
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
It has never been truly lockless, it uses two spinlocks in the ring buffer implementation, and it introduced a q->seqlock recently, with this patch now we have priv->lock, 4 locks in total. So our "lockless" qdisc ends up having more locks than others. ;) I don't think we are going to a right direction...
Just a thought, have you guys considered adopting the lockless MSPC ring buffer recently introduced into Wireguard in commit:
8b5553ace83c ("wireguard: queueing: get rid of per-peer ring buffers")
Jason indicated he was willing to work on generalising it into a reusable library if there was a use case for it. I haven't quite though through the details of whether this would be such a use case, but figured I'd at least mention it :)
-Toke
On 3/17/21, Toke Høiland-Jørgensen toke@redhat.com wrote:
Cong Wang xiyou.wangcong@gmail.com writes:
On Mon, Mar 15, 2021 at 2:07 PM Jakub Kicinski kuba@kernel.org wrote:
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
It has never been truly lockless, it uses two spinlocks in the ring buffer implementation, and it introduced a q->seqlock recently, with this patch now we have priv->lock, 4 locks in total. So our "lockless" qdisc ends up having more locks than others. ;) I don't think we are going to a right direction...
Just a thought, have you guys considered adopting the lockless MSPC ring buffer recently introduced into Wireguard in commit:
8b5553ace83c ("wireguard: queueing: get rid of per-peer ring buffers")
Jason indicated he was willing to work on generalising it into a reusable library if there was a use case for it. I haven't quite though through the details of whether this would be such a use case, but figured I'd at least mention it :)
That offer definitely still stands. Generalization sounds like a lot of fun.
Keep in mind though that it's an eventually consistent queue, not an immediately consistent one, so that might not match all use cases. It works with wg because we always trigger the reader thread anew when it finishes, but that doesn't apply to everyone's queueing setup.
On 2021/3/17 21:45, Jason A. Donenfeld wrote:
On 3/17/21, Toke Høiland-Jørgensen toke@redhat.com wrote:
Cong Wang xiyou.wangcong@gmail.com writes:
On Mon, Mar 15, 2021 at 2:07 PM Jakub Kicinski kuba@kernel.org wrote:
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
It has never been truly lockless, it uses two spinlocks in the ring buffer implementation, and it introduced a q->seqlock recently, with this patch now we have priv->lock, 4 locks in total. So our "lockless" qdisc ends up having more locks than others. ;) I don't think we are going to a right direction...
Just a thought, have you guys considered adopting the lockless MSPC ring buffer recently introduced into Wireguard in commit:
8b5553ace83c ("wireguard: queueing: get rid of per-peer ring buffers")
Jason indicated he was willing to work on generalising it into a reusable library if there was a use case for it. I haven't quite though through the details of whether this would be such a use case, but figured I'd at least mention it :)
That offer definitely still stands. Generalization sounds like a lot of fun.
Keep in mind though that it's an eventually consistent queue, not an immediately consistent one, so that might not match all use cases. It works with wg because we always trigger the reader thread anew when it finishes, but that doesn't apply to everyone's queueing setup.
Thanks for mentioning this.
"multi-producer, single-consumer" seems to match the lockless qdisc's paradigm too, for now concurrent enqueuing/dequeuing to the pfifo_fast's queues() is not allowed, it is protected by producer_lock or consumer_lock.
So it would be good to has lockless concurrent enqueuing, while dequeuing can be protected by qdisc_lock() or q->seqlock, which meets the "multi-producer, single-consumer" paradigm.
But right now lockless qdisc has some packet stuck problem, which I tried to fix in [1].
If the packet stuck problem for lockless qdisc can be fixed, and we can do more optimization on lockless qdisc, including the one you mention:)
1.https://patchwork.kernel.org/project/netdevbpf/patch/1616050402-37023-1-git-...
Linuxarm mailing list -- linuxarm@openeuler.org To unsubscribe send an email to linuxarm-leave@openeuler.org
On Thu, Mar 18, 2021 at 12:33 AM Yunsheng Lin linyunsheng@huawei.com wrote:
On 2021/3/17 21:45, Jason A. Donenfeld wrote:
On 3/17/21, Toke Høiland-Jørgensen toke@redhat.com wrote:
Cong Wang xiyou.wangcong@gmail.com writes:
On Mon, Mar 15, 2021 at 2:07 PM Jakub Kicinski kuba@kernel.org wrote:
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
It has never been truly lockless, it uses two spinlocks in the ring buffer implementation, and it introduced a q->seqlock recently, with this patch now we have priv->lock, 4 locks in total. So our "lockless" qdisc ends up having more locks than others. ;) I don't think we are going to a right direction...
Just a thought, have you guys considered adopting the lockless MSPC ring buffer recently introduced into Wireguard in commit:
8b5553ace83c ("wireguard: queueing: get rid of per-peer ring buffers")
Jason indicated he was willing to work on generalising it into a reusable library if there was a use case for it. I haven't quite though through the details of whether this would be such a use case, but figured I'd at least mention it :)
That offer definitely still stands. Generalization sounds like a lot of fun.
Keep in mind though that it's an eventually consistent queue, not an immediately consistent one, so that might not match all use cases. It works with wg because we always trigger the reader thread anew when it finishes, but that doesn't apply to everyone's queueing setup.
Thanks for mentioning this.
"multi-producer, single-consumer" seems to match the lockless qdisc's paradigm too, for now concurrent enqueuing/dequeuing to the pfifo_fast's queues() is not allowed, it is protected by producer_lock or consumer_lock.
So it would be good to has lockless concurrent enqueuing, while dequeuing can be protected by qdisc_lock() or q->seqlock, which meets the "multi-producer, single-consumer" paradigm.
I don't think so. Usually we have one queue for each CPU so we can expect each CPU has a lockless qdisc assigned, but we can not assume this in the code, so we still have to deal with multiple CPU's sharing a lockless qdisc, and we usually enqueue and dequeue in process context, so it means we could have multiple producers and multiple consumers.
On the other hand, I don't think the problems we have been fixing are the ring buffer implementation itself, they are about the high-level qdisc state transitions.
Thanks.
On 2021/3/20 2:15, Cong Wang wrote:
On Thu, Mar 18, 2021 at 12:33 AM Yunsheng Lin linyunsheng@huawei.com wrote:
On 2021/3/17 21:45, Jason A. Donenfeld wrote:
On 3/17/21, Toke Høiland-Jørgensen toke@redhat.com wrote:
Cong Wang xiyou.wangcong@gmail.com writes:
On Mon, Mar 15, 2021 at 2:07 PM Jakub Kicinski kuba@kernel.org wrote:
I thought pfifo was supposed to be "lockless" and this change re-introduces a lock between producer and consumer, no?
It has never been truly lockless, it uses two spinlocks in the ring buffer implementation, and it introduced a q->seqlock recently, with this patch now we have priv->lock, 4 locks in total. So our "lockless" qdisc ends up having more locks than others. ;) I don't think we are going to a right direction...
Just a thought, have you guys considered adopting the lockless MSPC ring buffer recently introduced into Wireguard in commit:
8b5553ace83c ("wireguard: queueing: get rid of per-peer ring buffers")
Jason indicated he was willing to work on generalising it into a reusable library if there was a use case for it. I haven't quite though through the details of whether this would be such a use case, but figured I'd at least mention it :)
That offer definitely still stands. Generalization sounds like a lot of fun.
Keep in mind though that it's an eventually consistent queue, not an immediately consistent one, so that might not match all use cases. It works with wg because we always trigger the reader thread anew when it finishes, but that doesn't apply to everyone's queueing setup.
Thanks for mentioning this.
"multi-producer, single-consumer" seems to match the lockless qdisc's paradigm too, for now concurrent enqueuing/dequeuing to the pfifo_fast's queues() is not allowed, it is protected by producer_lock or consumer_lock.
So it would be good to has lockless concurrent enqueuing, while dequeuing can be protected by qdisc_lock() or q->seqlock, which meets the "multi-producer, single-consumer" paradigm.
I don't think so. Usually we have one queue for each CPU so we can expect each CPU has a lockless qdisc assigned, but we can not assume this in the code, so we still have to deal with multiple CPU's sharing a lockless qdisc, and we usually enqueue and dequeue in process context, so it means we could have multiple producers and multiple consumers.
For lockless qdisc, dequeuing is always within the qdisc_run_begin() and qdisc_run_end(), so multiple consumers is protected with each other by q->seqlock .
For enqueuing, multiple consumers is protected by producer_lock, see pfifo_fast_enqueue() -> skb_array_produce() -> ptr_ring_produce(). I am not sure if lockless MSPC can work with the process context, but even if not, the enqueuing is also protected by rcu_read_lock_bh(), which provides some kind of atomicity, so that producer_lock can be reomved when lockless MSPC is used.
On the other hand, I don't think the problems we have been fixing are the ring buffer implementation itself, they are about the high-level qdisc state transitions.
Thanks.
.
On Sun, Mar 21, 2021 at 5:55 PM Yunsheng Lin linyunsheng@huawei.com wrote:
On 2021/3/20 2:15, Cong Wang wrote:
On Thu, Mar 18, 2021 at 12:33 AM Yunsheng Lin linyunsheng@huawei.com wrote:
On 2021/3/17 21:45, Jason A. Donenfeld wrote:
On 3/17/21, Toke Høiland-Jørgensen toke@redhat.com wrote:
Cong Wang xiyou.wangcong@gmail.com writes:
On Mon, Mar 15, 2021 at 2:07 PM Jakub Kicinski kuba@kernel.org wrote: > > I thought pfifo was supposed to be "lockless" and this change > re-introduces a lock between producer and consumer, no?
It has never been truly lockless, it uses two spinlocks in the ring buffer implementation, and it introduced a q->seqlock recently, with this patch now we have priv->lock, 4 locks in total. So our "lockless" qdisc ends up having more locks than others. ;) I don't think we are going to a right direction...
Just a thought, have you guys considered adopting the lockless MSPC ring buffer recently introduced into Wireguard in commit:
8b5553ace83c ("wireguard: queueing: get rid of per-peer ring buffers")
Jason indicated he was willing to work on generalising it into a reusable library if there was a use case for it. I haven't quite though through the details of whether this would be such a use case, but figured I'd at least mention it :)
That offer definitely still stands. Generalization sounds like a lot of fun.
Keep in mind though that it's an eventually consistent queue, not an immediately consistent one, so that might not match all use cases. It works with wg because we always trigger the reader thread anew when it finishes, but that doesn't apply to everyone's queueing setup.
Thanks for mentioning this.
"multi-producer, single-consumer" seems to match the lockless qdisc's paradigm too, for now concurrent enqueuing/dequeuing to the pfifo_fast's queues() is not allowed, it is protected by producer_lock or consumer_lock.
So it would be good to has lockless concurrent enqueuing, while dequeuing can be protected by qdisc_lock() or q->seqlock, which meets the "multi-producer, single-consumer" paradigm.
I don't think so. Usually we have one queue for each CPU so we can expect each CPU has a lockless qdisc assigned, but we can not assume this in the code, so we still have to deal with multiple CPU's sharing a lockless qdisc, and we usually enqueue and dequeue in process context, so it means we could have multiple producers and multiple consumers.
For lockless qdisc, dequeuing is always within the qdisc_run_begin() and qdisc_run_end(), so multiple consumers is protected with each other by q->seqlock .
So are you saying you will never go lockless for lockless qdisc? I thought you really want to go lockless with Jason's proposal of MPMC ring buffer code.
For enqueuing, multiple consumers is protected by producer_lock, see pfifo_fast_enqueue() -> skb_array_produce() -> ptr_ring_produce().
I think you seriously misunderstand how we classify MPMC or MPSC, it is not about how we lock them, it is about whether we truly have a single or multiple consumers regardless of locks used, because the goal is to go lockless.
I am not sure if lockless MSPC can work with the process context, but even if not, the enqueuing is also protected by rcu_read_lock_bh(), which provides some kind of atomicity, so that producer_lock can be reomved when lockless MSPC is used.
Not sure if I can even understand what you are saying here, Jason's code only disables preemption with busy wait, I can't see why it can not be used in the process context.
Thanks.
On 2021/3/24 9:49, Cong Wang wrote:
On Sun, Mar 21, 2021 at 5:55 PM Yunsheng Lin linyunsheng@huawei.com wrote:
On 2021/3/20 2:15, Cong Wang wrote:
On Thu, Mar 18, 2021 at 12:33 AM Yunsheng Lin linyunsheng@huawei.com wrote:
On 2021/3/17 21:45, Jason A. Donenfeld wrote:
On 3/17/21, Toke Høiland-Jørgensen toke@redhat.com wrote:
Cong Wang xiyou.wangcong@gmail.com writes:
> On Mon, Mar 15, 2021 at 2:07 PM Jakub Kicinski kuba@kernel.org wrote: >> >> I thought pfifo was supposed to be "lockless" and this change >> re-introduces a lock between producer and consumer, no? > > It has never been truly lockless, it uses two spinlocks in the ring > buffer > implementation, and it introduced a q->seqlock recently, with this patch > now we have priv->lock, 4 locks in total. So our "lockless" qdisc ends > up having more locks than others. ;) I don't think we are going to a > right direction...
Just a thought, have you guys considered adopting the lockless MSPC ring buffer recently introduced into Wireguard in commit:
8b5553ace83c ("wireguard: queueing: get rid of per-peer ring buffers")
Jason indicated he was willing to work on generalising it into a reusable library if there was a use case for it. I haven't quite though through the details of whether this would be such a use case, but figured I'd at least mention it :)
That offer definitely still stands. Generalization sounds like a lot of fun.
Keep in mind though that it's an eventually consistent queue, not an immediately consistent one, so that might not match all use cases. It works with wg because we always trigger the reader thread anew when it finishes, but that doesn't apply to everyone's queueing setup.
Thanks for mentioning this.
"multi-producer, single-consumer" seems to match the lockless qdisc's paradigm too, for now concurrent enqueuing/dequeuing to the pfifo_fast's queues() is not allowed, it is protected by producer_lock or consumer_lock.
So it would be good to has lockless concurrent enqueuing, while dequeuing can be protected by qdisc_lock() or q->seqlock, which meets the "multi-producer, single-consumer" paradigm.
I don't think so. Usually we have one queue for each CPU so we can expect each CPU has a lockless qdisc assigned, but we can not assume this in the code, so we still have to deal with multiple CPU's sharing a lockless qdisc, and we usually enqueue and dequeue in process context, so it means we could have multiple producers and multiple consumers.
For lockless qdisc, dequeuing is always within the qdisc_run_begin() and qdisc_run_end(), so multiple consumers is protected with each other by q->seqlock .
So are you saying you will never go lockless for lockless qdisc? I thought you really want to go lockless with Jason's proposal of MPMC ring buffer code.
I think we has different definition about lockless qdisc.
For my understanding, the dequeuing is within the qdisc_run_begin() and qdisc_run_end(), so it is always protected by q->seqlock for lockless qdisck currently, and by lockless qdisc, I never mean lockless dequeuing, and I am not proposing lockless dequeuing currently.
Current lockless qdisc for pfifo_fast only means there is no lock for protection between dequeuing and enqueuing, which also means when __qdisc_run() is dequeuing a skb while other cpu is enqueuing a skb.
But enqueuing is protected by producer_lock in skb_array_produce(), so only one cpu can do the enqueuing at the same time, so I am proposing to use Jason's proposal to enable multi cpus to do concurrent enqueuing without taking any lock.
For enqueuing, multiple consumers is protected by producer_lock, see pfifo_fast_enqueue() -> skb_array_produce() -> ptr_ring_produce().
I think you seriously misunderstand how we classify MPMC or MPSC, it is not about how we lock them, it is about whether we truly have a single or multiple consumers regardless of locks used, because the goal is to go lockless.
I think I am only relying on the MPSC(multi-produce & single-consumer), as explained above.
I am not sure if lockless MSPC can work with the process context, but even if not, the enqueuing is also protected by rcu_read_lock_bh(), which provides some kind of atomicity, so that producer_lock can be reomved when lockless MSPC is used.
Not sure if I can even understand what you are saying here, Jason's code only disables preemption with busy wait, I can't see why it can not be used in the process context.
I am saying q->enqeue() is protected by rcu_read_lock_bh(). rcu_read_lock_bh() will disable preemption for us for most configuation, otherwise it will break netdev_xmit_more() interface too, for it relies on the cpu not being prempted by using per cpu var(softnet_data.xmit.more).
Thanks.
.
On Thu, Mar 18, 2021 at 1:33 AM Yunsheng Lin linyunsheng@huawei.com wrote:
That offer definitely still stands. Generalization sounds like a lot of fun.
Keep in mind though that it's an eventually consistent queue, not an immediately consistent one, so that might not match all use cases. It works with wg because we always trigger the reader thread anew when it finishes, but that doesn't apply to everyone's queueing setup.
Thanks for mentioning this.
"multi-producer, single-consumer" seems to match the lockless qdisc's paradigm too, for now concurrent enqueuing/dequeuing to the pfifo_fast's queues() is not allowed, it is protected by producer_lock or consumer_lock.
The other thing is that if you've got memory for a ring buffer rather than a list queue, we worked on an MPMC ring structure for WireGuard a few years ago that we didn't wind up using in the end, but it lives here: https://git.zx2c4.com/wireguard-monolithic-historical/tree/src/mpmc_ptr_ring...
On 2021/3/20 3:03, Jason A. Donenfeld wrote:
On Thu, Mar 18, 2021 at 1:33 AM Yunsheng Lin linyunsheng@huawei.com wrote:
That offer definitely still stands. Generalization sounds like a lot of fun.
Keep in mind though that it's an eventually consistent queue, not an immediately consistent one, so that might not match all use cases. It works with wg because we always trigger the reader thread anew when it finishes, but that doesn't apply to everyone's queueing setup.
Thanks for mentioning this.
"multi-producer, single-consumer" seems to match the lockless qdisc's paradigm too, for now concurrent enqueuing/dequeuing to the pfifo_fast's queues() is not allowed, it is protected by producer_lock or consumer_lock.
The other thing is that if you've got memory for a ring buffer rather than a list queue, we worked on an MPMC ring structure for WireGuard a few years ago that we didn't wind up using in the end, but it lives here: https://git.zx2c4.com/wireguard-monolithic-historical/tree/src/mpmc_ptr_ring...
Thanks for mentioning that, It seems that is exactly what the pfifo_fast qdisc need for locklees multi-producer, because it only need the memory to store the skb pointer.
Does it have any limitation? More specifically, does it works with the process or softirq context, if not, how about context with rcu protection?
Linuxarm mailing list -- linuxarm@openeuler.org To unsubscribe send an email to linuxarm-leave@openeuler.org
On 15.03.21 04:10, Yunsheng Lin wrote:
Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calls sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, see pfifo_fast_enqueue(), and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue(), a spinlock is added to avoid the race between enqueue/dequeue and qdisc->empty setting.
If there is requeued skb in q->gso_skb, and qdisc->empty is true, do not allow bypassing requeued skb. enqueuing and dequeuing in q->gso_skb is always protected by qdisc->seqlock, so is the access of q->gso_skb by skb_queue_empty();
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com
RFC V2: fix requeued skb out of order and data race problem.
cansequence didn't find any frame reordering with 2 FlexCAN's communicating with each other on a dual core i.MX6. Feel free to add:
Tested-by: Ahmad Fatoum a.fatoum@pengutronix.de
include/net/pkt_sched.h | 2 ++ include/net/sch_generic.h | 7 +++++-- net/core/dev.c | 14 ++++++++++++++ net/sched/sch_generic.c | 31 ++++++++++++++++++++++++++++++- 4 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/include/net/pkt_sched.h b/include/net/pkt_sched.h index f5c1bee..c760f6a 100644 --- a/include/net/pkt_sched.h +++ b/include/net/pkt_sched.h @@ -122,6 +122,8 @@ void qdisc_warn_nonwc(const char *txt, struct Qdisc *qdisc); bool sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, struct net_device *dev, struct netdev_queue *txq, spinlock_t *root_lock, bool validate); +bool sch_may_need_requeuing(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev);
void __qdisc_run(struct Qdisc *q);
diff --git a/include/net/sch_generic.h b/include/net/sch_generic.h index 2d6eb60..6591356 100644 --- a/include/net/sch_generic.h +++ b/include/net/sch_generic.h @@ -161,7 +161,6 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) if (qdisc->flags & TCQ_F_NOLOCK) { if (!spin_trylock(&qdisc->seqlock)) return false;
} else if (qdisc_is_running(qdisc)) { return false; }WRITE_ONCE(qdisc->empty, false);
@@ -176,8 +175,12 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) static inline void qdisc_run_end(struct Qdisc *qdisc) { write_seqcount_end(&qdisc->running);
- if (qdisc->flags & TCQ_F_NOLOCK)
- if (qdisc->flags & TCQ_F_NOLOCK) { spin_unlock(&qdisc->seqlock);
if (unlikely(!READ_ONCE(qdisc->empty)))
__netif_schedule(qdisc);
- }
}
static inline bool qdisc_may_bulk(const struct Qdisc *qdisc) diff --git a/net/core/dev.c b/net/core/dev.c index 2bfdd52..8f4afb6 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -3791,6 +3791,20 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, qdisc_calculate_pkt_len(skb, q);
if (q->flags & TCQ_F_NOLOCK) {
if (q->flags & TCQ_F_CAN_BYPASS && READ_ONCE(q->empty) &&
qdisc_run_begin(q)) {
qdisc_bstats_cpu_update(q, skb);
if (sch_may_need_requeuing(skb, q, dev))
__qdisc_run(q);
else if (sch_direct_xmit(skb, q, dev, txq, NULL, true) &&
!READ_ONCE(q->empty))
__qdisc_run(q);
qdisc_run_end(q);
return NET_XMIT_SUCCESS;
}
- rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK; qdisc_run(q);
diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c index 49eae93..0df1462 100644 --- a/net/sched/sch_generic.c +++ b/net/sched/sch_generic.c @@ -273,6 +273,23 @@ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate, return skb; }
+bool sch_may_need_requeuing(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev)
+{
- bool again = false;
- if (likely(skb_queue_empty(&q->gso_skb)))
return false;
- /* need validating before requeuing */
- skb = validate_xmit_skb_list(skb, dev, &again);
- if (unlikely(!skb))
return true;
- dev_requeue_skb(skb, q);
- return true;
+}
/*
- Transmit possibly several skbs, and handle the return status as
- required. Owning running seqcount bit guarantees that
@@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { */ struct pfifo_fast_priv { struct skb_array q[PFIFO_FAST_BANDS];
- /* protect against data race between enqueue/dequeue and
* qdisc->empty setting
*/
- spinlock_t lock;
};
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, unsigned int pkt_len = qdisc_pkt_len(skb); int err;
- err = skb_array_produce(q, skb);
spin_lock(&priv->lock);
err = __ptr_ring_produce(&q->ring, skb);
WRITE_ONCE(qdisc->empty, false);
spin_unlock(&priv->lock);
if (unlikely(err)) { if (qdisc_is_percpu_stats(qdisc))
@@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) struct sk_buff *skb = NULL; int band;
- spin_lock(&priv->lock); for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } else { WRITE_ONCE(qdisc->empty, true); }
spin_unlock(&priv->lock);
return skb;
} @@ -739,6 +766,8 @@ static int pfifo_fast_init(struct Qdisc *qdisc, struct nlattr *opt,
/* Can by-pass the queue discipline */ qdisc->flags |= TCQ_F_CAN_BYPASS;
- spin_lock_init(&priv->lock); return 0;
}
On 2021/3/18 15:10, Ahmad Fatoum wrote:
On 15.03.21 04:10, Yunsheng Lin wrote:
Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calls sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, see pfifo_fast_enqueue(), and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue(), a spinlock is added to avoid the race between enqueue/dequeue and qdisc->empty setting.
If there is requeued skb in q->gso_skb, and qdisc->empty is true, do not allow bypassing requeued skb. enqueuing and dequeuing in q->gso_skb is always protected by qdisc->seqlock, so is the access of q->gso_skb by skb_queue_empty();
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com
RFC V2: fix requeued skb out of order and data race problem.
cansequence didn't find any frame reordering with 2 FlexCAN's communicating with each other on a dual core i.MX6. Feel free to add:
Tested-by: Ahmad Fatoum a.fatoum@pengutronix.de
Thanks for testing. Actually I has a newer implemetion that canget rid of the priv->lock added in this patch. I am not sending it out yet: 1. There is a packet stuck problem for lockless qdisc I try to fix, see [1], and I prefer not to add more optimization to lockless qdisc before we find out real cause, it will make backporting packet stuck patch harder and optimization is useless if there is still basic bug for lockless qdisc 2. I am still not convinced that the lockless implemetion is clearer than the priv->lock implemetion, I still need to do some thinking and testing.
include/net/pkt_sched.h | 2 ++ include/net/sch_generic.h | 7 +++++-- net/core/dev.c | 14 ++++++++++++++ net/sched/sch_generic.c | 31 ++++++++++++++++++++++++++++++- 4 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/include/net/pkt_sched.h b/include/net/pkt_sched.h index f5c1bee..c760f6a 100644 --- a/include/net/pkt_sched.h +++ b/include/net/pkt_sched.h @@ -122,6 +122,8 @@ void qdisc_warn_nonwc(const char *txt, struct Qdisc *qdisc); bool sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, struct net_device *dev, struct netdev_queue *txq, spinlock_t *root_lock, bool validate); +bool sch_may_need_requeuing(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev);
void __qdisc_run(struct Qdisc *q);
diff --git a/include/net/sch_generic.h b/include/net/sch_generic.h index 2d6eb60..6591356 100644 --- a/include/net/sch_generic.h +++ b/include/net/sch_generic.h @@ -161,7 +161,6 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) if (qdisc->flags & TCQ_F_NOLOCK) { if (!spin_trylock(&qdisc->seqlock)) return false;
} else if (qdisc_is_running(qdisc)) { return false; }WRITE_ONCE(qdisc->empty, false);
@@ -176,8 +175,12 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) static inline void qdisc_run_end(struct Qdisc *qdisc) { write_seqcount_end(&qdisc->running);
- if (qdisc->flags & TCQ_F_NOLOCK)
- if (qdisc->flags & TCQ_F_NOLOCK) { spin_unlock(&qdisc->seqlock);
if (unlikely(!READ_ONCE(qdisc->empty)))
__netif_schedule(qdisc);
- }
}
static inline bool qdisc_may_bulk(const struct Qdisc *qdisc) diff --git a/net/core/dev.c b/net/core/dev.c index 2bfdd52..8f4afb6 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -3791,6 +3791,20 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, qdisc_calculate_pkt_len(skb, q);
if (q->flags & TCQ_F_NOLOCK) {
if (q->flags & TCQ_F_CAN_BYPASS && READ_ONCE(q->empty) &&
qdisc_run_begin(q)) {
qdisc_bstats_cpu_update(q, skb);
if (sch_may_need_requeuing(skb, q, dev))
__qdisc_run(q);
else if (sch_direct_xmit(skb, q, dev, txq, NULL, true) &&
!READ_ONCE(q->empty))
__qdisc_run(q);
qdisc_run_end(q);
return NET_XMIT_SUCCESS;
}
- rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK; qdisc_run(q);
diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c index 49eae93..0df1462 100644 --- a/net/sched/sch_generic.c +++ b/net/sched/sch_generic.c @@ -273,6 +273,23 @@ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate, return skb; }
+bool sch_may_need_requeuing(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev)
+{
- bool again = false;
- if (likely(skb_queue_empty(&q->gso_skb)))
return false;
- /* need validating before requeuing */
- skb = validate_xmit_skb_list(skb, dev, &again);
- if (unlikely(!skb))
return true;
- dev_requeue_skb(skb, q);
- return true;
+}
/*
- Transmit possibly several skbs, and handle the return status as
- required. Owning running seqcount bit guarantees that
@@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { */ struct pfifo_fast_priv { struct skb_array q[PFIFO_FAST_BANDS];
- /* protect against data race between enqueue/dequeue and
* qdisc->empty setting
*/
- spinlock_t lock;
};
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, unsigned int pkt_len = qdisc_pkt_len(skb); int err;
- err = skb_array_produce(q, skb);
spin_lock(&priv->lock);
err = __ptr_ring_produce(&q->ring, skb);
WRITE_ONCE(qdisc->empty, false);
spin_unlock(&priv->lock);
if (unlikely(err)) { if (qdisc_is_percpu_stats(qdisc))
@@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) struct sk_buff *skb = NULL; int band;
- spin_lock(&priv->lock); for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } else { WRITE_ONCE(qdisc->empty, true); }
spin_unlock(&priv->lock);
return skb;
} @@ -739,6 +766,8 @@ static int pfifo_fast_init(struct Qdisc *qdisc, struct nlattr *opt,
/* Can by-pass the queue discipline */ qdisc->flags |= TCQ_F_CAN_BYPASS;
- spin_lock_init(&priv->lock); return 0;
}
On 18.03.21 08:46, Yunsheng Lin wrote:
On 2021/3/18 15:10, Ahmad Fatoum wrote:
On 15.03.21 04:10, Yunsheng Lin wrote:
Currently pfifo_fast has both TCQ_F_CAN_BYPASS and TCQ_F_NOLOCK flag set, but queue discipline by-pass does not work for lockless qdisc because skb is always enqueued to qdisc even when the qdisc is empty, see __dev_xmit_skb().
This patch calls sch_direct_xmit() to transmit the skb directly to the driver for empty lockless qdisc too, which aviod enqueuing and dequeuing operation. qdisc->empty is set to false whenever a skb is enqueued, see pfifo_fast_enqueue(), and is set to true when skb dequeuing return NULL, see pfifo_fast_dequeue(), a spinlock is added to avoid the race between enqueue/dequeue and qdisc->empty setting.
If there is requeued skb in q->gso_skb, and qdisc->empty is true, do not allow bypassing requeued skb. enqueuing and dequeuing in q->gso_skb is always protected by qdisc->seqlock, so is the access of q->gso_skb by skb_queue_empty();
Also, qdisc is scheduled at the end of qdisc_run_end() when q->empty is false to avoid packet stuck problem.
The performance for ip_forward test increases about 10% with this patch.
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com
RFC V2: fix requeued skb out of order and data race problem.
cansequence didn't find any frame reordering with 2 FlexCAN's communicating with each other on a dual core i.MX6. Feel free to add:
Tested-by: Ahmad Fatoum a.fatoum@pengutronix.de
Thanks for testing. Actually I has a newer implemetion that canget rid of the priv->lock added in this patch. I am not sending it out yet:
- There is a packet stuck problem for lockless qdisc I try to fix, see [1], and I prefer not to add more optimization to lockless qdisc before we find out real cause, it will make backporting packet stuck patch harder and optimization is useless if there is still basic bug for lockless qdisc
- I am still not convinced that the lockless implemetion is clearer than the priv->lock implemetion, I still need to do some thinking and testing.
I see. Keep me in the loop for future revisions and I'll give them a spin to see if regressions pop up.
Cheers, Ahmad
include/net/pkt_sched.h | 2 ++ include/net/sch_generic.h | 7 +++++-- net/core/dev.c | 14 ++++++++++++++ net/sched/sch_generic.c | 31 ++++++++++++++++++++++++++++++- 4 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/include/net/pkt_sched.h b/include/net/pkt_sched.h index f5c1bee..c760f6a 100644 --- a/include/net/pkt_sched.h +++ b/include/net/pkt_sched.h @@ -122,6 +122,8 @@ void qdisc_warn_nonwc(const char *txt, struct Qdisc *qdisc); bool sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, struct net_device *dev, struct netdev_queue *txq, spinlock_t *root_lock, bool validate); +bool sch_may_need_requeuing(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev);
void __qdisc_run(struct Qdisc *q);
diff --git a/include/net/sch_generic.h b/include/net/sch_generic.h index 2d6eb60..6591356 100644 --- a/include/net/sch_generic.h +++ b/include/net/sch_generic.h @@ -161,7 +161,6 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) if (qdisc->flags & TCQ_F_NOLOCK) { if (!spin_trylock(&qdisc->seqlock)) return false;
} else if (qdisc_is_running(qdisc)) { return false; }WRITE_ONCE(qdisc->empty, false);
@@ -176,8 +175,12 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) static inline void qdisc_run_end(struct Qdisc *qdisc) { write_seqcount_end(&qdisc->running);
- if (qdisc->flags & TCQ_F_NOLOCK)
- if (qdisc->flags & TCQ_F_NOLOCK) { spin_unlock(&qdisc->seqlock);
if (unlikely(!READ_ONCE(qdisc->empty)))
__netif_schedule(qdisc);
- }
}
static inline bool qdisc_may_bulk(const struct Qdisc *qdisc) diff --git a/net/core/dev.c b/net/core/dev.c index 2bfdd52..8f4afb6 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -3791,6 +3791,20 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, qdisc_calculate_pkt_len(skb, q);
if (q->flags & TCQ_F_NOLOCK) {
if (q->flags & TCQ_F_CAN_BYPASS && READ_ONCE(q->empty) &&
qdisc_run_begin(q)) {
qdisc_bstats_cpu_update(q, skb);
if (sch_may_need_requeuing(skb, q, dev))
__qdisc_run(q);
else if (sch_direct_xmit(skb, q, dev, txq, NULL, true) &&
!READ_ONCE(q->empty))
__qdisc_run(q);
qdisc_run_end(q);
return NET_XMIT_SUCCESS;
}
- rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK; qdisc_run(q);
diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c index 49eae93..0df1462 100644 --- a/net/sched/sch_generic.c +++ b/net/sched/sch_generic.c @@ -273,6 +273,23 @@ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate, return skb; }
+bool sch_may_need_requeuing(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev)
+{
- bool again = false;
- if (likely(skb_queue_empty(&q->gso_skb)))
return false;
- /* need validating before requeuing */
- skb = validate_xmit_skb_list(skb, dev, &again);
- if (unlikely(!skb))
return true;
- dev_requeue_skb(skb, q);
- return true;
+}
/*
- Transmit possibly several skbs, and handle the return status as
- required. Owning running seqcount bit guarantees that
@@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = { */ struct pfifo_fast_priv { struct skb_array q[PFIFO_FAST_BANDS];
- /* protect against data race between enqueue/dequeue and
* qdisc->empty setting
*/
- spinlock_t lock;
};
static inline struct skb_array *band2list(struct pfifo_fast_priv *priv, @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc *qdisc, unsigned int pkt_len = qdisc_pkt_len(skb); int err;
- err = skb_array_produce(q, skb);
spin_lock(&priv->lock);
err = __ptr_ring_produce(&q->ring, skb);
WRITE_ONCE(qdisc->empty, false);
spin_unlock(&priv->lock);
if (unlikely(err)) { if (qdisc_is_percpu_stats(qdisc))
@@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) struct sk_buff *skb = NULL; int band;
- spin_lock(&priv->lock); for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) { struct skb_array *q = band2list(priv, band);
@@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) } else { WRITE_ONCE(qdisc->empty, true); }
spin_unlock(&priv->lock);
return skb;
} @@ -739,6 +766,8 @@ static int pfifo_fast_init(struct Qdisc *qdisc, struct nlattr *opt,
/* Can by-pass the queue discipline */ qdisc->flags |= TCQ_F_CAN_BYPASS;
- spin_lock_init(&priv->lock); return 0;
}