在 2021/7/1 下午8:26, Yunsheng Lin 写道:
Currently r->queue[] clearing is done before r->consumer_head updating, which makes the __ptr_ring_empty() returning false positive result(the ring is non-empty, but __ptr_ring_empty() suggest that it is empty) if the checking is done after the r->queue clearing and before the consumer_head moving forward.
Move the r->queue[] clearing after consumer_head moving forward to avoid the above case.
As a side effect of above change, a consumer_head checking is avoided for the likely case, and it has noticeable performance improvement when it is tested using the ptr_ring_test selftest added in the previous patch.
Tested using the "perf stat -r 1000 ./ptr_ring_test -s 1000 -m 1 -N 100000000", comparing the elapsed time:
arch unpatched patched improvement arm64 2.087205 sec 1.888224 sec +9.5% X86 2.6538 sec 2.5422 sec +4.2%
I think we need the number of real workloads here.
Thanks
Signed-off-by: Yunsheng Lin linyunsheng@huawei.com
V3: adjust the title and comment log according to disscusion in V2, and update performance data using "perf stat -r". V2: Add performance data.
include/linux/ptr_ring.h | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-)
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h index 808f9d3..db9c282 100644 --- a/include/linux/ptr_ring.h +++ b/include/linux/ptr_ring.h @@ -261,8 +261,7 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty * to work correctly. */
- int consumer_head = r->consumer_head;
- int head = consumer_head++;
int consumer_head = r->consumer_head + 1;
/* Once we have processed enough entries invalidate them in
- the ring all at once so producer can reuse their space in the ring.
@@ -271,19 +270,27 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) */ if (unlikely(consumer_head - r->consumer_tail >= r->batch || consumer_head >= r->size)) {
int tail = r->consumer_tail;
if (unlikely(consumer_head >= r->size)) {
r->consumer_tail = 0;
WRITE_ONCE(r->consumer_head, 0);
} else {
r->consumer_tail = consumer_head;
WRITE_ONCE(r->consumer_head, consumer_head);
}
- /* Zero out entries in the reverse order: this way we touch the
*/
- cache line that producer might currently be reading the last;
- producer won't make progress and touch other cache lines
- besides the first one until we write out all entries.
while (likely(head >= r->consumer_tail))
r->queue[head--] = NULL;
r->consumer_tail = consumer_head;
- }
- if (unlikely(consumer_head >= r->size)) {
consumer_head = 0;
r->consumer_tail = 0;
while (likely(--consumer_head >= tail))
r->queue[consumer_head] = NULL;
}return;
- /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ WRITE_ONCE(r->consumer_head, consumer_head); }