Tokio channel sends, but doesn't receive

TL;DR I'm trying to have a background thread that's ID'd that is controlled via that ID and web calls, and the background threads doesn't seem to be getting the message via all the types of channels I've tried.

I've tried both the std channels as well as tokio's, and of those I've tried all but the watcher type from tokio. All have the same result which probably means that I've messed something up somewhere without realizing it, but I can't find the issue:

use std::collections::{
    hash_map::Entry::{Occupied, Vacant},
    HashMap,
};
use std::sync::Arc;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use uuid::Uuid;
use warp::{http, Filter};

#[derive(Default)]
pub struct Switcher {
    pub handle: Option<JoinHandle<bool>>,
    pub pipeline_end_tx: Option<UnboundedSender<String>>,
}

impl Switcher {
    pub fn set_sender(&mut self, tx: UnboundedSender<String>) {
        self.pipeline_end_tx = Some(tx);
    }
    pub fn set_handle(&mut self, handle: JoinHandle<bool>) {
        self.handle = Some(handle);
    }
}

const ADDR: [u8; 4] = [0, 0, 0, 0];
const PORT: u16 = 3000;

type RunningPipelines = Arc<RwLock<HashMap<String, Arc<RwLock<Switcher>>>>>;

#[tokio::main]
async fn main() {
    let running_pipelines = Arc::new(RwLock::new(HashMap::<String, Arc<RwLock<Switcher>>>::new()));

    let session_create = warp::post()
        .and(with_pipelines(running_pipelines.clone()))
        .and(warp::path("session"))
        .then(|pipelines: RunningPipelines| async move {
            println!("session requested OK!");
            let id = Uuid::new_v4();
            let mut switcher = Switcher::default();
            let (tx, mut rx) = mpsc::unbounded_channel::<String>();
            switcher.set_sender(tx);

            let t = tokio::spawn(async move {
                println!("Background going...");
                //This would be something processing in the background until it received the end signal
                match rx.recv().await {
                    Some(v) => {
                        println!(
                            "Got end message:{} YESSSSSS@!@@!!!!!!!!!!!!!!!!1111eleven",
                            v
                        );
                    }
                    None => println!("Error receiving end signal:"),
                }

                println!("ABORTING HANDLE");

                true
            });

            let ret = HashMap::from([("session_id", id.to_string())]);

            switcher.set_handle(t);

            {
                pipelines
                    .write()
                    .await
                    .insert(id.to_string(), Arc::new(RwLock::new(switcher)));
            }

            Ok(warp::reply::json(&ret))
        });

    let session_end = warp::delete()
        .and(with_pipelines(running_pipelines.clone()))
        .and(warp::path("session"))
        .and(warp::query::<HashMap<String, String>>())
        .then(
            |pipelines: RunningPipelines, p: HashMap<String, String>| async move {
                println!("session end requested OK!: {:?}", p);

                match p.get("session_id") {
                    None => Ok(warp::reply::with_status(
                        "Please specify session to end",
                        http::StatusCode::BAD_REQUEST,
                    )),
                    Some(id) => {
                        let mut pipe = pipelines.write().await;

                        match pipe.entry(String::from(id)) {
                            Occupied(handle) => {
                                println!("occupied");
                                let (k, v) = handle.remove_entry();
                                drop(pipe);
                                println!("removed from hashmap, key:{}", k);
                                let s = v.write().await;
                                if let Some(h) = &s.handle {
                                    if let Some(tx) = &s.pipeline_end_tx {
                                        match tx.send("goodbye".to_string()) {
                                            Ok(res) => {
                                                println!(
                                                "sent end message|{:?}| to fpipeline: {}",
                                                res, id
                                            );
                                            //Added this to try to get it to at least Error on the other side
                                            drop(tx);
                                        },
                                            Err(err) => println!(
                                                "ERROR sending end message to pipeline({}):{}",
                                                id, err
                                            ),
                                        };
                                    } else {
                                        println!("no sender channel found for pipeline: {}", id);
                                    };
                                    h.abort();
                                } else {
                                    println!(
                                        "no luck finding the value in handle in the switcher: {}",
                                        id
                                    );
                                };
                            }
                            Vacant(_) => {
                                println!("no luck finding the handle in the pipelines: {}", id)
                            }
                        };
                        Ok(warp::reply::with_status("done", http::StatusCode::OK))
                    }
                }
            },
        );

    let routes = session_create
        .or(session_end)
        .recover(handle_rejection)
        .with(warp::cors().allow_any_origin());

    println!("starting server...");
    warp::serve(routes).run((ADDR, PORT)).await;
}

async fn handle_rejection(
    err: warp::Rejection,
) -> Result<impl warp::Reply, std::convert::Infallible> {
    Ok(warp::reply::json(&format!("{:?}", err)))
}

fn with_pipelines(
    pipelines: RunningPipelines,
) -> impl Filter<Extract = (RunningPipelines,), Error = std::convert::Infallible> + Clone {
    warp::any().map(move || pipelines.clone())
}

depends:

[dependencies]
warp = "0.3"
tokio = { version = "1", features = ["full"] }
uuid = { version = "0.8.2", features = ["serde", "v4"] }

Results when I boot up, send a "create" request, and then an "end" request with the received ID:

starting server...
session requested OK!
Background going...
session end requested OK!: {"session_id": "6b984a45-38d8-41dc-bf95-422f75c5a429"}
occupied
removed from hashmap, key:6b984a45-38d8-41dc-bf95-422f75c5a429
sent end message|()| to fpipeline: 6b984a45-38d8-41dc-bf95-422f75c5a429

You'll notice that the background thread starts (and doesn't end) when the "create" request is made, but when the "end" request is made, while everything appears to complete successfully from the request(web) side, the background thread doesn't ever receive the message. As I've said I've tried all different channel types and moved things around to get it into this configuration... i.e. flattened and thread safetied as much as I could or at least could think of. I'm greener than I would like in rust, so any help would be VERY appreciated!


Solution 1:

I think that the issue here is that you are sending the message and then immediately aborting the background task:

tx.send("goodbye".to_string());
//...
h.abort();

And the background task does not have time to process the message, as the abort is of higher priority.

What you need is to join the task, not to abort it.

Curiously, tokio tasks handles do not have a join() method, instead you wait for the handle itself. But for that you need to own the handle, so first you have to extract the handle from the Switcher:

let mut s = v.write().await;
//steal the task handle
if let Some(h) = s.handle.take() {
   //...
   tx.send("goodbye".to_string());
   //...
   //join the task
   h.await.unwrap();
}

Note that joining a task may fail, in case the task is aborted or panicked. I'm just panicking in the code above, but you may want to do something different.

Or... you could not to wait for the task. In tokio if you drop a task handle, it will be detached. Then, it will finish when it finishes.