< Return to Blog

Spawn tasks and talk to them via a channel with Axum

To share some background on my particular use-case, I'm processing a large CSV file (somewhere in the 800MB range) and this task is triggered by a post request from AWS S3.
The Axum request handler needs to return a 200 success response instantly and let the long running task proceed on a spawned thread, leaving the main thread (running the Axum server) free to proceed.

Inside the Axum request handler 

I will first show you my implementation of this, and discuss it afterwards.  Here's the block in for the request handler.  Notice the important call here, which is task_handle.run_task(&masterfile_key).await.
pub async fn handle_trigger( Extension(task_handle): Extension<crate::MyActorHandle>, extract::Json(payload): extract::Json<PostTrigger>, ) -> Result<Response, Error> { tracing::info!( "Starting to run Crawler at: {}", crate::models::get_utc_timestamp_as_rfc3339() ); let masterfile_key = match payload.object.key { Some(value) => { if value.is_empty() { return Err(RequestNotSuccessful::new( StatusCode::BAD_REQUEST, "AWS IoT Analytics S3 masterfile key in POST request is blank!", ) .into()); } value } None => { return Err(RequestNotSuccessful::new( StatusCode::BAD_REQUEST, "AWS IoT Analytics S3 masterfile key missing in POST request!", ) .into()) } }; task_handle.run_task(&masterfile_key).await; tracing::info!("POST /v1/trigger request handler completed"); Ok(Json(json!({ "status": "success" })).into_response()) }

Inside main()

This is where we spawn our Axum server and also background tasks.
#[tokio::main] async fn main() -> Result<(), Error> { dotenv().ok(); // Set the RUST_LOG, if it hasn't been explicitly defined if std::env::var_os("RUST_LOG").is_none() { std::env::set_var("RUST_LOG", "example-rs=info,tower_http=debug") } tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_file(true) .with_line_number(true) .init(); let config = config::config().await?; // Spin up our API let addr = env::var("SERVER_BIND_HOST_PORT").expect("SERVER_BIND_HOST_PORT is missing!"); tracing::info!("[ OK ]: Listening on {}", addr); let (sender, receiver) = mpsc::channel(8); let mut actor = MyActor::new(receiver); let actor_handle = MyActorHandle { sender }; let config_arc = Arc::new(Mutex::new(config.clone())); let backend = async move { http::serve(&config, &addr, actor_handle).await }; tokio::spawn({ let c = Arc::clone(&config_arc); async move { actor.run(&c).await } }); tokio::join!(backend); Ok(()) }
There is nothing stopping you from having multiple threads and multiple handlers, accordingly.  Let's digest this in smaller chunks.
Inside main we are creating our MPSC (Multiple producer, Single consumer) channel. It provides both a tx and rx that we call sender and receiver. Two instances are created here, (i) MyActor and (ii) MyActorHandle.
let (sender, receiver) = mpsc::channel(8); let mut actor = MyActor::new(receiver); let actor_handle = MyActorHandle { sender };
Let's look at the actor handle first.  The MyActorHandle record struct has a sender field, which is of type mpsc::Sender<ActorMessage>.  And we can see that ActorMessage is an enum with a RunTask struct variant with its own field as well.
We can see that task_handle.run_task) instantiates the RunTask variant and uses its sender to send the RunTask message to the channel (our Actor).
#[derive(Debug)] enum ActorMessage { RunTask { bucket_key: String }, } impl fmt::Display for ActorMessage { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ActorMessage::RunTask { bucket_key } => { write!(f, "ActorMessage::RunTask: {{ bucket_key: {} }}", bucket_key) } } } } #[derive(Clone)] pub struct MyActorHandle { sender: mpsc::Sender<ActorMessage>, } impl MyActorHandle { pub async fn run_task(&self, masterfile_key: &str) { let msg = ActorMessage::RunTask { bucket_key: masterfile_key.to_string(), }; if (self.sender.send(msg).await).is_err() { tracing::info!("receiver dropped"); assert!(self.sender.is_closed()); } } }
We already previously instantiated MyActor with the channel receiver in main().  Creating a new instance simply sets it up with the receiver. In main() we run the Actor in its own thread
tokio::spawn({ let c = Arc::clone(&config_arc); async move { actor.run(&c).await } });
Notice how the actor.run() function loops over self.receiver.recv().await, which will break when a None is returned. This is where the message send from MyActorHandle is processed.
We then run our background task in self.handle_message().
struct MyActor { receiver: mpsc::Receiver<ActorMessage>, } impl MyActor { fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self { MyActor { receiver, } } async fn handle_message(&mut self, msg: ActorMessage, config: &AppConfig) -> Result<(), Error> { match msg { ActorMessage::RunTask { bucket_key } => { tracing::info!("Running task..."); match Crawler::process(&config, &bucket_key).await { Ok(_) => tracing::info!("Done processing..."), Err(err) => tracing::error!("Task execution error: {}", err), }; } } Ok(()) } async fn run(&mut self, config_arc: &Arc<tokio::sync::Mutex<AppConfig>>) -> Result<(), Error> { let config = match config_arc.try_lock() { Ok(value) => value, Err(err) => return Err(Error::new_internal_error().with(err)), }; while let Some(msg) = self.receiver.recv().await { tracing::info!("Receiver: {}", &msg); self.handle_message(msg, &config).await?; } Ok(()) } }
The most crucial aspect above is how we handle errors when the background task fails.  We can do anything we like, but we need to prevent a failure (panic) return, as this will kill the background thread; Future requests handled by Axum will get errors logs due to tracing::info!("receiver dropped") if a panic takes place.
match theoretical_background_task.await { Ok(_) => tracing::info!("Done processing..."), Err(err) => tracing::error!("Task execution error: {}", err), }; // We don't want to do this. match theoretical_background_task.await { Ok(_) => tracing::info!("Done processing..."), Err(err) => return Err(/* ... */), // this will cause the rx receiver to drop };

Here's everything in main.rs

struct MyActor { receiver: mpsc::Receiver<ActorMessage>, } #[derive(Debug)] enum ActorMessage { RunTask { bucket_key: String }, } impl fmt::Display for ActorMessage { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ActorMessage::RunTask { bucket_key } => { write!(f, "ActorMessage::RunTask: {{ bucket_key: {} }}", bucket_key) } } } } impl MyActor { fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self { MyActor { receiver, } } async fn handle_message(&mut self, msg: ActorMessage, config: &AppConfig) -> Result<(), Error> { match msg { ActorMessage::RunTask { bucket_key } => { tracing::info!("Running task..."); match Crawler::process(&config, &bucket_key).await { Ok(_) => tracing::info!("Done processing..."), Err(err) => tracing::error!("Task execution error: {}", err), }; } } Ok(()) } async fn run(&mut self, config_arc: &Arc<tokio::sync::Mutex<AppConfig>>) -> Result<(), Error> { let config = match config_arc.try_lock() { Ok(value) => value, Err(err) => return Err(Error::new_internal_error().with(err)), }; while let Some(msg) = self.receiver.recv().await { tracing::info!("Receiver: {}", &msg); self.handle_message(msg, &config).await?; } Ok(()) } } #[derive(Clone)] pub struct MyActorHandle { sender: mpsc::Sender<ActorMessage>, } impl MyActorHandle { pub async fn run_task(&self, masterfile_key: &str) { let msg = ActorMessage::RunTask { bucket_key: masterfile_key.to_string(), }; if (self.sender.send(msg).await).is_err() { tracing::info!("receiver dropped"); assert!(self.sender.is_closed()); } } } #[tokio::main] async fn main() -> Result<(), Error> { dotenv().ok(); // Set the RUST_LOG, if it hasn't been explicitly defined if std::env::var_os("RUST_LOG").is_none() { std::env::set_var("RUST_LOG", "example-rs=info,tower_http=debug") } tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_file(true) .with_line_number(true) .init(); let config = config::config().await?; // Spin up our API let addr = env::var("SERVER_BIND_HOST_PORT").expect("SERVER_BIND_HOST_PORT is missing!"); tracing::info!("[ OK ]: Listening on {}", addr); let (sender, receiver) = mpsc::channel(8); let mut actor = MyActor::new(receiver); let actor_handle = MyActorHandle { sender }; let config_arc = Arc::new(Mutex::new(config.clone())); let backend = async move { http::serve(&config, &addr, actor_handle).await }; tokio::spawn({ let c = Arc::clone(&config_arc); async move { actor.run(&c).await } }); tokio::join!(backend); Ok(()) }
If you found this useful give me shout on Twitter, thanks!