Browse Source

refactor connection remain

pull/24/head
Sun 3 years ago
parent
commit
8f64b65a75
  1. 2
      src/apps/mod.rs
  2. 13
      src/group/mod.rs
  3. 265
      src/layer.rs
  4. 158
      src/server.rs

2
src/apps/mod.rs

@ -66,7 +66,7 @@ pub(crate) async fn app_layer_handle( @@ -66,7 +66,7 @@ pub(crate) async fn app_layer_handle(
.and_modify(|f| f.push(index))
.or_insert(vec![index]);
if index == 0 {
results.rpcs.push(session_lost(&session.s_id));
results.rpcs.push(session_lost(&session.sid));
} else {
if let Ok(mid) = group::Member::get_id(&db, &session.db_id, addr) {
results

13
src/group/mod.rs

@ -209,4 +209,17 @@ impl GroupSession { @@ -209,4 +209,17 @@ impl GroupSession {
self.remain = 6; // keep-alive 10~11 minutes 120s/time
}
}
pub fn clear(&mut self) -> bool {
if self.suspend_me && self.suspend_remote {
if self.remain == 0 {
true
} else {
self.remain -= 1;
false
}
} else {
false
}
}
}

265
src/layer.rs

@ -12,7 +12,6 @@ use tokio::sync::RwLock; @@ -12,7 +12,6 @@ use tokio::sync::RwLock;
use crate::account::User;
use crate::group::GroupEvent;
//use crate::apps::group::{group_conn, GROUP_ID};
use crate::own::Own;
use crate::session::{Session, SessionType};
@ -121,96 +120,6 @@ impl Layer { @@ -121,96 +120,6 @@ impl Layer {
false
}
// pub fn remove_running(&mut self, gid: &GroupId) -> HashMap<PeerId, GroupId> {
// // check close the stable connection.
// let mut addrs: HashMap<PeerId, GroupId> = HashMap::new();
// if let Some(running) = self.runnings.remove(gid) {
// for (addr, fgid) in running.remove_onlines() {
// addrs.insert(addr, fgid);
// }
// }
// let mut need_keep = vec![];
// for (_, running) in &self.runnings {
// for addr in addrs.keys() {
// if running.check_addr_online(addr) {
// need_keep.push(*addr);
// }
// }
// }
// for i in need_keep {
// addrs.remove(&i);
// }
// addrs
// }
// pub fn remove_all_running(&mut self) -> HashMap<PeerId, GroupId> {
// let mut addrs: HashMap<PeerId, GroupId> = HashMap::new();
// for (_, running) in self.runnings.drain() {
// for (addr, fgid) in running.remove_onlines() {
// addrs.insert(addr, fgid);
// }
// }
// addrs
// }
// pub fn get_running_remote_id(&self, mgid: &GroupId, fgid: &GroupId) -> Result<(i64, i64)> {
// debug!("onlines: {:?}, find: {:?}", self.runnings.keys(), mgid);
// self.running(mgid)?.get_online_id(fgid)
// }
// pub fn remove_online(&mut self, gid: &GroupId, fgid: &GroupId) -> Option<PeerId> {
// self.running_mut(gid).ok()?.remove_online(fgid)
// }
// pub async fn all_layer_conns(&self) -> Result<HashMap<GroupId, Vec<(GroupId, SendType)>>> {
// let mut conns = HashMap::new();
// let own_lock = self.group.read().await;
// for mgid in self.runnings.keys() {
// let mut vecs = vec![];
// let db = own_lock.session_db(&mgid)?;
// let sessions = Session::list(&db)?;
// drop(db);
// for s in sessions {
// match s.s_type {
// SessionType::Chat => {
// let proof = own_lock.prove_addr(mgid, &s.addr)?;
// vecs.push((s.gid, chat_conn(proof, Peer::peer(s.addr))));
// }
// SessionType::Group => {
// let proof = own_lock.prove_addr(mgid, &s.addr)?;
// vecs.push((GROUP_ID, group_conn(proof, Peer::peer(s.addr), s.gid)));
// }
// _ => {}
// }
// }
// conns.insert(*mgid, vecs);
// }
// Ok(conns)
// }
// pub fn is_addr_online(&self, faddr: &PeerId) -> bool {
// for (_, running) in &self.runnings {
// if running.check_addr_online(faddr) {
// return true;
// }
// }
// return false;
// }
// pub fn is_online(&self, gid: &GroupId, fgid: &GroupId) -> bool {
// if let Some(running) = self.runnings.get(gid) {
// running.is_online(fgid)
// } else {
// false
// }
// }
// pub fn broadcast(&self, user: User, results: &mut HandleResult) {
// let info = GroupEvent::InfoRes(user);
// let data = bincode::serialize(&info).unwrap_or(vec![]);
@ -219,50 +128,13 @@ impl Layer { @@ -219,50 +128,13 @@ impl Layer {
// }
}
// pub(crate) struct OnlineSession {
// pub pid: PeerId,
// /// session database id.
// pub id: i64,
// /// session ref's service(friend/group) database id.
// pub fid: i64,
// pub suspend_me: bool,
// pub suspend_remote: bool,
// pub remain: u16, // keep-alive remain minutes
// }
// impl OnlineSession {
// fn new(online: Online, db_id: i64, db_fid: i64) -> Self {
// Self {
// online,
// db_id,
// db_fid,
// suspend_me: false,
// suspend_remote: false,
// remain: 0,
// }
// }
// fn close_suspend(&mut self) -> bool {
// if self.suspend_me && self.suspend_remote {
// if self.remain == 0 {
// true
// } else {
// self.remain -= 1;
// false
// }
// } else {
// false
// }
// }
// }
/// online connected layer session.
pub(crate) struct LayerSession {
pub height: i64,
/// session network addr.
pub addrs: Vec<PeerId>,
/// session database id.
pub s_id: i64,
pub sid: i64,
/// layer service database id.
pub db_id: i64,
/// if session is suspend by me.
@ -274,9 +146,9 @@ pub(crate) struct LayerSession { @@ -274,9 +146,9 @@ pub(crate) struct LayerSession {
}
impl LayerSession {
fn new(addr: PeerId, s_id: i64, db_id: i64, height: i64) -> Self {
fn new(addr: PeerId, sid: i64, db_id: i64, height: i64) -> Self {
Self {
s_id,
sid,
db_id,
height,
addrs: vec![addr],
@ -287,7 +159,7 @@ impl LayerSession { @@ -287,7 +159,7 @@ impl LayerSession {
}
pub fn info(&self) -> (i64, i64, i64, PeerId) {
(self.height, self.s_id, self.db_id, self.addrs[0])
(self.height, self.sid, self.db_id, self.addrs[0])
}
pub fn increased(&mut self) -> i64 {
@ -325,121 +197,16 @@ impl LayerSession { @@ -325,121 +197,16 @@ impl LayerSession {
}
}
// pub fn get_online_id(&self, gid: &GroupId) -> Result<(i64, i64)> {
// debug!("onlines: {:?}, find: {:?}", self.sessions.keys(), gid);
// self.sessions
// .get(gid)
// .map(|online| (online.db_id, online.db_fid))
// .ok_or(anyhow!("remote not online"))
// }
// /// get online peer's addr.
// pub fn online(&self, gid: &GroupId) -> Result<PeerId> {
// self.sessions
// .get(gid)
// .map(|online| *online.online.addr())
// .ok_or(anyhow!("remote not online"))
// }
// pub fn online_direct(&self, gid: &GroupId) -> Result<PeerId> {
// if let Some(online) = self.sessions.get(gid) {
// match online.online {
// Online::Direct(addr) => return Ok(addr),
// _ => {}
// }
// }
// Err(anyhow!("no direct online"))
// }
// /// get all online peer.
// pub fn onlines(&self) -> Vec<(&GroupId, &PeerId)> {
// self.sessions
// .iter()
// .map(|(fgid, online)| (fgid, online.online.addr()))
// .collect()
// }
// /// check add online.
// /// check offline, and return is direct.
// pub fn check_offline(&mut self, gid: &GroupId, addr: &PeerId) -> bool {
// if let Some(online) = self.sessions.remove(gid) {
// if online.online.addr() != addr {
// return false;
// }
// match online.online {
// Online::Direct(..) => {
// return true;
// }
// _ => {}
// }
// }
// false
// }
// pub fn remove_online(&mut self, gid: &GroupId) -> Option<PeerId> {
// self.sessions
// .remove(gid)
// .map(|online| *online.online.addr())
// }
// /// remove all onlines peer.
// pub fn remove_onlines(self) -> Vec<(PeerId, GroupId)> {
// let mut peers = vec![];
// for (fgid, online) in self.sessions {
// match online.online {
// Online::Direct(addr) => peers.push((addr, fgid)),
// _ => {}
// }
// }
// peers
// }
// /// check if addr is online.
// pub fn check_addr_online(&self, addr: &PeerId) -> bool {
// for (_, online) in &self.sessions {
// if online.online.addr() == addr {
// return true;
// }
// }
// false
// }
// /// peer leave, remove online peer.
// pub fn peer_leave(&mut self, addr: &PeerId) -> Vec<i64> {
// let mut peers = vec![];
// let mut deletes = vec![];
// for (fgid, online) in &self.sessions {
// if online.online.addr() == addr {
// peers.push(online.db_id);
// deletes.push(*fgid);
// }
// }
// for i in &deletes {
// self.sessions.remove(&i);
// }
// peers
// }
// /// list all onlines groups.
// pub fn close_suspend(&mut self, self_addr: &PeerId) -> Vec<(GroupId, PeerId, i64)> {
// let mut needed = vec![];
// for (fgid, online) in &mut self.sessions {
// // when online is self. skip.
// if online.online == Online::Direct(*self_addr) {
// continue;
// }
// if online.close_suspend() {
// needed.push((*fgid, *online.online.addr(), online.db_id));
// }
// }
// for (gid, _, _) in needed.iter() {
// self.sessions.remove(gid);
// }
// needed
// }
pub fn clear(&mut self) -> bool {
if self.suspend_me && self.suspend_remote {
if self.remain == 0 {
true
} else {
self.remain -= 1;
false
}
} else {
false
}
}
}

158
src/server.rs

@ -10,22 +10,17 @@ use tdn::{ @@ -10,22 +10,17 @@ use tdn::{
primitives::{HandleResult, Result},
},
};
use tokio::{
sync::mpsc::{error::SendError, Sender},
sync::RwLock,
};
use tdn_storage::local::DStorage;
use tokio::{sync::mpsc::Sender, sync::RwLock};
use crate::account::Account;
use crate::apps::app_layer_handle;
use crate::global::Global;
use crate::group::group_handle;
use crate::layer::Layer;
use crate::migrate::{main_migrate, ACCOUNT_DB};
use crate::own::{handle as own_handle, Own};
use crate::own::handle as own_handle;
use crate::primitives::network_seeds;
use crate::rpc::{init_rpc, inner_rpc};
use crate::rpc::{init_rpc, inner_rpc, session_lost};
pub const DEFAULT_WS_ADDR: &'static str = "127.0.0.1:7366";
pub const DEFAULT_LOG_FILE: &'static str = "esse.log.txt";
@ -85,8 +80,8 @@ pub async fn start(db_path: String) -> Result<()> { @@ -85,8 +80,8 @@ pub async fn start(db_path: String) -> Result<()> {
// //let mut group_rpcs: HashMap<u64, GroupId> = HashMap::new();
let mut now_rpc_uid = 0;
// // running session remain task.
// tokio::spawn(session_remain(peer_id, layer.clone(), sender.clone()));
// running session remain task.
tokio::spawn(session_remain(global.clone()));
while let Some(message) = self_recv.recv().await {
match message {
@ -128,15 +123,6 @@ pub async fn start(db_path: String) -> Result<()> { @@ -128,15 +123,6 @@ pub async fn start(db_path: String) -> Result<()> {
global
.send(SendMessage::Network(NetworkType::NetworkReboot))
.await?;
// let t_sender = tdn_send.clone();
// let g_conns = group.read().await.all_distribute_conns();
// let l_conns = layer
// .read()
// .await
// .all_layer_conns()
// .await
// .unwrap_or(HashMap::new());
// tokio::spawn(sleep_waiting_reboot(t_sender, g_conns, l_conns));
}
}
}
@ -144,78 +130,68 @@ pub async fn start(db_path: String) -> Result<()> { @@ -144,78 +130,68 @@ pub async fn start(db_path: String) -> Result<()> {
Ok(())
}
// #[inline]
// async fn sleep_waiting_reboot(
// sender: Sender<SendMessage>,
// groups: HashMap<GroupId, Vec<SendType>>,
// layers: HashMap<GroupId, Vec<(GroupId, SendType)>>,
// ) -> std::result::Result<(), SendError<SendMessage>> {
// tokio::time::sleep(std::time::Duration::from_secs(10)).await;
// for (gid, conns) in groups {
// for conn in conns {
// sender.send(SendMessage::Group(gid, conn)).await?;
// }
// }
// for (fgid, conns) in layers {
// for (tgid, conn) in conns {
// sender.send(SendMessage::Layer(fgid, tgid, conn)).await?;
// }
// }
// Ok(())
// }
// async fn session_remain(
// self_addr: PeerId,
// layer: Arc<RwLock<Layer>>,
// sender: Sender<SendMessage>,
// ) -> Result<()> {
// loop {
// tokio::time::sleep(std::time::Duration::from_secs(120)).await;
// if let Some(uid) = RPC_WS_UID.get() {
// let mut layer_lock = layer.write().await;
// let mut rpcs = vec![];
// let mut addrs = HashMap::new();
// for (_, running) in layer_lock.runnings.iter_mut() {
// let closed = running.close_suspend(&self_addr);
// for (gid, addr, sid) in closed {
// addrs.insert(addr, false);
// rpcs.push(crate::rpc::session_lost(gid, &sid));
// }
// }
// drop(layer_lock);
// let layer_lock = layer.read().await;
// for (_, running) in layer_lock.runnings.iter() {
// for (addr, keep) in addrs.iter_mut() {
// if running.check_addr_online(addr) {
// *keep = true;
// }
// }
// }
// drop(layer_lock);
// for rpc in rpcs {
// let _ = sender.send(SendMessage::Rpc(*uid, rpc, true)).await;
// }
// for (addr, keep) in addrs {
// if !keep {
// let _ = sender
// .send(SendMessage::Layer(
// GroupId::default(),
// GroupId::default(),
// SendType::Disconnect(addr),
// ))
// .await;
// }
// }
// }
// }
// }
async fn session_remain(global: Arc<Global>) -> Result<()> {
loop {
tokio::time::sleep(std::time::Duration::from_secs(120)).await;
if let Some(uid) = RPC_WS_UID.get() {
let mut rpcs = vec![];
let mut addrs = vec![];
// clear group connections.
let mut group_lock = global.group.write().await;
let mut closed = vec![];
for (pid, session) in group_lock.sessions.iter_mut() {
if session.clear() {
closed.push((*pid, session.sid));
addrs.push(*pid);
}
}
for (pid, sid) in closed {
group_lock.rm_online(&pid);
rpcs.push(session_lost(&sid));
}
drop(group_lock);
// clear layer connections.
let mut layer_lock = global.layer.write().await;
let mut closed = vec![];
for (gcid, session) in layer_lock.groups.iter_mut() {
if session.clear() {
closed.push((*gcid, session.sid));
for addr in &session.addrs {
addrs.push(*addr);
}
}
}
for (gcid, sid) in closed {
layer_lock.group_del(&gcid);
rpcs.push(session_lost(&sid));
}
drop(layer_lock);
for rpc in rpcs {
let _ = global.send(SendMessage::Rpc(*uid, rpc, true)).await;
}
for addr in addrs {
if global.group.read().await.is_online(&addr) {
continue;
}
if global.layer.read().await.is_addr_online(&addr) {
continue;
}
let _ = global
.send(SendMessage::Layer(
GroupId::default(),
SendType::Disconnect(addr),
))
.await;
}
}
}
}
#[inline]
async fn handle(handle_result: HandleResult, uid: u64, is_ws: bool, global: &Arc<Global>) {

Loading…
Cancel
Save