buttercrab's profile image

buttercrab

March 15, 2023 00:00

Rust의 async/await

async , Rust , tokio

서론

지난 글에서 다양한 언어에서의 async/await에 대해 알아보았습니다. 이번 글에서는 Rust의 async/await에 대해 더 자세히 알아보겠습니다.

std::future

Rust의 async/await는 std::future::Future를 기반으로 합니다. std::future::Future trait은 std::future에 정의되어 있습니다.

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Future trait은 poll 함수를 정의하고 있습니다. poll 함수는 Future가 준비되었는지를 확인합니다. Future가 준비되었다면 Poll::Ready를 반환하고, 아니라면 Poll::Pending을 반환합니다.

자세히 살펴보면 poll 함수는 selfPin<&mut Self>로 받고 있습니다. Pin의 정의를 볼까요?

std::pin::Pin

pub struct Pin<P> {
    pub pointer: P,
}

특별할게 없어 보입니다. 그럼 Pin의 역할은 무엇일까요? P는 포인터 타입으로 Pin은 포인터를 들고 있게 됩니다. 즉, 어떤 값의 포인터를 들고 있음으로써 그 값이 움직일 수 없게 만듭니다. 움직일 수 없는 값의 장점이 무엇일까요?

다음과 같은 코드가 있다고 가정해봅시다.


struct Foo {
    x: String,
    y: *mut String,
}

impl Foo {
    fn new(x: String) -> Self {
        let y = &x as *const String as *mut String;
        let foo = Foo { x, y };
        foo
    }
}

위 그림은 위 코드를 나타낸 도식입니다.

이 코드는 xy가 같은 값을 가리키도록 만듭니다. 그런데 러스트의 모든 값은 이동할 수 있으므로 Foox를 이동할 수 있습니다. 그러면 Fooxy는 같은 값을 가리키지 않게 됩니다. 이런 문제를 방지하기 위해 Pin을 사용합니다.

위 그림은 Foo가 이동했을 때의 값의 변화를 나타낸 그림입니다.

impl Foo {
    fn new(x: String) -> Pin<&mut Self> {
        let y = &x as *const String as *mut String;
        let foo = Foo { x, y };
        let foo = Box::new(foo);
        let foo = Box::leak(foo);
        let foo = unsafe { Pin::new_unchecked(foo) };
        foo
    }
}

이렇게 FooxPin으로 감싸면 Foox를 이동할 수 없게 됩니다. 이제 Fooxy는 항상 같은 값을 가리키게 됩니다.

std::future::Futurepoll

그럼 std::future::Futurepoll은 왜 Pin<&mut Self>를 받는 것일까요? 그 이유는 Future는 한번에 실행되는 것이 아니라 여러 번 실행될 수 있기 때문입니다. 각 실행 사이에 값이 이동할 수 있기 때문에 FuturePin으로 감싸져야 합니다.

Tokio

tokio는 Rust의 async/await를 위한 라이브러리입니다. 지난 글에서는 단순히 tokio가 어떻게 작동하는지를 알아보았고 이번 글에서는 tokio가 어떻게 구성되어 있는지를 알아보도록 하겠습니다.

먼저 가장 기본적인 api부터 알아보겠습니다.

#[tokio::main]
async fn main() {
    println!("Hello, world!");
}

#[tokio::main]은 tokio의 runtime을 생성하고, main 함수를 실행합니다. 정확히 어떻게 구성되어 있는지 cargo expand를 통해 확인해보겠습니다.

#![feature(prelude_import)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
fn main() {
    let body = async {
        {
            ::std::io::_print(format_args!("Hello, World!\n"));
        };
    };
    #[allow(clippy::expect_used, clippy::diverging_sub_expression)]
    {
        return tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .expect("Failed building the Runtime")
            .block_on(body);
    }
}

#[tokio::main]tokio::runtime::Builder::new_multi_thread().enable_all().build()를 통해 runtime을 생성하고, runtime.block_on(body)를 통해 main 함수를 실행합니다.

tokio::runtime::Builder는 tokio의 runtime을 생성하기 위한 builder입니다. new_multi_thread는 multi-threaded runtime을 생성하기 위한 builder를 생성합니다. enable_all은 multi-threaded runtime에서 사용할 모든 feature를 활성화합니다. build는 builder를 통해 생성한 runtime을 생성합니다.

runtime.block_on(body)body를 실행하기 위해 runtime을 block합니다.

block_on의 구조를 살펴보겠습니다.

impl Runtime {
    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
        let _enter = self.enter();

        match &self.scheduler {
            Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
            Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
        }
    }
}

위 코드는 tokio::runtime::Runtimeblock_on를 간략히 요약한 것입니다. block_onself.scheduler에 따라 CurrentThreadblock_on 또는 MultiThreadblock_on을 실행합니다. MultiThreadblock_on도 계속 많은 block_on을 호출해 결국 CachedParkThreadblock_on을 호출합니다.

CachedParkThreadblock_on을 살펴보겠습니다.

impl CachedParkThread {
    pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
        let waker = self.waker()?;
        let mut cx = Context::from_waker(&waker);

        pin!(f);

        loop {
            if let Ready(v) = f.as_mut().poll(&mut cx) {
                return Ok(v);
            }

            self.park();
        }
    }
}

위 코드는 tokio::runtime::park::CachedParkThreadblock_on을 간략히 요약한 것입니다. block_on 함수는 단순히 반복문을 돌면서 future을 계속 poll하고 있습니다. 그리고 future가 끝나면 종료를 하는 단순한 알고리즘입니다. 그런데, wakerpark가 무엇일까요?

Thread Parking

park는 thread를 park하는 것을 의미합니다. 이는 thread가 다른 thread에게 cpu를 양보하도록 합니다. Thread yield와 비슷한 개념입니다. 하지만 yield는 현재 thread를 지금 양보하는 것입니다. 즉, scheduler에 의해 언제든지 다시 실행될 수 있습니다. 반면, park는 현재 thread를 양보하고 다시 실행되지 않도록 합니다. 즉, scheduler에 의해 다시 실행되지 않습니다.

그러면 언제 다시 실행될까요? unpark가 호출될 때까지 park 상태를 유지합니다. unpark는 다른 thread가 park된 thread를 다시 실행하도록 합니다. 그러면 park는 현재 쓰레드를 park하는 것이고 unpark는 다른 쓰레드가 park된 쓰레드를 다시 실행하는 것입니다. 물론 unpark한 thread가 곧바로 실행되는 것은 아닙니다. Scheduler에 의해 실행될 때 실행됩니다.

위 그림은 설명을 그림으로 간단히 설명한 것입니다.

그럼 yield를 하지 않고 park을 하는 이유는 무엇일까요? 이는 IO를 기다리기 때문입니다. IO가 완료되어야 다시 실행되도록 하기 위해 park을 사용합니다.

Waker

그럼 언제 unpark가 호출될까요? unparkwaker를 통해 호출됩니다. wakerFuturepoll할 때 전달되는 Context 안에 들어있습니다. 요청한 IO가 완료되면 waker를 통해 unpark이 호출됩니다.

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if Instant::now() >= self.when {
            Poll::Ready(())
        } else {
            let waker = cx.waker().clone();
            let when = self.when;

            thread::spawn(move || {
                let now = Instant::now();

                if now < when {
                    thread::sleep(when - now);
                }

                waker.wake();
            });

            Poll::Pending
        }
    }
}

위 코드는 정해진 시간 동안 sleep을 하는 Delay future입니다. 새로운 thread를 생성하여 when 시간이 되면 waker를 통해 unpark을 호출합니다.

waker를 조금 더 자세히 볼까요?

pub struct RawWakerVTable {
    clone: unsafe fn(*const ()) -> RawWaker,
    wake: unsafe fn(*const ()),
    wake_by_ref: unsafe fn(*const ()),
    drop: unsafe fn(*const ()),
}

pub struct RawWaker {
    data: *const (),
    vtable: &'static RawWakerVTable,
}

pub struct Waker {
    waker: RawWaker,
}

impl Waker {
    pub fn wake(self) {
        unsafe { (self.waker.vtable.wake)(self.waker.data) }
    }

    pub fn wake_by_ref(&self) {
        unsafe { (self.waker.vtable.wake_by_ref)(self.waker.data) }
    }
}

위 코드는 std::task::Waker의 코드입니다.

wakerRawWakerRawWakerVTable로 구성되어 있습니다. RawWakerVTableclone, wake, wake_by_ref, drop 함수 포인터를 가지고 있습니다. 즉, waker는 틀인 셈입니다. 직접 함수를 구현해서 waker를 만드는 방식입니다. tokio에서는 이를 Thread parking으로 구현한 것입니다.

Mio

miotokio의 IO를 다루는 라이브러리입니다. tokio에서 future의 원리를 알았지만 언제 IO가 끝나는지 알고 waker를 호출할까요? mio가 이를 담당합니다.

Non-blocking IO

Non-blocking IO는 IO가 끝나지 않아도 다른 작업을 할 수 있도록 하는 것입니다.

int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);

위 C 코드는 fd에 대해 non-blocking IO를 설정하는 코드입니다. 이렇게 되면 fd를 사용한 함수는 바로 리턴을 하고 IO가 끝나면 fd에 대한 이벤트가 발생합니다.

epoll

epoll은 Linux에서 IO 이벤트를 다루는 방법입니다. 앞서 non-blocking IO를 설정하면 IO가 끝나면 fd에 대한 이벤트가 발생하는데, 이를 epoll을 통해 다룰 수 있습니다. epollfd에 대한 이벤트를 관리하는데, fd에 대한 이벤트가 발생하면 epoll에 등록된 콜백을 호출합니다.

int epoll_fd = epoll_create1(0);
event.data.fd = fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);

struct epoll_event events[10];
// ...

while (1) {
    int n = epoll_wait(epoll_fd, events, 10, -1);
    // ...
}

위 C 코드는 epoll을 사용하는 코드입니다. epoll_create1을 통해 epoll을 생성하고 epoll_ctl을 통해 fd에 대한 이벤트를 등록합니다. epoll_wait를 통해 epoll에 등록된 이벤트를 기다립니다. 이벤트가 발생하면 epoll_wait는 리턴하고 epoll에 등록된 콜백을 호출합니다.

Mio에서의 epoll

mio는 내부적으로 epoll을 사용하여 IO를 기다리고, 이벤트가 발생하면 waker를 호출합니다. epoll은 Linux에서만 사용할 수 있습니다. 그래서 mio는 Linux에서는 epoll, Windows에서는 IOCP, macOS에서는 kqueue를 사용합니다. 이렇게 다양한 API를 하나로 통일하기 위해 mio를 사용합니다.

결론

tokiofuture를 사용하여 비동기 프로그래밍을 하고, mio를 사용하여 IO를 다룹니다. 내부적으로는 복잡한 방법을 통해 비동기 프로그래밍을 하지만, future를 사용하면 간단하게 비동기 프로그래밍을 할 수 있습니다.