diff --git a/src/apps/mod.rs b/src/apps/mod.rs index 888ff12..1b35e29 100644 --- a/src/apps/mod.rs +++ b/src/apps/mod.rs @@ -66,7 +66,7 @@ pub(crate) async fn app_layer_handle( .and_modify(|f| f.push(index)) .or_insert(vec![index]); if index == 0 { - results.rpcs.push(session_lost(&session.s_id)); + results.rpcs.push(session_lost(&session.sid)); } else { if let Ok(mid) = group::Member::get_id(&db, &session.db_id, addr) { results diff --git a/src/group/mod.rs b/src/group/mod.rs index a769965..d6012b1 100644 --- a/src/group/mod.rs +++ b/src/group/mod.rs @@ -209,4 +209,17 @@ impl GroupSession { self.remain = 6; // keep-alive 10~11 minutes 120s/time } } + + pub fn clear(&mut self) -> bool { + if self.suspend_me && self.suspend_remote { + if self.remain == 0 { + true + } else { + self.remain -= 1; + false + } + } else { + false + } + } } diff --git a/src/layer.rs b/src/layer.rs index c328a2e..e8c6dc2 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -12,7 +12,6 @@ use tokio::sync::RwLock; use crate::account::User; use crate::group::GroupEvent; -//use crate::apps::group::{group_conn, GROUP_ID}; use crate::own::Own; use crate::session::{Session, SessionType}; @@ -121,96 +120,6 @@ impl Layer { false } - // pub fn remove_running(&mut self, gid: &GroupId) -> HashMap { - // // check close the stable connection. - // let mut addrs: HashMap = HashMap::new(); - // if let Some(running) = self.runnings.remove(gid) { - // for (addr, fgid) in running.remove_onlines() { - // addrs.insert(addr, fgid); - // } - // } - - // let mut need_keep = vec![]; - // for (_, running) in &self.runnings { - // for addr in addrs.keys() { - // if running.check_addr_online(addr) { - // need_keep.push(*addr); - // } - // } - // } - // for i in need_keep { - // addrs.remove(&i); - // } - - // addrs - // } - - // pub fn remove_all_running(&mut self) -> HashMap { - // let mut addrs: HashMap = HashMap::new(); - // for (_, running) in self.runnings.drain() { - // for (addr, fgid) in running.remove_onlines() { - // addrs.insert(addr, fgid); - // } - // } - // addrs - // } - - // pub fn get_running_remote_id(&self, mgid: &GroupId, fgid: &GroupId) -> Result<(i64, i64)> { - // debug!("onlines: {:?}, find: {:?}", self.runnings.keys(), mgid); - // self.running(mgid)?.get_online_id(fgid) - // } - - // pub fn remove_online(&mut self, gid: &GroupId, fgid: &GroupId) -> Option { - // self.running_mut(gid).ok()?.remove_online(fgid) - // } - - // pub async fn all_layer_conns(&self) -> Result>> { - // let mut conns = HashMap::new(); - // let own_lock = self.group.read().await; - // for mgid in self.runnings.keys() { - // let mut vecs = vec![]; - - // let db = own_lock.session_db(&mgid)?; - // let sessions = Session::list(&db)?; - // drop(db); - - // for s in sessions { - // match s.s_type { - // SessionType::Chat => { - // let proof = own_lock.prove_addr(mgid, &s.addr)?; - // vecs.push((s.gid, chat_conn(proof, Peer::peer(s.addr)))); - // } - // SessionType::Group => { - // let proof = own_lock.prove_addr(mgid, &s.addr)?; - // vecs.push((GROUP_ID, group_conn(proof, Peer::peer(s.addr), s.gid))); - // } - // _ => {} - // } - // } - - // conns.insert(*mgid, vecs); - // } - - // Ok(conns) - // } - - // pub fn is_addr_online(&self, faddr: &PeerId) -> bool { - // for (_, running) in &self.runnings { - // if running.check_addr_online(faddr) { - // return true; - // } - // } - // return false; - // } - - // pub fn is_online(&self, gid: &GroupId, fgid: &GroupId) -> bool { - // if let Some(running) = self.runnings.get(gid) { - // running.is_online(fgid) - // } else { - // false - // } - // } - // pub fn broadcast(&self, user: User, results: &mut HandleResult) { // let info = GroupEvent::InfoRes(user); // let data = bincode::serialize(&info).unwrap_or(vec![]); @@ -219,50 +128,13 @@ impl Layer { // } } -// pub(crate) struct OnlineSession { -// pub pid: PeerId, -// /// session database id. -// pub id: i64, -// /// session ref's service(friend/group) database id. -// pub fid: i64, -// pub suspend_me: bool, -// pub suspend_remote: bool, -// pub remain: u16, // keep-alive remain minutes -// } - -// impl OnlineSession { -// fn new(online: Online, db_id: i64, db_fid: i64) -> Self { -// Self { -// online, -// db_id, -// db_fid, -// suspend_me: false, -// suspend_remote: false, -// remain: 0, -// } -// } - -// fn close_suspend(&mut self) -> bool { -// if self.suspend_me && self.suspend_remote { -// if self.remain == 0 { -// true -// } else { -// self.remain -= 1; -// false -// } -// } else { -// false -// } -// } -// } - /// online connected layer session. pub(crate) struct LayerSession { pub height: i64, /// session network addr. pub addrs: Vec, /// session database id. - pub s_id: i64, + pub sid: i64, /// layer service database id. pub db_id: i64, /// if session is suspend by me. @@ -274,9 +146,9 @@ pub(crate) struct LayerSession { } impl LayerSession { - fn new(addr: PeerId, s_id: i64, db_id: i64, height: i64) -> Self { + fn new(addr: PeerId, sid: i64, db_id: i64, height: i64) -> Self { Self { - s_id, + sid, db_id, height, addrs: vec![addr], @@ -287,7 +159,7 @@ impl LayerSession { } pub fn info(&self) -> (i64, i64, i64, PeerId) { - (self.height, self.s_id, self.db_id, self.addrs[0]) + (self.height, self.sid, self.db_id, self.addrs[0]) } pub fn increased(&mut self) -> i64 { @@ -325,121 +197,16 @@ impl LayerSession { } } - // pub fn get_online_id(&self, gid: &GroupId) -> Result<(i64, i64)> { - // debug!("onlines: {:?}, find: {:?}", self.sessions.keys(), gid); - // self.sessions - // .get(gid) - // .map(|online| (online.db_id, online.db_fid)) - // .ok_or(anyhow!("remote not online")) - // } - - // /// get online peer's addr. - // pub fn online(&self, gid: &GroupId) -> Result { - // self.sessions - // .get(gid) - // .map(|online| *online.online.addr()) - // .ok_or(anyhow!("remote not online")) - // } - - // pub fn online_direct(&self, gid: &GroupId) -> Result { - // if let Some(online) = self.sessions.get(gid) { - // match online.online { - // Online::Direct(addr) => return Ok(addr), - // _ => {} - // } - // } - // Err(anyhow!("no direct online")) - // } - - // /// get all online peer. - // pub fn onlines(&self) -> Vec<(&GroupId, &PeerId)> { - // self.sessions - // .iter() - // .map(|(fgid, online)| (fgid, online.online.addr())) - // .collect() - // } - - // /// check add online. - - // /// check offline, and return is direct. - // pub fn check_offline(&mut self, gid: &GroupId, addr: &PeerId) -> bool { - // if let Some(online) = self.sessions.remove(gid) { - // if online.online.addr() != addr { - // return false; - // } - - // match online.online { - // Online::Direct(..) => { - // return true; - // } - // _ => {} - // } - // } - // false - // } - - // pub fn remove_online(&mut self, gid: &GroupId) -> Option { - // self.sessions - // .remove(gid) - // .map(|online| *online.online.addr()) - // } - - // /// remove all onlines peer. - // pub fn remove_onlines(self) -> Vec<(PeerId, GroupId)> { - // let mut peers = vec![]; - // for (fgid, online) in self.sessions { - // match online.online { - // Online::Direct(addr) => peers.push((addr, fgid)), - // _ => {} - // } - // } - // peers - // } - - // /// check if addr is online. - // pub fn check_addr_online(&self, addr: &PeerId) -> bool { - // for (_, online) in &self.sessions { - // if online.online.addr() == addr { - // return true; - // } - // } - // false - // } - - // /// peer leave, remove online peer. - // pub fn peer_leave(&mut self, addr: &PeerId) -> Vec { - // let mut peers = vec![]; - // let mut deletes = vec![]; - // for (fgid, online) in &self.sessions { - // if online.online.addr() == addr { - // peers.push(online.db_id); - // deletes.push(*fgid); - // } - // } - // for i in &deletes { - // self.sessions.remove(&i); - // } - - // peers - // } - - // /// list all onlines groups. - // pub fn close_suspend(&mut self, self_addr: &PeerId) -> Vec<(GroupId, PeerId, i64)> { - // let mut needed = vec![]; - // for (fgid, online) in &mut self.sessions { - // // when online is self. skip. - // if online.online == Online::Direct(*self_addr) { - // continue; - // } - - // if online.close_suspend() { - // needed.push((*fgid, *online.online.addr(), online.db_id)); - // } - // } - - // for (gid, _, _) in needed.iter() { - // self.sessions.remove(gid); - // } - // needed - // } + pub fn clear(&mut self) -> bool { + if self.suspend_me && self.suspend_remote { + if self.remain == 0 { + true + } else { + self.remain -= 1; + false + } + } else { + false + } + } } diff --git a/src/server.rs b/src/server.rs index 67be828..664b9c5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,22 +10,17 @@ use tdn::{ primitives::{HandleResult, Result}, }, }; -use tokio::{ - sync::mpsc::{error::SendError, Sender}, - sync::RwLock, -}; - use tdn_storage::local::DStorage; +use tokio::{sync::mpsc::Sender, sync::RwLock}; use crate::account::Account; use crate::apps::app_layer_handle; use crate::global::Global; use crate::group::group_handle; -use crate::layer::Layer; use crate::migrate::{main_migrate, ACCOUNT_DB}; -use crate::own::{handle as own_handle, Own}; +use crate::own::handle as own_handle; use crate::primitives::network_seeds; -use crate::rpc::{init_rpc, inner_rpc}; +use crate::rpc::{init_rpc, inner_rpc, session_lost}; pub const DEFAULT_WS_ADDR: &'static str = "127.0.0.1:7366"; pub const DEFAULT_LOG_FILE: &'static str = "esse.log.txt"; @@ -85,8 +80,8 @@ pub async fn start(db_path: String) -> Result<()> { // //let mut group_rpcs: HashMap = HashMap::new(); let mut now_rpc_uid = 0; - // // running session remain task. - // tokio::spawn(session_remain(peer_id, layer.clone(), sender.clone())); + // running session remain task. + tokio::spawn(session_remain(global.clone())); while let Some(message) = self_recv.recv().await { match message { @@ -128,15 +123,6 @@ pub async fn start(db_path: String) -> Result<()> { global .send(SendMessage::Network(NetworkType::NetworkReboot)) .await?; - // let t_sender = tdn_send.clone(); - // let g_conns = group.read().await.all_distribute_conns(); - // let l_conns = layer - // .read() - // .await - // .all_layer_conns() - // .await - // .unwrap_or(HashMap::new()); - // tokio::spawn(sleep_waiting_reboot(t_sender, g_conns, l_conns)); } } } @@ -144,78 +130,68 @@ pub async fn start(db_path: String) -> Result<()> { Ok(()) } -// #[inline] -// async fn sleep_waiting_reboot( -// sender: Sender, -// groups: HashMap>, -// layers: HashMap>, -// ) -> std::result::Result<(), SendError> { -// tokio::time::sleep(std::time::Duration::from_secs(10)).await; - -// for (gid, conns) in groups { -// for conn in conns { -// sender.send(SendMessage::Group(gid, conn)).await?; -// } -// } - -// for (fgid, conns) in layers { -// for (tgid, conn) in conns { -// sender.send(SendMessage::Layer(fgid, tgid, conn)).await?; -// } -// } - -// Ok(()) -// } - -// async fn session_remain( -// self_addr: PeerId, -// layer: Arc>, -// sender: Sender, -// ) -> Result<()> { -// loop { -// tokio::time::sleep(std::time::Duration::from_secs(120)).await; -// if let Some(uid) = RPC_WS_UID.get() { -// let mut layer_lock = layer.write().await; -// let mut rpcs = vec![]; -// let mut addrs = HashMap::new(); - -// for (_, running) in layer_lock.runnings.iter_mut() { -// let closed = running.close_suspend(&self_addr); -// for (gid, addr, sid) in closed { -// addrs.insert(addr, false); -// rpcs.push(crate::rpc::session_lost(gid, &sid)); -// } -// } -// drop(layer_lock); - -// let layer_lock = layer.read().await; -// for (_, running) in layer_lock.runnings.iter() { -// for (addr, keep) in addrs.iter_mut() { -// if running.check_addr_online(addr) { -// *keep = true; -// } -// } -// } -// drop(layer_lock); - -// for rpc in rpcs { -// let _ = sender.send(SendMessage::Rpc(*uid, rpc, true)).await; -// } - -// for (addr, keep) in addrs { -// if !keep { -// let _ = sender -// .send(SendMessage::Layer( -// GroupId::default(), -// GroupId::default(), -// SendType::Disconnect(addr), -// )) -// .await; -// } -// } -// } -// } -// } +async fn session_remain(global: Arc) -> Result<()> { + loop { + tokio::time::sleep(std::time::Duration::from_secs(120)).await; + if let Some(uid) = RPC_WS_UID.get() { + let mut rpcs = vec![]; + let mut addrs = vec![]; + + // clear group connections. + let mut group_lock = global.group.write().await; + let mut closed = vec![]; + for (pid, session) in group_lock.sessions.iter_mut() { + if session.clear() { + closed.push((*pid, session.sid)); + addrs.push(*pid); + } + } + for (pid, sid) in closed { + group_lock.rm_online(&pid); + rpcs.push(session_lost(&sid)); + } + drop(group_lock); + + // clear layer connections. + let mut layer_lock = global.layer.write().await; + let mut closed = vec![]; + for (gcid, session) in layer_lock.groups.iter_mut() { + if session.clear() { + closed.push((*gcid, session.sid)); + for addr in &session.addrs { + addrs.push(*addr); + } + } + } + for (gcid, sid) in closed { + layer_lock.group_del(&gcid); + rpcs.push(session_lost(&sid)); + } + drop(layer_lock); + + for rpc in rpcs { + let _ = global.send(SendMessage::Rpc(*uid, rpc, true)).await; + } + + for addr in addrs { + if global.group.read().await.is_online(&addr) { + continue; + } + + if global.layer.read().await.is_addr_online(&addr) { + continue; + } + + let _ = global + .send(SendMessage::Layer( + GroupId::default(), + SendType::Disconnect(addr), + )) + .await; + } + } + } +} #[inline] async fn handle(handle_result: HandleResult, uid: u64, is_ws: bool, global: &Arc) {