diff --git a/src/main.rs b/src/bin/main.rs similarity index 95% rename from src/main.rs rename to src/bin/main.rs index 45f3ce2..7c6f168 100644 --- a/src/main.rs +++ b/src/bin/main.rs @@ -2,14 +2,20 @@ use httparse; use std::io::prelude::*; use std::net::{TcpListener, TcpStream}; +use myip::ThreadPool; + fn main() { println!("Replying with most probable peer's ip on port 7878"); let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + let pool = ThreadPool::new(4); + for stream in listener.incoming() { let stream = stream.unwrap(); - handle_connection(stream); + pool.execute(|| { + handle_connection(stream); + }); } } diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..7184f37 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,67 @@ +use std::{ + sync::{mpsc, Arc, Mutex}, + thread, +}; + +pub struct ThreadPool { + workers: Vec, + sender: mpsc::Sender, +} + +impl ThreadPool { + /// Create a new ThreadPool. + /// + /// The size is the number of workers in the pool. + /// + /// # Panics + /// + /// The `new` function will panic if the size is zero. + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + let (sender, receiver) = mpsc::channel(); + + let receiver = Arc::new(Mutex::new(receiver)); + + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + // create some workers and store them in the vector + workers.push(Worker::new(id, Arc::clone(&receiver))) + } + ThreadPool { workers, sender } + } + + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + + self.sender.send(job).unwrap(); + } +} + +type Job = Box; + +struct Worker { + id: usize, + thread: thread::JoinHandle<()>, +} + +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || loop { + let job = receiver + .lock() + .expect("Mutex poisoned; bailing out.") + .recv() + .expect("Failed to receive job."); + + println!("Worker {} got a job; executing.", id); + + job(); + }); + + Worker { id, thread } + } +}