From 9eb2d1416bd9d5df921aed451354863d396cdeec Mon Sep 17 00:00:00 2001 From: q1-silver Date: Mon, 25 Oct 2021 12:47:27 +0300 Subject: [PATCH] ic --- .gitignore | 1 + Cargo.lock | 16 ++++ Cargo.toml | 9 +++ src/main.rs | 206 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 232 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..7e9afaa --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,16 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "libc" +version = "0.2.105" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "869d572136620d55835903746bcb5cdc54cb2851fd0aeec53220b4bb65ef3013" + +[[package]] +name = "tepoll" +version = "0.1.0" +dependencies = [ + "libc", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..460733b --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "tepoll" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +libc = "*" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..8ca40a7 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,206 @@ +use std::collections::HashMap; +use std::io::{self, prelude::*}; +use std::net::{TcpListener, TcpStream}; +use std::os::unix::io::{AsRawFd, RawFd}; + +#[allow(unused_macros)] +macro_rules! syscall { + ($fn:ident ( $($arg: expr),* $(,)* ) ) => {{ + let res = unsafe { libc::$fn($($arg, )*)}; + if res == -1 { + Err (std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; +} + +const HTTP_RESP: &[u8] = b"HTTP/1.1 200 OK +content-type: text/html +content-length: 5 + +Hello"; + +const READ_FLAGS: i32 = libc::EPOLLONESHOT | libc::EPOLLIN; +const WRITE_FLAGS: i32 = libc::EPOLLONESHOT | libc::EPOLLOUT; + +fn main() -> io::Result<()> { + let mut request_contexts: HashMap = HashMap::new(); + let mut events: Vec = Vec::with_capacity(1024); + let mut key = 100; + + let listener = TcpListener::bind("[::1]:7878")?; + listener.set_nonblocking(true)?; + let listener_fd = listener.as_raw_fd(); + + let epoll_fd = epoll_create().expect("cannot create epoll queue"); + add_interest(epoll_fd, listener_fd, listener_read_event(key))?; + + loop { + println!("\nrequests in flight: {}", request_contexts.len()); + events.clear(); + let res = match syscall!(epoll_wait( + epoll_fd, + events.as_mut_ptr() as *mut libc::epoll_event, + 1024, + 1000 as libc::c_int, + )) { + Ok(v) => v, + Err(e) => panic!("error during epoll wait: {}", e), + }; + + // safe as long as the kernel does nothing wrong - copied from mio + unsafe { events.set_len(res as usize) }; + + for ev in &events { + match ev.u64 { + 100 => { + match listener.accept() { + Ok((stream, addr)) => { + stream.set_nonblocking(true)?; + println!("new client: {}", addr); + key += 1; + add_interest(epoll_fd, stream.as_raw_fd(), listener_read_event(key))?; + request_contexts.insert(key, RequestContext::new(stream)); + } + Err(e) => eprintln!("couldn't accept: {}", e), + }; + modify_interest(epoll_fd, listener_fd, listener_read_event(100))?; + } + key => { + let mut to_delete = None; + if let Some(context) = request_contexts.get_mut(&key) { + let events = ev.events; + match events { + v if v as i32 & libc::EPOLLIN == libc::EPOLLIN => { + context.read_cb(key, epoll_fd)?; + } + v if v as i32 & libc::EPOLLOUT == libc::EPOLLOUT => { + context.write_cb(key, epoll_fd)?; + to_delete = Some(key); + } + v => println!("unexpected events: {}", v), + }; + } + if let Some(key) = to_delete { + request_contexts.remove(&key); + } + } + } + } + } +} + +fn epoll_create() -> io::Result { + let fd = syscall!(epoll_create1(0))?; + if let Ok(flags) = syscall!(fcntl(fd, libc::F_GETFD)) { + let _ = syscall!(fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC)); + } + + Ok(fd) +} + +fn add_interest(epoll_fd: RawFd, fd: RawFd, mut event: libc::epoll_event) -> io::Result<()> { + syscall!(epoll_ctl(epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut event))?; + Ok(()) +} + +fn listener_read_event(key: u64) -> libc::epoll_event { + libc::epoll_event { + events: READ_FLAGS as u32, + u64: key, + } +} + +fn listener_write_event(key: u64) -> libc::epoll_event { + libc::epoll_event { + events: WRITE_FLAGS as u32, + u64: key, + } +} + +fn modify_interest(epoll_fd: RawFd, fd: RawFd, mut event: libc::epoll_event) -> io::Result<()> { + syscall!(epoll_ctl(epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut event))?; + Ok(()) +} + +fn remove_interest(epoll_fd: RawFd, fd: RawFd) -> io::Result<()> { + syscall!(epoll_ctl( + epoll_fd, + libc::EPOLL_CTL_DEL, + fd, + std::ptr::null_mut() + ))?; + Ok(()) +} + +fn close(fd: RawFd) { + let _ = syscall!(close(fd)); +} + +#[derive(Debug)] +pub struct RequestContext { + pub stream: TcpStream, + pub context_length: usize, + pub buf: Vec, +} + +impl RequestContext { + fn new(stream: TcpStream) -> Self { + Self { + stream, + buf: Vec::new(), + context_length: 0, + } + } + + fn read_cb(&mut self, key: u64, epoll_fd: RawFd) -> io::Result<()> { + let mut buf = [0u8; 4096]; + match self.stream.read(&mut buf) { + Ok(_) => { + if let Ok(data) = std::str::from_utf8(&buf) { + self.parse_and_set_content_length(data); + } + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => {} + Err(e) => return Err(e), + }; + self.buf.extend_from_slice(&buf); + if self.buf.len() >= self.context_length { + println!("got all data: {} bytes", self.buf.len()); + modify_interest(epoll_fd, self.stream.as_raw_fd(), listener_write_event(key))?; + } else { + modify_interest(epoll_fd, self.stream.as_raw_fd(), listener_read_event(key))?; + } + Ok(()) + } + + fn write_cb(&mut self, key: u64, epoll_fd: RawFd) -> io::Result<()> { + match self.stream.write(HTTP_RESP) { + Ok(_) => println!("answered to request {}", key), + Err(e) => eprintln!("could not answer to request {}: {}", key, e), + } + self.stream.shutdown(std::net::Shutdown::Both)?; + let fd = self.stream.as_raw_fd(); + remove_interest(epoll_fd, fd)?; + close(fd); + Ok(()) + } + + fn parse_and_set_content_length(&mut self, data: &str) { + if data.contains("HTTP") { + if let Some(context_length) = data + .lines() + .find(|l| l.to_lowercase().starts_with("content-length: ")) + { + if let Some(len) = context_length + .to_lowercase() + .strip_prefix("content-length: ") + { + self.context_length = len.parse::().expect("content-length is invalid"); + println!("set content length: {} bytes", self.context_length); + } + } + } + } +}