diff --git a/Cargo.toml b/Cargo.toml index 4dc218e..9fc6fc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,12 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bytes = "1.3.0" colored = "2.0.0" ctrlc = "3.2.4" directories = "4.0.1" -reqwest = { version = "0.11.13", features = ["blocking", "deflate", "gzip", "rustls", "rustls-tls"] } +futures-util = "0.3.25" +indicatif = { version = "0.17.3", features = ["tokio"] } +reqwest = { version = "0.11.13", features = ["blocking", "deflate", "gzip", "rustls", "rustls-tls", "stream"] } +tokio = { version = "1.24.2", features = ["full"] } +tokio-util = "0.7.4" diff --git a/src/downloader.rs b/src/downloader.rs new file mode 100644 index 0000000..c77cf56 --- /dev/null +++ b/src/downloader.rs @@ -0,0 +1,94 @@ +use std::{ + fmt, + fs::File, + io::{self, Read, Write}, +}; + +use bytes::Bytes; +use futures_util::StreamExt; +use indicatif::{ProgressBar, ProgressState, ProgressStyle}; +use reqwest::{self, Client}; +use std::cmp; + +pub enum DualWriter { + File(File), + Buffer(Vec), +} + +impl DualWriter { + pub fn write(&mut self, bytes: Bytes) -> Result<(), std::io::Error> { + match self { + Self::Buffer(buffer) => { + bytes.into_iter().for_each(|byte| buffer.push(byte)); + } + Self::File(file) => { + file.write(&bytes)?; + } + } + + Ok(()) + } + + pub fn get_string(self) -> Result { + Ok(match self { + Self::Buffer(buffer) => { + String::from_utf8(buffer).or(Err("Failed to decode buffer".to_owned()))? + } + Self::File(file) => { + let mut buf = String::new(); + + // Well this is safe since I consume the file anyways + let ptr = &file as *const File as *mut File; + let file = unsafe { &mut *ptr }; + file.read_to_string(&mut buf) + .or(Err("Failed to read file".to_owned()))?; + buf + } + }) + } + pub fn new(file_name: Option) -> Result { + Ok(if let Some(file_name) = file_name { + Self::File(File::create(&file_name)?) + } else { + Self::Buffer(Vec::::new()) + }) + } +} + +pub async fn download_with_progress( + link: &str, + file_name: Option, +) -> Result { + let mut dw = DualWriter::new(file_name).or(Err("Failed to create file".to_owned()))?; + + let client = Client::builder() + .gzip(true) + .deflate(true) + .build() + .or(Err("Failed to create client".to_owned()))?; + let resp = client + .get(link) + .send() + .await + .or(Err("Failed to connect server".to_owned()))?; + let content_length = resp.content_length().unwrap(); + + let pb = ProgressBar::new(content_length); + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})").unwrap() + .with_key("eta", |state: &ProgressState, w: &mut dyn fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) + .progress_chars("#>-")); + + let mut downloaded: u64 = 0; + let mut stream = resp.bytes_stream(); + + while let Some(item) = stream.next().await { + let bytes = item.unwrap(); + downloaded = cmp::min(downloaded + (bytes.len() as u64), content_length); + dw.write(bytes) + .or(Err("Failed to write to file".to_owned()))?; + + pb.set_position(downloaded); + } + + Ok(dw) +} diff --git a/src/lib.rs b/src/lib.rs index 4fffe5d..c6aeae5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ use std::{ pub use m3u8::M3u8; pub use parser::Parser; mod config; - +mod downloader; use directories::ProjectDirs; pub fn setup() -> String { diff --git a/src/main.rs b/src/main.rs index 0a667cf..71028c3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,9 +3,10 @@ use std::process::Command; use std::rc::Rc; use iptvnator_rs::{setup, M3u8, Parser}; -fn main() { +#[tokio::main] +async fn main() { println!("Welcome to iptvnator_rs, the port of my iptvprogram written in python, now in rust BLAZINGLY FAST\n"); - let parser = Parser::new("iptv.m3u8".to_owned(), setup(), "watched.txt".to_owned()); + let parser = Parser::new("iptv.m3u8".to_owned(), setup(), "watched.txt".to_owned()).await; let stdin = io::stdin(); let mut stdout = stdout().lock(); @@ -60,7 +61,7 @@ fn main() { { let ptr = &parser as *const Parser as *mut Parser; let p = unsafe { &mut *ptr }; - p.forcefully_update(); + p.forcefully_update().await; } continue; } diff --git a/src/parser.rs b/src/parser.rs index b451daa..bca5687 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -4,6 +4,7 @@ use std::{fs, process}; use directories::ProjectDirs; +use crate::downloader::download_with_progress; use crate::m3u8::M3u8; const MAX_TRIES: usize = 4; @@ -16,7 +17,7 @@ pub struct Parser { } impl Parser { - pub fn new(file_name: String, iptv_url: String, watched_name: String) -> Self { + pub async fn new(file_name: String, iptv_url: String, watched_name: String) -> Self { let project_dirs = ProjectDirs::from("com", "billenius", "iptvnator_rs").unwrap(); let cache = project_dirs.cache_dir(); let _ = fs::create_dir_all(&cache); @@ -27,7 +28,7 @@ impl Parser { Self { watched_name: watched_name.clone(), - m3u8_items: Self::get_parsed_content(&ilovetv_url, &file_name, &watched_name), + m3u8_items: Self::get_parsed_content(&ilovetv_url, &file_name, &watched_name).await, ilovetv_url, file_name, } @@ -62,11 +63,11 @@ impl Parser { ) } - pub fn forcefully_update(&mut self) { + pub async fn forcefully_update(&mut self) { let mut counter = 0; let content = loop { counter += 1; - let content = Self::download(&self.ilovetv_url).ok(); + let content = Self::download(&self.ilovetv_url).await.ok(); if counter > MAX_TRIES { return; } else if content.is_some() { @@ -99,8 +100,17 @@ impl Parser { } } - fn get_parsed_content(link: &String, file_name: &PathBuf, watched_name: &PathBuf) -> Vec { - Self::parse_m3u8(Self::get_stringcontent(link, file_name, 0), watched_name) + async fn get_parsed_content( + link: &String, + file_name: &PathBuf, + watched_name: &PathBuf, + ) -> Vec { + Self::parse_m3u8( + Self::get_stringcontent(link, file_name) + .await + .expect("Failed to retrieve playlist"), + watched_name, + ) } fn parse_m3u8(content: String, watched_name: &PathBuf) -> Vec { @@ -149,24 +159,30 @@ impl Parser { m3u8_items } - fn get_stringcontent(link: &String, file_name: &PathBuf, tried: usize) -> String { + async fn get_stringcontent(link: &String, file_name: &PathBuf) -> Result { if !Self::should_update(file_name) { let content = fs::read_to_string(&file_name); if content.is_ok() { - return content.unwrap(); + return Ok(content.unwrap()); } } - let content = Self::download(link); - if content.is_err() && tried < 4 { - println!("Retrying {}/{}", tried + 1, MAX_TRIES); - Self::get_stringcontent(link, file_name, tried + 1); - } + let mut counter: usize = 0; + let content = loop { + counter += 1; + + if let Ok(content) = Self::download(link).await { + break Ok(content); + } else if counter > MAX_TRIES { + break Err("".to_owned()); + } + println!("Retrying {}/{}", counter + 1, MAX_TRIES); + }; match content { Ok(s) => { let _ = fs::write(&file_name, s.as_bytes()); - s + Ok(s) } Err(_) => { println!("Couldn't get m3u8 file!"); @@ -175,8 +191,10 @@ impl Parser { } } - fn download(link: &String) -> Result { - reqwest::blocking::get(link.clone()) - .and_then(|resp| Ok(resp.text().expect("Could not get m3u8 from server"))) + async fn download(link: &String) -> Result { + Ok(download_with_progress(link, None) + .await? + .get_string() + .unwrap()) } }