diff --git a/src/lib.rs b/src/lib.rs index 7184f37..6e9cb5a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ use std::{ pub struct ThreadPool { workers: Vec, - sender: mpsc::Sender, + sender: mpsc::Sender, } impl ThreadPool { @@ -37,31 +37,68 @@ impl ThreadPool { { let job = Box::new(f); - self.sender.send(job).unwrap(); + self.sender.send(Message::NewJob(job)).unwrap(); + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + println!("Sending terminate message to all workers."); + for _ in &self.workers { + // Panic is an acceptable, if we cannot shut down cleanly + self.sender.send(Message::Terminate).unwrap(); + } + + println!("Shutting down all workers."); + + for worker in &mut self.workers { + println!("Shutting down worker {}", worker.id); + + if let Some(thread) = worker.thread.take() { + // Panic is an acceptable, if we cannot shut down cleanly + thread.join().unwrap(); + } + } } } type Job = Box; +enum Message { + NewJob(Job), + Terminate, +} + struct Worker { id: usize, - thread: thread::JoinHandle<()>, + thread: Option>, } impl Worker { - fn new(id: usize, receiver: Arc>>) -> Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { let thread = thread::spawn(move || loop { - let job = receiver + let message = receiver .lock() .expect("Mutex poisoned; bailing out.") .recv() - .expect("Failed to receive job."); + .expect("Failed to receive message."); - println!("Worker {} got a job; executing.", id); + match message { + Message::NewJob(job) => { + println!("Worker {} got a job; executing.", id); - job(); + job(); + } + Message::Terminate => { + println!("Worker {} was told to terminate.", id); + break; + } + } }); - Worker { id, thread } + Worker { + id, + thread: Some(thread), + } } }