Skip to content

Managing Processes in Rust

Published: at 10:31 PM

Table of contents

Open Table of contents

Introduction

In a recent side project, I needed to send an HLS stream from my webcam to a TV. The goal was to delay the stream, allowing ballroom dancers to review their figures and improve their performance. I decided to use Rust as the programming language for this project, as I wanted to enhance my Rust skills.

Why Run a Sub-process

When researching for this project I found that controlling a webcam through Rust and sending the data of as an HLS stream is not easy. There are awesome projects like nokhwa, but there is no plug and play solution.

At the brink of despair a friend chimed in and told me that FFMPEG might be able to do what I needed. After fiddling around with the options & configurations I found a working FFMPEG command that took my webcam as an input and provided an HLS stream ready to consume. I created an Axum backend that serves the stream and a frontend that reads it, boom done! Almost, I still needed to be able to start and stop the video streams as part of the Rust application.

You might wonder why, there are several reasons

The solution

Before I go into details about the hows and whys I’d like to show you what I came up with:

use std::{process::Stdio, sync::Arc};

use tokio::{
    io::{AsyncBufReadExt, BufReader},
    process::{Child, Command},
    sync::RwLock,
};
use tracing::debug;

pub async fn run(
    display_name: String,
    cmd: String,
    args: Vec<String>,
    current_dir: Option<String>,
    environment_variables: Option<Vec<(String, String)>>,
) -> Result<Arc<RwLock<Child>>, String> {
    let mut command = Command::new(cmd);
    if let Some(dir) = current_dir {
        command.current_dir(dir);
    }

    for arg in args.iter() {
        command.arg(arg);
    }

    if let Some(env) = environment_variables {
        for (key, val) in env.iter() {
            command.env(key, val);
        }
    }

    command
        // Output gets piped and read in separate tokio thread
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped());

    let child = command
        .spawn()
        .expect(format!("Expect {} to start", display_name).as_str());
    let child_ref = Arc::new(RwLock::new(child));
    let result = Ok(child_ref.clone());

    tokio::spawn(async move {
        let mut child = child_ref.write().await;
        let stdout = child
            .stdout
            .take()
            .expect("child did not have a handle to stdout");
        let stderr = child
            .stderr
            .take()
            .expect("child did not have a handle to stderr");

        let mut stdout_reader = BufReader::new(stdout).lines();
        let mut stderr_reader = BufReader::new(stderr).lines();

        drop(child);

        loop {
            tokio::select! {
                result = stdout_reader.next_line() => {
                    match result {
                        Ok(Some(line)) => debug!("{}: {}", display_name, line),
                        Err(_) => break,
                        _ => (),
                    }
                }
                result = stderr_reader.next_line() => {
                    match result {
                        Ok(Some(line)) => debug!("{}: {}", display_name, line),
                        Err(_) => break,
                        _ => (),
                    }
                }
            };
        }
    });

    result
}

As you can see I’m using the tokio crate as the async executor as it comes with the axum crate that I use for the entire HTTP handling. To have access to the process namespace you need at least the process feature enabled, maybe more, I ran it on full and never looked back.

Now let me show you the problems that lead to this solution.

Running a process

To run a process, you need is the tokio::process::Command struct. It wraps executing a command from your Rust code nicely, you can for example get the output of ls -l -a through an a call like this

use std::process::Output;
use tokio::process::Command;

#[tokio::main]
async fn main() {
    println!("Let me run ls -l -a");
    match Command::new("ls").arg("-l").arg("-a").output().await {
        Ok(Output {
            stdout,
            status,
            stderr,
        }) => println!(
            "ls -la had exit code ({}) stdout: {} stderr: {}",
            status,
            String::from_utf8(stdout).unwrap(),
            String::from_utf8(stderr).unwrap()
        ),
        Err(_) => println!("Something went wrong"),
    }
}

// =>
// Let me run ls -l -a
// ls -la had exit code (exit status: 0) stdout: total 40
// drwxr-xr-x  8 danielschmidt  staff   256 Mar 23 20:29 .
// drwx------  3 danielschmidt  staff    96 Mar 23 20:28 ..
// drwxr-xr-x  9 danielschmidt  staff   288 Mar 23 20:28 .git
// -rw-r--r--  1 danielschmidt  staff     8 Mar 23 20:28 .gitignore
// -rw-r--r--  1 danielschmidt  staff  8790 Mar 23 20:32 Cargo.lock
// -rw-r--r--  1 danielschmidt  staff   224 Mar 23 20:32 Cargo.toml
// drwxr-xr-x  3 danielschmidt  staff    96 Mar 23 20:28 src
// drwxr-xr-x@ 5 danielschmidt  staff   160 Mar 23 20:29 target
//  stderr:

It’s worth noting that calling output is necessary to actually run the command. The Future it returns an Output struct containing the exit code, stdout and stderr, so It’s a great option for running a one-time command. I use this method to for example list the attached cameras using v4l2-ctl --list-devices.

I could have ran ffmpeg through this method as well, but I wouldn’t have received any logs from FFMPEG. As determining the right command for ffmpeg was a lot of trial and error it became clear to me that logs were necessary. Let’s explore how to obtain them:

Getting logs out of a running process

To obtain the logs I need to tell the Command that I want to pipe the stdout and stderr to consume later. Otherwise the output is stored internally but not readable until process concludes. I then need to execute the program so that something gets executed and read the stdout and stderr buffer at the same time.

To be able to move the spawned child into a thread created by tokio::spawn we need to wrap the child reference into an RwLock so we can savely access it in another thread. I put it also into an Arc so that I can return this from a function.

The most interesting part is us taking stdout and stderr out of the child and using tokio::io::BufReader to read lines whenever they come in. You have to add the tokio::io::AsyncBufReadExt extension to get the nice line-by-line reading utility methods for the BufReader.

Within the thread we start an infinite loop through the loop keyword and use tokio::select to either execute reading the stdout or the stderr reader depending on what Future finishes first.

use std::{process::Stdio, sync::Arc, time::Duration};
use tokio::{io::AsyncBufReadExt, time::sleep};
use tokio::{io::BufReader, process::Command, sync::RwLock};

#[tokio::main]
async fn main() {
    println!("Let me run ls -l -a");
    let mut command = Command::new("ls");
    command.arg("-l").arg("-a");
    command
        // Output gets piped and read in separate tokio thread
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped());
    let child = command
        .spawn()
        .expect(format!("Expect ls -l -a to start").as_str());
    let child_ref = Arc::new(RwLock::new(child));

    tokio::spawn(async move {
        let mut child = child_ref.write().await;
        let stdout = child
            .stdout
            .take()
            .expect("child did not have a handle to stdout");
        let stderr = child
            .stderr
            .take()
            .expect("child did not have a handle to stderr");

        let mut stdout_reader = BufReader::new(stdout).lines();
        let mut stderr_reader = BufReader::new(stderr).lines();

        drop(child);

        loop {
            tokio::select! {
                result = stdout_reader.next_line() => {
                    match result {
                        Ok(Some(line)) => println!("ls said: {}", line),
                        Err(_) => break,
                        _ => (),
                    }
                }
                result = stderr_reader.next_line() => {
                    match result {
                        Ok(Some(line)) => println!("ls complained: {}", line),
                        Err(_) => break,
                        _ => (),
                    }
                }
            };
        }
    });

    // We need to add this to make the parent process live long enough
    sleep(Duration::from_secs(1)).await
}

// =>
// Running `target/debug/foo`
// Let me run ls -l -a
// ls said: total 40
// ls said: drwxr-xr-x  8 danielschmidt  staff   256 Mar 23 20:29 .
// ls said: drwx------  3 danielschmidt  staff    96 Mar 23 20:28 ..
// ls said: drwxr-xr-x  9 danielschmidt  staff   288 Mar 23 20:28 .git
// ls said: -rw-r--r--  1 danielschmidt  staff     8 Mar 23 20:28 .gitignore
// ls said: -rw-r--r--  1 danielschmidt  staff  8790 Mar 23 20:32 Cargo.lock
// ls said: -rw-r--r--  1 danielschmidt  staff   224 Mar 23 20:32 Cargo.toml
// ls said: drwxr-xr-x  3 danielschmidt  staff    96 Mar 23 20:28 src
// ls said: drwxr-xr-x@ 5 danielschmidt  staff   160 Mar 23 20:29 target

This was the hardest part, now let’s take a brief look at sending data to the process.

Two-way communication with the spawned process

To get FFMPEG to gracefully shutdown (and for example finish up writing .mp4 files that work and are not corrupted) you need to send the letter q to it.

Let’s say child is the child you spawned using process.spawn() in the example above. All you need to do to is to call the write_all method on stdin.

let child_stdin = child.stdin.as_mut().unwrap();
child_stdin
    .write_all(b"q\n")
    .await

Disclaimer

I’m new to Rust, so please assume this code to be imperfect. Also, if you have questions or feedback, please reach out to me on Twitter.




Previous Post
Discriminated Union Pattern in Go
Next Post
Declarative Development Environments with CDK for Terraform