diff --git a/src/apps/chat/mod.rs b/src/apps/chat/mod.rs deleted file mode 100644 index 43adf82..0000000 --- a/src/apps/chat/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -mod layer; -mod models; - -pub(crate) mod rpc; -pub(crate) use layer::{chat_conn, handle, update_session, LayerEvent}; -pub(crate) use models::{ - from_model, from_network_message, raw_to_network_message, to_network_message, Friend, - InviteType, Message, Request, -}; -pub(crate) use rpc::new_rpc_handler; diff --git a/src/apps/dao/layer.rs b/src/apps/dao/layer.rs index 333a72c..a784fc1 100644 --- a/src/apps/dao/layer.rs +++ b/src/apps/dao/layer.rs @@ -14,7 +14,7 @@ use group_types::{ use tdn_did::Proof; use tdn_storage::local::DStorage; -use crate::apps::chat::Friend; +use crate::group::Friend; use crate::layer::{Layer, Online}; use crate::rpc::{session_connect, session_create, session_last, session_lost, session_suspend}; use crate::session::{connect_session, Session, SessionType}; diff --git a/src/apps/dao/models/message.rs b/src/apps/dao/models/message.rs index 8adb554..b444f34 100644 --- a/src/apps/dao/models/message.rs +++ b/src/apps/dao/models/message.rs @@ -9,7 +9,7 @@ use tdn_storage::local::{DStorage, DsValue}; use group_types::NetworkMessage; -use crate::apps::chat::{Friend, MessageType}; +use crate::group::{Friend, MessageType}; use crate::storage::{ chat_db, group_db, read_avatar, read_file, read_record, write_avatar_sync, write_file_sync, write_image_sync, write_record_sync, diff --git a/src/apps/dao/rpc.rs b/src/apps/dao/rpc.rs index 058d64a..605abb3 100644 --- a/src/apps/dao/rpc.rs +++ b/src/apps/dao/rpc.rs @@ -9,7 +9,7 @@ use tdn_did::Proof; use group_types::{Event, GroupLocation, GroupType, JoinProof, LayerEvent}; -use crate::apps::chat::{Friend, MessageType}; +use crate::group::{Friend, MessageType}; use crate::layer::Online; use crate::rpc::{session_close, session_create, session_delete, session_last, RpcState}; use crate::session::{Session, SessionType}; @@ -424,8 +424,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { .filter_map(|v| v.as_i64()) .collect(); - let group_lock = state.own.read().await; - let base = group_lock.base().clone(); + let own_lock = state.own.read().await; + let base = own_lock.base().clone(); let chat = chat_db(&base, &gid)?; let group_db = group_db(&base, &gid)?; @@ -434,7 +434,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { for fid in ids { let friend = Friend::get_id(&chat, fid)?.ok_or(RpcError::ParseError)?; if Member::get_id(&group_db, &id, &friend.gid).is_err() { - let proof = group_lock.prove_addr(&gid, &friend.gid.into())?; + let proof = own_lock.prove_addr(&gid, &friend.gid.into())?; invites.push((friend.id, friend.gid, friend.addr, proof)); } } @@ -467,7 +467,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { } } - let (msg, nw, sc) = crate::apps::chat::LayerEvent::from_message( + let (msg, nw, sc) = crate::group::GroupEvent::from_message( &base, gid, fid, @@ -475,9 +475,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { contact_values, ) .await?; - let event = crate::apps::chat::LayerEvent::Message(msg.hash, nw); - let s = - crate::apps::chat::event_message(&mut layer_lock, msg.id, gid, faddr, &event); + let event = crate::group::GroupEvent::Message(msg.hash, nw); + let s = crate::group::event_message(&mut layer_lock, msg.id, gid, faddr, &event); results.layers.push((gid, fgid, s)); if let Ok(id) = diff --git a/src/apps/device/rpc.rs b/src/apps/device/rpc.rs index ef01317..8f368e2 100644 --- a/src/apps/device/rpc.rs +++ b/src/apps/device/rpc.rs @@ -76,16 +76,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; - let group_lock = state.own.read().await; - if id == group_lock.device()?.id { - let uptime = group_lock.uptime; + let own_lock = state.own.read().await; + if id == own_lock.device()?.id { + let uptime = own_lock.uptime; let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) = local_device_status(); return Ok(HandleResult::rpc(json!([ cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime ]))); } - drop(group_lock); + drop(own_lock); //let msg = state.own.write().await.event_message(addr, &OwnEvent::StatusRequest)?; //Ok(HandleResult::group(msg)) diff --git a/src/apps/group/layer.rs b/src/apps/group/layer.rs index 37158c5..a0a3f9c 100644 --- a/src/apps/group/layer.rs +++ b/src/apps/group/layer.rs @@ -7,8 +7,8 @@ use tdn::types::{ }; use tdn_storage::local::DStorage; -use crate::apps::chat::Friend; use crate::global::Global; +use crate::group::Friend; use crate::rpc::{ session_close, session_connect, session_last, session_lost, session_suspend, session_update_name, diff --git a/src/apps/group/models/message.rs b/src/apps/group/models/message.rs index 65cc6dc..0ab5935 100644 --- a/src/apps/group/models/message.rs +++ b/src/apps/group/models/message.rs @@ -8,7 +8,7 @@ use tdn::types::{ }; use tdn_storage::local::{DStorage, DsValue}; -use crate::apps::chat::{from_network_message, raw_to_network_message, to_network_message as tnm}; +use crate::group::{from_network_message, raw_to_network_message, to_network_message as tnm}; use crate::storage::group_db; use super::Member; diff --git a/src/apps/group/rpc.rs b/src/apps/group/rpc.rs index fa4aba5..55f3c18 100644 --- a/src/apps/group/rpc.rs +++ b/src/apps/group/rpc.rs @@ -1,4 +1,4 @@ -use esse_primitives::{MessageType, ESSE_ID}; +use esse_primitives::MessageType; use group_types::{Event, LayerEvent, GROUP_CHAT_ID}; use std::sync::Arc; use tdn::types::{ @@ -7,8 +7,8 @@ use tdn::types::{ rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, }; -use crate::apps::chat::{raw_to_network_message, Friend, InviteType}; use crate::global::Global; +use crate::group::{raw_to_network_message, Friend, InviteType}; use crate::rpc::{session_create, session_delete, session_update_name}; use crate::session::{Session, SessionType}; use crate::storage::{chat_db, group_db, read_avatar, session_db, write_avatar}; @@ -104,10 +104,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned(); let pid = state.pid().await; - let group_lock = state.own.read().await; - let db_key = group_lock.db_key(&pid)?; - let me = group_lock.clone_user(&pid)?; - drop(group_lock); + let own_lock = state.own.read().await; + let db_key = own_lock.db_key(&pid)?; + let me = own_lock.clone_user(&pid)?; + drop(own_lock); let db = group_db(&state.base, &pid, &db_key)?; let s_db = session_db(&state.base, &pid, &db_key)?; @@ -172,16 +172,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let m_type = MessageType::Invite; let (nm, raw) = raw_to_network_message(&pid, &state.base, &db_key, &m_type, &contact).await?; - let mut msg = crate::apps::chat::Message::new(&pid, f.id, true, m_type, raw, false); + let mut msg = crate::group::Message::new(&pid, f.id, true, m_type, raw, false); msg.insert(&chat_db)?; - let event = crate::apps::chat::LayerEvent::Message(msg.hash, nm); + let event = crate::group::GroupEvent::Message(msg.hash, nm); let tid = state.layer.write().await.delivery(msg.id); let data = bincode::serialize(&event).unwrap_or(vec![]); - let lmsg = SendType::Event(tid, f.pid, data); - results.layers.push((ESSE_ID, lmsg)); + results.groups.push(SendType::Event(tid, f.pid, data)); // update session. - crate::apps::chat::update_session(&s_db, &id, &msg, &mut results); + crate::group::update_session(&s_db, &id, &msg, &mut results); // handle group member let avatar = read_avatar(&state.base, &pid, &f.pid) diff --git a/src/apps/jarvis/rpc.rs b/src/apps/jarvis/rpc.rs index c63268c..f386ea4 100644 --- a/src/apps/jarvis/rpc.rs +++ b/src/apps/jarvis/rpc.rs @@ -11,8 +11,8 @@ use tdn_storage::local::DStorage; use tokio::sync::mpsc::Sender; use crate::account::lang_from_i64; -use crate::apps::chat::raw_to_network_message; use crate::global::Global; +use crate::group::raw_to_network_message; use crate::storage::jarvis_db; use crate::utils::answer::load_answer; diff --git a/src/apps.rs b/src/apps/mod.rs similarity index 89% rename from src/apps.rs rename to src/apps/mod.rs index c87e69a..888ff12 100644 --- a/src/apps.rs +++ b/src/apps/mod.rs @@ -1,7 +1,6 @@ use cloud_types::CLOUD_ID; use dao_types::DAO_ID; use domain_types::DOMAIN_ID; -use esse_primitives::ESSE_ID; use group_types::{GroupChatId, GROUP_CHAT_ID}; use std::collections::HashMap; use std::sync::Arc; @@ -16,7 +15,6 @@ use crate::global::Global; use crate::rpc::session_lost; use crate::storage::group_db; -pub(crate) mod chat; pub(crate) mod cloud; pub(crate) mod device; pub(crate) mod domain; @@ -28,7 +26,6 @@ pub(crate) mod wallet; pub(crate) fn app_rpc_inject(handler: &mut RpcHandler) { device::new_rpc_handler(handler); - chat::new_rpc_handler(handler); jarvis::new_rpc_handler(handler); domain::new_rpc_handler(handler); file::new_rpc_handler(handler); @@ -46,21 +43,16 @@ pub(crate) async fn app_layer_handle( ) -> Result { debug!("TODO GOT LAYER MESSAGE: ====== {} -> {} ===== ", fgid, tgid); match (fgid, tgid) { - (ESSE_ID, 0) | (0, ESSE_ID) => chat::handle(msg, global).await, (GROUP_CHAT_ID, 0) | (0, GROUP_CHAT_ID) => group::handle(msg, global).await, (DOMAIN_ID, 0) | (0, DOMAIN_ID) => domain::handle(msg, global).await, (CLOUD_ID, 0) | (0, CLOUD_ID) => cloud::handle(msg, global).await, - (DAO_ID, 0) | (0, DAO_ID) => chat::handle(msg, global).await, + (DAO_ID, 0) | (0, DAO_ID) => cloud::handle(msg, global).await, // TODO DAO _ => match msg { RecvType::Leave(peer) => { debug!("Peer leaved: {}", peer.id.to_hex()); let mut results = HandleResult::new(); let mut layer = global.layer.write().await; - if let Some(session) = layer.chats.remove(&peer.id) { - results.rpcs.push(session_lost(&session.s_id)); - } - let mut delete: HashMap> = HashMap::new(); let pid = global.pid().await; let db_key = global.own.read().await.db_key(&pid)?; diff --git a/src/apps/wallet/rpc.rs b/src/apps/wallet/rpc.rs index 529ab54..2e40ac9 100644 --- a/src/apps/wallet/rpc.rs +++ b/src/apps/wallet/rpc.rs @@ -338,13 +338,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let db_key = state.own.read().await.db_key(&pid)?; let db = wallet_db(&state.base, &pid, &db_key)?; - let group_lock = state.own.read().await; - let mnemonic = group_lock.mnemonic(&pid, lock, &state.secret)?; - let account = group_lock.account(&pid)?; + let own_lock = state.own.read().await; + let mnemonic = own_lock.mnemonic(&pid, lock, &state.secret)?; + let account = own_lock.account(&pid)?; let lang = account.lang(); let pass = account.pass.to_string(); let account_index = account.index as u32; - drop(group_lock); + drop(own_lock); let mut results = HandleResult::new(); @@ -371,16 +371,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { results.rpcs.push(address.to_rpc()); if address.main { let a_db = account_db(&state.base, &state.secret)?; - let mut group_lock = state.own.write().await; - let account = group_lock.account_mut(&pid)?; + let mut own_lock = state.own.write().await; + let account = own_lock.account_mut(&pid)?; account.wallet = address.chain.update_main(&address.address, &account.wallet); account.pub_height = account.pub_height + 1; account.update_info(&a_db)?; - let user = group_lock.clone_user(&pid)?; - drop(group_lock); + let user = own_lock.clone_user(&pid)?; + drop(own_lock); // broadcast to all friends. - state.layer.read().await.broadcast(user, &mut results); + state.group.read().await.broadcast(user, &mut results); } Ok(results) }, @@ -398,11 +398,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let pid = state.pid().await; - let group_lock = state.own.read().await; - let ckey = &group_lock.account(&pid)?.encrypt; - let db_key = group_lock.db_key(&pid)?; + let own_lock = state.own.read().await; + let ckey = &own_lock.account(&pid)?.encrypt; + let db_key = own_lock.db_key(&pid)?; let cbytes = encrypt(&state.secret, lock, ckey, sk.as_ref())?; - drop(group_lock); + drop(own_lock); let db = wallet_db(&state.base, &pid, &db_key)?; @@ -480,26 +480,26 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let lock = params[6].as_str().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let group_lock = state.own.read().await; - if !group_lock.check_lock(&pid, &lock) { + let own_lock = state.own.read().await; + if !own_lock.check_lock(&pid, &lock) { return Err(RpcError::Custom("Lock is invalid!".to_owned())); } - let db_key = group_lock.db_key(&pid)?; + let db_key = own_lock.db_key(&pid)?; let db = wallet_db(&state.base, &pid, &db_key)?; let address = Address::get(&db, &from)?; let (mnemonic, pbytes) = if address.is_gen() { - (group_lock.mnemonic(&pid, lock, &state.secret)?, vec![]) + (own_lock.mnemonic(&pid, lock, &state.secret)?, vec![]) } else { - let ckey = &group_lock.account(&pid)?.encrypt; + let ckey = &own_lock.account(&pid)?.encrypt; let pbytes = decrypt(&state.secret, lock, ckey, address.secret.as_ref())?; (String::new(), pbytes) }; - let account = group_lock.account(&pid)?; + let account = own_lock.account(&pid)?; let lang = account.lang(); let pass = account.pass.to_string(); let account_index = account.index as u32; - drop(group_lock); + drop(own_lock); let pass = if pass.len() > 0 { Some(pass.as_ref()) } else { @@ -617,16 +617,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); - let mut group_lock = state.own.write().await; - let account = group_lock.account_mut(&pid)?; + let mut own_lock = state.own.write().await; + let account = own_lock.account_mut(&pid)?; account.wallet = address.chain.update_main(&address.address, &account.wallet); account.pub_height = account.pub_height + 1; account.update_info(&a_db)?; - let user = group_lock.clone_user(&pid)?; - drop(group_lock); + let user = own_lock.clone_user(&pid)?; + drop(own_lock); // broadcast all friends. - state.layer.read().await.broadcast(user, &mut results); + state.group.read().await.broadcast(user, &mut results); Ok(HandleResult::new()) }, diff --git a/src/daemon.rs b/src/daemon.rs index 26f3ce5..8e958ed 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -11,6 +11,7 @@ mod apps; //mod consensus; //mod event; mod global; +mod group; mod layer; mod migrate; mod own; diff --git a/src/event.rs b/src/event.rs index 1d6a593..2ba39d5 100644 --- a/src/event.rs +++ b/src/event.rs @@ -13,17 +13,17 @@ use tdn_storage::local::DStorage; use tokio::sync::{mpsc::Sender, RwLock}; use crate::account::{Account, User}; -use crate::apps::chat::LayerEvent; use crate::consensus::Event as OldEvent; +use crate::group::GroupEvent; use crate::layer::Layer; use crate::migrate::consensus::{ ACCOUNT_TABLE_PATH, FILE_TABLE_PATH, FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH, }; use crate::own::{Own, OwnEvent}; -use crate::apps::chat::rpc as chat_rpc; -use crate::apps::chat::{from_model, Friend, Message, Request}; use crate::apps::file::{FileDid, RootDirectory}; +use crate::group::rpc as chat_rpc; +use crate::group::{from_model, Friend, Message, Request}; use crate::rpc; use crate::storage::{delete_avatar_sync, read_avatar_sync, write_avatar_sync}; @@ -156,7 +156,7 @@ impl InnerEvent { layer: Arc>, gid: GroupId, fgid: GroupId, - event: LayerEvent, + event: GroupEvent, ) -> Result<()> { let addr = layer.read().await.running(&gid)?.online_direct(&fgid)?; let data = bincode::serialize(&event).unwrap_or(vec![]); @@ -342,7 +342,7 @@ impl InnerEvent { let ggid = gid.clone(); let fgid = f.gid; let sender = group.sender(); - let layer_event = LayerEvent::Message(hash, m.clone()); + let layer_event = GroupEvent::Message(hash, m.clone()); tokio::spawn(InnerEvent::direct_layer_session( sender, layer_lock, diff --git a/src/global.rs b/src/global.rs index 66af2ff..1089011 100644 --- a/src/global.rs +++ b/src/global.rs @@ -7,6 +7,7 @@ use tdn::{ use tokio::{sync::mpsc::Sender, sync::RwLock}; use crate::account::Account; +use crate::group::Group; use crate::layer::Layer; use crate::own::Own; @@ -20,6 +21,8 @@ pub(crate) struct Global { pub peer_own_height: RwLock, /// current own. pub own: RwLock, + /// current group. + pub group: RwLock, /// current layer. pub layer: RwLock, /// message delivery tracking. uuid, me_gid, db_id. @@ -62,6 +65,7 @@ impl Global { peer_pub_height: RwLock::new(0), peer_own_height: RwLock::new(0), own: RwLock::new(Own::init(accounts)), + group: RwLock::new(Group::init()), layer: RwLock::new(Layer::init()), p2p_send: RwLock::new(None), _delivery: RwLock::new(HashMap::new()), @@ -90,6 +94,7 @@ impl Global { pub async fn clear(&self) { *self.peer_id.write().await = PeerId::default(); + self.group.write().await.clear(); self.layer.write().await.clear(); } @@ -108,6 +113,7 @@ impl Global { .write() .await .reset(pid, lock, &self.base, &self.secret)?; + self.group.write().await.clear(); self.layer.write().await.clear(); *self.p2p_send.write().await = Some(send); diff --git a/src/apps/chat/layer.rs b/src/group/handle.rs similarity index 73% rename from src/apps/chat/layer.rs rename to src/group/handle.rs index 0f10c17..1b22bb0 100644 --- a/src/apps/chat/layer.rs +++ b/src/group/handle.rs @@ -1,5 +1,6 @@ -use esse_primitives::{MessageType, NetworkMessage, ESSE_ID}; +use esse_primitives::{MessageType, NetworkMessage}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::sync::Arc; use tdn::types::{ group::EventId, @@ -17,74 +18,44 @@ use crate::rpc::{ use crate::session::{connect_session, Session, SessionType}; use crate::storage::{account_db, chat_db, session_db, write_avatar_sync}; -use super::models::{handle_nmsg, Friend, Message, Request}; use super::rpc; +use super::{ + from_model, from_network_message, handle_nmsg, Friend, GroupEvent, InviteType, Message, Request, +}; -/// Chat connect data structure. -/// params: Friend about me height -//#[derive(Serialize, Deserialize)] -//pub struct LayerConnect(pub i64); - -/// ESSE chat layer Event. -#[derive(Serialize, Deserialize)] -pub(crate) enum LayerEvent { - /// offline. extend BaseLayerEvent. - Offline, - /// suspend. extend BaseLayerEvent. - Suspend, - /// actived. extend BaseLayerEvent. - Actived, - /// make friendship request. - /// params is name, remark. - Request(String, String), - /// agree friendship request. - /// params is gid. - Agree, - /// reject friendship request. - Reject, - /// receiver gid, sender gid, message. - Message(EventId, NetworkMessage), - /// request user info. - InfoReq(u64), - /// user full info. - InfoRes(User), - /// close friendship. - Close, -} - -pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result { - debug!("---------DEBUG--------- GOT CHAT EVENT"); +pub(crate) async fn group_handle(msg: RecvType, global: &Arc) -> Result { + debug!("---------DEBUG--------- GOT GROUP MESSAGE"); let mut results = HandleResult::new(); let pid = global.pid().await; match msg { RecvType::Connect(peer, _) | RecvType::ResultConnect(peer, _) => { - // ESSE chat layer connect date structure. + // ESSE group connect date structure. if let Ok(height) = handle_connect(pid, &peer, global, &mut results).await { let peer_id = peer.id; let msg = SendType::Result(0, peer, true, false, vec![]); - results.layers.push((ESSE_ID, msg)); + results.groups.push(msg); - let info = LayerEvent::InfoReq(height); + let info = GroupEvent::InfoReq(height); let data = bincode::serialize(&info).unwrap_or(vec![]); let msg = SendType::Event(0, peer_id, data); - results.layers.push((ESSE_ID, msg)); + results.groups.push(msg); } else { let msg = SendType::Result(0, peer, false, false, vec![]); - results.layers.push((ESSE_ID, msg)); + results.groups.push(msg); } } RecvType::Result(peer, is_ok, _) => { - // ESSE chat layer result date structure. + // ESSE group result date structure. if is_ok { if let Ok(height) = handle_connect(pid, &peer, global, &mut results).await { - let info = LayerEvent::InfoReq(height); + let info = GroupEvent::InfoReq(height); let data = bincode::serialize(&info).unwrap_or(vec![]); let msg = SendType::Event(0, peer.id, data); - results.layers.push((ESSE_ID, msg)); + results.groups.push(msg); } else { let msg = SendType::Result(0, peer, false, false, vec![]); - results.layers.push((ESSE_ID, msg)); + results.groups.push(msg); } } else { let db_key = global.own.read().await.db_key(&pid)?; @@ -95,12 +66,12 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result { - return LayerEvent::handle(pid, fpid, global, bytes).await; + return GroupEvent::handle(pid, fpid, global, bytes).await; } RecvType::Delivery(t, tid, is_ok) => { - let mut layer = global.layer.write().await; - let id = layer.delivery.remove(&tid).ok_or(anyhow!("delivery err"))?; - drop(layer); + let mut group = global.group.write().await; + let id = group.delivery.remove(&tid).ok_or(anyhow!("delivery err"))?; + drop(group); let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; let resp = match t { @@ -125,7 +96,15 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result { // TODO stream } - RecvType::Leave(..) => {} // nerver here. + RecvType::Leave(peer) => { + debug!("Peer leaved: {}", peer.id.to_hex()); + let mut group_lock = global.group.write().await; + if let Ok((sid, _fid)) = group_lock.get(&peer.id) { + results.rpcs.push(session_lost(&sid)); + } + group_lock.rm_online(&peer.id); + drop(group_lock); + } } Ok(results) @@ -159,41 +138,41 @@ async fn handle_connect( results.rpcs.push(session_connect(&sid, &peer.id)); // 4. active this session. - global.layer.write().await.chat_add(peer.id, sid, f.id, 0); + global.group.write().await.add(peer.id, sid, f.id, 0); Ok(f.height as u64) } -impl LayerEvent { +impl GroupEvent { pub async fn handle( pid: PeerId, fpid: PeerId, global: &Arc, bytes: Vec, ) -> Result { - let event: LayerEvent = bincode::deserialize(&bytes)?; + let event: GroupEvent = bincode::deserialize(&bytes)?; let mut results = HandleResult::new(); match event { - LayerEvent::Offline => { - let mut layer = global.layer.write().await; - let (sid, _fid) = layer.chat_session(&fpid)?; - let _ = layer.chat_rm_online(&fpid); + GroupEvent::Offline => { + let mut group = global.group.write().await; + let (sid, _fid) = group.get(&fpid)?; + group.rm_online(&fpid); results.rpcs.push(session_lost(&sid)); } - LayerEvent::Suspend => { - let mut layer = global.layer.write().await; - let (sid, _fid) = layer.chat_session(&fpid)?; - let _ = layer.chat_suspend(&fpid, false, false)?; + GroupEvent::Suspend => { + let mut group = global.group.write().await; + let (sid, _fid) = group.get(&fpid)?; + group.suspend(&fpid, false, false)?; results.rpcs.push(session_suspend(&sid)); } - LayerEvent::Actived => { - let mut layer = global.layer.write().await; - let (sid, _fid) = layer.chat_session(&fpid)?; - let _ = layer.chat_active(&fpid, false); + GroupEvent::Actived => { + let mut group = global.group.write().await; + let (sid, _fid) = group.get(&fpid)?; + group.active(&fpid, false)?; results.rpcs.push(session_connect(&sid, &fpid)); } - LayerEvent::Request(name, remark) => { + GroupEvent::Request(name, remark) => { let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; @@ -212,12 +191,12 @@ impl LayerEvent { results.rpcs.push(notice_menu(&SessionType::Chat)); return Ok(results); } else { - let data = bincode::serialize(&LayerEvent::Agree).unwrap_or(vec![]); + let data = bincode::serialize(&GroupEvent::Agree).unwrap_or(vec![]); let msg = SendType::Event(0, fpid, data); - results.layers.push((ESSE_ID, msg)); + results.groups.push(msg); } } - LayerEvent::Agree => { + GroupEvent::Agree => { let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; @@ -247,7 +226,7 @@ impl LayerEvent { drop(db); } } - LayerEvent::Reject => { + GroupEvent::Reject => { let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; @@ -258,8 +237,8 @@ impl LayerEvent { results.rpcs.push(rpc::request_reject(request.id)); } } - LayerEvent::Message(hash, m) => { - let (_sid, fid) = global.layer.read().await.chat_session(&fpid)?; + GroupEvent::Message(hash, m) => { + let (_sid, fid) = global.group.read().await.get(&fpid)?; let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; @@ -283,12 +262,12 @@ impl LayerEvent { update_session(&s_db, &fid, &msg, &mut results); } } - LayerEvent::InfoReq(height) => { + GroupEvent::InfoReq(height) => { // check sync remote height. let a_db = account_db(&global.base, &global.secret)?; let account = Account::get(&a_db, &pid)?; if account.pub_height > height { - let info = LayerEvent::InfoRes(User::info( + let info = GroupEvent::InfoRes(User::info( account.pub_height, account.name, account.wallet, @@ -298,11 +277,11 @@ impl LayerEvent { )); let data = bincode::serialize(&info).unwrap_or(vec![]); let msg = SendType::Event(0, fpid, data); - results.layers.push((ESSE_ID, msg)); + results.groups.push(msg); } } - LayerEvent::InfoRes(remote) => { - let (sid, fid) = global.layer.read().await.chat_session(&fpid)?; + GroupEvent::InfoRes(remote) => { + let (sid, fid) = global.group.read().await.get(&fpid)?; let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; @@ -322,12 +301,12 @@ impl LayerEvent { let _ = Session::update_name(&s_db, &sid, &name); results.rpcs.push(session_update_name(&sid, &name)); } - LayerEvent::Close => { - let mut layer = global.layer.write().await; - let _ = layer.chat_rm_online(&fpid); - let (sid, fid) = global.layer.read().await.chat_session(&fpid)?; - let keep = layer.is_addr_online(&fpid); - drop(layer); + GroupEvent::Close => { + let mut group = global.group.write().await; + group.rm_online(&fpid); + let (sid, fid) = group.get(&fpid)?; + let keep = group.is_online(&fpid); + drop(group); let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; @@ -336,7 +315,7 @@ impl LayerEvent { drop(db); results.rpcs.push(rpc::friend_close(fid)); if !keep { - results.layers.push((ESSE_ID, SendType::Disconnect(fpid))) + results.groups.push(SendType::Disconnect(fpid)) } // TODO close session } @@ -346,10 +325,10 @@ impl LayerEvent { } } -pub(crate) fn chat_conn(pid: PeerId, results: &mut HandleResult) { +pub(crate) fn group_conn(pid: PeerId, results: &mut HandleResult) { results - .layers - .push((ESSE_ID, SendType::Connect(0, Peer::peer(pid), vec![]))); + .groups + .push(SendType::Connect(0, Peer::peer(pid), vec![])); } // UPDATE SESSION. diff --git a/src/group/mod.rs b/src/group/mod.rs new file mode 100644 index 0000000..a769965 --- /dev/null +++ b/src/group/mod.rs @@ -0,0 +1,212 @@ +use esse_primitives::{MessageType, NetworkMessage}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use tdn::types::{ + group::EventId, + message::{RecvType, SendType}, + primitives::{DeliveryType, HandleResult, Peer, PeerId, Result}, +}; +use tdn_storage::local::DStorage; + +use crate::account::{Account, User}; +use crate::global::Global; +use crate::rpc::{ + notice_menu, session_connect, session_create, session_last, session_lost, session_suspend, + session_update_name, +}; +use crate::session::{connect_session, Session, SessionType}; +use crate::storage::{account_db, chat_db, session_db, write_avatar_sync}; + +mod handle; +mod models; +mod rpc; + +pub(crate) use handle::{group_conn, group_handle, update_session}; +pub(crate) use models::{ + from_model, from_network_message, handle_nmsg, raw_to_network_message, to_network_message, + Friend, InviteType, Message, Request, +}; +pub(crate) use rpc::group_rpc; + +/// ESSE groups. +pub(crate) struct Group { + /// friend pid => Session + pub sessions: HashMap, + /// delivery feedback. + pub delivery: HashMap, + /// delivery counter. + delivery_count: usize, +} + +/// online connected layer session. +pub(crate) struct GroupSession { + /// consensus height. + pub height: i64, + /// session database id. + pub sid: i64, + /// friend database id. + pub fid: i64, + /// if session is suspend by me. + pub suspend_me: bool, + /// if session is suspend by remote. + pub suspend_remote: bool, + /// keep alive remain minutes. + pub remain: u16, +} + +/// ESSE group Event (Chat). +#[derive(Serialize, Deserialize)] +pub(crate) enum GroupEvent { + /// offline. extend BaseGroupEvent. + Offline, + /// suspend. extend BaseGroupEvent. + Suspend, + /// actived. extend BaseGroupEvent. + Actived, + /// make friendship request. + /// params is name, remark. + Request(String, String), + /// agree friendship request. + /// params is gid. + Agree, + /// reject friendship request. + Reject, + /// receiver gid, sender gid, message. + Message(EventId, NetworkMessage), + /// request user info. + InfoReq(u64), + /// user full info. + InfoRes(User), + /// close friendship. + Close, +} + +impl Group { + pub fn init() -> Group { + Group { + sessions: HashMap::new(), + delivery: HashMap::new(), + delivery_count: 0, + } + } + + pub fn delivery(&mut self, db_id: i64) -> u64 { + let next = self.delivery_count as u64; + self.delivery.insert(next, db_id); + self.delivery_count += 1; + next + } + + pub fn clear(&mut self) { + self.sessions.clear(); + self.delivery.clear(); + self.delivery_count = 0; + } + + pub fn add(&mut self, pid: PeerId, sid: i64, fid: i64, h: i64) { + self.sessions + .entry(pid) + .and_modify(|s| { + s.sid = sid; + s.fid = fid; + s.height = h; + }) + .or_insert(GroupSession::new(sid, fid, h)); + } + + pub fn get(&self, pid: &PeerId) -> Result<(i64, i64)> { + if let Some(session) = self.sessions.get(pid) { + Ok((session.sid, session.fid)) + } else { + Err(anyhow!("session missing!")) + } + } + + pub fn is_online(&self, pid: &PeerId) -> bool { + self.sessions.contains_key(pid) + } + + pub fn rm_online(&mut self, pid: &PeerId) -> bool { + if self.sessions.contains_key(pid) { + self.sessions.remove(pid); + true + } else { + false + } + } + + pub fn active(&mut self, pid: &PeerId, is_me: bool) -> Result<()> { + if let Some(session) = self.sessions.get_mut(pid) { + Ok(session.active(is_me)) + } else { + Err(anyhow!("session missing!")) + } + } + + pub fn suspend(&mut self, pid: &PeerId, me: bool, m: bool) -> Result<()> { + if let Some(session) = self.sessions.get_mut(pid) { + Ok(session.suspend(me, m)) + } else { + Err(anyhow!("session missing!")) + } + } + + pub fn broadcast(&self, user: User, results: &mut HandleResult) { + let info = GroupEvent::InfoRes(user); + let data = bincode::serialize(&info).unwrap_or(vec![]); + + for fpid in self.sessions.keys() { + let msg = SendType::Event(0, *fpid, data.clone()); + results.groups.push(msg); + } + } +} + +impl GroupSession { + fn new(sid: i64, fid: i64, height: i64) -> Self { + Self { + sid, + fid, + height, + suspend_me: false, + suspend_remote: false, + remain: 0, + } + } + + pub fn info(&self) -> (i64, i64, i64) { + (self.height, self.sid, self.fid) + } + + pub fn increased(&mut self) -> i64 { + self.height += 1; + self.height + } + + pub fn active(&mut self, is_me: bool) { + if is_me { + self.suspend_me = false; + } else { + self.suspend_remote = false; + } + self.remain = 0; + } + + pub fn suspend(&mut self, is_me: bool, must: bool) { + if must { + self.suspend_me = true; + self.suspend_remote = true; + } + + if is_me { + self.suspend_me = true; + } else { + self.suspend_remote = true; + } + + if self.suspend_remote && self.suspend_me { + self.remain = 6; // keep-alive 10~11 minutes 120s/time + } + } +} diff --git a/src/apps/chat/models.rs b/src/group/models.rs similarity index 100% rename from src/apps/chat/models.rs rename to src/group/models.rs diff --git a/src/apps/chat/models/friend.rs b/src/group/models/friend.rs similarity index 100% rename from src/apps/chat/models/friend.rs rename to src/group/models/friend.rs diff --git a/src/apps/chat/models/message.rs b/src/group/models/message.rs similarity index 100% rename from src/apps/chat/models/message.rs rename to src/group/models/message.rs diff --git a/src/apps/chat/models/request.rs b/src/group/models/request.rs similarity index 100% rename from src/apps/chat/models/request.rs rename to src/group/models/request.rs diff --git a/src/apps/chat/rpc.rs b/src/group/rpc.rs similarity index 87% rename from src/apps/chat/rpc.rs rename to src/group/rpc.rs index 8b05bcf..03a6c18 100644 --- a/src/apps/chat/rpc.rs +++ b/src/group/rpc.rs @@ -1,4 +1,4 @@ -use esse_primitives::{id_from_str, MessageType, ESSE_ID}; +use esse_primitives::{id_from_str, MessageType}; use std::sync::Arc; use tdn::types::{ message::SendType, @@ -11,8 +11,7 @@ use crate::global::Global; use crate::rpc::session_create; use crate::storage::{chat_db, delete_avatar, session_db}; -use super::layer::{update_session, LayerEvent}; -use super::{raw_to_network_message, Friend, Message, Request}; +use super::{raw_to_network_message, update_session, Friend, GroupEvent, Message, Request}; #[inline] pub(crate) fn friend_info(friend: &Friend) -> RpcParam { @@ -101,7 +100,7 @@ fn detail_list(friend: Friend, messages: Vec) -> RpcParam { json!([friend.to_rpc(), message_results]) } -pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { +pub(crate) fn group_rpc(handler: &mut RpcHandler) { handler.add_method("chat-echo", |params, _| async move { Ok(HandleResult::rpc(json!(params))) }); @@ -118,10 +117,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let friends = Friend::list(&db)?; let mut results = vec![]; - let layer_lock = state.layer.read().await; + let group_lock = state.group.read().await; if need_online { for friend in friends { - let online = layer_lock.chat_is_online(&friend.pid); + let online = group_lock.is_online(&friend.pid); results.push(friend.to_rpc_online(online)); } } else { @@ -129,7 +128,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { results.push(friend.to_rpc()); } } - drop(layer_lock); + drop(group_lock); Ok(HandleResult::rpc(json!(results))) }, @@ -177,16 +176,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { friend.close(&db)?; drop(db); - let online = state.layer.write().await.chat_rm_online(&friend.pid); - if let Some(faddr) = online { - let data = bincode::serialize(&LayerEvent::Close)?; - results - .layers - .push((ESSE_ID, SendType::Event(0, friend.pid, data))); - - results - .layers - .push((ESSE_ID, SendType::Disconnect(friend.pid))); + let online = state.group.write().await.rm_online(&friend.pid); + if online { + let data = bincode::serialize(&GroupEvent::Close)?; + results.groups.push(SendType::Event(0, friend.pid, data)); + results.groups.push(SendType::Disconnect(friend.pid)); } // state.own.write().await.broadcast( @@ -215,18 +209,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { Friend::delete(&db, &id)?; drop(db); - let online = state.layer.write().await.chat_rm_online(&friend.pid); + let online = state.group.write().await.rm_online(&friend.pid); delete_avatar(&state.base, &pid, &friend.pid).await?; - if let Some(faddr) = online { - let data = bincode::serialize(&LayerEvent::Close)?; - results - .layers - .push((ESSE_ID, SendType::Event(0, friend.pid, data))); - - results - .layers - .push((ESSE_ID, SendType::Disconnect(friend.pid))); + if online { + let data = bincode::serialize(&GroupEvent::Close)?; + results.groups.push(SendType::Event(0, friend.pid, data)); + results.groups.push(SendType::Disconnect(friend.pid)); } // state.own.write().await.broadcast( @@ -282,10 +271,9 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::rpc(json!(request.to_rpc())); let name = state.own.read().await.account(&pid)?.name.clone(); - let req = LayerEvent::Request(name, request.remark); + let req = GroupEvent::Request(name, request.remark); let data = bincode::serialize(&req).unwrap_or(vec![]); - let msg = SendType::Event(0, request.pid, data); - results.layers.push((ESSE_ID, msg)); + results.groups.push(SendType::Event(0, request.pid, data)); Ok(results) }, @@ -330,9 +318,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { session.insert(&s_db)?; results.rpcs.push(session_create(&session)); - let data = bincode::serialize(&LayerEvent::Agree).unwrap_or(vec![]); - let msg = SendType::Event(0, friend.pid, data); - results.layers.push((ESSE_ID, msg)); + let data = bincode::serialize(&GroupEvent::Agree).unwrap_or(vec![]); + results.groups.push(SendType::Event(0, friend.pid, data)); Ok(results) }, @@ -353,9 +340,9 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { req.update(&db)?; drop(db); - let data = bincode::serialize(&LayerEvent::Reject).unwrap_or(vec![]); + let data = bincode::serialize(&GroupEvent::Reject).unwrap_or(vec![]); let msg = SendType::Event(0, req.pid, data); - let mut results = HandleResult::layer(ESSE_ID, msg); + let mut results = HandleResult::group(msg); // state.own.write().await.broadcast( // &gid, @@ -449,12 +436,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::rpc(json!(msg.to_rpc())); - let tid = state.layer.write().await.delivery(msg.id); - let event = LayerEvent::Message(msg.hash, nm); + let tid = state.group.write().await.delivery(msg.id); + let event = GroupEvent::Message(msg.hash, nm); let data = bincode::serialize(&event).unwrap_or(vec![]); - results - .layers - .push((ESSE_ID, SendType::Event(tid, fpid, data))); + results.groups.push(SendType::Event(tid, fpid, data)); // UPDATE SESSION. let s_db = session_db(&state.base, &pid, &db_key)?; diff --git a/src/layer.rs b/src/layer.rs index 5985338..c328a2e 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -1,4 +1,4 @@ -use esse_primitives::{id_to_str, ESSE_ID}; +use esse_primitives::id_to_str; use group_types::GroupChatId; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -11,15 +11,13 @@ use tdn::types::{ use tokio::sync::RwLock; use crate::account::User; -use crate::apps::chat::LayerEvent as ChatLayerEvent; +use crate::group::GroupEvent; //use crate::apps::group::{group_conn, GROUP_ID}; use crate::own::Own; use crate::session::{Session, SessionType}; /// ESSE layers. pub(crate) struct Layer { - /// friend pid => Session - pub chats: HashMap, /// group chat id => Session pub groups: HashMap, /// delivery feedback. @@ -31,7 +29,6 @@ pub(crate) struct Layer { impl Layer { pub fn init() -> Layer { Layer { - chats: HashMap::new(), groups: HashMap::new(), delivery: HashMap::new(), delivery_count: 0, @@ -46,30 +43,18 @@ impl Layer { } pub fn clear(&mut self) { - self.chats.clear(); self.groups.clear(); self.delivery.clear(); } pub fn is_addr_online(&self, addr: &PeerId) -> bool { - if self.chats.contains_key(addr) { - return true; - } else { - for (_, session) in &self.groups { - if session.addrs.contains(addr) { - return true; - } + for (_, session) in &self.groups { + if session.addrs.contains(addr) { + return true; } } - false - } - pub fn chat_active(&mut self, pid: &PeerId, is_me: bool) -> Option { - if let Some(session) = self.chats.get_mut(pid) { - Some(session.active(is_me)) - } else { - None - } + false } pub fn group_active(&mut self, gid: &GroupChatId, is_me: bool) -> Option { @@ -80,14 +65,6 @@ impl Layer { } } - pub fn chat_suspend(&mut self, pid: &PeerId, me: bool, m: bool) -> Result> { - if let Some(session) = self.chats.get_mut(pid) { - Ok(session.suspend(me, m)) - } else { - Err(anyhow!("session missing!")) - } - } - pub fn group_suspend(&mut self, g: &GroupChatId, me: bool, m: bool) -> Result> { if let Some(session) = self.groups.get_mut(g) { Ok(session.suspend(me, m)) @@ -96,28 +73,6 @@ impl Layer { } } - pub fn chat_is_online(&self, pid: &PeerId) -> bool { - self.chats.contains_key(pid) - } - - pub fn chat_rm_online(&mut self, pid: &PeerId) -> Option { - self.chats.remove(pid).map(|session| session.addrs[0]) - } - - pub fn chat_add(&mut self, pid: PeerId, sid: i64, fid: i64, h: i64) { - if !self.chats.contains_key(&pid) { - self.chats.insert(pid, LayerSession::new(pid, sid, fid, h)); - } - } - - pub fn chat_session(&self, pid: &PeerId) -> Result<(i64, i64)> { - if let Some(session) = self.chats.get(pid) { - Ok((session.s_id, session.db_id)) - } else { - Err(anyhow!("session missing!")) - } - } - pub fn group(&self, gid: &GroupChatId) -> Result<&LayerSession> { if let Some(session) = self.groups.get(gid) { Ok(session) @@ -211,22 +166,22 @@ impl Layer { // pub async fn all_layer_conns(&self) -> Result>> { // let mut conns = HashMap::new(); - // let group_lock = self.group.read().await; + // let own_lock = self.group.read().await; // for mgid in self.runnings.keys() { // let mut vecs = vec![]; - // let db = group_lock.session_db(&mgid)?; + // let db = own_lock.session_db(&mgid)?; // let sessions = Session::list(&db)?; // drop(db); // for s in sessions { // match s.s_type { // SessionType::Chat => { - // let proof = group_lock.prove_addr(mgid, &s.addr)?; + // let proof = own_lock.prove_addr(mgid, &s.addr)?; // vecs.push((s.gid, chat_conn(proof, Peer::peer(s.addr)))); // } // SessionType::Group => { - // let proof = group_lock.prove_addr(mgid, &s.addr)?; + // let proof = own_lock.prove_addr(mgid, &s.addr)?; // vecs.push((GROUP_ID, group_conn(proof, Peer::peer(s.addr), s.gid))); // } // _ => {} @@ -256,17 +211,12 @@ impl Layer { // } // } - pub fn broadcast(&self, user: User, results: &mut HandleResult) { - let info = ChatLayerEvent::InfoRes(user); - let data = bincode::serialize(&info).unwrap_or(vec![]); + // pub fn broadcast(&self, user: User, results: &mut HandleResult) { + // let info = GroupEvent::InfoRes(user); + // let data = bincode::serialize(&info).unwrap_or(vec![]); - for fpid in self.chats.keys() { - let msg = SendType::Event(0, *fpid, data.clone()); - results.layers.push((ESSE_ID, msg)); - } - - // TODO GROUPS - } + // // TODO GROUPS + // } } // pub(crate) struct OnlineSession { diff --git a/src/lib.rs b/src/lib.rs index 545edda..69a33b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ mod apps; //mod consensus; //mod event; mod global; +mod group; mod layer; mod migrate; mod own; diff --git a/src/migrate.rs b/src/migrate/mod.rs similarity index 100% rename from src/migrate.rs rename to src/migrate/mod.rs diff --git a/src/own.rs b/src/own/mod.rs similarity index 100% rename from src/own.rs rename to src/own/mod.rs diff --git a/src/rpc.rs b/src/rpc.rs index c99b3f1..935a751 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,4 +1,4 @@ -use esse_primitives::{id_from_str, id_to_str, ESSE_ID}; +use esse_primitives::{id_from_str, id_to_str}; use group_types::{GroupChatId, LayerEvent as GroupLayerEvent, GROUP_CHAT_ID}; use std::net::SocketAddr; use std::sync::Arc; @@ -16,15 +16,20 @@ use tdn_did::{generate_mnemonic, Count}; use crate::account::lang_from_i64; use crate::apps::app_rpc_inject; -use crate::apps::chat::{chat_conn, LayerEvent as ChatLayerEvent}; -use crate::apps::group::{group_conn, GroupChat}; +use crate::apps::group::{group_conn as group_chat_conn, GroupChat}; use crate::global::Global; +use crate::group::{group_conn, group_rpc, GroupEvent}; //use crate::event::InnerEvent; use crate::session::{connect_session, Session, SessionType}; use crate::storage::{group_db, session_db}; pub(crate) fn init_rpc(global: Arc) -> RpcHandler { let mut handler = new_rpc_handler(global); + + // inject group rpcs + group_rpc(&mut handler); + + // inject layers rpcs app_rpc_inject(&mut handler); handler } @@ -174,15 +179,15 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { handler.add_method("account-list", |_, state: Arc| async move { let mut accounts: Vec> = vec![]; - let group_lock = state.own.read().await; - for (pid, account) in group_lock.list_accounts().iter() { + let own_lock = state.own.read().await; + for (pid, account) in own_lock.list_accounts().iter() { accounts.push(vec![ id_to_str(pid), account.name.clone(), base64::encode(&account.avatar), ]); } - drop(group_lock); + drop(own_lock); Ok(HandleResult::rpc(json!(accounts))) }); @@ -270,21 +275,15 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let avatar_bytes = base64::decode(avatar).unwrap_or(vec![]); let pid = state.pid().await; - let mut group_lock = state.own.write().await; - group_lock.update_account( - pid, - name, - avatar_bytes.clone(), - &state.base, - &state.secret, - )?; - drop(group_lock); + let mut own_lock = state.own.write().await; + own_lock.update_account(pid, name, avatar_bytes.clone(), &state.base, &state.secret)?; + drop(own_lock); let results = HandleResult::new(); // TODO broadcast to all devices. - //let user = group_lock.clone_user(&pid)?; - //group_lock.broadcast(&pid, &mut results)?; + //let user = own_lock.clone_user(&pid)?; + //own_lock.broadcast(&pid, &mut results)?; // TODO broadcast to all layers. //state.layer.read().await.broadcast(user, &mut results); @@ -409,26 +408,23 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let s = Session::get(&db, &id)?; drop(db); - let mut layer_lock = state.layer.write().await; - let mut results = HandleResult::new(); match s.s_type { SessionType::Chat => { let remote_pid = id_from_str(remote)?; - let online = layer_lock.chat_active(&remote_pid, true); - if let Some(addr) = online { - return Ok(HandleResult::rpc(json!([id, id_to_str(&addr)]))); + if state.group.write().await.active(&remote_pid, true).is_ok() { + return Ok(HandleResult::rpc(json!([id, id_to_str(&remote_pid)]))); } - chat_conn(remote_pid, &mut results); + group_conn(remote_pid, &mut results); } SessionType::Group => { let remote_gid: GroupChatId = remote.parse().map_err(|_| RpcError::ParseError)?; - let online = layer_lock.group_active(&remote_gid, true); + let online = state.layer.write().await.group_active(&remote_gid, true); if let Some(addr) = online { return Ok(HandleResult::rpc(json!([id, id_to_str(&addr)]))); } - group_conn(s.addr, remote_gid, &mut results); + group_chat_conn(s.addr, remote_gid, &mut results); } _ => {} } @@ -451,23 +447,23 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { drop(db); let mut results = HandleResult::new(); - let mut layer_lock = state.layer.write().await; match s.s_type { SessionType::Chat => { - let remote_id = id_from_str(remote)?; - if layer_lock.chat_suspend(&remote_id, true, must)?.is_some() { + let rid = id_from_str(remote)?; + if state.group.write().await.suspend(&rid, true, must).is_ok() { results.rpcs.push(json!([id])); } - let data = bincode::serialize(&ChatLayerEvent::Suspend)?; - let msg = SendType::Event(0, remote_id, data); - results.layers.push((ESSE_ID, msg)); + let data = bincode::serialize(&GroupEvent::Suspend)?; + results.groups.push(SendType::Event(0, rid, data)); } SessionType::Group => { let remote_gid: GroupChatId = remote.parse().map_err(|_| RpcError::ParseError)?; + let mut layer_lock = state.layer.write().await; if layer_lock.group_suspend(&remote_gid, true, must)?.is_some() { results.rpcs.push(json!([id])); } + drop(layer_lock); let data = bincode::serialize(&GroupLayerEvent::Suspend(remote_gid))?; let msg = SendType::Event(0, s.addr, data); results.layers.push((GROUP_CHAT_ID, msg)); diff --git a/src/server.rs b/src/server.rs index c809c2b..67be828 100644 --- a/src/server.rs +++ b/src/server.rs @@ -20,6 +20,7 @@ use tdn_storage::local::DStorage; use crate::account::Account; use crate::apps::app_layer_handle; use crate::global::Global; +use crate::group::group_handle; use crate::layer::Layer; use crate::migrate::{main_migrate, ACCOUNT_DB}; use crate::own::{handle as own_handle, Own}; @@ -94,8 +95,10 @@ pub async fn start(db_path: String) -> Result<()> { handle(handle_result, now_rpc_uid, true, &global).await; } } - ReceiveMessage::Group(_) => { - warn!("ESSE has no Group Message!"); + ReceiveMessage::Group(g_msg) => { + if let Ok(handle_result) = group_handle(g_msg, &global).await { + handle(handle_result, now_rpc_uid, true, &global).await; + } } ReceiveMessage::Layer(fgid, tgid, l_msg) => { if let Ok(handle_result) = app_layer_handle(fgid, tgid, l_msg, &global).await { diff --git a/src/utils.rs b/src/utils/mod.rs similarity index 100% rename from src/utils.rs rename to src/utils/mod.rs