To share some background on my particular use-case, I'm processing a large CSV file (somewhere in the 800MB range) and this task is triggered by a post request from AWS S3.
The Axum request handler needs to return a 200 success response instantly and let the long running task proceed on a spawned thread, leaving the main thread (running the Axum server) free to proceed.
Inside the Axum request handler
I will first show you my implementation of this, and discuss it afterwards. Here's the block in for the request handler. Notice the important call here, which is
task_handle.run_task(&masterfile_key).await
.pub async fn handle_trigger(
Extension(task_handle): Extension<crate::MyActorHandle>,
extract::Json(payload): extract::Json<PostTrigger>,
) -> Result<Response, Error> {
tracing::info!(
"Starting to run Crawler at: {}",
crate::models::get_utc_timestamp_as_rfc3339()
);
let masterfile_key = match payload.object.key {
Some(value) => {
if value.is_empty() {
return Err(RequestNotSuccessful::new(
StatusCode::BAD_REQUEST,
"AWS IoT Analytics S3 masterfile key in POST request is blank!",
)
.into());
}
value
}
None => {
return Err(RequestNotSuccessful::new(
StatusCode::BAD_REQUEST,
"AWS IoT Analytics S3 masterfile key missing in POST request!",
)
.into())
}
};
task_handle.run_task(&masterfile_key).await;
tracing::info!("POST /v1/trigger request handler completed");
Ok(Json(json!({ "status": "success" })).into_response())
}
Inside main()
This is where we spawn our Axum server and also background tasks.
#[tokio::main]
async fn main() -> Result<(), Error> {
dotenv().ok();
// Set the RUST_LOG, if it hasn't been explicitly defined
if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "example-rs=info,tower_http=debug")
}
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_file(true)
.with_line_number(true)
.init();
let config = config::config().await?;
// Spin up our API
let addr = env::var("SERVER_BIND_HOST_PORT").expect("SERVER_BIND_HOST_PORT is missing!");
tracing::info!("[ OK ]: Listening on {}", addr);
let (sender, receiver) = mpsc::channel(8);
let mut actor = MyActor::new(receiver);
let actor_handle = MyActorHandle { sender };
let config_arc = Arc::new(Mutex::new(config.clone()));
let backend = async move { http::serve(&config, &addr, actor_handle).await };
tokio::spawn({
let c = Arc::clone(&config_arc);
async move { actor.run(&c).await }
});
tokio::join!(backend);
Ok(())
}
There is nothing stopping you from having multiple threads and multiple handlers, accordingly. Let's digest this in smaller chunks.
Inside main we are creating our MPSC (Multiple producer, Single consumer) channel. It provides both a
tx
and rx
that we call sender
and receiver
. Two instances are created here, (i) MyActor
and (ii) MyActorHandle
. let (sender, receiver) = mpsc::channel(8);
let mut actor = MyActor::new(receiver);
let actor_handle = MyActorHandle { sender };
Let's look at the actor handle first. The
MyActorHandle
record struct has a sender
field, which is of type mpsc::Sender<ActorMessage>
. And we can see that ActorMessage
is an enum with a RunTask
struct variant with its own field as well.We can see that
task_handle.run_task)
instantiates the RunTask
variant and uses its sender to send the RunTask
message to the channel (our Actor
).#[derive(Debug)]
enum ActorMessage {
RunTask { bucket_key: String },
}
impl fmt::Display for ActorMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ActorMessage::RunTask { bucket_key } => {
write!(f, "ActorMessage::RunTask: {{ bucket_key: {} }}", bucket_key)
}
}
}
}
#[derive(Clone)]
pub struct MyActorHandle {
sender: mpsc::Sender<ActorMessage>,
}
impl MyActorHandle {
pub async fn run_task(&self, masterfile_key: &str) {
let msg = ActorMessage::RunTask {
bucket_key: masterfile_key.to_string(),
};
if (self.sender.send(msg).await).is_err() {
tracing::info!("receiver dropped");
assert!(self.sender.is_closed());
}
}
}
We already previously instantiated
MyActor
with the channel receiver
in main()
. Creating a new instance simply sets it up with the receiver
. In main()
we run the Actor
in its own thread tokio::spawn({
let c = Arc::clone(&config_arc);
async move { actor.run(&c).await }
});
Notice how the
actor.run()
function loops over self.receiver.recv().await
, which will break when a None
is returned. This is where the message send from MyActorHandle
is processed.We then run our background task in
self.handle_message()
.struct MyActor {
receiver: mpsc::Receiver<ActorMessage>,
}
impl MyActor {
fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self {
MyActor {
receiver,
}
}
async fn handle_message(&mut self, msg: ActorMessage, config: &AppConfig) -> Result<(), Error> {
match msg {
ActorMessage::RunTask { bucket_key } => {
tracing::info!("Running task...");
match Crawler::process(&config, &bucket_key).await {
Ok(_) => tracing::info!("Done processing..."),
Err(err) => tracing::error!("Task execution error: {}", err),
};
}
}
Ok(())
}
async fn run(&mut self, config_arc: &Arc<tokio::sync::Mutex<AppConfig>>) -> Result<(), Error> {
let config = match config_arc.try_lock() {
Ok(value) => value,
Err(err) => return Err(Error::new_internal_error().with(err)),
};
while let Some(msg) = self.receiver.recv().await {
tracing::info!("Receiver: {}", &msg);
self.handle_message(msg, &config).await?;
}
Ok(())
}
}
The most crucial aspect above is how we handle errors when the background task fails. We can do anything we like, but we need to prevent a failure (panic) return, as this will kill the background thread; Future requests handled by Axum will get errors logs due to
tracing::info!("receiver dropped")
if a panic takes place.match theoretical_background_task.await {
Ok(_) => tracing::info!("Done processing..."),
Err(err) => tracing::error!("Task execution error: {}", err),
};
// We don't want to do this.
match theoretical_background_task.await {
Ok(_) => tracing::info!("Done processing..."),
Err(err) => return Err(/* ... */), // this will cause the rx receiver to drop
};
Here's everything in main.rs
struct MyActor {
receiver: mpsc::Receiver<ActorMessage>,
}
#[derive(Debug)]
enum ActorMessage {
RunTask { bucket_key: String },
}
impl fmt::Display for ActorMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ActorMessage::RunTask { bucket_key } => {
write!(f, "ActorMessage::RunTask: {{ bucket_key: {} }}", bucket_key)
}
}
}
}
impl MyActor {
fn new(receiver: mpsc::Receiver<ActorMessage>) -> Self {
MyActor {
receiver,
}
}
async fn handle_message(&mut self, msg: ActorMessage, config: &AppConfig) -> Result<(), Error> {
match msg {
ActorMessage::RunTask { bucket_key } => {
tracing::info!("Running task...");
match Crawler::process(&config, &bucket_key).await {
Ok(_) => tracing::info!("Done processing..."),
Err(err) => tracing::error!("Task execution error: {}", err),
};
}
}
Ok(())
}
async fn run(&mut self, config_arc: &Arc<tokio::sync::Mutex<AppConfig>>) -> Result<(), Error> {
let config = match config_arc.try_lock() {
Ok(value) => value,
Err(err) => return Err(Error::new_internal_error().with(err)),
};
while let Some(msg) = self.receiver.recv().await {
tracing::info!("Receiver: {}", &msg);
self.handle_message(msg, &config).await?;
}
Ok(())
}
}
#[derive(Clone)]
pub struct MyActorHandle {
sender: mpsc::Sender<ActorMessage>,
}
impl MyActorHandle {
pub async fn run_task(&self, masterfile_key: &str) {
let msg = ActorMessage::RunTask {
bucket_key: masterfile_key.to_string(),
};
if (self.sender.send(msg).await).is_err() {
tracing::info!("receiver dropped");
assert!(self.sender.is_closed());
}
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
dotenv().ok();
// Set the RUST_LOG, if it hasn't been explicitly defined
if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "example-rs=info,tower_http=debug")
}
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_file(true)
.with_line_number(true)
.init();
let config = config::config().await?;
// Spin up our API
let addr = env::var("SERVER_BIND_HOST_PORT").expect("SERVER_BIND_HOST_PORT is missing!");
tracing::info!("[ OK ]: Listening on {}", addr);
let (sender, receiver) = mpsc::channel(8);
let mut actor = MyActor::new(receiver);
let actor_handle = MyActorHandle { sender };
let config_arc = Arc::new(Mutex::new(config.clone()));
let backend = async move { http::serve(&config, &addr, actor_handle).await };
tokio::spawn({
let c = Arc::clone(&config_arc);
async move { actor.run(&c).await }
});
tokio::join!(backend);
Ok(())
}
If you found this useful give me shout on Twitter, thanks!