서론
지난 글에서 다양한 언어에서의 async/await에 대해 알아보았습니다. 이번 글에서는 Rust의 async/await에 대해 더 자세히 알아보겠습니다.
std::future
Rust의 async/await는 std::future::Future
를 기반으로 합니다.
std::future::Future
trait은 std::future
에 정의되어 있습니다.
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Future
trait은 poll
함수를 정의하고 있습니다.
poll
함수는 Future
가 준비되었는지를 확인합니다.
Future
가 준비되었다면 Poll::Ready
를 반환하고, 아니라면 Poll::Pending
을 반환합니다.
자세히 살펴보면 poll
함수는 self
를 Pin<&mut Self>
로 받고 있습니다.
Pin
의 정의를 볼까요?
std::pin::Pin
pub struct Pin<P> {
pub pointer: P,
}
특별할게 없어 보입니다.
그럼 Pin
의 역할은 무엇일까요?
P
는 포인터 타입으로 Pin
은 포인터를 들고 있게 됩니다.
즉, 어떤 값의 포인터를 들고 있음으로써 그 값이 움직일 수 없게 만듭니다.
움직일 수 없는 값의 장점이 무엇일까요?
다음과 같은 코드가 있다고 가정해봅시다.
struct Foo {
x: String,
y: *mut String,
}
impl Foo {
fn new(x: String) -> Self {
let y = &x as *const String as *mut String;
let foo = Foo { x, y };
foo
}
}
위 그림은 위 코드를 나타낸 도식입니다.
이 코드는 x
와 y
가 같은 값을 가리키도록 만듭니다.
그런데 러스트의 모든 값은 이동할 수 있으므로 Foo
의 x
를 이동할 수 있습니다.
그러면 Foo
의 x
와 y
는 같은 값을 가리키지 않게 됩니다.
이런 문제를 방지하기 위해 Pin
을 사용합니다.
위 그림은 Foo
가 이동했을 때의 값의 변화를 나타낸 그림입니다.
impl Foo {
fn new(x: String) -> Pin<&mut Self> {
let y = &x as *const String as *mut String;
let foo = Foo { x, y };
let foo = Box::new(foo);
let foo = Box::leak(foo);
let foo = unsafe { Pin::new_unchecked(foo) };
foo
}
}
이렇게 Foo
의 x
를 Pin
으로 감싸면 Foo
의 x
를 이동할 수 없게 됩니다.
이제 Foo
의 x
와 y
는 항상 같은 값을 가리키게 됩니다.
std::future::Future
의 poll
그럼 std::future::Future
의 poll
은 왜 Pin<&mut Self>
를 받는 것일까요?
그 이유는 Future
는 한번에 실행되는 것이 아니라 여러 번 실행될 수 있기 때문입니다.
각 실행 사이에 값이 이동할 수 있기 때문에 Future
는 Pin
으로 감싸져야 합니다.
Tokio
tokio는 Rust의 async/await를 위한 라이브러리입니다. 지난 글에서는 단순히 tokio가 어떻게 작동하는지를 알아보았고 이번 글에서는 tokio가 어떻게 구성되어 있는지를 알아보도록 하겠습니다.
먼저 가장 기본적인 api부터 알아보겠습니다.
#[tokio::main]
async fn main() {
println!("Hello, world!");
}
#[tokio::main]
은 tokio의 runtime을 생성하고, main
함수를 실행합니다.
정확히 어떻게 구성되어 있는지 cargo expand
를 통해 확인해보겠습니다.
#![feature(prelude_import)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
fn main() {
let body = async {
{
::std::io::_print(format_args!("Hello, World!\n"));
};
};
#[allow(clippy::expect_used, clippy::diverging_sub_expression)]
{
return tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(body);
}
}
#[tokio::main]
은 tokio::runtime::Builder::new_multi_thread().enable_all().build()
를 통해 runtime을 생성하고, runtime.block_on(body)
를 통해 main
함수를 실행합니다.
tokio::runtime::Builder
는 tokio의 runtime을 생성하기 위한 builder입니다.
new_multi_thread
는 multi-threaded runtime을 생성하기 위한 builder를 생성합니다.
enable_all
은 multi-threaded runtime에서 사용할 모든 feature를 활성화합니다.
build
는 builder를 통해 생성한 runtime을 생성합니다.
runtime.block_on(body)
는 body
를 실행하기 위해 runtime을 block합니다.
block_on
의 구조를 살펴보겠습니다.
impl Runtime {
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let _enter = self.enter();
match &self.scheduler {
Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
}
}
}
위 코드는 tokio::runtime::Runtime
의 block_on
를 간략히 요약한 것입니다.
block_on
은 self.scheduler
에 따라 CurrentThread
의 block_on
또는 MultiThread
의 block_on
을 실행합니다.
MultiThread
의 block_on
도 계속 많은 block_on
을 호출해 결국 CachedParkThread
의 block_on
을 호출합니다.
CachedParkThread
의 block_on
을 살펴보겠습니다.
impl CachedParkThread {
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
let waker = self.waker()?;
let mut cx = Context::from_waker(&waker);
pin!(f);
loop {
if let Ready(v) = f.as_mut().poll(&mut cx) {
return Ok(v);
}
self.park();
}
}
}
위 코드는 tokio::runtime::park::CachedParkThread
의 block_on
을 간략히 요약한 것입니다.
block_on
함수는 단순히 반복문을 돌면서 future을 계속 poll
하고 있습니다.
그리고 future가 끝나면 종료를 하는 단순한 알고리즘입니다.
그런데, waker
와 park
가 무엇일까요?
Thread Parking
park
는 thread를 park하는 것을 의미합니다.
이는 thread가 다른 thread에게 cpu를 양보하도록 합니다.
Thread yield와 비슷한 개념입니다.
하지만 yield
는 현재 thread를 지금 양보하는 것입니다.
즉, scheduler에 의해 언제든지 다시 실행될 수 있습니다.
반면, park
는 현재 thread를 양보하고 다시 실행되지 않도록 합니다.
즉, scheduler에 의해 다시 실행되지 않습니다.
그러면 언제 다시 실행될까요?
unpark
가 호출될 때까지 park 상태를 유지합니다.
unpark
는 다른 thread가 park
된 thread를 다시 실행하도록 합니다.
그러면 park
는 현재 쓰레드를 park
하는 것이고 unpark
는 다른 쓰레드가 park
된 쓰레드를 다시 실행하는 것입니다.
물론 unpark
한 thread가 곧바로 실행되는 것은 아닙니다.
Scheduler에 의해 실행될 때 실행됩니다.
위 그림은 설명을 그림으로 간단히 설명한 것입니다.
그럼 yield
를 하지 않고 park
을 하는 이유는 무엇일까요?
이는 IO를 기다리기 때문입니다.
IO가 완료되어야 다시 실행되도록 하기 위해 park
을 사용합니다.
Waker
그럼 언제 unpark
가 호출될까요?
unpark
는 waker
를 통해 호출됩니다.
waker
는 Future
가 poll
할 때 전달되는 Context
안에 들어있습니다.
요청한 IO가 완료되면 waker
를 통해 unpark
이 호출됩니다.
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if Instant::now() >= self.when {
Poll::Ready(())
} else {
let waker = cx.waker().clone();
let when = self.when;
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
}
위 코드는 정해진 시간 동안 sleep을 하는 Delay
future입니다.
새로운 thread를 생성하여 when
시간이 되면 waker
를 통해 unpark
을 호출합니다.
waker
를 조금 더 자세히 볼까요?
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}
pub struct RawWaker {
data: *const (),
vtable: &'static RawWakerVTable,
}
pub struct Waker {
waker: RawWaker,
}
impl Waker {
pub fn wake(self) {
unsafe { (self.waker.vtable.wake)(self.waker.data) }
}
pub fn wake_by_ref(&self) {
unsafe { (self.waker.vtable.wake_by_ref)(self.waker.data) }
}
}
위 코드는 std::task::Waker
의 코드입니다.
waker
는 RawWaker
와 RawWakerVTable
로 구성되어 있습니다.
RawWakerVTable
은 clone
, wake
, wake_by_ref
, drop
함수 포인터를 가지고 있습니다.
즉, waker
는 틀인 셈입니다.
직접 함수를 구현해서 waker
를 만드는 방식입니다.
tokio
에서는 이를 Thread parking으로 구현한 것입니다.
Mio
mio는 tokio
의 IO를 다루는 라이브러리입니다.
tokio
에서 future
의 원리를 알았지만 언제 IO가 끝나는지 알고 waker
를 호출할까요?
mio
가 이를 담당합니다.
Non-blocking IO
Non-blocking IO는 IO가 끝나지 않아도 다른 작업을 할 수 있도록 하는 것입니다.
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
위 C 코드는 fd
에 대해 non-blocking IO를 설정하는 코드입니다.
이렇게 되면 fd
를 사용한 함수는 바로 리턴을 하고 IO가 끝나면 fd
에 대한 이벤트가 발생합니다.
epoll
epoll
은 Linux에서 IO 이벤트를 다루는 방법입니다.
앞서 non-blocking IO를 설정하면 IO가 끝나면 fd
에 대한 이벤트가 발생하는데, 이를 epoll
을 통해 다룰 수 있습니다.
epoll
은 fd
에 대한 이벤트를 관리하는데, fd
에 대한 이벤트가 발생하면 epoll
에 등록된 콜백을 호출합니다.
int epoll_fd = epoll_create1(0);
event.data.fd = fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);
struct epoll_event events[10];
// ...
while (1) {
int n = epoll_wait(epoll_fd, events, 10, -1);
// ...
}
위 C 코드는 epoll
을 사용하는 코드입니다.
epoll_create1
을 통해 epoll
을 생성하고 epoll_ctl
을 통해 fd
에 대한 이벤트를 등록합니다.
epoll_wait
를 통해 epoll
에 등록된 이벤트를 기다립니다.
이벤트가 발생하면 epoll_wait
는 리턴하고 epoll
에 등록된 콜백을 호출합니다.
Mio에서의 epoll
mio는 내부적으로 epoll
을 사용하여 IO를 기다리고, 이벤트가 발생하면 waker
를 호출합니다.
epoll
은 Linux에서만 사용할 수 있습니다.
그래서 mio는 Linux에서는 epoll
, Windows에서는 IOCP
, macOS에서는 kqueue
를 사용합니다.
이렇게 다양한 API를 하나로 통일하기 위해 mio
를 사용합니다.
결론
tokio
는 future
를 사용하여 비동기 프로그래밍을 하고, mio
를 사용하여 IO를 다룹니다.
내부적으로는 복잡한 방법을 통해 비동기 프로그래밍을 하지만, future
를 사용하면 간단하게 비동기 프로그래밍을 할 수 있습니다.