< Return to Blog

Elaborating on use of Rust in a professional capacity

I was recently asked the following:
  1. can you point to an implementation of an actor model in Rust that you've built
  2. example of how you've used Tokio
  3. What have you built that uses async/await
I will reflect on a tool that I built in a professional capacity during my time at E-Accent BV. The team had long abandoned Nagios for many reasons, but mainly due to management complexity overhead (and decided well before I joined). Our production stack was running a MySQL production DB and this had a DB 'slave' with replication enabled.
We started to experience instances where the DB slave would fall out-of-sync with the master DB and do so without alerting anyone, and I decided to use Rust to fix this as I was the only person on the team with C/C++ skills.
Also to clarify, I spent many years during both of my Engineering degrees coding in Assembly (for microcontrollers) and C/C++, either for execution on x86 or on 8-bit MCUs, and have had my share of fun with pointers and buffers blowing up.
Here are some of the dependencies I ended up using in project Sentinel
[package] name = "sentinel" version = "0.1.0" authors = ["Michael de Silva <[email protected]>"] edition = "2018" license = "MIT" readme = "README.md" publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] config = "^0.9" glob = "^0.3.0" hyper = "^0.13.4" hyper-tls = "^0.4.1" serde_json = "^1.0" rustc-serialize = "^0.3" futures = "^0.3.4" tokio = { version = "0.2.20", features = ["full"] } getopts = "^0.2.19" indicatif = "^0.13.0" once_cell = "^1.3.1" async-trait = "^0.1.30" sqlx = { version = "^0.3", default-features = false, features = [ "runtime-tokio", "macros", "mysql" ] } chrono = "^0.4.11" log = "^0.4.8" log4rs ="^0.12.0" regex = "^1.3.7" url = "^2.1.1"
The most important of which was the Tokio runtime as I wanted to take advantage of async/await from the get go. The ever popular Serde crate helped a lot with the JSON overhead.
Here are some of the objectives of this project:
  1. Functionality first, code beauty second.
  2. Built as a PoC, and as such, the initial build (does) lack much of "idiomatic" Rust.  I leveraged generics heavily.
  3. Used the MPSC concurrency model for sending notifications to Slack and Postmark (email alerts).
  4. Calls to Slack were done asynchronously.
  5. Any alert from the DB Slave is recorded and alerts are sent with a configurable "anti-spam" duration, before further alerts are sent. This means if an alert is received every 2 minutes, the team will only see 1 notification in Slack/Email for every 30-minute period after that very first alert.  This prevents sending 15 alerts to Slack every two minutes!
Here's some sample code of a call to Postmark
pub async fn notify( data: &serde_json::Value, ) -> Result<(Response<Body>, serde_json::Value), Box<dyn std::error::Error + Send + Sync>> { let url = "https://api.postmarkapp.com/email"; let payload = Body::from(data.to_string()); let (response, body): (Response<Body>, hyper::body::Bytes) = match post(&url, payload).await { Ok(result) => result, Err(error) => panic!("Error [postmark]: {:#?}", error), }; let body_string = String::from_utf8_lossy(&body); let json_value: serde_json::Value = match serde_json::from_str(&body_string) { Ok(value) => value, Err(error) => panic!("Err: parsing JSON {:#?} / body: {:#?}", error, body_string), }; // Patch error from Postmark // "No Account or Server API tokens were supplied in the HTTP headers. // Please add a header for either X-Postmark-Server-Token or // X-Postmark-Account-Token." if response.status().eq(&422) && 10.eq(&json_value["ErrorCode"]) { panic!("Postmark Error: {:#?}, {}:{}", json_value, file!(), line!()); } // Patch error from Postmark if response.status().eq(&422) && 300.eq(&json_value["ErrorCode"]) { panic!("Postmark Error: {:#?}, {}:{}", json_value, file!(), line!()); } // Patch for success from Slack if response.status().eq(&200) && 0.eq(&json_value["ErrorCode"]) && "OK".eq(&json_value["Message"]) { let serde_ok = serde_json::json!("ok"); let mut new_map: HashMap<&str, serde_json::Value> = HashMap::new(); new_map.insert("success", serde_ok); let response_value = serde_json::to_value(&json_value).unwrap(); new_map.insert("response", response_value.clone()); return Ok((response, response_value)); } Ok((response, json_value.clone())) }

Multithreading with MPSC (Multi-Producer, Single Consumer) channels

Much of the MPSC implementation was in line with the design provided in the Rust book.  There are several articles on this approach as well.
The main-thread blocks and polls the DB-slave for its status with a pre-configured delay.  Any alerts spawned at this time are created in a separate 'runner' thread (multiple-producer) and finally the main-thread will consume these and process notifications.
Take a look at the master branch for the original project on Github.  I started to re-write this with an idiomatic Rust approach in a separate branch but did not have the resources to finish it considering the PoC approach worked just fine.

Actor model

While I did not use an Actor model in this project, I have read the docs for the popular Actix crate.

Highlights

I was able to create some really handy utilities to deal with timestamps and conversion for timezones.
Both services for Slack and Postmark are Tokio compatible and easily reusable.
All configurations can be loaded via a simple YAML file.
The codebase also includes a working Ansible playbook to deploy it including a Systemd unit configuration file.

Areas for improvement

Looking back at this original codebase, here are things I wish I had more time to spend on:
  • Module organisation: I feel that the level of organisation could be improved further.
  • Reusable modules such as utilities and services could have been extracted into dependencies; this is just a nice clean up task.
  • Better unit-testing.
  • Testing runtime-performance.
  • Checking if a Trait object based approach would be less verbose than the heavy use of generics; an its impact on runtime-performance vs. binary size.