第四节 同步原语 - Mutex
xxx
pub struct Mutex<T: ?Sized> {
wq: AxWaitQueueHandle,
owner_id: AtomicU64,
data: UnsafeCell<T>,
}
pub struct MutexGuard<'a, T: ?Sized + 'a> {
lock: &'a Mutex<T>,
data: *mut T,
}
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
impl<T> Mutex<T> {
/// Creates a new [`Mutex`] wrapping the supplied data.
#[inline(always)]
pub const fn new(data: T) -> Self {
Self {
wq: AxWaitQueueHandle::new(),
owner_id: AtomicU64::new(0),
data: UnsafeCell::new(data),
}
}
/// Consumes this [`Mutex`] and unwraps the underlying data.
#[inline(always)]
pub fn into_inner(self) -> T {
// We know statically that there are no outstanding references to
// `self` so there's no need to lock.
let Mutex { data, .. } = self;
data.into_inner()
}
}
impl<T: ?Sized> Mutex<T> {
/// Returns `true` if the lock is currently held.
///
/// # Safety
///
/// This function provides no synchronization guarantees and so its result should be considered 'out of date'
/// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic.
#[inline(always)]
pub fn is_locked(&self) -> bool {
self.owner_id.load(Ordering::Relaxed) != 0
}
pub fn lock(&self) -> MutexGuard<T> {
let current_id = super::ax_current_task_id();
loop {
// Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock`
// when called in a loop.
match self.owner_id.compare_exchange_weak(
0,
current_id,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(owner_id) => {
assert_ne!(
owner_id, current_id,
"Thread({}) tried to acquire mutex it already owns.",
current_id,
);
// Wait until the lock looks unlocked before retrying
super::ax_wait_queue_wait(&self.wq, || !self.is_locked(), None);
}
}
}
MutexGuard {
lock: self,
data: unsafe { &mut *self.data.get() },
}
}
pub unsafe fn force_unlock(&self) {
let owner_id = self.owner_id.swap(0, Ordering::Release);
let current_id = super::ax_current_task_id();
assert_eq!(
owner_id, current_id,
"Thread({}) tried to release mutex it doesn't own",
current_id,
);
// wake up one waiting thread.
super::ax_wait_queue_wake(&self.wq, 1);
}
pub fn get_mut(&mut self) -> &mut T {
// We know statically that there are no other references to `self`, so
// there's no need to lock the inner mutex.
unsafe { &mut *self.data.get() }
}
}
impl<'a, T: ?Sized> Deref for MutexGuard<'a, T> {
type Target = T;
#[inline(always)]
fn deref(&self) -> &T {
// We know statically that only we are referencing data
unsafe { &*self.data }
}
}
impl<'a, T: ?Sized> DerefMut for MutexGuard<'a, T> {
#[inline(always)]
fn deref_mut(&mut self) -> &mut T {
// We know statically that only we are referencing data
unsafe { &mut *self.data }
}
}
impl<'a, T: ?Sized> Drop for MutexGuard<'a, T> {
/// The dropping of the [`MutexGuard`] will release the lock it was created from.
fn drop(&mut self) {
unsafe { self.lock.force_unlock() }
}
}
xxx
pub fn ax_wait_queue_wait(
wq: &AxWaitQueueHandle,
until_condition: impl Fn() -> bool,
timeout: Option<Duration>,
) -> bool {
if let Some(_dur) = timeout {
unimplemented!();
}
if timeout.is_some() {
panic!("ax_wait_queue_wait: the `timeout` argument is ignored without the `irq` feature");
}
wq.0.wait_until(until_condition);
false
}
pub fn ax_wait_queue_wake(wq: &AxWaitQueueHandle, count: u32) {
if count == u32::MAX {
wq.0.notify_all(true);
} else {
for _ in 0..count {
wq.0.notify_one(true);
}
}
}
xxx