第四节 任务退出和等待任务退出

上节中我们遗留了关于任务退出与等待其它任务退出的问题,这个问题的复杂性在于:任务有两个角色,一方面任务一定会在某个时刻退出,另一方面某个任务可能在运行中阻塞等待另一个任务的退出。关系如下:

wait-for-task

至于任务之间是如何形成这样一种相互等待关系的?回顾上一节开头的流程图,MainTask 对 AppTask 调用 join,建立等待关系,然后把自己状态设置为 Blocked,从运行队列 run_queue 转移到等待队列 wait_queue,然后触发重新调度让出执行权。直到 AppTask 退出时,MainTask 作为等待者被重新唤醒,继续执行。

为实现上述功能,我们需要为 Task 增加一个等待者列表 waiting list,记录那些等待它退出的其它 Task。修改 Task 结构:

pub struct Task {
    ... ...
    wait_for_exit: WaitQueue,
    exit_code: AtomicI32,
    ... ...
}

impl Task {
    pub(crate) fn notify_exit(&self, exit_code: i32, rq: &mut AxRunQueue) {
        self.exit_code.store(exit_code, Ordering::Release);
        self.wait_for_exit.notify_all_locked(rq);
    }
    pub fn join(&self) -> Option<i32> {
        self.wait_for_exit.wait_until(|| self.state() == TaskState::Exited);
        Some(self.exit_code.load(Ordering::Acquire))
    }
}

结构 Task 中增加 wait_for_exit 等待列表和退出码 exit_code。一方面实现 join 方法,挂起当前以等待目标任务的退出。相应的,实现 notify_exit 这个方法,用于记录退出码并通知所有等待者。

数据结构 WaitQueue 是类似于 AxRunQueue 的一个特殊队列,用于实现任务的等待与唤醒。

// axtask/src/wait_queue.rs
pub struct WaitQueue {
    queue: SpinRaw<VecDeque<AxTaskRef>>, // we already disabled IRQs when lock the `RUN_QUEUE`
}

impl WaitQueue {
    pub const fn new() -> Self {
        Self {
            queue: SpinRaw::new(VecDeque::new()),
        }
    }
    
    pub fn wait_until<F>(&self, condition: F)
    where
        F: Fn() -> bool,
    {
        loop {
            let mut rq = RUN_QUEUE.lock();
            if condition() {
                break;
            }
            rq.block_current(|task| {
                self.queue.lock().push_back(task);
            });
        }
    }

    pub(crate) fn notify_all_locked(&self, rq: &mut AxRunQueue) {
        while let Some(task) = self.queue.lock().pop_front() {
            rq.unblock_task(task);
        }
    }
}

// axtask/src/run_queue.rs
impl AxRunQueue {
    pub fn unblock_task(&mut self, task: AxTaskRef) {
        if task.is_blocked() {
            task.set_state(TaskState::Ready);
            self.add_task(task);
        }
    }
    pub fn block_current<F>(&mut self, wait_queue_push: F)
    where
        F: FnOnce(AxTaskRef),
    {
        let curr = current();
        curr.set_state(TaskState::Blocked);
        wait_queue_push(curr.clone());
        self.resched(false);
    }
}

从上面的实现可以看出,当前任务 CurrentTask 调用 join,进而通过 WaitQueue::wait_until 进入等待队列中阻塞等待;直到所等待的任务退出,被从等待队列中唤醒,再回到运行队列中准备被调度,这步最后是通过AxRunQueue::unblock_task(...) 实现的。

最后来给出 exit 的完整功能,接上节对 exit_current 的实现:

// axtask/src/run_queue.rs
impl AxRunQueue {
    pub fn exit_current(&mut self, exit_code: i32) -> ! {
        let curr = current();
        if curr.is_init() {
            axhal::misc::terminate();
        } else {
            curr.set_state(TaskState::Exited);
			// Save exit code and notify kernel to reclaim itself.
            // [Begin]
            curr.notify_exit(exit_code, self);
            // [End]
            self.resched(false);
        }
        unreachable!("task exited!");
    }
}

只是新增 [Begin]...[End] 中间的一行,任务调用 notify_exit 通知所有等待者一个消息 - “它正在退出”。

测试一下退出与等待功能,测试通过!