Zrythm v2.0.0-DEV
a highly automated and intuitive digital audio workstation
Loading...
Searching...
No Matches
mpmc_queue.h
1// SPDX-FileCopyrightText: © 2024 Alexandros Theodotou <alex@zrythm.org>
2// SPDX-License-Identifier: LicenseRef-ZrythmLicense
3/*
4 * This file incorporates work covered by the following copyright and
5 * permission notice:
6 *
7 * ---
8 *
9 * Copyright (C) 2010-2011 Dmitry Vyukov
10 * Copyright (C) 2017, 2019 Robin Gareus <robin@gareus.org>
11 *
12 * This program is free software: you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation, either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with this program. If not, see <https://www.gnu.org/licenses/>.
24 *
25 * ---
26 */
27
28#pragma once
29
30#include <cassert>
31#include <cstddef>
32
33#define MPMC_USE_STD_ATOMIC 1
34
35#if MPMC_USE_STD_ATOMIC
36# include <atomic>
37# define MPMC_QUEUE_TYPE std::atomic<size_t>
38#else
39# define MPMC_QUEUE_TYPE unsigned int
40#endif
41
47
64template <typename T> class MPMCQueue
65{
66public:
67 MPMCQueue (size_t buffer_size = 8) { reserve (buffer_size); }
68
69 ~MPMCQueue () { delete[] _buffer; }
70
71 size_t capacity () const { return _buffer_mask + 1; }
72
73 static size_t power_of_two_size (size_t sz)
74 {
75 int32_t power_of_two;
76 for (power_of_two = 1; 1U << power_of_two < sz; ++power_of_two)
77 ;
78 return 1U << power_of_two;
79 }
80
81 void reserve (size_t buffer_size)
82 {
83 buffer_size = power_of_two_size (buffer_size);
84 assert ((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0));
85 if (_buffer_mask >= buffer_size - 1)
86 {
87 return;
88 }
89 delete[] _buffer;
90 _buffer = new cell_t[buffer_size];
91 _buffer_mask = buffer_size - 1;
92 clear ();
93 }
94
95 void clear ()
96 {
97 for (size_t i = 0; i <= _buffer_mask; ++i)
98 {
99 _buffer[i]._sequence.store (i, std::memory_order_relaxed);
100 }
101 _enqueue_pos.store (0, std::memory_order_relaxed);
102 _dequeue_pos.store (0, std::memory_order_relaxed);
103 }
104
105 bool push_back (T const &data)
106 {
107 cell_t * cell;
108 size_t pos = _enqueue_pos.load (std::memory_order_relaxed);
109
110 for (;;)
111 {
112 cell = &_buffer[pos & _buffer_mask];
113 size_t seq = cell->_sequence.load (std::memory_order_acquire);
114 intptr_t dif = (intptr_t) seq - (intptr_t) pos;
115 if (dif == 0)
116 {
117 if (_enqueue_pos.compare_exchange_weak (
118 pos, pos + 1, std::memory_order_relaxed))
119 {
120 break;
121 }
122 }
123 else if (dif < 0)
124 {
125 return false;
126 }
127 else
128 {
129 pos = _enqueue_pos.load (std::memory_order_relaxed);
130 }
131 }
132
133 cell->_data = data;
134 cell->_sequence.store (pos + 1, std::memory_order_release);
135
136 return true;
137 }
138
139 bool pop_front (T &data)
140 {
141 cell_t * cell;
142 size_t pos = _dequeue_pos.load (std::memory_order_relaxed);
143
144 for (;;)
145 {
146 cell = &_buffer[pos & _buffer_mask];
147 size_t seq = cell->_sequence.load (std::memory_order_acquire);
148 intptr_t dif = (intptr_t) seq - (intptr_t) (pos + 1);
149 if (dif == 0)
150 {
151 if (_dequeue_pos.compare_exchange_weak (
152 pos, pos + 1, std::memory_order_relaxed))
153 {
154 break;
155 }
156 }
157 else if (dif < 0)
158 {
159 return false;
160 }
161 else
162 {
163 pos = _dequeue_pos.load (std::memory_order_relaxed);
164 }
165 }
166
167 data = cell->_data;
168 cell->_sequence.store (pos + _buffer_mask + 1, std::memory_order_release);
169 return true;
170 }
171
172private:
173 struct cell_t
174 {
175 MPMC_QUEUE_TYPE _sequence;
176 T _data;
177 };
178
179 char _pad0[64] = {};
180 cell_t * _buffer{ nullptr };
181 size_t _buffer_mask{};
182 char _pad1[64 - sizeof (cell_t *) - sizeof (size_t)] = {};
183 MPMC_QUEUE_TYPE _enqueue_pos{ 0 };
184 char _pad2[64 - sizeof (size_t)] = {};
185 MPMC_QUEUE_TYPE _dequeue_pos = 0;
186 char _pad3[64 - sizeof (size_t)] = {};
187};
188