diff --git a/src/apps/chat/layer.rs b/src/apps/chat/layer.rs index f3b1dfd..a50b4d4 100644 --- a/src/apps/chat/layer.rs +++ b/src/apps/chat/layer.rs @@ -50,19 +50,19 @@ pub(crate) enum LayerResponse { //Service, } -/// Esse app's Event. +/// ESSE chat layer Event. #[derive(Serialize, Deserialize)] pub(crate) enum LayerEvent { + /// receiver gid, sender gid. as BaseLayerEvent. + OnlinePing, + /// receiver gid, sender gid. as BaseLayerEvent. + OnlinePong, + /// receiver gid, sender gid. as BaseLayerEvent. + Offline, /// receiver gid, sender gid, message. Message(EventId, NetworkMessage), /// receiver gid, sender user. Info(User), - /// receiver gid, sender gid. - OnlinePing, - /// receiver gid, sender gid. - OnlinePong, - /// receiver gid, sender gid. - Offline, /// close friendship. Close, } @@ -83,19 +83,27 @@ pub(crate) async fn handle( match request { LayerRequest::Connect(proof) => { - let fid = layer.get_remote_id(&mgid, &fgid)?; + let friend = load_friend(&layer.base, &mgid, &fgid)?; + if friend.is_none() { + let data = postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]); + let msg = SendType::Result(0, addr, false, false, data); + results.layers.push((mgid, fgid, msg)); + return Ok(results); + } + let f = friend.unwrap(); // safe. + // 1. check verify. proof.verify(&fgid, &addr, &layer.addr)?; // 2. online this group. layer .running_mut(&mgid)? - .check_add_online(fgid, Online::Direct(addr))?; + .check_add_online(fgid, Online::Direct(addr), f.id)?; // 3. update remote addr. TODO let db = session_db(&layer.base, &mgid)?; - Friend::addr_update(&db, fid, &addr)?; + Friend::addr_update(&db, f.id, &addr)?; drop(db); // 4. online to UI. - results.rpcs.push(rpc::friend_online(mgid, fid, addr)); + results.rpcs.push(rpc::friend_online(mgid, f.id, addr)); // 5. connected. let msg = conn_res_message(&layer, &mgid, addr).await?; results.layers.push((mgid, fgid, msg)); @@ -106,8 +114,8 @@ pub(crate) async fn handle( )?; } LayerRequest::Friend(remote, remark) => { - let some_fid = layer.get_remote_id(&mgid, &fgid); - if some_fid.is_err() { + let some_friend = load_friend(&layer.base, &mgid, &fgid)?; + if some_friend.is_none() { // check if exist request. let db = session_db(&layer.base, &mgid)?; if let Some(req) = Request::get(&db, &remote.id)? { @@ -139,15 +147,23 @@ pub(crate) async fn handle( results.rpcs.push(rpc::request_create(mgid, &request)); return Ok(results); } - let fid = some_fid.unwrap(); // safe checked. + let mut friend = some_friend.unwrap(); // safe checked. // already friendship & update. // 1. online this group. - layer - .running_mut(&mgid)? - .check_add_online(fgid, Online::Direct(addr))?; + layer.running_mut(&mgid)?.check_add_online( + fgid, + Online::Direct(addr), + friend.id, + )?; // 2. update remote user. - let mut friend = layer.update_friend(&mgid, fid, remote)?; + friend.name = remote.name; + friend.addr = remote.addr; + let db = session_db(&layer.base, &mgid)?; + friend.remote_update(&db)?; + drop(db); + write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; + // 3. online to UI. friend.online = true; results.rpcs.push(rpc::friend_info(mgid, &friend)); @@ -223,11 +239,16 @@ pub(crate) async fn handle( // 1. check verify. proof.verify(&fgid, &addr, &layer.addr)?; // 2. check has this remove. - let fid = layer.get_remote_id(&mgid, &fgid)?; + let some_friend = load_friend(&layer.base, &mgid, &fgid)?; + if some_friend.is_none() { + return Ok(results); + } + let fid = some_friend.unwrap().id; // safe. + // 3. online this group. layer .running_mut(&mgid)? - .check_add_online(fgid, Online::Direct(addr))?; + .check_add_online(fgid, Online::Direct(addr), fid)?; // 4. update remote addr. let db = session_db(&layer.base, &mgid)?; Friend::addr_update(&db, fid, &addr)?; @@ -243,12 +264,14 @@ pub(crate) async fn handle( LayerResponse::Agree(remote, proof) => { // 1. check verify. proof.verify(&fgid, &addr, &layer.addr)?; - if let Ok(fid) = layer.get_remote_id(&mgid, &fgid) { + if let Some(friend) = load_friend(&layer.base, &mgid, &fgid)? { // already friendship. - layer - .running_mut(&mgid)? - .check_add_online(fgid, Online::Direct(addr))?; - results.rpcs.push(rpc::friend_online(mgid, fid, addr)); + layer.running_mut(&mgid)?.check_add_online( + fgid, + Online::Direct(addr), + friend.id, + )?; + results.rpcs.push(rpc::friend_online(mgid, friend.id, addr)); layer.group.write().await.status( &mgid, StatusEvent::SessionFriendOnline(fgid), @@ -275,7 +298,6 @@ pub(crate) async fn handle( let request_id = request.id; let friend = Friend::from_request(&db, request)?; write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; - layer.running_mut(&mgid)?.add_permissioned(fgid, friend.id); results .rpcs .push(rpc::request_agree(mgid, request_id, &friend)); @@ -299,11 +321,16 @@ pub(crate) async fn handle( // 1. check verify. proof.verify(&fgid, &addr, &layer.addr)?; // 2. check has this remove. - let fid = layer.get_remote_id(&mgid, &fgid)?; + let some_friend = load_friend(&layer.base, &mgid, &fgid)?; + if some_friend.is_none() { + return Ok(results); + } + let fid = some_friend.unwrap().id; // safe. + // 3. online this group. layer .running_mut(&mgid)? - .check_add_online(fgid, Online::Direct(addr))?; + .check_add_online(fgid, Online::Direct(addr), fid)?; // 4. update remote addr. let db = session_db(&layer.base, &mgid)?; Friend::addr_update(&db, fid, &addr)?; @@ -322,12 +349,14 @@ pub(crate) async fn handle( LayerResponse::Agree(remote, proof) => { // 1. check verify. proof.verify(&fgid, &addr, &layer.addr)?; - if let Ok(fid) = layer.get_remote_id(&mgid, &fgid) { + if let Some(friend) = load_friend(&layer.base, &mgid, &fgid)? { // already friendship. - layer - .running_mut(&mgid)? - .check_add_online(fgid, Online::Direct(addr))?; - results.rpcs.push(rpc::friend_online(mgid, fid, addr)); + layer.running_mut(&mgid)?.check_add_online( + fgid, + Online::Direct(addr), + friend.id, + )?; + results.rpcs.push(rpc::friend_online(mgid, friend.id, addr)); layer.group.write().await.status( &mgid, StatusEvent::SessionFriendOnline(fgid), @@ -354,7 +383,6 @@ pub(crate) async fn handle( let request_id = request.id; let friend = Friend::from_request(&db, request)?; write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; - layer.running_mut(&mgid)?.add_permissioned(fgid, friend.id); results .rpcs .push(rpc::request_agree(mgid, request_id, &friend)); @@ -432,7 +460,7 @@ impl LayerEvent { ) -> Result { let event: LayerEvent = postcard::from_bytes(&bytes).map_err(|_| new_io_error("serialize event error."))?; - let fid = layer.get_remote_id(&mgid, &fgid)?; + let fid = layer.get_running_remote_id(&mgid, &fgid)?; let mut results = HandleResult::new(); @@ -453,7 +481,14 @@ impl LayerEvent { } LayerEvent::Info(remote) => { let avatar = remote.avatar.clone(); - let f = layer.update_friend(&mgid, fid, remote)?; + let db = session_db(&layer.base, &mgid)?; + let mut f = Friend::get_id(&db, fid)?.ok_or(new_io_error(""))?; + f.name = remote.name; + f.addr = remote.addr; + f.remote_update(&db)?; + drop(db); + write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; + layer.group.write().await.broadcast( &mgid, InnerEvent::SessionFriendInfo(f.gid, f.addr, f.name.clone(), avatar), @@ -471,7 +506,7 @@ impl LayerEvent { )?; layer .running_mut(&mgid)? - .check_add_online(fgid, Online::Direct(addr))?; + .check_add_online(fgid, Online::Direct(addr), fid)?; results.rpcs.push(rpc::friend_online(mgid, fid, addr)); let data = postcard::to_allocvec(&LayerEvent::OnlinePong).unwrap_or(vec![]); let msg = SendType::Event(0, addr, data); @@ -485,7 +520,7 @@ impl LayerEvent { )?; layer .running_mut(&mgid)? - .check_add_online(fgid, Online::Direct(addr))?; + .check_add_online(fgid, Online::Direct(addr), fid)?; results.rpcs.push(rpc::friend_online(mgid, fid, addr)); } LayerEvent::Offline => { @@ -505,7 +540,7 @@ impl LayerEvent { fid, &mut results, )?; - layer.remove_friend(&mgid, &fgid); + layer.remove_online(&mgid, &fgid); let db = session_db(&layer.base, &mgid)?; Friend::id_close(&db, fid)?; drop(db); @@ -593,6 +628,12 @@ impl LayerEvent { } } +#[inline] +fn load_friend(base: &PathBuf, mgid: &GroupId, fgid: &GroupId) -> Result> { + let db = session_db(base, mgid)?; + Friend::get(&db, fgid) +} + pub(super) fn req_message(layer: &mut Layer, me: User, request: Request) -> SendType { // update delivery. let uid = layer.delivery.len() as u64 + 1; @@ -627,14 +668,9 @@ pub(super) fn event_message( SendType::Event(uid, addr, data) } -pub(crate) async fn conn_req_message( - layer: &Layer, - mgid: &GroupId, - addr: PeerAddr, -) -> Result { - let proof = layer.group.read().await.prove_addr(mgid, &addr)?; +pub(crate) fn chat_conn(proof: Proof, addr: PeerAddr) -> SendType { let data = postcard::to_allocvec(&LayerRequest::Connect(proof)).unwrap_or(vec![]); - Ok(SendType::Connect(0, addr, None, None, data)) + SendType::Connect(0, addr, None, None, data) } async fn conn_res_message(layer: &Layer, mgid: &GroupId, addr: PeerAddr) -> Result { @@ -674,6 +710,6 @@ pub(super) fn rpc_agree_message( } // maybe need if gid or addr in blocklist. -fn _res_reject() -> Vec { +fn res_reject() -> Vec { postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]) } diff --git a/src/apps/chat/mod.rs b/src/apps/chat/mod.rs index d8153f4..d649fe1 100644 --- a/src/apps/chat/mod.rs +++ b/src/apps/chat/mod.rs @@ -2,7 +2,7 @@ mod layer; mod models; pub(crate) mod rpc; -pub(crate) use layer::conn_req_message; +pub(crate) use layer::chat_conn; pub(crate) use layer::handle as layer_handle; pub(crate) use layer::LayerEvent; pub(crate) use models::{Friend, Message, MessageType, NetworkMessage, Request}; diff --git a/src/apps/chat/rpc.rs b/src/apps/chat/rpc.rs index 3da13d5..09a9dbe 100644 --- a/src/apps/chat/rpc.rs +++ b/src/apps/chat/rpc.rs @@ -122,7 +122,17 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "chat-friend-list", |gid: GroupId, _params: Vec, state: Arc| async move { - let friends = state.layer.read().await.all_friends_with_online(&gid)?; + let layer_lock = state.layer.read().await; + let db = session_db(&layer_lock.base, &gid)?; + let mut friends = Friend::all(&db)?; + drop(db); + + let gids: Vec<&GroupId> = friends.iter().map(|f| &f.gid).collect(); + let onlines = layer_lock.merge_online(&gid, gids)?; + for (index, online) in onlines.iter().enumerate() { + friends[index].online = *online; + } + Ok(HandleResult::rpc(friend_list(friends))) }, ); @@ -182,7 +192,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { friend.close(&db)?; drop(db); - let online = layer_lock.remove_friend(&gid, &friend.gid); + let online = layer_lock.remove_online(&gid, &friend.gid); drop(layer_lock); if let Some(faddr) = online { @@ -222,7 +232,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { friend.delete(&db)?; drop(db); - let online = layer_lock.remove_friend(&gid, &friend.gid); + let online = layer_lock.remove_online(&gid, &friend.gid); delete_avatar(layer_lock.base(), &gid, &friend.gid).await?; drop(layer_lock); @@ -346,7 +356,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { request.update(&db)?; let f = Friend::from_request(&db, request)?; - layer_lock.running_mut(&gid)?.add_permissioned(f.gid, f.id); results.rpcs.push(json!([id, f.to_rpc()])); let proof = group_lock.prove_addr(&gid, &f.addr)?; diff --git a/src/apps/group_chat/layer.rs b/src/apps/group_chat/layer.rs index d8b4e45..31940c5 100644 --- a/src/apps/group_chat/layer.rs +++ b/src/apps/group_chat/layer.rs @@ -3,12 +3,13 @@ use tdn::{ smol::lock::RwLock, types::{ group::GroupId, - message::RecvType, - primitive::{new_io_error, HandleResult, Result}, + message::{RecvType, SendType}, + primitive::{new_io_error, HandleResult, PeerAddr, Result}, }, }; -use group_chat_types::GroupResult; +use group_chat_types::{GroupConnect, GroupResult, JoinProof}; +use tdn_did::Proof; //use group_chat_types::{Event, GroupConnect, GroupEvent, GroupInfo, GroupResult, GroupType}; use crate::layer::Layer; @@ -72,3 +73,9 @@ pub(crate) async fn handle( Ok(results) } + +pub(crate) fn group_chat_conn(proof: Proof, addr: PeerAddr, gid: GroupId, height: u64) -> SendType { + let data = postcard::to_allocvec(&GroupConnect::Join(gid, JoinProof::Had(proof), height)) + .unwrap_or(vec![]); + SendType::Connect(0, addr, None, None, data) +} diff --git a/src/apps/group_chat/mod.rs b/src/apps/group_chat/mod.rs index 52cd2d8..c6b2d00 100644 --- a/src/apps/group_chat/mod.rs +++ b/src/apps/group_chat/mod.rs @@ -6,11 +6,12 @@ use tdn::types::{group::GroupId, message::SendType, primitive::HandleResult}; /// Group chat server to ESSE. #[inline] -pub(super) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) { +pub(crate) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) { results.layers.push((gid, GROUP_ID, msg)); } pub(crate) use models::GroupChat; pub(crate) mod rpc; +pub(crate) use layer::group_chat_conn; pub(crate) use layer::handle as layer_handle; pub(crate) use rpc::new_rpc_handler; diff --git a/src/apps/group_chat/models.rs b/src/apps/group_chat/models.rs index 79379a7..f75887f 100644 --- a/src/apps/group_chat/models.rs +++ b/src/apps/group_chat/models.rs @@ -59,7 +59,7 @@ pub(crate) struct GroupChat { /// group chat type. g_type: GroupType, /// group chat server addresse. - g_addr: PeerAddr, + pub g_addr: PeerAddr, /// group chat name. g_name: String, /// group chat simple intro. @@ -213,6 +213,16 @@ impl GroupChat { Ok(groups) } + /// use in rpc when load account groups. + pub fn all_ok(db: &DStorage) -> Result> { + let matrix = db.query("SELECT id, owner, gcd, gtype, addr, name, bio, is_top, is_ok, is_need_agree, is_closed, key, last_datetime, last_content, last_readed, datetime FROM groups WHERE is_closed = false ORDER BY last_datetime DESC")?; + let mut groups = vec![]; + for values in matrix { + groups.push(GroupChat::from_values(values, false)); + } + Ok(groups) + } + pub fn get(db: &DStorage, gid: &GroupId) -> Result> { let sql = format!("SELECT id, owner, gcd, gtype, addr, name, bio, is_top, is_ok, is_need_agree, is_closed, key, last_datetime, last_content, last_readed, datetime FROM groups WHERE gcd = '{}' AND is_deleted = false", gid.to_hex()); let mut matrix = db.query(&sql)?; @@ -223,6 +233,11 @@ impl GroupChat { Ok(None) } + pub fn get_height(&self, db: &DStorage) -> Result { + // TODO + Ok(0) + } + pub fn insert(&mut self, db: &DStorage) -> Result<()> { let sql = format!("INSERT INTO groups (owner, gcd, gtype, addr, name, bio, is_top, is_ok, is_need_agree, is_closed, key, last_datetime, last_content, last_readed, datetime, is_deleted) VALUES ('{}', '{}', {}, '{}', '{}', '{}', {}, {}, {}, {}, '{}', {}, '{}', {}, {}, false)", self.owner.to_hex(), diff --git a/src/apps/group_chat/rpc.rs b/src/apps/group_chat/rpc.rs index 3869a3b..0343186 100644 --- a/src/apps/group_chat/rpc.rs +++ b/src/apps/group_chat/rpc.rs @@ -59,7 +59,17 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "group-chat-list", |gid: GroupId, _params: Vec, state: Arc| async move { - let groups = state.layer.read().await.all_groups_with_online(&gid)?; + let layer_lock = state.layer.read().await; + let db = group_chat_db(&layer_lock.base, &gid)?; + let mut groups = GroupChat::all(&db)?; + drop(db); + + let gids: Vec<&GroupId> = groups.iter().map(|g| &g.g_id).collect(); + let onlines = layer_lock.merge_online(&gid, gids)?; + for (index, online) in onlines.iter().enumerate() { + groups[index].online = *online; + } + Ok(HandleResult::rpc(group_list(groups))) }, ); diff --git a/src/event.rs b/src/event.rs index 1177764..b2b438e 100644 --- a/src/event.rs +++ b/src/event.rs @@ -285,15 +285,6 @@ impl InnerEvent { write_avatar_sync(group.base(), &gid, &request.gid, avatar)?; } let friend = Friend::from_request(&db, request)?; - let layer_lock = layer.clone(); - let rfid = friend.id; - let ggid = gid.clone(); - tdn::smol::spawn(async move { - if let Ok(running) = layer_lock.write().await.running_mut(&ggid) { - running.add_permissioned(rgid, rfid); - } - }) - .detach(); results .rpcs .push(chat_rpc::request_agree(gid, rid, &friend)); @@ -400,7 +391,7 @@ impl InnerEvent { let ggid = gid.clone(); let sender = group.sender(); tdn::smol::spawn(async move { - let online = layer_lock.write().await.remove_friend(&ggid, &f.gid); + let online = layer_lock.write().await.remove_online(&ggid, &f.gid); if let Some(faddr) = online { let mut addrs: HashMap = HashMap::new(); addrs.insert(faddr, f.gid); @@ -430,7 +421,7 @@ impl InnerEvent { let ggid = gid.clone(); let sender = group.sender(); tdn::smol::spawn(async move { - let online = layer_lock.write().await.remove_friend(&ggid, &f.gid); + let online = layer_lock.write().await.remove_online(&ggid, &f.gid); if let Some(faddr) = online { let mut addrs: HashMap = HashMap::new(); addrs.insert(faddr, f.gid); @@ -503,10 +494,11 @@ impl StatusEvent { .push(chat_rpc::friend_online(gid, f.id, f.addr)); let layer_lock = layer.clone(); let rgid = f.gid; + let fid = f.id; let ggid = gid.clone(); tdn::smol::spawn(async move { if let Ok(running) = layer_lock.write().await.running_mut(&ggid) { - let _ = running.check_add_online(rgid, Online::Relay(addr)); + let _ = running.check_add_online(rgid, Online::Relay(addr), fid); } }) .detach(); @@ -795,15 +787,6 @@ impl SyncEvent { write_avatar_sync(&base, &gid, &request.gid, avatar)?; } let friend = Friend::from_request(&session_db, request)?; - let layer_lock = layer.clone(); - let rfid = friend.id; - let ggid = gid.clone(); - tdn::smol::spawn(async move { - if let Ok(running) = layer_lock.write().await.running_mut(&ggid) { - running.add_permissioned(rgid, rfid); - } - }) - .detach(); results .rpcs .push(chat_rpc::request_agree(gid, rid, &friend)); @@ -854,7 +837,7 @@ impl SyncEvent { let fgid = friend.gid; let sender = group.sender(); tdn::smol::spawn(async move { - let online = layer_lock.write().await.remove_friend(&ggid, &fgid); + let online = layer_lock.write().await.remove_online(&ggid, &fgid); if let Some(faddr) = online { let mut addrs: HashMap = HashMap::new(); addrs.insert(faddr, fgid); diff --git a/src/layer.rs b/src/layer.rs index 4c88e83..91607a0 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -1,3 +1,4 @@ +use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; @@ -5,18 +6,29 @@ use tdn::{ smol::lock::RwLock, types::{ group::GroupId, - message::{RecvType, SendType}, - primitive::{new_io_error, HandleResult, PeerAddr, Result}, + message::SendType, + primitive::{new_io_error, PeerAddr, Result}, }, }; use tdn_did::user::User; -use crate::apps::chat::conn_req_message; -use crate::apps::chat::Friend; -use crate::apps::group_chat::GroupChat; +use crate::apps::chat::{chat_conn, Friend}; +use crate::apps::group_chat::{group_chat_conn, GroupChat, GROUP_ID}; use crate::group::Group; use crate::storage::{group_chat_db, session_db, write_avatar_sync}; +/// ESSE app's BaseLayerEvent. +/// EVERY LAYER APP MUST EQUAL THE FIRST THREE FIELDS. +#[derive(Serialize, Deserialize)] +pub(crate) enum LayerEvent { + /// receiver gid, sender gid. + OnlinePing, + /// receiver gid, sender gid. + OnlinePong, + /// receiver gid, sender gid. + Offline, +} + /// ESSE layers. pub(crate) struct Layer { /// account_gid => running_account. @@ -56,8 +68,7 @@ impl Layer { pub fn add_running(&mut self, gid: &GroupId) -> Result<()> { if !self.runnings.contains_key(gid) { - self.runnings - .insert(*gid, RunningAccount::init(&self.base, gid)?); + self.runnings.insert(*gid, RunningAccount::init()); } Ok(()) @@ -97,90 +108,47 @@ impl Layer { addrs } - pub fn get_remote_id(&self, mgid: &GroupId, fgid: &GroupId) -> Result { - self.running(mgid)?.get_permissioned(fgid) + pub fn get_running_remote_id(&self, mgid: &GroupId, fgid: &GroupId) -> Result { + self.running(mgid)?.get_online_id(fgid) } - pub fn all_friends(&self, gid: &GroupId) -> Result> { - let db = session_db(&self.base, &gid)?; - let friends = Friend::all_ok(&db)?; - drop(db); - Ok(friends) + pub fn merge_online(&self, mgid: &GroupId, gids: Vec<&GroupId>) -> Result> { + let runnings = self.running(mgid)?; + Ok(gids.iter().map(|g| runnings.is_online(g)).collect()) } - pub fn all_friends_with_online(&self, gid: &GroupId) -> Result> { - let db = session_db(&self.base, &gid)?; - let mut friends = Friend::all(&db)?; - drop(db); - - let keys: HashMap = friends - .iter() - .enumerate() - .map(|(i, f)| (f.gid, i)) - .collect(); - - for fgid in self.running(gid)?.online_groups() { - if keys.contains_key(fgid) { - friends[keys[fgid]].online = true; // safe vec index. - } - } - - Ok(friends) + pub fn remove_online(&mut self, gid: &GroupId, fgid: &GroupId) -> Option { + self.running_mut(gid).ok()?.remove_online(fgid) } - pub fn all_groups_with_online(&self, gid: &GroupId) -> Result> { - let db = group_chat_db(&self.base, &gid)?; - let mut groups = GroupChat::all(&db)?; - drop(db); - - let keys: HashMap = groups - .iter() - .enumerate() - .map(|(i, g)| (g.g_id, i)) - .collect(); + pub async fn all_layer_conns(&self) -> Result>> { + let mut conns = HashMap::new(); + let group_lock = self.group.read().await; + for mgid in self.runnings.keys() { + let mut vecs = vec![]; - for fgid in self.running(gid)?.online_groups() { - if keys.contains_key(fgid) { - groups[keys[fgid]].online = true; + // load friend chat. + let db = session_db(&self.base, mgid)?; + for friend in Friend::all_ok(&db)? { + let proof = group_lock.prove_addr(mgid, &friend.addr).unwrap(); + vecs.push((friend.gid, chat_conn(proof, friend.addr))); } - } - - Ok(groups) - } - - pub fn update_friend(&self, gid: &GroupId, fid: i64, remote: User) -> Result { - let db = session_db(&self.base, &gid)?; - if let Some(mut friend) = Friend::get_id(&db, fid)? { - friend.name = remote.name; - friend.addr = remote.addr; - friend.remote_update(&db)?; drop(db); - write_avatar_sync(&self.base, gid, &remote.id, remote.avatar)?; - Ok(friend) - } else { - drop(db); - Err(new_io_error("missing friend id")) - } - } - - pub fn remove_friend(&mut self, gid: &GroupId, fgid: &GroupId) -> Option { - self.running_mut(gid).ok()?.remove_permissioned(fgid) - } - pub async fn all_friend_conns(&self) -> HashMap> { - let mut conns = HashMap::new(); - for mgid in self.runnings.keys() { - if let Ok(friends) = self.all_friends(mgid) { - let mut vecs = vec![]; - for friend in friends { - if let Ok(msg) = conn_req_message(self, &friend.gid, friend.addr).await { - vecs.push((friend.gid, msg)); - } - } - conns.insert(*mgid, vecs); + // load group chat. + let db = group_chat_db(&self.base, mgid)?; + let groups = GroupChat::all_ok(&db)?; + for g in groups { + let height = g.get_height(&db)? as u64; + let proof = group_lock.prove_addr(mgid, &g.g_addr)?; + vecs.push((GROUP_ID, group_chat_conn(proof, g.g_addr, g.g_id, height))); } + drop(db); + + conns.insert(*mgid, vecs); } - conns + + Ok(conns) } pub fn is_online(&self, faddr: &PeerAddr) -> bool { @@ -208,46 +176,39 @@ impl Online { } pub(crate) struct RunningAccount { - permissioned: HashMap, - /// online group (friends/services) => group's address. - onlines: HashMap, + /// online group (friends/services) => (group's address, group's db id) + onlines: HashMap, } impl RunningAccount { - pub fn init(base: &PathBuf, gid: &GroupId) -> Result { - let mut permissioned = HashMap::new(); - - // load friends to cache. - let db = session_db(base, gid)?; - let friends = Friend::all_id(&db)?; - for (fgid, db_id) in friends { - permissioned.insert(fgid, db_id); + pub fn init() -> Self { + RunningAccount { + onlines: HashMap::new(), } + } - // TODO load services to cache. - - // TODO load permissioned - Ok(RunningAccount { - permissioned, - onlines: HashMap::new(), - }) + pub fn get_online_id(&self, gid: &GroupId) -> Result { + self.onlines + .get(gid) + .map(|(_, id)| *id) + .ok_or(new_io_error("remote not online")) } /// get all onlines's groupid - pub fn online_groups(&self) -> Vec<&GroupId> { - self.onlines.keys().map(|k| k).collect() + pub fn is_online(&self, gid: &GroupId) -> bool { + self.onlines.contains_key(gid) } /// get online peer's addr. pub fn online(&self, gid: &GroupId) -> Result { self.onlines .get(gid) - .map(|online| *online.addr()) + .map(|(online, _)| *online.addr()) .ok_or(new_io_error("remote not online")) } pub fn online_direct(&self, gid: &GroupId) -> Result { - if let Some(online) = self.onlines.get(gid) { + if let Some((online, _)) = self.onlines.get(gid) { match online { Online::Direct(addr) => return Ok(*addr), _ => {} @@ -260,29 +221,29 @@ impl RunningAccount { pub fn onlines(&self) -> Vec<(&GroupId, &PeerAddr)> { self.onlines .iter() - .map(|(fgid, online)| (fgid, online.addr())) + .map(|(fgid, (online, _))| (fgid, online.addr())) .collect() } /// check add online. - pub fn check_add_online(&mut self, gid: GroupId, online: Online) -> Result<()> { - if let Some(o) = self.onlines.get(&gid) { + pub fn check_add_online(&mut self, gid: GroupId, online: Online, id: i64) -> Result<()> { + if let Some((o, _)) = self.onlines.get(&gid) { match (o, &online) { (Online::Relay(..), Online::Direct(..)) => { - self.onlines.insert(gid, online); + self.onlines.insert(gid, (online, id)); Ok(()) } _ => Err(new_io_error("remote had online")), } } else { - self.onlines.insert(gid, online); + self.onlines.insert(gid, (online, id)); Ok(()) } } /// check offline, and return is direct. pub fn check_offline(&mut self, gid: &GroupId, addr: &PeerAddr) -> bool { - if let Some(online) = self.onlines.remove(gid) { + if let Some((online, _)) = self.onlines.remove(gid) { if online.addr() != addr { return false; } @@ -297,10 +258,14 @@ impl RunningAccount { false } + pub fn remove_online(&mut self, gid: &GroupId) -> Option { + self.onlines.remove(gid).map(|(online, _)| *online.addr()) + } + /// remove all onlines peer. pub fn remove_onlines(self) -> Vec<(PeerAddr, GroupId)> { let mut peers = vec![]; - for (fgid, online) in self.onlines { + for (fgid, (online, _)) in self.onlines { match online { Online::Direct(addr) => peers.push((addr, fgid)), _ => {} @@ -311,7 +276,7 @@ impl RunningAccount { /// check if addr is online. pub fn check_addr_online(&self, addr: &PeerAddr) -> bool { - for (_, online) in &self.onlines { + for (_, (online, _)) in &self.onlines { if online.addr() == addr { return true; } @@ -322,11 +287,9 @@ impl RunningAccount { /// peer leave, remove online peer. pub fn peer_leave(&mut self, addr: &PeerAddr) -> Vec<(GroupId, i64)> { let mut peers = vec![]; - for (fgid, online) in &self.onlines { + for (fgid, (online, id)) in &self.onlines { if online.addr() == addr { - if let Some(i) = self.permissioned.get(fgid) { - peers.push((*fgid, *i)) - } + peers.push((*fgid, *id)) } } @@ -336,30 +299,11 @@ impl RunningAccount { peers } - /// add the permissioned group. - pub fn add_permissioned(&mut self, gid: GroupId, id: i64) { - self.permissioned.insert(gid, id); - } - - /// remove the permissioned group. - pub fn remove_permissioned(&mut self, gid: &GroupId) -> Option { - self.permissioned.remove(gid); - self.onlines.remove(gid).and_then(|o| match o { - Online::Direct(addr) => Some(addr), - _ => None, - }) - } - - /// check the group is permissioned. - pub fn get_permissioned(&self, gid: &GroupId) -> Result { - self.permissioned - .get(gid) - .cloned() - .ok_or(new_io_error("remote missing")) - } - /// list all onlines groups. pub fn _list_onlines(&self) -> Vec<(&GroupId, &PeerAddr)> { - self.onlines.iter().map(|(k, v)| (k, v.addr())).collect() + self.onlines + .iter() + .map(|(k, (v, _))| (k, v.addr())) + .collect() } } diff --git a/src/rpc.rs b/src/rpc.rs index fb2cad8..3df9494 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -13,10 +13,12 @@ use tdn::{ }; use crate::apps::app_rpc_inject; -use crate::apps::chat::{conn_req_message, LayerEvent}; +use crate::apps::chat::{chat_conn, Friend}; +use crate::apps::group_chat::{add_layer, group_chat_conn, GroupChat}; use crate::event::InnerEvent; use crate::group::Group; -use crate::layer::Layer; +use crate::layer::{Layer, LayerEvent}; +use crate::storage::{group_chat_db, session_db}; pub(crate) fn init_rpc( addr: PeerAddr, @@ -360,13 +362,28 @@ fn new_rpc_handler( let mut results = HandleResult::new(); - let layer_lock = state.layer.read().await; - let friends = layer_lock.all_friends(&gid)?; - for friend in friends { - let msg = conn_req_message(&layer_lock, &gid, friend.addr).await?; - results.layers.push((gid, friend.gid, msg)); + let group_lock = state.group.read().await; + let db = session_db(group_lock.base(), &gid)?; + let friends = Friend::all_ok(&db)?; + drop(db); + for f in friends { + let proof = group_lock.prove_addr(&gid, &f.addr)?; + results.layers.push((gid, f.gid, chat_conn(proof, f.addr))); } - drop(layer_lock); + + let db = group_chat_db(group_lock.base(), &gid)?; + let groups = GroupChat::all_ok(&db)?; + for g in groups { + let height = g.get_height(&db)? as u64; + let proof = group_lock.prove_addr(&gid, &g.g_addr)?; + add_layer( + &mut results, + gid, + group_chat_conn(proof, g.g_addr, g.g_id, height), + ); + } + drop(db); + drop(group_lock); let devices = state.group.read().await.distribute_conns(&gid); for device in devices { diff --git a/src/server.rs b/src/server.rs index c0ed383..97ef005 100644 --- a/src/server.rs +++ b/src/server.rs @@ -116,7 +116,12 @@ pub async fn start(db_path: String) -> Result<()> { .expect("TDN channel closed"); let t_sender = sender.clone(); let g_conns = group.read().await.all_distribute_conns(); - let l_conns = layer.read().await.all_friend_conns().await; + let l_conns = layer + .read() + .await + .all_layer_conns() + .await + .unwrap_or(HashMap::new()); tdn::smol::spawn(sleep_waiting_reboot(t_sender, g_conns, l_conns)).detach(); } }