diff --git a/src/apps/chat/layer.rs b/src/apps/chat/layer.rs index 90c2e74..bd8041b 100644 --- a/src/apps/chat/layer.rs +++ b/src/apps/chat/layer.rs @@ -11,11 +11,11 @@ use tdn::{ }; use tdn_did::{user::User, Proof}; -use crate::event::{InnerEvent, StatusEvent}; +use crate::event::InnerEvent; use crate::layer::{Layer, Online}; use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH}; use crate::rpc::{session_connect, session_create, session_last, session_lost, session_suspend}; -use crate::session::{Session, SessionType}; +use crate::session::{connect_session, Session, SessionType}; use crate::storage::{ chat_db, read_avatar, read_file, read_record, session_db, write_avatar_sync, write_file, write_image, @@ -24,35 +24,6 @@ use crate::storage::{ use super::models::{Friend, Message, MessageType, NetworkMessage, Request}; use super::rpc; -/// Layer Request for stable connected. -/// Params: User, remote_id, remark. -/// this user if already friend, only has gid. -#[derive(Serialize, Deserialize)] -enum LayerRequest { - /// Requst for connect, had friendship. - /// params: signature with PeerAddr. - Connect(Proof), - /// Requst for make friendship. - /// Params: remote_user, me_id, remark. - Friend(User, String), -} - -/// Layer Response for stable connected. -#[derive(Serialize, Deserialize)] -pub(crate) enum LayerResponse { - /// Connected with stable, had friendship. - Connect(Proof), - /// Agree a friend request. - /// Params: User, remote_id. - /// this user if already, only has gid. - Agree(User, Proof), - /// Reject a friend request. - /// Params: me_id, remote_id. - Reject, - // TODO service connected info. - //Service, -} - /// ESSE chat layer Event. #[derive(Serialize, Deserialize)] pub(crate) enum LayerEvent { @@ -66,6 +37,12 @@ pub(crate) enum LayerEvent { OnlinePing, /// receiver gid, sender gid. as BaseLayerEvent. OnlinePong, + /// make friendship request. + Request(User, String), + /// agree friendship request. + Agree(User, Proof), + /// reject friendship request. + Reject, /// receiver gid, sender gid, message. Message(EventId, NetworkMessage), /// receiver gid, sender user. @@ -84,145 +61,31 @@ pub(crate) async fn handle( let mut layer = arc_layer.write().await; match msg { - RecvType::Connect(addr, data) => { - let request: LayerRequest = postcard::from_bytes(&data) - .map_err(|_e| new_io_error("Deseralize request friend failure"))?; - - match request { - LayerRequest::Connect(proof) => { - let friend = load_friend(&layer.base, &mgid, &fgid)?; - if friend.is_none() { - let data = postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]); - let msg = SendType::Result(0, addr, false, false, data); - results.layers.push((mgid, fgid, msg)); - return Ok(results); - } - let f = friend.unwrap(); // safe. - - // 0. get session. TODO - let sid = 0; - - // 1. check verify. - proof.verify(&fgid, &addr, &layer.addr)?; - // 2. online this group. - layer.running_mut(&mgid)?.check_add_online( - fgid, - Online::Direct(addr), - sid, - f.id, - )?; - // 3. update remote addr. TODO - if f.addr != addr { - let db = chat_db(&layer.base, &mgid)?; - let _ = Friend::addr_update(&db, f.id, &addr); - drop(db); - } - // 4. online to UI. TODO - - // 5. connected. - let msg = conn_res_message(&layer, &mgid, addr).await?; - results.layers.push((mgid, fgid, msg)); - layer.group.write().await.status( - &mgid, - StatusEvent::SessionFriendOnline(fgid), - &mut results, - )?; - } - LayerRequest::Friend(remote, remark) => { - let some_friend = load_friend(&layer.base, &mgid, &fgid)?; - if some_friend.is_none() { - // check if exist request. - let db = chat_db(&layer.base, &mgid)?; - if let Some(req) = Request::get(&db, &remote.id)? { - req.delete(&db)?; // delete the old request. - results.rpcs.push(rpc::request_delete(mgid, req.id)); - } - let mut request = Request::new( - remote.id, - remote.addr, - remote.name.clone(), - remark.clone(), - false, - true, - ); - // save to db. - request.insert(&db)?; - drop(db); - // save the avatar. - write_avatar_sync(&layer.base, &mgid, &request.gid, remote.avatar.clone())?; - - layer.group.write().await.broadcast( - &mgid, - InnerEvent::SessionRequestCreate(false, remote, remark), - REQUEST_TABLE_PATH, - request.id, - &mut results, - )?; - - results.rpcs.push(rpc::request_create(mgid, &request)); - return Ok(results); - } - let mut friend = some_friend.unwrap(); // safe checked. - - // already friendship & update. - - // 0. get session. TODO - let sid = 0; - - // 1. online this group. - layer.running_mut(&mgid)?.check_add_online( - fgid, - Online::Direct(addr), - sid, - friend.id, - )?; - // 2. update remote user. - friend.name = remote.name; - friend.addr = remote.addr; - let db = chat_db(&layer.base, &mgid)?; - friend.remote_update(&db)?; - drop(db); - write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; - - // 3. online to UI. - - // TODO UPDATE SESSION - - results.rpcs.push(rpc::friend_info(mgid, &friend)); - // 4. connected. - let msg = conn_agree_message(&mut layer, 0, &mgid, addr).await?; - results.layers.push((mgid, fgid, msg)); - layer.group.write().await.status( - &mgid, - StatusEvent::SessionFriendOnline(fgid), - &mut results, - )?; - } - } - } RecvType::Leave(addr) => { - let group_pin = layer.group.clone(); - let mut group_lock = group_pin.write().await; for (mgid, running) in &mut layer.runnings { - let peers = running.peer_leave(&addr); - for (fgid, fid) in peers { - results - .rpcs - .push(crate::apps::group_chat::rpc::group_offline( - *mgid, fid, &fgid, - )); - - group_lock.status( - &mgid, - StatusEvent::SessionFriendOffline(fgid), - &mut results, - )?; + for sid in running.peer_leave(&addr) { + results.rpcs.push(session_lost(*mgid, &sid)); } } } + RecvType::Connect(addr, data) | RecvType::ResultConnect(addr, data) => { + // ESSE chat layer connect date structure. + if handle_connect(&mgid, &fgid, &addr, data, &mut layer, &mut results)? { + let msg = conn_res_message(&layer, &mgid, addr).await?; + results.layers.push((mgid, fgid, msg)); + } else { + let msg = SendType::Result(0, addr, false, false, vec![]); + results.layers.push((mgid, fgid, msg)); + } + } RecvType::Result(addr, is_ok, data) => { - // check to close. - if !is_ok { + // ESSE chat layer result date structure. + if is_ok { + if !handle_connect(&mgid, &fgid, &addr, data, &mut layer, &mut results)? { + let msg = SendType::Result(0, addr, false, false, vec![]); + results.layers.push((mgid, fgid, msg)); + } + } else { let db = chat_db(&layer.base, &mgid)?; if let Some(friend) = Friend::get_it(&db, &fgid)? { if friend.contains_addr(&addr) { @@ -230,260 +93,6 @@ pub(crate) async fn handle( friend.close(&db)?; } } - drop(db); - - let response: LayerResponse = postcard::from_bytes(&data) - .map_err(|_e| new_io_error("Deseralize result failure"))?; - match response { - LayerResponse::Reject => { - let db = chat_db(&layer.base, &mgid)?; - if let Some(mut request) = Request::get(&db, &fgid)? { - layer.group.write().await.broadcast( - &mgid, - InnerEvent::SessionRequestHandle(request.gid, false, vec![]), - REQUEST_TABLE_PATH, - request.id, - &mut results, - )?; - request.is_over = true; - request.is_ok = false; - request.update(&db)?; - results.rpcs.push(rpc::request_reject(mgid, request.id)); - } - drop(db); - } - _ => {} - } - - return Ok(results); - } - - let response: LayerResponse = postcard::from_bytes(&data) - .map_err(|_e| new_io_error("Deseralize result failure"))?; - - match response { - LayerResponse::Connect(proof) => { - // 1. check verify. - proof.verify(&fgid, &addr, &layer.addr)?; - // 2. check has this remove. - let some_friend = load_friend(&layer.base, &mgid, &fgid)?; - if some_friend.is_none() { - return Ok(results); - } - let fid = some_friend.unwrap().id; // safe. - - // 0. get session. TODO - let sid = 0; - - // 3. online this group. - layer.running_mut(&mgid)?.check_add_online( - fgid, - Online::Direct(addr), - sid, - fid, - )?; - - // 4. update remote addr. - let db = chat_db(&layer.base, &mgid)?; - Friend::addr_update(&db, fid, &addr)?; - drop(db); - - // 5. online to UI. - - layer.group.write().await.status( - &mgid, - StatusEvent::SessionFriendOnline(fgid), - &mut results, - )?; - } - LayerResponse::Agree(remote, proof) => { - // 1. check verify. - proof.verify(&fgid, &addr, &layer.addr)?; - if let Some(friend) = load_friend(&layer.base, &mgid, &fgid)? { - // 0. get session. TODO - let sid = 0; - - // already friendship. - layer.running_mut(&mgid)?.check_add_online( - fgid, - Online::Direct(addr), - sid, - friend.id, - )?; - - layer.group.write().await.status( - &mgid, - StatusEvent::SessionFriendOnline(fgid), - &mut results, - )?; - } else { - // agree request for friend. - let db = chat_db(&layer.base, &mgid)?; - if let Some(mut request) = Request::get(&db, &remote.id)? { - layer.group.write().await.broadcast( - &mgid, - InnerEvent::SessionRequestHandle( - request.gid, - true, - remote.avatar.clone(), - ), - REQUEST_TABLE_PATH, - request.id, - &mut results, - )?; - request.is_over = true; - request.is_ok = true; - request.update(&db)?; - let request_id = request.id; - let friend = Friend::from_request(&db, request)?; - write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; - results - .rpcs - .push(rpc::request_agree(mgid, request_id, &friend)); - - // ADD NEW SESSION. - let s_db = session_db(&layer.base, &mgid)?; - let mut session = Session::new( - friend.id, - friend.gid, - friend.addr, - SessionType::Chat, - friend.name, - friend.datetime, - ); - session.insert(&s_db)?; - results.rpcs.push(session_create(mgid, &session)); - } - drop(db); - } - - let data = postcard::to_allocvec(&LayerEvent::OnlinePing).unwrap_or(vec![]); - let msg = SendType::Event(0, addr, data); - results.layers.push((mgid, fgid, msg)); - } - LayerResponse::Reject => {} - } - } - RecvType::ResultConnect(addr, data) => { - let response: LayerResponse = postcard::from_bytes(&data) - .map_err(|_e| new_io_error("Deseralize result failure"))?; - - match response { - LayerResponse::Connect(proof) => { - // 1. check verify. - proof.verify(&fgid, &addr, &layer.addr)?; - // 2. check has this remove. - let some_friend = load_friend(&layer.base, &mgid, &fgid)?; - if some_friend.is_none() { - return Ok(results); - } - let fid = some_friend.unwrap().id; // safe. - - // 0. get session. TODO - let sid = 0; - - // 3. online this group. - layer.running_mut(&mgid)?.check_add_online( - fgid, - Online::Direct(addr), - sid, - fid, - )?; - // 4. update remote addr. - let db = chat_db(&layer.base, &mgid)?; - Friend::addr_update(&db, fid, &addr)?; - drop(db); - // 5. online to UI. TODO - - // 6. connected. - let msg = conn_res_message(&layer, &mgid, addr).await?; - results.layers.push((mgid, fgid, msg)); - layer.group.write().await.status( - &mgid, - StatusEvent::SessionFriendOnline(fgid), - &mut results, - )?; - } - LayerResponse::Agree(remote, proof) => { - // 1. check verify. - proof.verify(&fgid, &addr, &layer.addr)?; - if let Some(friend) = load_friend(&layer.base, &mgid, &fgid)? { - // 0. get session. TODO - let sid = 0; - - // already friendship. - layer.running_mut(&mgid)?.check_add_online( - fgid, - Online::Direct(addr), - sid, - friend.id, - )?; - layer.group.write().await.status( - &mgid, - StatusEvent::SessionFriendOnline(fgid), - &mut results, - )?; - } else { - // agree request for friend. - let db = chat_db(&layer.base, &mgid)?; - if let Some(mut request) = Request::get(&db, &remote.id)? { - layer.group.write().await.broadcast( - &mgid, - InnerEvent::SessionRequestHandle( - request.gid, - true, - remote.avatar.clone(), - ), - REQUEST_TABLE_PATH, - request.id, - &mut results, - )?; - request.is_over = true; - request.is_ok = true; - request.update(&db)?; - let request_id = request.id; - let friend = Friend::from_request(&db, request)?; - write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; - results - .rpcs - .push(rpc::request_agree(mgid, request_id, &friend)); - - // ADD NEW SESSION. - let s_db = session_db(&layer.base, &mgid)?; - let mut session = Session::new( - friend.id, - friend.gid, - friend.addr, - SessionType::Chat, - friend.name, - friend.datetime, - ); - session.insert(&s_db)?; - results.rpcs.push(session_create(mgid, &session)); - } - drop(db); - } - - let msg = conn_res_message(&layer, &mgid, addr).await?; - results.layers.push((mgid, fgid, msg)); - } - LayerResponse::Reject => { - let db = chat_db(&layer.base, &mgid)?; - if let Some(mut request) = Request::get(&db, &fgid)? { - layer.group.write().await.broadcast( - &mgid, - InnerEvent::SessionRequestHandle(request.gid, false, vec![]), - REQUEST_TABLE_PATH, - request.id, - &mut results, - )?; - request.is_over = true; - request.is_ok = false; - request.update(&db)?; - results.rpcs.push(rpc::request_reject(mgid, request.id)); - } - drop(db); - } } } RecvType::Event(addr, bytes) => { @@ -524,6 +133,45 @@ pub(crate) async fn handle( Ok(results) } +fn handle_connect( + mgid: &GroupId, + fgid: &GroupId, + addr: &PeerAddr, + data: Vec, + layer: &mut Layer, + results: &mut HandleResult, +) -> Result { + // 0. deserialize connect data. + let proof: Proof = postcard::from_bytes(&data) + .map_err(|_e| new_io_error("Deseralize chat layer connect failure"))?; + + // 1. check verify. + proof.verify(fgid, addr, &layer.addr)?; + + // 2. check friendship. + let friend = update_friend(&layer.base, mgid, fgid, addr)?; + if friend.is_none() { + return Ok(false); + } + let fid = friend.unwrap().id; // safe. + + // 3. get session. + let session_some = connect_session(&layer.base, mgid, &SessionType::Chat, &fid, addr)?; + if session_some.is_none() { + return Ok(false); + } + let sid = session_some.unwrap().id; + + // 4. active this session. + layer + .running_mut(mgid)? + .check_add_online(*fgid, Online::Direct(*addr), sid, fid)?; + + // 5. session online to UI. + results.rpcs.push(session_connect(*mgid, &sid, addr)); + Ok(true) +} + impl LayerEvent { pub async fn handle( fgid: GroupId, @@ -534,25 +182,127 @@ impl LayerEvent { ) -> Result { let event: LayerEvent = postcard::from_bytes(&bytes).map_err(|_| new_io_error("serialize event error."))?; - let (sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?; let mut results = HandleResult::new(); match event { LayerEvent::Offline(_) => { + let (sid, _fid) = layer.get_running_remote_id(&mgid, &fgid)?; layer.running_mut(&mgid)?.check_offline(&fgid, &addr); results.rpcs.push(session_lost(mgid, &sid)); } LayerEvent::Suspend(_) => { + let (sid, _fid) = layer.get_running_remote_id(&mgid, &fgid)?; if layer.running_mut(&mgid)?.suspend(&fgid, false)? { results.rpcs.push(session_suspend(mgid, &sid)); } } LayerEvent::Actived(_) => { + let (sid, _fid) = layer.get_running_remote_id(&mgid, &fgid)?; let _ = layer.running_mut(&mgid)?.active(&fgid, false); results.rpcs.push(session_connect(mgid, &sid, &addr)); } + LayerEvent::Request(remote, remark) => { + if load_friend(&layer.base, &mgid, &fgid)?.is_none() { + // check if exist request. + let db = chat_db(&layer.base, &mgid)?; + if let Some(req) = Request::get(&db, &remote.id)? { + req.delete(&db)?; // delete the old request. + results.rpcs.push(rpc::request_delete(mgid, req.id)); + } + let mut request = Request::new( + remote.id, + remote.addr, + remote.name.clone(), + remark.clone(), + false, + true, + ); + // save to db. + request.insert(&db)?; + drop(db); + // save the avatar. + write_avatar_sync(&layer.base, &mgid, &request.gid, remote.avatar.clone())?; + + layer.group.write().await.broadcast( + &mgid, + InnerEvent::SessionRequestCreate(false, remote, remark), + REQUEST_TABLE_PATH, + request.id, + &mut results, + )?; + + results.rpcs.push(rpc::request_create(mgid, &request)); + return Ok(results); + } else { + let msg = conn_agree_message(layer, 0, &mgid, addr).await?; + results.layers.push((mgid, fgid, msg)); + } + } + LayerEvent::Agree(remote, proof) => { + // 0. check verify. + proof.verify(&fgid, &addr, &layer.addr)?; + // 1. check friendship. + if load_friend(&layer.base, &mgid, &fgid)?.is_none() { + // agree request for friend. + let db = chat_db(&layer.base, &mgid)?; + if let Some(mut request) = Request::get(&db, &remote.id)? { + layer.group.write().await.broadcast( + &mgid, + InnerEvent::SessionRequestHandle( + request.gid, + true, + remote.avatar.clone(), + ), + REQUEST_TABLE_PATH, + request.id, + &mut results, + )?; + request.is_over = true; + request.is_ok = true; + request.update(&db)?; + let request_id = request.id; + let friend = Friend::from_request(&db, request)?; + write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; + results + .rpcs + .push(rpc::request_agree(mgid, request_id, &friend)); + + // ADD NEW SESSION. + let s_db = session_db(&layer.base, &mgid)?; + let mut session = Session::new( + friend.id, + friend.gid, + friend.addr, + SessionType::Chat, + friend.name, + friend.datetime, + ); + session.insert(&s_db)?; + results.rpcs.push(session_create(mgid, &session)); + } + drop(db); + } + } + LayerEvent::Reject => { + let db = chat_db(&layer.base, &mgid)?; + if let Some(mut request) = Request::get(&db, &fgid)? { + layer.group.write().await.broadcast( + &mgid, + InnerEvent::SessionRequestHandle(request.gid, false, vec![]), + REQUEST_TABLE_PATH, + request.id, + &mut results, + )?; + request.is_over = true; + request.is_ok = false; + request.update(&db)?; + results.rpcs.push(rpc::request_reject(mgid, request.id)); + } + drop(db); + } LayerEvent::Message(hash, m) => { + let (_sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?; let db = chat_db(&layer.base, &mgid)?; if !Message::exist(&db, &hash)? { let msg = m.clone().handle(false, mgid, &layer.base, &db, fid, hash)?; @@ -601,6 +351,7 @@ impl LayerEvent { } } LayerEvent::Info(remote) => { + let (_sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?; let avatar = remote.avatar.clone(); let db = chat_db(&layer.base, &mgid)?; let mut f = Friend::get_id(&db, fid)?.ok_or(new_io_error(""))?; @@ -620,11 +371,7 @@ impl LayerEvent { results.rpcs.push(rpc::friend_info(mgid, &f)); } LayerEvent::OnlinePing => { - layer.group.write().await.status( - &mgid, - StatusEvent::SessionFriendOnline(fgid), - &mut results, - )?; + let (sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?; layer .running_mut(&mgid)? .check_add_online(fgid, Online::Direct(addr), sid, fid)?; @@ -634,16 +381,13 @@ impl LayerEvent { results.layers.push((mgid, fgid, msg)); } LayerEvent::OnlinePong => { - layer.group.write().await.status( - &mgid, - StatusEvent::SessionFriendOnline(fgid), - &mut results, - )?; + let (sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?; layer .running_mut(&mgid)? .check_add_online(fgid, Online::Direct(addr), sid, fid)?; } LayerEvent::Close => { + let (_sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?; layer.group.write().await.broadcast( &mgid, InnerEvent::SessionFriendClose(fgid), @@ -747,13 +491,31 @@ fn load_friend(base: &PathBuf, mgid: &GroupId, fgid: &GroupId) -> Result Result> { + let db = chat_db(base, mgid)?; + if let Some(friend) = Friend::get(&db, fgid)? { + if &friend.addr != addr { + let _ = Friend::addr_update(&db, friend.id, addr); + } + Ok(Some(friend)) + } else { + Ok(None) + } +} + pub(super) fn req_message(layer: &mut Layer, me: User, request: Request) -> SendType { // update delivery. let uid = layer.delivery.len() as u64 + 1; layer.delivery.insert(uid, (me.id, request.id)); - let req = LayerRequest::Friend(me, request.remark); + let req = LayerEvent::Request(me, request.remark); let data = postcard::to_allocvec(&req).unwrap_or(vec![]); - SendType::Connect(uid, request.addr, None, None, data) + SendType::Event(uid, request.addr, data) } pub(super) fn reject_message( @@ -762,10 +524,10 @@ pub(super) fn reject_message( addr: PeerAddr, me_id: GroupId, ) -> SendType { - let data = postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]); + let data = postcard::to_allocvec(&LayerEvent::Reject).unwrap_or(vec![]); let uid = layer.delivery.len() as u64 + 1; layer.delivery.insert(uid, (me_id, tid)); - SendType::Result(uid, addr, false, false, data) + SendType::Event(uid, addr, data) } pub(super) fn event_message( @@ -782,13 +544,13 @@ pub(super) fn event_message( } pub(crate) fn chat_conn(proof: Proof, addr: PeerAddr) -> SendType { - let data = postcard::to_allocvec(&LayerRequest::Connect(proof)).unwrap_or(vec![]); + let data = postcard::to_allocvec(&proof).unwrap_or(vec![]); SendType::Connect(0, addr, None, None, data) } async fn conn_res_message(layer: &Layer, mgid: &GroupId, addr: PeerAddr) -> Result { let proof = layer.group.read().await.prove_addr(mgid, &addr)?; - let data = postcard::to_allocvec(&LayerResponse::Connect(proof)).unwrap_or(vec![]); + let data = postcard::to_allocvec(&proof).unwrap_or(vec![]); Ok(SendType::Result(0, addr, true, false, data)) } @@ -804,8 +566,8 @@ async fn conn_agree_message( let proof = group_lock.prove_addr(mgid, &addr)?; let me = group_lock.clone_user(mgid)?; drop(group_lock); - let data = postcard::to_allocvec(&LayerResponse::Agree(me, proof)).unwrap_or(vec![]); - Ok(SendType::Result(uid, addr, true, false, data)) + let data = postcard::to_allocvec(&LayerEvent::Agree(me, proof)).unwrap_or(vec![]); + Ok(SendType::Event(uid, addr, data)) } pub(super) fn rpc_agree_message( @@ -818,11 +580,11 @@ pub(super) fn rpc_agree_message( ) -> Result { let uid = layer.delivery.len() as u64 + 1; layer.delivery.insert(uid, (*mgid, tid)); - let data = postcard::to_allocvec(&LayerResponse::Agree(me, proof)).unwrap_or(vec![]); - Ok(SendType::Result(uid, addr, true, false, data)) + let data = postcard::to_allocvec(&LayerEvent::Agree(me, proof)).unwrap_or(vec![]); + Ok(SendType::Event(uid, addr, data)) } // maybe need if gid or addr in blocklist. fn res_reject() -> Vec { - postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]) + postcard::to_allocvec(&LayerEvent::Reject).unwrap_or(vec![]) } diff --git a/src/layer.rs b/src/layer.rs index c473f30..cc6045f 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -7,7 +7,7 @@ use tdn::{ types::{ group::GroupId, message::SendType, - primitive::{new_io_error, HandleResult, PeerAddr, Result}, + primitive::{new_io_error, PeerAddr, Result}, }, }; @@ -351,17 +351,19 @@ impl RunningAccount { } /// peer leave, remove online peer. - pub fn peer_leave(&mut self, addr: &PeerAddr) -> Vec<(GroupId, i64)> { + pub fn peer_leave(&mut self, addr: &PeerAddr) -> Vec { let mut peers = vec![]; + let mut deletes = vec![]; for (fgid, online) in &self.sessions { if online.online.addr() == addr { - peers.push((*fgid, online.db_id)) + peers.push(online.db_id); + deletes.push(*fgid); } } - - for i in &peers { - self.sessions.remove(&i.0); + for i in &deletes { + self.sessions.remove(&i); } + peers } diff --git a/src/rpc.rs b/src/rpc.rs index df3ddfb..dccb9be 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -91,7 +91,7 @@ pub(crate) fn session_last( } #[inline] -pub(crate) fn session_update( +pub(crate) fn _session_update( mgid: GroupId, id: &i64, addr: &PeerAddr, diff --git a/src/session.rs b/src/session.rs index 00f2dd5..3cfc674 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,3 +1,4 @@ +use std::path::PathBuf; use tdn::types::{ group::GroupId, primitive::{new_io_error, PeerAddr, Result}, @@ -5,6 +6,8 @@ use tdn::types::{ }; use tdn_storage::local::{DStorage, DsValue}; +use crate::storage::session_db; + pub(crate) enum SessionType { Chat, Group, @@ -43,7 +46,7 @@ impl SessionType { } pub(crate) struct Session { - id: i64, + pub id: i64, fid: i64, pub gid: GroupId, pub addr: PeerAddr, @@ -184,3 +187,31 @@ impl Session { )) } } + +#[inline] +pub(crate) fn connect_session( + base: &PathBuf, + mgid: &GroupId, + s_type: &SessionType, + fid: &i64, + addr: &PeerAddr, +) -> Result> { + let db = session_db(base, mgid)?; + + let sql = format!("SELECT id, fid, gid, addr, s_type, name, is_top, is_close, last_datetime, last_content, last_readed FROM sessions WHERE s_type = {} AND fid = {}", s_type.to_int(), fid); + + let mut matrix = db.query(&sql)?; + if matrix.len() > 0 { + let session = Session::from_values(matrix.pop().unwrap()); // safe unwrap() + + let _ = db.update(&format!( + "UPDATE sessions SET addr = '{}' WHERE id = {}", + addr.to_hex(), + session.id, + )); + + Ok(Some(session)) + } else { + Ok(None) + } +}