From 3803d313e3d1eb1cbc42f57fee735e38aed890cc Mon Sep 17 00:00:00 2001 From: Sun Date: Sat, 5 Mar 2022 13:28:02 +0800 Subject: [PATCH] upgrade group chat --- lib/apps/group/detail.dart | 3 +- lib/apps/group/models.dart | 6 +- src/apps/group/layer.rs | 712 ++++++++++++++----------------- src/apps/group/models/group.rs | 8 +- src/apps/group/models/message.rs | 8 +- src/apps/group/rpc.rs | 7 +- src/layer.rs | 41 +- src/rpc.rs | 90 +--- types/group/src/lib.rs | 26 +- 9 files changed, 394 insertions(+), 507 deletions(-) diff --git a/lib/apps/group/detail.dart b/lib/apps/group/detail.dart index 5c28b63..15fd86d 100644 --- a/lib/apps/group/detail.dart +++ b/lib/apps/group/detail.dart @@ -85,10 +85,9 @@ class _GroupChatDetailState extends State { } } - // [group_id, member_id, member_addr] + // [group_id, member_id] _memberOnline(List params) { if (_group.id == params[0] && this._members.containsKey(params[1])) { - this._members[params[1]]!.addr = params[2]; this._members[params[1]]!.online = true; setState(() {}); } diff --git a/lib/apps/group/models.dart b/lib/apps/group/models.dart index 4ae9acc..ff0d834 100644 --- a/lib/apps/group/models.dart +++ b/lib/apps/group/models.dart @@ -35,7 +35,6 @@ class Member { int id = 0; int fid = 0; String mid = ''; - String addr = ''; String name = ''; bool leave = false; bool online = false; @@ -44,9 +43,8 @@ class Member { this.id = params[0]; this.fid = params[1]; this.mid = params[2]; - this.addr = params[3]; - this.name = params[4]; - this.leave = params[5]; + this.name = params[3]; + this.leave = params[4]; if (this.mid == Global.pid) { this.online = true; } diff --git a/src/apps/group/layer.rs b/src/apps/group/layer.rs index 4bddfaf..f68fb98 100644 --- a/src/apps/group/layer.rs +++ b/src/apps/group/layer.rs @@ -9,13 +9,12 @@ use tdn_storage::local::DStorage; use crate::apps::chat::Friend; use crate::global::Global; -use crate::layer::Layer; use crate::rpc::{ session_close, session_connect, session_last, session_lost, session_suspend, session_update_name, }; use crate::session::{connect_session, Session, SessionType}; -use crate::storage::{delete_avatar, group_db, session_db, write_avatar_sync}; +use crate::storage::{chat_db, delete_avatar, group_db, session_db, write_avatar_sync}; use super::models::{handle_network_message, GroupChat, Member, Message}; use super::rpc; @@ -56,7 +55,7 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result) -> Result {} RecvType::Event(addr, bytes) => { + // PEER & SERVER let event: LayerEvent = bincode::deserialize(&bytes)?; - debug!("----------- DEBUG GROUP CHAT: SERVER GOT LAYER EVENT"); - //handle_server_event(fgid, addr, event, layer, &mut results).await?; - debug!("----------- DEBUG GROUP CHAT: SERVER OVER LAYER EVENT"); - - debug!("----------- DEBUG GROUP CHAT: PEER GOT LAYER EVENT"); - //handle_peer_event(ogid, addr, event, layer, &mut results).await?; - debug!("----------- DEBUG GROUP CHAT: PEER OVER LAYER EVENT"); + handle_event(addr, event, global, &mut results).await?; } + RecvType::Delivery(..) => {} RecvType::Stream(_uid, _stream, _bytes) => { // TODO stream } - - RecvType::Delivery(..) => {} + RecvType::Leave(..) => {} // nerver here. } Ok(results) @@ -98,7 +91,7 @@ async fn handle_connect( gid: GroupChatId, results: &mut HandleResult, ) -> Result<()> { - let (height, sid, id) = global.layer.read().await.group(&gid)?.info(); + let (height, _, id, _) = global.layer.read().await.group(&gid)?.info(); let pid = global.pid().await; let db_key = global.group.read().await.db_key(&pid)?; @@ -117,7 +110,7 @@ async fn handle_connect( results.rpcs.push(rpc::member_online(id, mid)); let data = LayerEvent::MemberOnline(gid, peer.id); - broadcast(&gid, global, &data, results).await; + broadcast(&gid, global, &data, results).await?; Ok(()) } @@ -182,384 +175,310 @@ async fn handle_result( Ok(()) } -// async fn handle_server_event( -// fgid: GroupId, -// addr: PeerId, -// event: LayerEvent, -// layer: &Arc>, -// results: &mut HandleResult, -// ) -> Result<()> { -// let gcd = event.gcd(); -// let base = layer.read().await.base().clone(); -// let (ogid, height, id) = layer.read().await.running(gcd)?.owner_height_id(); -// let db = layer.read().await.group.read().await.group_db(&ogid)?; - -// match event { -// LayerEvent::Offline(gcd) => { -// // 1. check member online. -// if layer.write().await.remove_online(&gcd, &fgid).is_none() { -// return Ok(()); -// } - -// // 2. UI: offline the member. -// if let Ok(mid) = Member::get_id(&db, &id, &fgid) { -// results.rpcs.push(rpc::member_offline(ogid, id, mid)); -// } - -// // 3. broadcast offline event. -// broadcast(&LayerEvent::MemberOffline(gcd, fgid), layer, &gcd, results).await?; -// } -// LayerEvent::GroupName(gcd, name) => { -// // 1. update group name -// let _ = GroupChat::update_name(&db, &id, &name)?; -// // 2. UI: update -// results.rpcs.push(rpc::group_name(ogid, &id, &name)); -// if let Ok(sid) = Session::update_name_by_id( -// &layer.read().await.group.read().await.session_db(&ogid)?, -// &id, -// &SessionType::Group, -// &name, -// ) { -// results.rpcs.push(session_update_name(ogid, &sid, &name)); -// } -// // 3. broadcast -// broadcast(&LayerEvent::GroupName(gcd, name), layer, &gcd, results).await?; -// } -// LayerEvent::Sync(gcd, _, event) => { -// match event { -// Event::MemberJoin(mgid, maddr, mname, mavatar) => { -// let mdid_res = Member::get_id(&db, &id, &mgid); -// let h = layer.write().await.running_mut(&gcd)?.increased(); -// let new_e = Event::MemberJoin(mgid, maddr, mname.clone(), mavatar.clone()); - -// if let Ok(mdid) = mdid_res { -// Member::update(&db, &h, &mdid, &maddr, &mname)?; -// if mavatar.len() > 0 { -// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; -// } -// let mem = Member::info(mdid, id, mgid, maddr, mname); -// results.rpcs.push(rpc::member_join(ogid, &mem)); -// } else { -// let mut member = Member::new(h, id, mgid, maddr, mname); -// member.insert(&db)?; -// if mavatar.len() > 0 { -// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; -// } -// results.rpcs.push(rpc::member_join(ogid, &member)); -// } - -// // broadcast -// GroupChat::add_height(&db, id, h)?; -// broadcast(&LayerEvent::Sync(gcd, h, new_e), layer, &gcd, results).await?; -// } -// Event::MemberLeave(mgid) => { -// let mdid = Member::get_id(&db, &id, &mgid)?; -// let h = layer.write().await.running_mut(&gcd)?.increased(); -// Member::leave(&db, &mdid, &h)?; - -// // check mid is my chat friend. if not, delete avatar. -// let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?; -// if Friend::get_id(&s_db, &mgid).is_err() { -// let _ = delete_avatar(&base, &ogid, &mgid).await; -// } -// results.rpcs.push(rpc::member_leave(ogid, id, mdid)); - -// // broadcast -// GroupChat::add_height(&db, id, h)?; -// broadcast(&LayerEvent::Sync(gcd, h, event), layer, &gcd, results).await?; -// } -// Event::MessageCreate(mgid, nmsg, mtime) => { -// debug!("Sync: create message start"); -// let _mdid = Member::get_id(&db, &id, &mgid)?; - -// let new_e = Event::MessageCreate(mgid, nmsg.clone(), mtime); -// let new_h = layer.write().await.running_mut(&gcd)?.increased(); -// broadcast(&LayerEvent::Sync(gcd, new_h, new_e), layer, &gcd, results).await?; -// GroupChat::add_height(&db, id, new_h)?; - -// let msg = handle_network_message( -// &layer.read().await.group, -// new_h, -// id, -// mgid, -// &ogid, -// nmsg, -// mtime, -// &base, -// results, -// ) -// .await?; -// results.rpcs.push(rpc::message_create(ogid, &msg)); -// debug!("Sync: create message ok"); - -// // UPDATE SESSION. -// if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) { -// update_session(&s_db, &ogid, &id, &msg, results); -// } -// } -// } -// } -// LayerEvent::MemberOnlineSync(gcd) => { -// let onlines = layer -// .read() -// .await -// .running(&gcd)? -// .onlines() -// .iter() -// .map(|(g, a)| (**g, **a)) -// .collect(); -// let event = LayerEvent::MemberOnlineSyncResult(gcd, onlines); -// let data = bincode::serialize(&event).unwrap_or(vec![]); -// let s = SendType::Event(0, addr, data); -// add_server_layer(results, fgid, s); -// } -// LayerEvent::SyncReq(gcd, from) => { -// debug!("Got sync request. height: {} from: {}", height, from); - -// if height >= from { -// let to = if height - from > 20 { -// from + 20 -// } else { -// height -// }; - -// let (members, leaves) = Member::sync(&base, &ogid, &db, &id, &from, &to).await?; -// let messages = Message::sync(&base, &ogid, &db, &id, &from, &to).await?; -// let event = LayerEvent::SyncRes(gcd, height, from, to, members, leaves, messages); -// let data = bincode::serialize(&event).unwrap_or(vec![]); -// let s = SendType::Event(0, addr, data); -// add_server_layer(results, fgid, s); -// debug!("Sended sync request results. from: {}, to: {}", from, to); -// } -// } -// LayerEvent::Suspend(..) => {} -// LayerEvent::Actived(..) => {} -// _ => error!("group server handle event nerver here"), -// } - -// Ok(()) -// } - -// async fn handle_peer_event( -// ogid: GroupId, -// addr: PeerId, -// event: LayerEvent, -// layer: &Arc>, -// results: &mut HandleResult, -// ) -> Result<()> { -// let base = layer.read().await.base().clone(); -// let gcd = event.gcd(); -// let (sid, id) = layer.read().await.get_running_remote_id(&ogid, gcd)?; -// let db = layer.read().await.group.read().await.group_db(&ogid)?; - -// match event { -// LayerEvent::Offline(gcd) => { -// // 1. offline group chat. -// layer -// .write() -// .await -// .running_mut(&ogid)? -// .check_offline(&gcd, &addr); - -// // 2. UI: offline the session. -// results.rpcs.push(session_lost(ogid, &sid)); -// } -// LayerEvent::Suspend(gcd) => { -// if layer -// .write() -// .await -// .running_mut(&ogid)? -// .suspend(&gcd, false, true)? -// { -// results.rpcs.push(session_suspend(ogid, &sid)); -// } -// } -// LayerEvent::Actived(gcd) => { -// let _ = layer.write().await.running_mut(&ogid)?.active(&gcd, false); -// results.rpcs.push(session_connect(ogid, &sid, &addr)); -// } -// LayerEvent::MemberOnline(_gcd, mgid, maddr) => { -// if let Ok(mid) = Member::addr_update(&db, &id, &mgid, &maddr) { -// results.rpcs.push(rpc::member_online(ogid, id, mid, &maddr)); -// } -// } -// LayerEvent::MemberOffline(_gcd, mgid) => { -// if let Ok(mid) = Member::get_id(&db, &id, &mgid) { -// results.rpcs.push(rpc::member_offline(ogid, id, mid)); -// } -// } -// LayerEvent::MemberOnlineSyncResult(_gcd, onlines) => { -// for (mgid, maddr) in onlines { -// if let Ok(mid) = Member::addr_update(&db, &id, &mgid, &maddr) { -// results.rpcs.push(rpc::member_online(ogid, id, mid, &maddr)); -// } -// } -// } -// LayerEvent::GroupName(_gcd, name) => { -// let _ = GroupChat::update_name(&db, &id, &name)?; -// results.rpcs.push(rpc::group_name(ogid, &id, &name)); -// let _ = Session::update_name( -// &layer.read().await.group.read().await.session_db(&ogid)?, -// &sid, -// &name, -// ); -// results.rpcs.push(session_update_name(ogid, &sid, &name)); -// } -// LayerEvent::GroupClose(_gcd) => { -// let group = GroupChat::close(&db, &gcd)?; -// let sid = Session::close( -// &layer.read().await.group.read().await.session_db(&ogid)?, -// &group.id, -// &SessionType::Group, -// )?; -// results.rpcs.push(session_close(ogid, &sid)); -// } -// LayerEvent::Sync(_gcd, height, event) => { -// debug!("Sync: handle height: {}", height); - -// match event { -// Event::MemberJoin(mgid, maddr, mname, mavatar) => { -// let mdid_res = Member::get_id(&db, &id, &mgid); -// if let Ok(mdid) = mdid_res { -// Member::update(&db, &height, &mdid, &maddr, &mname)?; -// if mavatar.len() > 0 { -// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; -// } -// let mem = Member::info(mdid, id, mgid, maddr, mname); -// results.rpcs.push(rpc::member_join(ogid, &mem)); -// } else { -// let mut member = Member::new(height, id, mgid, maddr, mname); -// member.insert(&db)?; -// if mavatar.len() > 0 { -// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; -// } -// results.rpcs.push(rpc::member_join(ogid, &member)); -// } - -// // save consensus. -// GroupChat::add_height(&db, id, height)?; -// } -// Event::MemberLeave(mgid) => { -// let mdid = Member::get_id(&db, &id, &mgid)?; -// Member::leave(&db, &height, &mdid)?; - -// // check mid is my chat friend. if not, delete avatar. -// let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?; -// if Friend::get_id(&s_db, &mgid).is_err() { -// let _ = delete_avatar(&base, &ogid, &mgid).await; -// } -// results.rpcs.push(rpc::member_leave(ogid, id, mdid)); - -// // save consensus. -// GroupChat::add_height(&db, id, height)?; -// } -// Event::MessageCreate(mgid, nmsg, mtime) => { -// debug!("Sync: create message start"); -// let _mdid = Member::get_id(&db, &id, &mgid)?; - -// let msg = handle_network_message( -// &layer.read().await.group, -// height, -// id, -// mgid, -// &ogid, -// nmsg, -// mtime, -// &base, -// results, -// ) -// .await?; -// results.rpcs.push(rpc::message_create(ogid, &msg)); - -// GroupChat::add_height(&db, id, height)?; -// debug!("Sync: create message ok"); - -// // UPDATE SESSION. -// if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) { -// update_session(&s_db, &ogid, &id, &msg, results); -// } -// } -// } -// } -// LayerEvent::SyncRes(gcd, height, from, to, adds, leaves, messages) => { -// if to >= height { -// // when last packed sync, start sync online members. -// add_layer(results, ogid, sync_online(gcd, addr)); -// } - -// debug!("Start handle sync packed... {}, {}, {}", height, from, to); -// let mut last_message = None; - -// for (height, mgid, maddr, mname, mavatar) in adds { -// let mdid_res = Member::get_id(&db, &id, &mgid); -// if let Ok(mdid) = mdid_res { -// Member::update(&db, &height, &mdid, &maddr, &mname)?; -// if mavatar.len() > 0 { -// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; -// } -// let mem = Member::info(mdid, id, mgid, maddr, mname); -// results.rpcs.push(rpc::member_join(ogid, &mem)); -// } else { -// let mut member = Member::new(height, id, mgid, maddr, mname); -// member.insert(&db)?; -// if mavatar.len() > 0 { -// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; -// } -// results.rpcs.push(rpc::member_join(ogid, &member)); -// } -// } - -// for (height, mgid) in leaves { -// if let Ok(mdid) = Member::get_id(&db, &id, &mgid) { -// Member::leave(&db, &height, &mdid)?; -// // check mid is my chat friend. if not, delete avatar. -// let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?; -// if Friend::get_id(&s_db, &mgid).is_err() { -// let _ = delete_avatar(&base, &ogid, &mgid).await; -// } -// results.rpcs.push(rpc::member_leave(ogid, id, mdid)); -// } -// } - -// for (height, mgid, nm, time) in messages { -// if let Ok(msg) = handle_network_message( -// &layer.read().await.group, -// height, -// id, -// mgid, -// &ogid, -// nm, -// time, -// &base, -// results, -// ) -// .await -// { -// results.rpcs.push(rpc::message_create(ogid, &msg)); -// last_message = Some(msg); -// } -// } - -// if to < height { -// add_layer(results, ogid, sync(gcd, addr, to + 1)); -// } - -// // update group chat height. -// GroupChat::add_height(&db, id, to)?; - -// // UPDATE SESSION. -// if let Some(msg) = last_message { -// if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) { -// update_session(&s_db, &ogid, &id, &msg, results); -// } -// } -// debug!("Over handle sync packed... {}, {}, {}", height, from, to); -// } -// _ => error!("group peer handle event nerver here"), -// } - -// Ok(()) -// } +async fn handle_event( + addr: PeerId, + event: LayerEvent, + global: &Arc, + results: &mut HandleResult, +) -> Result<()> { + let gid = event.gid(); + let (height, sid, id, gaddr) = global.layer.read().await.group(&gid)?.info(); + let pid = global.pid().await; + let db_key = global.group.read().await.db_key(&pid)?; + let db = group_db(&global.base, &pid, &db_key)?; + let is_server = gaddr == pid; + if !is_server && gaddr != addr { + warn!("INVALID EVENT NOT FROM THE SERVER."); + return Err(anyhow!("NOT THE SERVER EVENT")); + } + + match event { + LayerEvent::Offline(gid) => { + // SERVER & PEER + if is_server { + // 1. check member online. + if !global.layer.write().await.group_del_online(&gid, &addr) { + return Ok(()); + } + + // 2. UI: offline the member. + if let Ok(mid) = Member::get_id(&db, &id, &addr) { + results.rpcs.push(rpc::member_offline(id, mid)); + } + + // 3. broadcast offline event. + broadcast(&gid, global, &LayerEvent::MemberOffline(gid, addr), results).await?; + } else { + // 1. offline group chat. + global.layer.write().await.group_del(&gid); + + // 2. UI: offline the session. + results.rpcs.push(session_lost(&sid)); + } + } + LayerEvent::Suspend(gid) => { + // PEER + if global + .layer + .write() + .await + .group_mut(&gid)? + .suspend(false, true) + .is_some() + { + results.rpcs.push(session_suspend(&sid)); + } + } + LayerEvent::Actived(gid) => { + // PEER + let _ = global.layer.write().await.group_mut(&gid)?.active(false); + results.rpcs.push(session_connect(&sid, &addr)); + } + LayerEvent::MemberOnline(_gid, mpid) => { + // PEER + if let Ok(mid) = Member::get_id(&db, &id, &mpid) { + results.rpcs.push(rpc::member_online(id, mid)); + } + } + LayerEvent::MemberOffline(_gid, mpid) => { + // PEER + if let Ok(mid) = Member::get_id(&db, &id, &mpid) { + results.rpcs.push(rpc::member_offline(id, mid)); + } + } + LayerEvent::MemberOnlineSync(gid) => { + // SERVER + let onlines = global.layer.read().await.group(&gid)?.addrs.clone(); + let event = LayerEvent::MemberOnlineSyncResult(gid, onlines); + let data = bincode::serialize(&event).unwrap_or(vec![]); + let msg = SendType::Event(0, addr, data); + results.layers.push((GROUP_CHAT_ID, msg)); + } + LayerEvent::MemberOnlineSyncResult(_gid, onlines) => { + // PEER + for mpid in onlines { + if let Ok(mid) = Member::get_id(&db, &id, &mpid) { + results.rpcs.push(rpc::member_online(id, mid)); + } + } + } + LayerEvent::GroupName(gid, name) => { + // SERVER & PEER + // 1. update group name + let _ = GroupChat::update_name(&db, &id, &name)?; + + // 2. UI: update + results.rpcs.push(rpc::group_name(&id, &name)); + let s_db = session_db(&global.base, &pid, &db_key)?; + let _ = Session::update_name(&s_db, &sid, &name); + results.rpcs.push(session_update_name(&sid, &name)); + + if is_server { + // 3. broadcast + broadcast(&gid, global, &LayerEvent::GroupName(gid, name), results).await?; + } + } + LayerEvent::GroupClose(gid) => { + // PEER + let group = GroupChat::close(&db, &id)?; + let s_db = session_db(&global.base, &pid, &db_key)?; + let sid = Session::close(&s_db, &group.id, &SessionType::Group)?; + results.rpcs.push(session_close(&sid)); + } + LayerEvent::Sync(gid, height, event) => { + // SERVER & PEER + debug!("Sync: handle is_server: {} height: {} ", is_server, height); + match event { + Event::MemberJoin(mpid, mname, mavatar) => { + let mid_res = Member::get_id(&db, &id, &mpid); + let h = if is_server { + global.layer.write().await.group_mut(&gid)?.increased() + } else { + height + }; + + if let Ok(mid) = mid_res { + Member::update(&db, &h, &mid, &mname)?; + if mavatar.len() > 0 { + write_avatar_sync(&global.base, &pid, &mpid, mavatar.clone())?; + } + let mem = Member::info(mid, id, mpid, mname.clone()); + results.rpcs.push(rpc::member_join(&mem)); + } else { + let mut member = Member::new(h, id, mpid, mname.clone()); + member.insert(&db)?; + if mavatar.len() > 0 { + write_avatar_sync(&global.base, &pid, &mpid, mavatar.clone())?; + } + results.rpcs.push(rpc::member_join(&member)); + } + + GroupChat::add_height(&db, id, h)?; + if is_server { + // broadcast + let new_e = Event::MemberJoin(mpid, mname, mavatar); + broadcast(&gid, global, &LayerEvent::Sync(gid, h, new_e), results).await?; + } + } + Event::MemberLeave(mpid) => { + let mid = Member::get_id(&db, &id, &mpid)?; + let h = if is_server { + global.layer.write().await.group_mut(&gid)?.increased() + } else { + height + }; + Member::leave(&db, &mid, &h)?; + + // check mid is my chat friend. if not, delete avatar. + let c_db = chat_db(&global.base, &pid, &db_key)?; + if Friend::get_id(&c_db, &mpid).is_err() { + let _ = delete_avatar(&global.base, &pid, &mpid).await; + } + results.rpcs.push(rpc::member_leave(id, mid)); + + // broadcast + GroupChat::add_height(&db, id, h)?; + if is_server { + broadcast(&gid, global, &LayerEvent::Sync(gid, h, event), results).await?; + } + } + Event::MessageCreate(mpid, nmsg, mtime) => { + debug!("Sync: create message start"); + let _mid = Member::get_id(&db, &id, &mpid)?; + let h = if is_server { + global.layer.write().await.group_mut(&gid)?.increased() + } else { + height + }; + + let msg = handle_network_message( + &pid, + &global.base, + &db_key, + h, + id, + mpid, + nmsg.clone(), + mtime, + results, + ) + .await?; + results.rpcs.push(rpc::message_create(&msg)); + debug!("Sync: create message ok"); + + // UPDATE SESSION. + let s_db = session_db(&global.base, &pid, &db_key)?; + update_session(&s_db, &id, &msg, results); + + GroupChat::add_height(&db, id, h)?; + if is_server { + let new_e = Event::MessageCreate(mpid, nmsg, mtime); + broadcast(&gid, global, &LayerEvent::Sync(gid, h, new_e), results).await?; + } + } + } + } + LayerEvent::SyncReq(gid, from) => { + // SERVER + debug!("Got sync request. height: {} from: {}", height, from); + + if height >= from { + let to = if height - from > 20 { + from + 20 + } else { + height + }; + + let (members, leaves) = + Member::sync(&global.base, &pid, &db, &id, &from, &to).await?; + let messages = Message::sync(&global.base, &pid, &db, &id, &from, &to).await?; + let event = LayerEvent::SyncRes(gid, height, from, to, members, leaves, messages); + let data = bincode::serialize(&event).unwrap_or(vec![]); + let s = SendType::Event(0, addr, data); + results.layers.push((GROUP_CHAT_ID, s)); + debug!("Sended sync request results. from: {}, to: {}", from, to); + } + } + LayerEvent::SyncRes(gid, height, from, to, adds, leaves, messages) => { + // PEER + if to >= height { + results.layers.push((GROUP_CHAT_ID, sync_online(gid, addr))); + // when last packed sync, start sync online members. + } + + debug!("Start handle sync packed... {}, {}, {}", height, from, to); + let mut last_message = None; + + for (height, mpid, mname, mavatar) in adds { + let mid_res = Member::get_id(&db, &id, &mpid); + if let Ok(mid) = mid_res { + Member::update(&db, &height, &mid, &mname)?; + if mavatar.len() > 0 { + write_avatar_sync(&global.base, &pid, &mpid, mavatar)?; + } + let mem = Member::info(mid, id, mpid, mname); + results.rpcs.push(rpc::member_join(&mem)); + } else { + let mut member = Member::new(height, id, mpid, mname); + member.insert(&db)?; + if mavatar.len() > 0 { + write_avatar_sync(&global.base, &pid, &mpid, mavatar)?; + } + results.rpcs.push(rpc::member_join(&member)); + } + } + + let c_db = chat_db(&global.base, &pid, &db_key)?; + for (height, mpid) in leaves { + if let Ok(mid) = Member::get_id(&db, &id, &mpid) { + Member::leave(&db, &height, &mid)?; + // check mid is my chat friend. if not, delete avatar. + if Friend::get_id(&c_db, &mpid).is_err() { + let _ = delete_avatar(&global.base, &pid, &mpid).await; + } + results.rpcs.push(rpc::member_leave(id, mid)); + } + } + + for (height, mpid, nm, time) in messages { + if let Ok(msg) = handle_network_message( + &pid, + &global.base, + &db_key, + height, + id, + mpid, + nm, + time, + results, + ) + .await + { + results.rpcs.push(rpc::message_create(&msg)); + last_message = Some(msg); + } + } + + if to < height { + results + .layers + .push((GROUP_CHAT_ID, sync(gid, addr, to + 1))); + } + + // update group chat height. + GroupChat::add_height(&db, id, to)?; + + // UPDATE SESSION. + if let Some(msg) = last_message { + let s_db = session_db(&global.base, &pid, &db_key)?; + update_session(&s_db, &id, &msg, results); + } + debug!("Over handle sync packed... {}, {}, {}", height, from, to); + } + } + + Ok(()) +} pub(crate) async fn broadcast( gid: &GroupChatId, @@ -569,7 +488,7 @@ pub(crate) async fn broadcast( ) -> Result<()> { let new_data = bincode::serialize(event)?; - for mpid in global.layer.read().await.group(gid)?.addrs.iter() { + for mpid in global.layer.read().await.group(gid)?.addrs.iter().skip(1) { let s = SendType::Event(0, *mpid, new_data.clone()); results.layers.push((GROUP_CHAT_ID, s)); debug!("--- DEBUG broadcast to: {:?}", mpid); @@ -601,9 +520,10 @@ pub(crate) fn update_session(s_db: &DStorage, id: &i64, msg: &Message, results: } } -pub(crate) fn group_conn(addr: Peer, gid: GroupChatId) -> SendType { +pub(crate) fn group_conn(addr: PeerId, gid: GroupChatId, results: &mut HandleResult) { let data = bincode::serialize(&LayerConnect(gid)).unwrap_or(vec![]); - SendType::Connect(0, addr, data) + let msg = SendType::Connect(0, Peer::peer(addr), data); + results.layers.push((GROUP_CHAT_ID, msg)); } fn sync(gid: GroupChatId, addr: PeerId, height: i64) -> SendType { diff --git a/src/apps/group/models/group.rs b/src/apps/group/models/group.rs index fb36820..97e8a79 100644 --- a/src/apps/group/models/group.rs +++ b/src/apps/group/models/group.rs @@ -177,7 +177,13 @@ impl GroupChat { db.update(&sql) } - pub fn close(db: &DStorage, gid: &GroupChatId, addr: &PeerId) -> Result { + pub fn close(db: &DStorage, id: &i64) -> Result { + let sql = format!("UPDATE groups SET is_close = true WHERE id = {}", id); + db.update(&sql)?; + Self::get(db, id) + } + + pub fn close_id(db: &DStorage, gid: &GroupChatId, addr: &PeerId) -> Result { let group = Self::get_id(db, gid, addr)?; let sql = format!("UPDATE groups SET is_close = true WHERE id = {}", group.id); db.update(&sql)?; diff --git a/src/apps/group/models/message.rs b/src/apps/group/models/message.rs index 921d118..99b49fe 100644 --- a/src/apps/group/models/message.rs +++ b/src/apps/group/models/message.rs @@ -138,8 +138,8 @@ impl Message { } pub async fn sync( - own: &PeerId, base: &PathBuf, + own: &PeerId, db: &DStorage, fid: &i64, from: &i64, @@ -191,17 +191,17 @@ pub(crate) async fn handle_network_message( base: &PathBuf, db_key: &str, height: i64, - gdid: i64, + id: i64, mid: PeerId, msg: NetworkMessage, datetime: i64, results: &mut HandleResult, ) -> Result { let db = group_db(base, own, db_key)?; - let mdid = Member::get_id(&db, &gdid, &mid)?; + let mdid = Member::get_id(&db, &id, &mid)?; let is_me = &mid == own; let (m_type, raw) = from_network_message(own, base, db_key, msg, results).await?; - let mut msg = Message::new_with_time(height, gdid, mdid, is_me, m_type, raw, datetime); + let mut msg = Message::new_with_time(height, id, mdid, is_me, m_type, raw, datetime); msg.insert(&db)?; Ok(msg) } diff --git a/src/apps/group/rpc.rs b/src/apps/group/rpc.rs index 12096d3..3cef2a7 100644 --- a/src/apps/group/rpc.rs +++ b/src/apps/group/rpc.rs @@ -124,7 +124,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut m = Member::new(gh, id, pid, me.name); m.insert(&db)?; - let mid = m.id; let _ = write_avatar(&state.base, &pid, &pid, &me.avatar).await; // Add new session. @@ -192,7 +191,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { if g.local { // local save. - let new_h = state.layer.write().await.group_increased(&gid)?; + let new_h = state.layer.write().await.group_mut(&gid)?.increased(); let mut mem = Member::new(new_h, g.id, f.pid, f.name); mem.insert(&group_db)?; @@ -236,7 +235,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { if group.local { // local save. - let new_h = state.layer.write().await.group_increased(&gid)?; + let new_h = state.layer.write().await.group_mut(&gid)?.increased(); let mut msg = Message::new_with_time(new_h, id, mid, true, m_type, raw, datetime); msg.insert(&db)?; @@ -311,7 +310,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { if g.local { // dissolve group. let data = bincode::serialize(&LayerEvent::GroupClose(g.gid))?; - if let Some(addrs) = state.layer.write().await.group_rm_online(&g.gid) { + if let Some(addrs) = state.layer.write().await.group_del(&g.gid) { for addr in addrs { let s = SendType::Event(0, addr, data.clone()); results.layers.push((GROUP_CHAT_ID, s)); diff --git a/src/layer.rs b/src/layer.rs index 18af6a3..6640243 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -119,12 +119,6 @@ impl Layer { } } - pub fn group_add(&mut self, gid: GroupChatId, pid: PeerId, sid: i64, fid: i64, h: i64) { - if !self.groups.contains_key(&gid) { - self.groups.insert(gid, LayerSession::new(pid, sid, fid, h)); - } - } - pub fn group(&self, gid: &GroupChatId) -> Result<&LayerSession> { if let Some(session) = self.groups.get(gid) { Ok(session) @@ -133,18 +127,24 @@ impl Layer { } } - pub fn group_rm_online(&mut self, gid: &GroupChatId) -> Option> { - self.groups.remove(gid).map(|session| session.addrs) - } - - pub fn group_increased(&mut self, gid: &GroupChatId) -> Result { + pub fn group_mut(&mut self, gid: &GroupChatId) -> Result<&mut LayerSession> { if let Some(session) = self.groups.get_mut(gid) { - Ok(session.increased()) + Ok(session) } else { Err(anyhow!("session missing!")) } } + pub fn group_add(&mut self, gid: GroupChatId, pid: PeerId, sid: i64, fid: i64, h: i64) { + if !self.groups.contains_key(&gid) { + self.groups.insert(gid, LayerSession::new(pid, sid, fid, h)); + } + } + + pub fn group_del(&mut self, gid: &GroupChatId) -> Option> { + self.groups.remove(gid).map(|session| session.addrs) + } + pub fn group_add_member(&mut self, gid: &GroupChatId, addr: PeerId) { if let Some(session) = self.groups.get_mut(gid) { session.addrs.push(addr); @@ -157,6 +157,16 @@ impl Layer { } } + pub fn group_del_online(&mut self, gid: &GroupChatId, addr: &PeerId) -> bool { + if let Some(session) = self.groups.get_mut(gid) { + if let Some(pos) = session.addrs.iter().position(|x| x == addr) { + session.addrs.remove(pos); + return true; + } + } + false + } + // pub fn remove_running(&mut self, gid: &GroupId) -> HashMap { // // check close the stable connection. // let mut addrs: HashMap = HashMap::new(); @@ -248,7 +258,6 @@ impl Layer { // } pub fn broadcast(&self, user: User, results: &mut HandleResult) { - let gid = user.id; let info = ChatLayerEvent::InfoRes(user); let data = bincode::serialize(&info).unwrap_or(vec![]); @@ -256,6 +265,8 @@ impl Layer { let msg = SendType::Event(0, *fpid, data.clone()); results.layers.push((CHAT_ID, msg)); } + + // TODO GROUPS } } @@ -326,8 +337,8 @@ impl LayerSession { } } - pub fn info(&self) -> (i64, i64, i64) { - (self.height, self.s_id, self.db_id) + pub fn info(&self) -> (i64, i64, i64, PeerId) { + (self.height, self.s_id, self.db_id, self.addrs[0]) } pub fn increased(&mut self) -> i64 { diff --git a/src/rpc.rs b/src/rpc.rs index 9368ba7..299c433 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,14 +1,11 @@ use chat_types::CHAT_ID; use esse_primitives::{id_from_str, id_to_str}; -use group_types::GroupChatId; -use group_types::GROUP_CHAT_ID; -use std::collections::HashMap; +use group_types::{GroupChatId, LayerEvent as GroupLayerEvent, GROUP_CHAT_ID}; use std::net::SocketAddr; use std::sync::Arc; use tdn::{ prelude::{new_send_channel, start_main}, types::{ - group::GroupId, message::{ NetworkType, RpcSendMessage, SendMessage, SendType, StateRequest, StateResponse, }, @@ -17,21 +14,15 @@ use tdn::{ }, }; use tdn_did::{generate_mnemonic, Count}; -use tokio::sync::{ - mpsc::{self, error::SendError, Sender}, - RwLock, -}; use crate::account::lang_from_i64; use crate::apps::app_rpc_inject; use crate::apps::chat::{chat_conn, LayerEvent as ChatLayerEvent}; +use crate::apps::group::{group_conn, GroupChat}; use crate::global::Global; -//use crate::apps::group::{add_layer, group_conn, GroupChat}; //use crate::event::InnerEvent; -use crate::group::Group; -use crate::layer::Layer; use crate::session::{connect_session, Session, SessionType}; -use crate::storage::session_db; +use crate::storage::{group_db, session_db}; pub(crate) fn init_rpc(global: Arc) -> RpcHandler { let mut handler = new_rpc_handler(global); @@ -129,28 +120,6 @@ fn session_list(sessions: Vec) -> RpcParam { json!(results) } -#[inline] -pub(crate) async fn sleep_waiting_close_stable( - sender: Sender, - groups: HashMap, - layers: HashMap, -) -> std::result::Result<(), SendError> { - tokio::time::sleep(std::time::Duration::from_secs(10)).await; - for (addr, _) in groups { - sender - .send(SendMessage::Group(SendType::Disconnect(addr))) - .await?; - } - - for (faddr, fgid) in layers { - sender - .send(SendMessage::Layer(fgid, SendType::Disconnect(faddr))) - .await?; - } - - Ok(()) -} - #[inline] pub(crate) async fn inner_rpc(uid: u64, method: &str, global: &Arc) -> Result<()> { // Inner network default rpc method. only use in http-rpc. @@ -161,7 +130,7 @@ pub(crate) async fn inner_rpc(uid: u64, method: &str, global: &Arc) -> R _ => return Ok(()), }; - let (s, mut r) = mpsc::channel::(128); + let (s, mut r) = tokio::sync::mpsc::channel::(128); let _ = global .send(SendMessage::Network(NetworkType::NetworkState(req, s))) .await?; @@ -371,7 +340,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let pid = id_from_str(params[0].as_str().ok_or(RpcError::ParseError)?)?; let me_lock = params[1].as_str().ok_or(RpcError::ParseError)?; - let mut results = HandleResult::rpc(json!([id_to_str(&pid)])); + let results = HandleResult::rpc(json!([id_to_str(&pid)])); let (tdn_send, tdn_recv) = new_send_channel(); let running = state.reset(&pid, me_lock, tdn_send).await?; @@ -379,31 +348,20 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { return Ok(results); } - // TODO load all local services created by this account. + // load all local services created by this account. + let db_key = state.group.read().await.db_key(&pid)?; + let group_db = group_db(&state.base, &pid, &db_key)?; + let s_db = session_db(&state.base, &pid, &db_key)?; // 1. group chat. - // let self_addr = layer_lock.addr.clone(); - // let group_lock = state.group.read().await; - // let group_db = group_lock.group_db(&ogid)?; - // let s_db = group_lock.session_db(&ogid)?; - // drop(group_lock); - // let group_chats = GroupChat::local(&group_db)?; - // for g in group_chats { - // layer_lock.add_running(&g.g_id, ogid, g.id, g.height)?; - // results.networks.push(NetworkType::AddGroup(g.g_id)); - - // // 2. online group to self group onlines. - // if let Some(session) = - // connect_session(&s_db, &SessionType::Group, &g.id, &self_addr)? - // { - // layer_lock.running_mut(&ogid)?.check_add_online( - // g.g_id, - // Online::Direct(self_addr), - // session.id, - // g.id, - // )?; - // } - // } - // drop(layer_lock); + let group_chats = GroupChat::local(&group_db)?; + let mut layer = state.layer.write().await; + for g in group_chats { + // 2. online group to self group onlines. + if let Some(s) = connect_session(&s_db, &SessionType::Group, &g.id, &pid)? { + layer.group_add(g.gid, g.addr, s.id, g.id, g.height); + } + } + drop(layer); let key = state.group.read().await.keypair(); let peer_id = start_main( @@ -475,11 +433,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { if let Some(addr) = online { return Ok(HandleResult::rpc(json!([id, id_to_str(&addr)]))); } - // add_layer( - // &mut results, - // gid, - // group_conn(proof, Peer::peer(s.addr), s.gid), - // ); + group_conn(s.addr, remote_gid, &mut results); } _ => {} } @@ -519,9 +473,9 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { if layer_lock.group_suspend(&remote_gid, true, must)?.is_some() { results.rpcs.push(json!([id])); } - //let data = bincode::serialize(&GroupLayerEvent::Suspend(remote_gid))?; - //let msg = SendType::Event(0, s.addr, data); - //results.layers.push((GROUP_CHAT_ID, msg)); + let data = bincode::serialize(&GroupLayerEvent::Suspend(remote_gid))?; + let msg = SendType::Event(0, s.addr, data); + results.layers.push((GROUP_CHAT_ID, msg)); } _ => { return Ok(HandleResult::new()); // others has no online. diff --git a/types/group/src/lib.rs b/types/group/src/lib.rs index e6fa28d..050370b 100644 --- a/types/group/src/lib.rs +++ b/types/group/src/lib.rs @@ -62,20 +62,20 @@ pub enum LayerEvent { impl LayerEvent { /// get event's group id. - pub fn gcd(&self) -> &GroupChatId { + pub fn gid(&self) -> &GroupChatId { match self { - Self::Offline(gcd) => gcd, - Self::Suspend(gcd) => gcd, - Self::Actived(gcd) => gcd, - Self::MemberOnline(gcd, ..) => gcd, - Self::MemberOffline(gcd, ..) => gcd, - Self::MemberOnlineSync(gcd) => gcd, - Self::MemberOnlineSyncResult(gcd, ..) => gcd, - Self::GroupName(gcd, ..) => gcd, - Self::GroupClose(gcd) => gcd, - Self::Sync(gcd, ..) => gcd, - Self::SyncReq(gcd, ..) => gcd, - Self::SyncRes(gcd, ..) => gcd, + Self::Offline(gid) => gid, + Self::Suspend(gid) => gid, + Self::Actived(gid) => gid, + Self::MemberOnline(gid, ..) => gid, + Self::MemberOffline(gid, ..) => gid, + Self::MemberOnlineSync(gid) => gid, + Self::MemberOnlineSyncResult(gid, ..) => gid, + Self::GroupName(gid, ..) => gid, + Self::GroupClose(gid) => gid, + Self::Sync(gid, ..) => gid, + Self::SyncReq(gid, ..) => gid, + Self::SyncRes(gid, ..) => gid, } } }