第四节 同步原语 - Mutex

xxx

pub struct Mutex<T: ?Sized> {
    wq: AxWaitQueueHandle,
    owner_id: AtomicU64,
    data: UnsafeCell<T>,
}

pub struct MutexGuard<'a, T: ?Sized + 'a> {
    lock: &'a Mutex<T>,
    data: *mut T,
}

unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}

impl<T> Mutex<T> {
    /// Creates a new [`Mutex`] wrapping the supplied data.
    #[inline(always)]
    pub const fn new(data: T) -> Self {
        Self {
            wq: AxWaitQueueHandle::new(),
            owner_id: AtomicU64::new(0),
            data: UnsafeCell::new(data),
        }
    }

    /// Consumes this [`Mutex`] and unwraps the underlying data.
    #[inline(always)]
    pub fn into_inner(self) -> T {
        // We know statically that there are no outstanding references to
        // `self` so there's no need to lock.
        let Mutex { data, .. } = self;
        data.into_inner()
    }
}

impl<T: ?Sized> Mutex<T> {
    /// Returns `true` if the lock is currently held.
    ///
    /// # Safety
    ///
    /// This function provides no synchronization guarantees and so its result should be considered 'out of date'
    /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic.
    #[inline(always)]
    pub fn is_locked(&self) -> bool {
        self.owner_id.load(Ordering::Relaxed) != 0
    }
    pub fn lock(&self) -> MutexGuard<T> {
        let current_id = super::ax_current_task_id();
        loop {
            // Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock`
            // when called in a loop.
            match self.owner_id.compare_exchange_weak(
                0,
                current_id,
                Ordering::Acquire,
                Ordering::Relaxed,
            ) {
                Ok(_) => break,
                Err(owner_id) => {
                    assert_ne!(
                        owner_id, current_id,
                        "Thread({}) tried to acquire mutex it already owns.",
                        current_id,
                    );
                    // Wait until the lock looks unlocked before retrying
                    super::ax_wait_queue_wait(&self.wq, || !self.is_locked(), None);
                }
            }
        }
        MutexGuard {
            lock: self,
            data: unsafe { &mut *self.data.get() },
        }
    }
    pub unsafe fn force_unlock(&self) {
        let owner_id = self.owner_id.swap(0, Ordering::Release);
        let current_id = super::ax_current_task_id();
        assert_eq!(
            owner_id, current_id,
            "Thread({}) tried to release mutex it doesn't own",
            current_id,
        );
        // wake up one waiting thread.
        super::ax_wait_queue_wake(&self.wq, 1);
    }
    pub fn get_mut(&mut self) -> &mut T {
        // We know statically that there are no other references to `self`, so
        // there's no need to lock the inner mutex.
        unsafe { &mut *self.data.get() }
    }
}

impl<'a, T: ?Sized> Deref for MutexGuard<'a, T> {
    type Target = T;
    #[inline(always)]
    fn deref(&self) -> &T {
        // We know statically that only we are referencing data
        unsafe { &*self.data }
    }
}

impl<'a, T: ?Sized> DerefMut for MutexGuard<'a, T> {
    #[inline(always)]
    fn deref_mut(&mut self) -> &mut T {
        // We know statically that only we are referencing data
        unsafe { &mut *self.data }
    }
}

impl<'a, T: ?Sized> Drop for MutexGuard<'a, T> {
    /// The dropping of the [`MutexGuard`] will release the lock it was created from.
    fn drop(&mut self) {
        unsafe { self.lock.force_unlock() }
    }
}

xxx

pub fn ax_wait_queue_wait(
    wq: &AxWaitQueueHandle,
    until_condition: impl Fn() -> bool,
    timeout: Option<Duration>,
) -> bool {
    if let Some(_dur) = timeout {
        unimplemented!();
    }

    if timeout.is_some() {
        panic!("ax_wait_queue_wait: the `timeout` argument is ignored without the `irq` feature");
    }
    wq.0.wait_until(until_condition);
    false
}

pub fn ax_wait_queue_wake(wq: &AxWaitQueueHandle, count: u32) {
    if count == u32::MAX {
        wq.0.notify_all(true);
    } else {
        for _ in 0..count {
            wq.0.notify_one(true);
        }
    }
}

xxx