break out message handler

This commit is contained in:
Love 2024-07-14 16:02:28 +02:00
parent cd191df5c5
commit e5a40d5ae7
3 changed files with 89 additions and 70 deletions

View File

@ -2,12 +2,14 @@ mod cloudflare;
mod config; mod config;
mod logging; mod logging;
mod public_ip; mod public_ip;
mod message_handler;
pub mod utils; pub mod utils;
pub use cloudflare::CloudflareClient; pub use cloudflare::CloudflareClient;
pub use config::{get_config_path, read_config, Config}; pub use config::{get_config_path, read_config, Config};
pub use logging::init_logger; pub use logging::init_logger;
pub use public_ip::get_current_public_ipv4; pub use public_ip::get_current_public_ipv4;
pub use message_handler::MessageHandler;
pub const PROGRAM_NAME: &'static str = "dynip-cloudflare"; pub const PROGRAM_NAME: &'static str = "dynip-cloudflare";
pub const MAX_ERORS_IN_ROW_DEFAULT: usize = 10; pub const MAX_ERORS_IN_ROW_DEFAULT: usize = 10;

View File

@ -4,13 +4,11 @@ use std::sync::{
}; };
use futures::stream::StreamExt; use futures::stream::StreamExt;
use log::{debug, error, info, log_enabled, Level}; use log::{error, info};
use netlink_packet_core::{NetlinkMessage, NetlinkPayload};
use netlink_packet_route::RouteNetlinkMessage as RtnlMessage;
use netlink_sys::{AsyncSocket, SocketAddr}; use netlink_sys::{AsyncSocket, SocketAddr};
use rtnetlink::new_connection; use rtnetlink::new_connection;
use dynip_cloudflare::{utils, CloudflareClient, MAX_ERORS_IN_ROW_DEFAULT}; use dynip_cloudflare::{utils, CloudflareClient, MessageHandler, MAX_ERORS_IN_ROW_DEFAULT};
use scopeguard::defer; use scopeguard::defer;
use tokio::{signal, sync::Notify}; use tokio::{signal, sync::Notify};
@ -76,80 +74,19 @@ async fn main() {
tokio::spawn(conn); tokio::spawn(conn);
info!("Listening for IPv4 address changes and interface connect/disconnect events..."); info!("Listening for IPv4 address changes and interface connect/disconnect events...");
let mut errs_counter: usize = 0; let mut handler = MessageHandler::new(
let errs_max = config.max_errors_in_row.unwrap_or(MAX_ERORS_IN_ROW_DEFAULT); &mut cloudflare,
config.max_errors_in_row.unwrap_or(MAX_ERORS_IN_ROW_DEFAULT),
);
while !should_exit.load(Ordering::SeqCst) { while !should_exit.load(Ordering::SeqCst) {
tokio::select! { tokio::select! {
_ = notify.notified() => break, _ = notify.notified() => break,
message = messages.next() => { message = messages.next() => {
if let Some((message, _)) = message { if let Some((message, _)) = message {
handle_message(message, &mut cloudflare, &mut errs_counter, errs_max).await; handler.handle_message(message).await;
} }
} }
} }
} }
} }
async fn handle_message(
message: NetlinkMessage<RtnlMessage>,
cloudflare: &mut CloudflareClient,
errs_counter: &mut usize,
errs_max: usize,
) {
match message.payload {
NetlinkPayload::InnerMessage(RtnlMessage::NewAddress(msg)) => {
if log_enabled!(Level::Debug) {
debug!("New IPv4 address message: {:?}", msg);
} else {
info!("New IPv4 address");
if let Err(e) = cloudflare.check().await {
*errs_counter += 1;
error!(
"Failed to check cloudflare ({}/{}): {:?}",
errs_counter, errs_max, &e
);
if *errs_counter >= errs_max {
return;
}
}
}
}
NetlinkPayload::InnerMessage(RtnlMessage::DelAddress(msg)) => {
if log_enabled!(Level::Debug) {
debug!("Deleted IPv4 address message: {:?}", msg);
} else {
info!("Deleted IPv4 address");
}
}
NetlinkPayload::InnerMessage(RtnlMessage::NewLink(link)) => {
if log_enabled!(Level::Debug) {
debug!("New link message (interface connected): {:?}", link);
} else {
info!("New link (interface connected)");
if let Err(e) = cloudflare.check().await {
*errs_counter += 1;
error!(
"Failed to check cloudflare ({}/{}): {:?}",
errs_counter, errs_max, &e
);
if *errs_counter >= errs_max {
return;
}
}
}
}
NetlinkPayload::InnerMessage(RtnlMessage::DelLink(link)) => {
if log_enabled!(Level::Debug) {
debug!("Deleted link message (interface disconnected): {:?}", link);
} else {
info!("Deleted link (interface disconnected)");
}
}
_ => {
if log_enabled!(Level::Debug) {
debug!("Unhandled message payload: {:?}", message.payload);
}
}
}
}

80
src/message_handler.rs Normal file
View File

@ -0,0 +1,80 @@
use log::{debug, error, info, log_enabled, Level};
use netlink_packet_core::{NetlinkMessage, NetlinkPayload};
use netlink_packet_route::RouteNetlinkMessage as RtnlMessage;
use crate::CloudflareClient;
pub struct MessageHandler<'a> {
cloudflare: &'a mut CloudflareClient,
errs_counter: usize,
errs_max: usize,
}
impl<'a> MessageHandler<'a> {
pub fn new(cloudflare: &'a mut CloudflareClient, errs_max: usize) -> Self {
Self {
cloudflare,
errs_counter: 0,
errs_max,
}
}
pub async fn handle_message(&mut self, message: NetlinkMessage<RtnlMessage>) -> Option<()> {
match message.payload {
NetlinkPayload::InnerMessage(RtnlMessage::NewAddress(msg)) => {
if log_enabled!(Level::Debug) {
debug!("New IPv4 address message: {:?}", msg);
} else {
info!("New IPv4 address");
if let Err(e) = self.cloudflare.check().await {
self.errs_counter += 1;
error!(
"Failed to check cloudflare ({}/{}): {:?}",
self.errs_counter, self.errs_max, &e
);
if self.errs_counter >= self.errs_max {
return None;
}
}
}
}
NetlinkPayload::InnerMessage(RtnlMessage::DelAddress(msg)) => {
if log_enabled!(Level::Debug) {
debug!("Deleted IPv4 address message: {:?}", msg);
} else {
info!("Deleted IPv4 address");
}
}
NetlinkPayload::InnerMessage(RtnlMessage::NewLink(link)) => {
if log_enabled!(Level::Debug) {
debug!("New link message (interface connected): {:?}", link);
} else {
info!("New link (interface connected)");
if let Err(e) = self.cloudflare.check().await {
self.errs_counter += 1;
error!(
"Failed to check cloudflare ({}/{}): {:?}",
self.errs_counter, self.errs_max, &e
);
if self.errs_counter >= self.errs_max {
return None;
}
}
}
}
NetlinkPayload::InnerMessage(RtnlMessage::DelLink(link)) => {
if log_enabled!(Level::Debug) {
debug!("Deleted link message (interface disconnected): {:?}", link);
} else {
info!("Deleted link (interface disconnected)");
}
}
_ => {
if log_enabled!(Level::Debug) {
debug!("Unhandled message payload: {:?}", message.payload);
}
}
}
Some(())
}
}