diff --git a/lib/widgets/user_info.dart b/lib/widgets/user_info.dart index 31eacea..e2e2d45 100644 --- a/lib/widgets/user_info.dart +++ b/lib/widgets/user_info.dart @@ -106,9 +106,6 @@ class _UserInfoState extends State { ), if (widget.title != null) Text(widget.title!, style: TextStyle(fontSize: 16.0, fontStyle: FontStyle.italic)), - const SizedBox(height: 10), - const Divider(height: 1.0, color: Color(0x40ADB0BB)), - const SizedBox(height: 20), InkWell( onTap: () { Clipboard.setData(ClipboardData(text: pidText(widget.id))); @@ -117,11 +114,12 @@ class _UserInfoState extends State { }); }, child: Container( - width: 250.0, + padding: const EdgeInsets.symmetric(vertical: 10.0), child: Row( children: [ Expanded( child: Text(pidText(widget.id, widget.pre), + textAlign: TextAlign.center, style: TextStyle(fontSize: 14, color: idColor))), Padding( padding: const EdgeInsets.symmetric(horizontal: 10.0), @@ -132,6 +130,7 @@ class _UserInfoState extends State { ), ) ), + const Divider(height: 1.0, color: Color(0x40ADB0BB)), const SizedBox(height: 16), if (widget.remark != null) Container( diff --git a/src/apps.rs b/src/apps.rs index ba7f8e2..955605f 100644 --- a/src/apps.rs +++ b/src/apps.rs @@ -2,7 +2,8 @@ use chat_types::CHAT_ID; use cloud_types::CLOUD_ID; use dao_types::DAO_ID; use domain_types::DOMAIN_ID; -use group_types::GROUP_CHAT_ID; +use group_types::{GroupChatId, GROUP_CHAT_ID}; +use std::collections::HashMap; use std::sync::Arc; use tdn::types::{ group::GroupId, @@ -13,16 +14,17 @@ use tdn::types::{ use crate::global::Global; use crate::rpc::session_lost; +use crate::storage::group_db; pub(crate) mod chat; //pub(crate) mod cloud; pub(crate) mod device; //pub(crate) mod domain; //pub(crate) mod file; -//pub(crate) mod group; +pub(crate) mod group; pub(crate) mod jarvis; //pub(crate) mod dao; -//pub(crate) mod wallet; +pub(crate) mod wallet; pub(crate) fn app_rpc_inject(handler: &mut RpcHandler) { //device::new_rpc_handler(handler); @@ -30,8 +32,8 @@ pub(crate) fn app_rpc_inject(handler: &mut RpcHandler) { jarvis::new_rpc_handler(handler); //domain::new_rpc_handler(handler); //file::new_rpc_handler(handler); - //group::new_rpc_handler(handler); - //wallet::new_rpc_handler(handler); + group::new_rpc_handler(handler); + wallet::new_rpc_handler(handler); //dao::new_rpc_handler(handler); //cloud::new_rpc_handler(handler); } @@ -45,7 +47,7 @@ pub(crate) async fn app_layer_handle( debug!("TODO GOT LAYER MESSAGE: ====== {} -> {} ===== ", fgid, tgid); match (fgid, tgid) { (CHAT_ID, 0) | (0, CHAT_ID) => chat::handle(msg, global).await, - (GROUP_CHAT_ID, 0) => chat::handle(msg, global).await, + (GROUP_CHAT_ID, 0) | (0, GROUP_CHAT_ID) => group::handle(msg, global).await, (DAO_ID, 0) => chat::handle(msg, global).await, (DOMAIN_ID, 0) => chat::handle(msg, global).await, (CLOUD_ID, 0) => chat::handle(msg, global).await, @@ -59,16 +61,40 @@ pub(crate) async fn app_layer_handle( results.rpcs.push(session_lost(&session.s_id)); } - let mut delete = vec![]; + let mut delete: HashMap> = HashMap::new(); + let pid = global.pid().await; + let db_key = global.group.read().await.db_key(&pid)?; + let db = group_db(&global.base, &pid, &db_key)?; + for (gid, session) in &layer.groups { - if session.addr == peer.id { - delete.push(*gid); - results.rpcs.push(session_lost(&session.s_id)); + for (index, addr) in session.addrs.iter().enumerate() { + if addr == &peer.id { + delete + .entry(*gid) + .and_modify(|f| f.push(index)) + .or_insert(vec![index]); + if index == 0 { + results.rpcs.push(session_lost(&session.s_id)); + } else { + if let Ok(mid) = group::Member::get_id(&db, &session.db_id, addr) { + results + .rpcs + .push(group::rpc::member_offline(session.db_id, mid)); + } + } + } } } - for gid in delete { - let _ = layer.groups.remove(&gid); + for (gid, mut indexs) in delete { + if indexs[0] == 0 { + let _ = layer.groups.remove(&gid); + } else { + indexs.reverse(); + for i in indexs { + let _ = layer.group_del_member(&gid, i); + } + } } Ok(results) diff --git a/src/apps/chat/layer.rs b/src/apps/chat/layer.rs index da00a51..8f1f078 100644 --- a/src/apps/chat/layer.rs +++ b/src/apps/chat/layer.rs @@ -159,7 +159,7 @@ async fn handle_connect( results.rpcs.push(session_connect(&sid, &peer.id)); // 4. active this session. - global.layer.write().await.chat_add(peer.id, sid, f.id); + global.layer.write().await.chat_add(peer.id, sid, f.id, 0); Ok(f.height as u64) } diff --git a/src/apps/group/layer.rs b/src/apps/group/layer.rs index d42626e..4bddfaf 100644 --- a/src/apps/group/layer.rs +++ b/src/apps/group/layer.rs @@ -1,646 +1,585 @@ +use chat_types::MessageType; +use group_types::{Event, GroupChatId, LayerConnect, LayerEvent, LayerResult, GROUP_CHAT_ID}; use std::sync::Arc; use tdn::types::{ - group::GroupId, message::{RecvType, SendType}, - primitive::{HandleResult, Peer, PeerId, Result}, + primitives::{HandleResult, Peer, PeerId, Result}, }; -use tokio::sync::RwLock; - -use chat_types::MessageType; -use group_types::{Event, LayerConnect, LayerEvent, LayerResult}; -use tdn_did::Proof; use tdn_storage::local::DStorage; use crate::apps::chat::Friend; -use crate::layer::{Layer, Online}; +use crate::global::Global; +use crate::layer::Layer; use crate::rpc::{ session_close, session_connect, session_last, session_lost, session_suspend, session_update_name, }; use crate::session::{connect_session, Session, SessionType}; -use crate::storage::{delete_avatar, write_avatar_sync}; +use crate::storage::{delete_avatar, group_db, session_db, write_avatar_sync}; use super::models::{handle_network_message, GroupChat, Member, Message}; -use super::{add_layer, add_server_layer, rpc}; +use super::rpc; // variable statement: -// gcd: Group Chat ID. -// fgid: where is event come from. -// ogid: my account ID. if server is group owner. if client is my. -// mgid: member account ID. +// gid: Group Chat ID. +// pid: my account ID. +// mpid: member account ID. // id: Group Chat database Id. +// sid: Group Chat Session Id. // mid: member database Id. -pub(crate) async fn handle_server( - layer: &Arc>, - fgid: GroupId, - msg: RecvType, -) -> Result { +pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result { let mut results = HandleResult::new(); match msg { - RecvType::Connect(addr, data) => { - let LayerConnect(gcd, _proof) = bincode::deserialize(&data)?; - - if handle_server_connect(layer, gcd, fgid, &addr, &mut results) + RecvType::Connect(peer, data) => { + // SERVER + let LayerConnect(gid) = bincode::deserialize(&data)?; + if handle_connect(global, &peer, gid, &mut results) .await .is_err() { - let data = bincode::serialize(&gcd)?; - let s = SendType::Result(0, addr, false, false, data); - add_server_layer(&mut results, fgid, s); + let data = bincode::serialize(&gid)?; + let msg = SendType::Result(0, peer, false, false, data); + results.layers.push((GROUP_CHAT_ID, msg)); } } - RecvType::Leave(_addr) => { - // only server handle it. IMPORTANT !!! fgid IS mgid. - // TODO - } - RecvType::Event(addr, bytes) => { - debug!("----------- DEBUG GROUP CHAT: SERVER GOT LAYER EVENT"); - let event: LayerEvent = bincode::deserialize(&bytes)?; - handle_server_event(fgid, addr, event, layer, &mut results).await?; - debug!("----------- DEBUG GROUP CHAT: SERVER OVER LAYER EVENT"); - } - RecvType::Stream(_uid, _stream, _bytes) => { - // TODO stream - } - RecvType::Result(..) => {} - RecvType::ResultConnect(..) => {} - RecvType::Delivery(..) => {} - } - - Ok(results) -} - -async fn handle_server_connect( - layer: &Arc>, - gcd: GroupId, - fgid: GroupId, - addr: &Peer, - results: &mut HandleResult, -) -> Result<()> { - let (ogid, height, id) = layer.read().await.running(&gcd)?.owner_height_id(); - // check is member. - let db = layer.read().await.group.read().await.group_db(&ogid)?; - let g = GroupChat::get(&db, &id)?; - let mdid = Member::get_id(&db, &id, &fgid)?; - - let res = LayerResult(gcd, g.g_name, height); - let data = bincode::serialize(&res).unwrap_or(vec![]); - let s = SendType::Result(0, addr.clone(), true, false, data); - add_server_layer(results, fgid, s); - - layer.write().await.running_mut(&gcd)?.check_add_online( - fgid, - Online::Direct(addr.id), - id, - mdid, - )?; - - let _ = Member::addr_update(&db, &id, &fgid, &addr.id); - results - .rpcs - .push(rpc::member_online(ogid, id, mdid, &addr.id)); - - let new_data = bincode::serialize(&LayerEvent::MemberOnline(gcd, fgid, addr.id))?; - - for (mid, maddr) in layer.read().await.running(&gcd)?.onlines() { - let s = SendType::Event(0, *maddr, new_data.clone()); - add_server_layer(results, *mid, s); - } - Ok(()) -} - -pub(crate) async fn handle_peer( - layer: &Arc>, - ogid: GroupId, - msg: RecvType, -) -> Result { - let mut results = HandleResult::new(); - - match msg { - RecvType::Result(addr, is_ok, data) => { + RecvType::Result(peer, is_ok, data) => { + // PEER if is_ok { - let mut layer_lock = layer.write().await; - handle_connect(ogid, &addr, data, &mut layer_lock, &mut results).await?; + handle_result(global, &peer, data, &mut results).await?; } else { // close the group chat. - let gcd: GroupId = bincode::deserialize(&data)?; + let gid: GroupChatId = bincode::deserialize(&data)?; - let layer_lock = layer.read().await; - let group_lock = layer_lock.group.read().await; - let db = group_lock.group_db(&ogid)?; - let s_db = group_lock.session_db(&ogid)?; - drop(group_lock); - drop(layer_lock); + let pid = global.pid().await; + let db_key = global.group.read().await.db_key(&pid)?; + let db = group_db(&global.base, &pid, &db_key)?; + let s_db = session_db(&global.base, &pid, &db_key)?; - let group = GroupChat::close(&db, &gcd)?; + let group = GroupChat::close(&db, &gid, &peer.id)?; let sid = Session::close(&s_db, &group.id, &SessionType::Group)?; - results.rpcs.push(session_close(ogid, &sid)); + results.rpcs.push(session_close(&sid)); } } - RecvType::ResultConnect(addr, data) => { - let mut layer_lock = layer.write().await; - if handle_connect(ogid, &addr, data, &mut layer_lock, &mut results) + RecvType::ResultConnect(peer, data) => { + // PEER + if handle_result(global, &peer, data, &mut results) .await .is_err() { - let msg = SendType::Result(0, addr, true, false, vec![]); - add_layer(&mut results, ogid, msg); + let msg = SendType::Result(0, peer, true, false, vec![]); + results.layers.push((GROUP_CHAT_ID, msg)); } } + RecvType::Leave(..) => {} RecvType::Event(addr, bytes) => { - debug!("----------- DEBUG GROUP CHAT: PEER GOT LAYER EVENT"); let event: LayerEvent = bincode::deserialize(&bytes)?; - handle_peer_event(ogid, addr, event, layer, &mut results).await?; + debug!("----------- DEBUG GROUP CHAT: SERVER GOT LAYER EVENT"); + //handle_server_event(fgid, addr, event, layer, &mut results).await?; + debug!("----------- DEBUG GROUP CHAT: SERVER OVER LAYER EVENT"); + + debug!("----------- DEBUG GROUP CHAT: PEER GOT LAYER EVENT"); + //handle_peer_event(ogid, addr, event, layer, &mut results).await?; debug!("----------- DEBUG GROUP CHAT: PEER OVER LAYER EVENT"); } RecvType::Stream(_uid, _stream, _bytes) => { // TODO stream } - RecvType::Delivery(_t, _tid, _is_ok) => { - // TODO - } - _ => { - error!("group chat peer handle layer nerver here") - } + + RecvType::Delivery(..) => {} } Ok(results) } async fn handle_connect( - ogid: GroupId, - addr: &Peer, + global: &Arc, + peer: &Peer, + gid: GroupChatId, + results: &mut HandleResult, +) -> Result<()> { + let (height, sid, id) = global.layer.read().await.group(&gid)?.info(); + + let pid = global.pid().await; + let db_key = global.group.read().await.db_key(&pid)?; + let db = group_db(&global.base, &pid, &db_key)?; + + // check is member. + let g = GroupChat::get(&db, &id)?; + let mid = Member::get_id(&db, &id, &peer.id)?; + + let res = LayerResult(gid, g.name, height); + let data = bincode::serialize(&res).unwrap_or(vec![]); + let s = SendType::Result(0, peer.clone(), true, false, data); + results.layers.push((GROUP_CHAT_ID, s)); + + global.layer.write().await.group_add_member(&gid, peer.id); + results.rpcs.push(rpc::member_online(id, mid)); + + let data = LayerEvent::MemberOnline(gid, peer.id); + broadcast(&gid, global, &data, results).await; + Ok(()) +} + +async fn handle_result( + global: &Arc, + peer: &Peer, data: Vec, - layer: &mut Layer, results: &mut HandleResult, ) -> Result<()> { // 0. deserialize result. - let LayerResult(gcd, gname, height) = bincode::deserialize(&data)?; + let LayerResult(gid, name, height) = bincode::deserialize(&data)?; + + let pid = global.pid().await; + let db_key = global.group.read().await.db_key(&pid)?; + let db = group_db(&global.base, &pid, &db_key)?; + let s_db = session_db(&global.base, &pid, &db_key)?; // 1. check group. - let db = layer.group.read().await.group_db(&ogid)?; - let group = GroupChat::get_id(&db, &gcd)?; + let group = GroupChat::get_id(&db, &gid, &peer.id)?; // 1.0 check address. - if group.g_addr != addr.id { + if group.addr != peer.id { return Err(anyhow!("invalid group chat address.")); } - let _ = GroupChat::update_name(&db, &group.id, &gname); - results.rpcs.push(rpc::group_name(ogid, &group.id, &gname)); + let _ = GroupChat::update_name(&db, &group.id, &name); + results.rpcs.push(rpc::group_name(&group.id, &name)); // 1.1 get session. - let s_db = layer.group.read().await.session_db(&ogid)?; - let session_some = connect_session(&s_db, &SessionType::Group, &group.id, &addr.id)?; + let session_some = connect_session(&s_db, &SessionType::Group, &group.id, &peer.id)?; if session_some.is_none() { return Err(anyhow!("invalid group chat address.")); } let sid = session_some.unwrap().id; - let _ = Session::update_name(&s_db, &sid, &gname); - results.rpcs.push(session_update_name(ogid, &sid, &gname)); + let _ = Session::update_name(&s_db, &sid, &name); + results.rpcs.push(session_update_name(&sid, &name)); // 1.2 online this group. - layer - .running_mut(&ogid)? - .check_add_online(gcd, Online::Direct(addr.id), sid, group.id)?; + global + .layer + .write() + .await + .group_add(gid, peer.id, sid, group.id, height); // 1.3 online to UI. - results.rpcs.push(session_connect(ogid, &sid, &addr.id)); + results.rpcs.push(session_connect(&sid, &peer.id)); debug!("will sync remote: {}, my: {}", height, group.height); // 1.4 sync group height. if group.height < height { - add_layer(results, ogid, sync(gcd, addr.id, group.height)); + results + .layers + .push((GROUP_CHAT_ID, sync(gid, peer.id, group.height))); } else { // sync online members. - add_layer(results, ogid, sync_online(gcd, addr.id)); - } - - Ok(()) -} - -// variable statement: -// gcd: Group Chat ID. -// fgid: where is event come from. -// ogid: my account ID. if server is group owner. if client is my. -// mgid: member account ID. -// id: Group Chat database Id. -// mdid: member database Id. -// sid: session Id. -async fn handle_server_event( - fgid: GroupId, - addr: PeerId, - event: LayerEvent, - layer: &Arc>, - results: &mut HandleResult, -) -> Result<()> { - let gcd = event.gcd(); - let base = layer.read().await.base().clone(); - let (ogid, height, id) = layer.read().await.running(gcd)?.owner_height_id(); - let db = layer.read().await.group.read().await.group_db(&ogid)?; - - match event { - LayerEvent::Offline(gcd) => { - // 1. check member online. - if layer.write().await.remove_online(&gcd, &fgid).is_none() { - return Ok(()); - } - - // 2. UI: offline the member. - if let Ok(mid) = Member::get_id(&db, &id, &fgid) { - results.rpcs.push(rpc::member_offline(ogid, id, mid)); - } - - // 3. broadcast offline event. - broadcast(&LayerEvent::MemberOffline(gcd, fgid), layer, &gcd, results).await?; - } - LayerEvent::GroupName(gcd, name) => { - // 1. update group name - let _ = GroupChat::update_name(&db, &id, &name)?; - // 2. UI: update - results.rpcs.push(rpc::group_name(ogid, &id, &name)); - if let Ok(sid) = Session::update_name_by_id( - &layer.read().await.group.read().await.session_db(&ogid)?, - &id, - &SessionType::Group, - &name, - ) { - results.rpcs.push(session_update_name(ogid, &sid, &name)); - } - // 3. broadcast - broadcast(&LayerEvent::GroupName(gcd, name), layer, &gcd, results).await?; - } - LayerEvent::Sync(gcd, _, event) => { - match event { - Event::MemberJoin(mgid, maddr, mname, mavatar) => { - let mdid_res = Member::get_id(&db, &id, &mgid); - let h = layer.write().await.running_mut(&gcd)?.increased(); - let new_e = Event::MemberJoin(mgid, maddr, mname.clone(), mavatar.clone()); - - if let Ok(mdid) = mdid_res { - Member::update(&db, &h, &mdid, &maddr, &mname)?; - if mavatar.len() > 0 { - write_avatar_sync(&base, &ogid, &mgid, mavatar)?; - } - let mem = Member::info(mdid, id, mgid, maddr, mname); - results.rpcs.push(rpc::member_join(ogid, &mem)); - } else { - let mut member = Member::new(h, id, mgid, maddr, mname); - member.insert(&db)?; - if mavatar.len() > 0 { - write_avatar_sync(&base, &ogid, &mgid, mavatar)?; - } - results.rpcs.push(rpc::member_join(ogid, &member)); - } - - // broadcast - GroupChat::add_height(&db, id, h)?; - broadcast(&LayerEvent::Sync(gcd, h, new_e), layer, &gcd, results).await?; - } - Event::MemberLeave(mgid) => { - let mdid = Member::get_id(&db, &id, &mgid)?; - let h = layer.write().await.running_mut(&gcd)?.increased(); - Member::leave(&db, &mdid, &h)?; - - // check mid is my chat friend. if not, delete avatar. - let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?; - if Friend::get_id(&s_db, &mgid).is_err() { - let _ = delete_avatar(&base, &ogid, &mgid).await; - } - results.rpcs.push(rpc::member_leave(ogid, id, mdid)); - - // broadcast - GroupChat::add_height(&db, id, h)?; - broadcast(&LayerEvent::Sync(gcd, h, event), layer, &gcd, results).await?; - } - Event::MessageCreate(mgid, nmsg, mtime) => { - debug!("Sync: create message start"); - let _mdid = Member::get_id(&db, &id, &mgid)?; - - let new_e = Event::MessageCreate(mgid, nmsg.clone(), mtime); - let new_h = layer.write().await.running_mut(&gcd)?.increased(); - broadcast(&LayerEvent::Sync(gcd, new_h, new_e), layer, &gcd, results).await?; - GroupChat::add_height(&db, id, new_h)?; - - let msg = handle_network_message( - &layer.read().await.group, - new_h, - id, - mgid, - &ogid, - nmsg, - mtime, - &base, - results, - ) - .await?; - results.rpcs.push(rpc::message_create(ogid, &msg)); - debug!("Sync: create message ok"); - - // UPDATE SESSION. - if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) { - update_session(&s_db, &ogid, &id, &msg, results); - } - } - } - } - LayerEvent::MemberOnlineSync(gcd) => { - let onlines = layer - .read() - .await - .running(&gcd)? - .onlines() - .iter() - .map(|(g, a)| (**g, **a)) - .collect(); - let event = LayerEvent::MemberOnlineSyncResult(gcd, onlines); - let data = bincode::serialize(&event).unwrap_or(vec![]); - let s = SendType::Event(0, addr, data); - add_server_layer(results, fgid, s); - } - LayerEvent::SyncReq(gcd, from) => { - debug!("Got sync request. height: {} from: {}", height, from); - - if height >= from { - let to = if height - from > 20 { - from + 20 - } else { - height - }; - - let (members, leaves) = Member::sync(&base, &ogid, &db, &id, &from, &to).await?; - let messages = Message::sync(&base, &ogid, &db, &id, &from, &to).await?; - let event = LayerEvent::SyncRes(gcd, height, from, to, members, leaves, messages); - let data = bincode::serialize(&event).unwrap_or(vec![]); - let s = SendType::Event(0, addr, data); - add_server_layer(results, fgid, s); - debug!("Sended sync request results. from: {}, to: {}", from, to); - } - } - LayerEvent::Suspend(..) => {} - LayerEvent::Actived(..) => {} - _ => error!("group server handle event nerver here"), + results + .layers + .push((GROUP_CHAT_ID, sync_online(gid, peer.id))); } Ok(()) } -// variable statement: -// gcd: Group Chat ID. -// fgid: where is event come from. -// ogid: my account ID. if server is group owner. if client is my. -// mgid: member account ID. -// id: Group Chat database Id. -// mdid: member database Id. -// sid: session Id. -async fn handle_peer_event( - ogid: GroupId, - addr: PeerId, - event: LayerEvent, - layer: &Arc>, - results: &mut HandleResult, -) -> Result<()> { - let base = layer.read().await.base().clone(); - let gcd = event.gcd(); - let (sid, id) = layer.read().await.get_running_remote_id(&ogid, gcd)?; - let db = layer.read().await.group.read().await.group_db(&ogid)?; - - match event { - LayerEvent::Offline(gcd) => { - // 1. offline group chat. - layer - .write() - .await - .running_mut(&ogid)? - .check_offline(&gcd, &addr); - - // 2. UI: offline the session. - results.rpcs.push(session_lost(ogid, &sid)); - } - LayerEvent::Suspend(gcd) => { - if layer - .write() - .await - .running_mut(&ogid)? - .suspend(&gcd, false, true)? - { - results.rpcs.push(session_suspend(ogid, &sid)); - } - } - LayerEvent::Actived(gcd) => { - let _ = layer.write().await.running_mut(&ogid)?.active(&gcd, false); - results.rpcs.push(session_connect(ogid, &sid, &addr)); - } - LayerEvent::MemberOnline(_gcd, mgid, maddr) => { - if let Ok(mid) = Member::addr_update(&db, &id, &mgid, &maddr) { - results.rpcs.push(rpc::member_online(ogid, id, mid, &maddr)); - } - } - LayerEvent::MemberOffline(_gcd, mgid) => { - if let Ok(mid) = Member::get_id(&db, &id, &mgid) { - results.rpcs.push(rpc::member_offline(ogid, id, mid)); - } - } - LayerEvent::MemberOnlineSyncResult(_gcd, onlines) => { - for (mgid, maddr) in onlines { - if let Ok(mid) = Member::addr_update(&db, &id, &mgid, &maddr) { - results.rpcs.push(rpc::member_online(ogid, id, mid, &maddr)); - } - } - } - LayerEvent::GroupName(_gcd, name) => { - let _ = GroupChat::update_name(&db, &id, &name)?; - results.rpcs.push(rpc::group_name(ogid, &id, &name)); - let _ = Session::update_name( - &layer.read().await.group.read().await.session_db(&ogid)?, - &sid, - &name, - ); - results.rpcs.push(session_update_name(ogid, &sid, &name)); - } - LayerEvent::GroupClose(_gcd) => { - let group = GroupChat::close(&db, &gcd)?; - let sid = Session::close( - &layer.read().await.group.read().await.session_db(&ogid)?, - &group.id, - &SessionType::Group, - )?; - results.rpcs.push(session_close(ogid, &sid)); - } - LayerEvent::Sync(_gcd, height, event) => { - debug!("Sync: handle height: {}", height); - - match event { - Event::MemberJoin(mgid, maddr, mname, mavatar) => { - let mdid_res = Member::get_id(&db, &id, &mgid); - if let Ok(mdid) = mdid_res { - Member::update(&db, &height, &mdid, &maddr, &mname)?; - if mavatar.len() > 0 { - write_avatar_sync(&base, &ogid, &mgid, mavatar)?; - } - let mem = Member::info(mdid, id, mgid, maddr, mname); - results.rpcs.push(rpc::member_join(ogid, &mem)); - } else { - let mut member = Member::new(height, id, mgid, maddr, mname); - member.insert(&db)?; - if mavatar.len() > 0 { - write_avatar_sync(&base, &ogid, &mgid, mavatar)?; - } - results.rpcs.push(rpc::member_join(ogid, &member)); - } - - // save consensus. - GroupChat::add_height(&db, id, height)?; - } - Event::MemberLeave(mgid) => { - let mdid = Member::get_id(&db, &id, &mgid)?; - Member::leave(&db, &height, &mdid)?; - - // check mid is my chat friend. if not, delete avatar. - let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?; - if Friend::get_id(&s_db, &mgid).is_err() { - let _ = delete_avatar(&base, &ogid, &mgid).await; - } - results.rpcs.push(rpc::member_leave(ogid, id, mdid)); - - // save consensus. - GroupChat::add_height(&db, id, height)?; - } - Event::MessageCreate(mgid, nmsg, mtime) => { - debug!("Sync: create message start"); - let _mdid = Member::get_id(&db, &id, &mgid)?; - - let msg = handle_network_message( - &layer.read().await.group, - height, - id, - mgid, - &ogid, - nmsg, - mtime, - &base, - results, - ) - .await?; - results.rpcs.push(rpc::message_create(ogid, &msg)); - - GroupChat::add_height(&db, id, height)?; - debug!("Sync: create message ok"); - - // UPDATE SESSION. - if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) { - update_session(&s_db, &ogid, &id, &msg, results); - } - } - } - } - LayerEvent::SyncRes(gcd, height, from, to, adds, leaves, messages) => { - if to >= height { - // when last packed sync, start sync online members. - add_layer(results, ogid, sync_online(gcd, addr)); - } - - debug!("Start handle sync packed... {}, {}, {}", height, from, to); - let mut last_message = None; - - for (height, mgid, maddr, mname, mavatar) in adds { - let mdid_res = Member::get_id(&db, &id, &mgid); - if let Ok(mdid) = mdid_res { - Member::update(&db, &height, &mdid, &maddr, &mname)?; - if mavatar.len() > 0 { - write_avatar_sync(&base, &ogid, &mgid, mavatar)?; - } - let mem = Member::info(mdid, id, mgid, maddr, mname); - results.rpcs.push(rpc::member_join(ogid, &mem)); - } else { - let mut member = Member::new(height, id, mgid, maddr, mname); - member.insert(&db)?; - if mavatar.len() > 0 { - write_avatar_sync(&base, &ogid, &mgid, mavatar)?; - } - results.rpcs.push(rpc::member_join(ogid, &member)); - } - } - - for (height, mgid) in leaves { - if let Ok(mdid) = Member::get_id(&db, &id, &mgid) { - Member::leave(&db, &height, &mdid)?; - // check mid is my chat friend. if not, delete avatar. - let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?; - if Friend::get_id(&s_db, &mgid).is_err() { - let _ = delete_avatar(&base, &ogid, &mgid).await; - } - results.rpcs.push(rpc::member_leave(ogid, id, mdid)); - } - } - - for (height, mgid, nm, time) in messages { - if let Ok(msg) = handle_network_message( - &layer.read().await.group, - height, - id, - mgid, - &ogid, - nm, - time, - &base, - results, - ) - .await - { - results.rpcs.push(rpc::message_create(ogid, &msg)); - last_message = Some(msg); - } - } - - if to < height { - add_layer(results, ogid, sync(gcd, addr, to + 1)); - } - - // update group chat height. - GroupChat::add_height(&db, id, to)?; - - // UPDATE SESSION. - if let Some(msg) = last_message { - if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) { - update_session(&s_db, &ogid, &id, &msg, results); - } - } - debug!("Over handle sync packed... {}, {}, {}", height, from, to); - } - _ => error!("group peer handle event nerver here"), - } - - Ok(()) -} +// async fn handle_server_event( +// fgid: GroupId, +// addr: PeerId, +// event: LayerEvent, +// layer: &Arc>, +// results: &mut HandleResult, +// ) -> Result<()> { +// let gcd = event.gcd(); +// let base = layer.read().await.base().clone(); +// let (ogid, height, id) = layer.read().await.running(gcd)?.owner_height_id(); +// let db = layer.read().await.group.read().await.group_db(&ogid)?; + +// match event { +// LayerEvent::Offline(gcd) => { +// // 1. check member online. +// if layer.write().await.remove_online(&gcd, &fgid).is_none() { +// return Ok(()); +// } + +// // 2. UI: offline the member. +// if let Ok(mid) = Member::get_id(&db, &id, &fgid) { +// results.rpcs.push(rpc::member_offline(ogid, id, mid)); +// } + +// // 3. broadcast offline event. +// broadcast(&LayerEvent::MemberOffline(gcd, fgid), layer, &gcd, results).await?; +// } +// LayerEvent::GroupName(gcd, name) => { +// // 1. update group name +// let _ = GroupChat::update_name(&db, &id, &name)?; +// // 2. UI: update +// results.rpcs.push(rpc::group_name(ogid, &id, &name)); +// if let Ok(sid) = Session::update_name_by_id( +// &layer.read().await.group.read().await.session_db(&ogid)?, +// &id, +// &SessionType::Group, +// &name, +// ) { +// results.rpcs.push(session_update_name(ogid, &sid, &name)); +// } +// // 3. broadcast +// broadcast(&LayerEvent::GroupName(gcd, name), layer, &gcd, results).await?; +// } +// LayerEvent::Sync(gcd, _, event) => { +// match event { +// Event::MemberJoin(mgid, maddr, mname, mavatar) => { +// let mdid_res = Member::get_id(&db, &id, &mgid); +// let h = layer.write().await.running_mut(&gcd)?.increased(); +// let new_e = Event::MemberJoin(mgid, maddr, mname.clone(), mavatar.clone()); + +// if let Ok(mdid) = mdid_res { +// Member::update(&db, &h, &mdid, &maddr, &mname)?; +// if mavatar.len() > 0 { +// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; +// } +// let mem = Member::info(mdid, id, mgid, maddr, mname); +// results.rpcs.push(rpc::member_join(ogid, &mem)); +// } else { +// let mut member = Member::new(h, id, mgid, maddr, mname); +// member.insert(&db)?; +// if mavatar.len() > 0 { +// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; +// } +// results.rpcs.push(rpc::member_join(ogid, &member)); +// } + +// // broadcast +// GroupChat::add_height(&db, id, h)?; +// broadcast(&LayerEvent::Sync(gcd, h, new_e), layer, &gcd, results).await?; +// } +// Event::MemberLeave(mgid) => { +// let mdid = Member::get_id(&db, &id, &mgid)?; +// let h = layer.write().await.running_mut(&gcd)?.increased(); +// Member::leave(&db, &mdid, &h)?; + +// // check mid is my chat friend. if not, delete avatar. +// let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?; +// if Friend::get_id(&s_db, &mgid).is_err() { +// let _ = delete_avatar(&base, &ogid, &mgid).await; +// } +// results.rpcs.push(rpc::member_leave(ogid, id, mdid)); + +// // broadcast +// GroupChat::add_height(&db, id, h)?; +// broadcast(&LayerEvent::Sync(gcd, h, event), layer, &gcd, results).await?; +// } +// Event::MessageCreate(mgid, nmsg, mtime) => { +// debug!("Sync: create message start"); +// let _mdid = Member::get_id(&db, &id, &mgid)?; + +// let new_e = Event::MessageCreate(mgid, nmsg.clone(), mtime); +// let new_h = layer.write().await.running_mut(&gcd)?.increased(); +// broadcast(&LayerEvent::Sync(gcd, new_h, new_e), layer, &gcd, results).await?; +// GroupChat::add_height(&db, id, new_h)?; + +// let msg = handle_network_message( +// &layer.read().await.group, +// new_h, +// id, +// mgid, +// &ogid, +// nmsg, +// mtime, +// &base, +// results, +// ) +// .await?; +// results.rpcs.push(rpc::message_create(ogid, &msg)); +// debug!("Sync: create message ok"); + +// // UPDATE SESSION. +// if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) { +// update_session(&s_db, &ogid, &id, &msg, results); +// } +// } +// } +// } +// LayerEvent::MemberOnlineSync(gcd) => { +// let onlines = layer +// .read() +// .await +// .running(&gcd)? +// .onlines() +// .iter() +// .map(|(g, a)| (**g, **a)) +// .collect(); +// let event = LayerEvent::MemberOnlineSyncResult(gcd, onlines); +// let data = bincode::serialize(&event).unwrap_or(vec![]); +// let s = SendType::Event(0, addr, data); +// add_server_layer(results, fgid, s); +// } +// LayerEvent::SyncReq(gcd, from) => { +// debug!("Got sync request. height: {} from: {}", height, from); + +// if height >= from { +// let to = if height - from > 20 { +// from + 20 +// } else { +// height +// }; + +// let (members, leaves) = Member::sync(&base, &ogid, &db, &id, &from, &to).await?; +// let messages = Message::sync(&base, &ogid, &db, &id, &from, &to).await?; +// let event = LayerEvent::SyncRes(gcd, height, from, to, members, leaves, messages); +// let data = bincode::serialize(&event).unwrap_or(vec![]); +// let s = SendType::Event(0, addr, data); +// add_server_layer(results, fgid, s); +// debug!("Sended sync request results. from: {}, to: {}", from, to); +// } +// } +// LayerEvent::Suspend(..) => {} +// LayerEvent::Actived(..) => {} +// _ => error!("group server handle event nerver here"), +// } + +// Ok(()) +// } + +// async fn handle_peer_event( +// ogid: GroupId, +// addr: PeerId, +// event: LayerEvent, +// layer: &Arc>, +// results: &mut HandleResult, +// ) -> Result<()> { +// let base = layer.read().await.base().clone(); +// let gcd = event.gcd(); +// let (sid, id) = layer.read().await.get_running_remote_id(&ogid, gcd)?; +// let db = layer.read().await.group.read().await.group_db(&ogid)?; + +// match event { +// LayerEvent::Offline(gcd) => { +// // 1. offline group chat. +// layer +// .write() +// .await +// .running_mut(&ogid)? +// .check_offline(&gcd, &addr); + +// // 2. UI: offline the session. +// results.rpcs.push(session_lost(ogid, &sid)); +// } +// LayerEvent::Suspend(gcd) => { +// if layer +// .write() +// .await +// .running_mut(&ogid)? +// .suspend(&gcd, false, true)? +// { +// results.rpcs.push(session_suspend(ogid, &sid)); +// } +// } +// LayerEvent::Actived(gcd) => { +// let _ = layer.write().await.running_mut(&ogid)?.active(&gcd, false); +// results.rpcs.push(session_connect(ogid, &sid, &addr)); +// } +// LayerEvent::MemberOnline(_gcd, mgid, maddr) => { +// if let Ok(mid) = Member::addr_update(&db, &id, &mgid, &maddr) { +// results.rpcs.push(rpc::member_online(ogid, id, mid, &maddr)); +// } +// } +// LayerEvent::MemberOffline(_gcd, mgid) => { +// if let Ok(mid) = Member::get_id(&db, &id, &mgid) { +// results.rpcs.push(rpc::member_offline(ogid, id, mid)); +// } +// } +// LayerEvent::MemberOnlineSyncResult(_gcd, onlines) => { +// for (mgid, maddr) in onlines { +// if let Ok(mid) = Member::addr_update(&db, &id, &mgid, &maddr) { +// results.rpcs.push(rpc::member_online(ogid, id, mid, &maddr)); +// } +// } +// } +// LayerEvent::GroupName(_gcd, name) => { +// let _ = GroupChat::update_name(&db, &id, &name)?; +// results.rpcs.push(rpc::group_name(ogid, &id, &name)); +// let _ = Session::update_name( +// &layer.read().await.group.read().await.session_db(&ogid)?, +// &sid, +// &name, +// ); +// results.rpcs.push(session_update_name(ogid, &sid, &name)); +// } +// LayerEvent::GroupClose(_gcd) => { +// let group = GroupChat::close(&db, &gcd)?; +// let sid = Session::close( +// &layer.read().await.group.read().await.session_db(&ogid)?, +// &group.id, +// &SessionType::Group, +// )?; +// results.rpcs.push(session_close(ogid, &sid)); +// } +// LayerEvent::Sync(_gcd, height, event) => { +// debug!("Sync: handle height: {}", height); + +// match event { +// Event::MemberJoin(mgid, maddr, mname, mavatar) => { +// let mdid_res = Member::get_id(&db, &id, &mgid); +// if let Ok(mdid) = mdid_res { +// Member::update(&db, &height, &mdid, &maddr, &mname)?; +// if mavatar.len() > 0 { +// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; +// } +// let mem = Member::info(mdid, id, mgid, maddr, mname); +// results.rpcs.push(rpc::member_join(ogid, &mem)); +// } else { +// let mut member = Member::new(height, id, mgid, maddr, mname); +// member.insert(&db)?; +// if mavatar.len() > 0 { +// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; +// } +// results.rpcs.push(rpc::member_join(ogid, &member)); +// } + +// // save consensus. +// GroupChat::add_height(&db, id, height)?; +// } +// Event::MemberLeave(mgid) => { +// let mdid = Member::get_id(&db, &id, &mgid)?; +// Member::leave(&db, &height, &mdid)?; + +// // check mid is my chat friend. if not, delete avatar. +// let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?; +// if Friend::get_id(&s_db, &mgid).is_err() { +// let _ = delete_avatar(&base, &ogid, &mgid).await; +// } +// results.rpcs.push(rpc::member_leave(ogid, id, mdid)); + +// // save consensus. +// GroupChat::add_height(&db, id, height)?; +// } +// Event::MessageCreate(mgid, nmsg, mtime) => { +// debug!("Sync: create message start"); +// let _mdid = Member::get_id(&db, &id, &mgid)?; + +// let msg = handle_network_message( +// &layer.read().await.group, +// height, +// id, +// mgid, +// &ogid, +// nmsg, +// mtime, +// &base, +// results, +// ) +// .await?; +// results.rpcs.push(rpc::message_create(ogid, &msg)); + +// GroupChat::add_height(&db, id, height)?; +// debug!("Sync: create message ok"); + +// // UPDATE SESSION. +// if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) { +// update_session(&s_db, &ogid, &id, &msg, results); +// } +// } +// } +// } +// LayerEvent::SyncRes(gcd, height, from, to, adds, leaves, messages) => { +// if to >= height { +// // when last packed sync, start sync online members. +// add_layer(results, ogid, sync_online(gcd, addr)); +// } + +// debug!("Start handle sync packed... {}, {}, {}", height, from, to); +// let mut last_message = None; + +// for (height, mgid, maddr, mname, mavatar) in adds { +// let mdid_res = Member::get_id(&db, &id, &mgid); +// if let Ok(mdid) = mdid_res { +// Member::update(&db, &height, &mdid, &maddr, &mname)?; +// if mavatar.len() > 0 { +// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; +// } +// let mem = Member::info(mdid, id, mgid, maddr, mname); +// results.rpcs.push(rpc::member_join(ogid, &mem)); +// } else { +// let mut member = Member::new(height, id, mgid, maddr, mname); +// member.insert(&db)?; +// if mavatar.len() > 0 { +// write_avatar_sync(&base, &ogid, &mgid, mavatar)?; +// } +// results.rpcs.push(rpc::member_join(ogid, &member)); +// } +// } + +// for (height, mgid) in leaves { +// if let Ok(mdid) = Member::get_id(&db, &id, &mgid) { +// Member::leave(&db, &height, &mdid)?; +// // check mid is my chat friend. if not, delete avatar. +// let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?; +// if Friend::get_id(&s_db, &mgid).is_err() { +// let _ = delete_avatar(&base, &ogid, &mgid).await; +// } +// results.rpcs.push(rpc::member_leave(ogid, id, mdid)); +// } +// } + +// for (height, mgid, nm, time) in messages { +// if let Ok(msg) = handle_network_message( +// &layer.read().await.group, +// height, +// id, +// mgid, +// &ogid, +// nm, +// time, +// &base, +// results, +// ) +// .await +// { +// results.rpcs.push(rpc::message_create(ogid, &msg)); +// last_message = Some(msg); +// } +// } + +// if to < height { +// add_layer(results, ogid, sync(gcd, addr, to + 1)); +// } + +// // update group chat height. +// GroupChat::add_height(&db, id, to)?; + +// // UPDATE SESSION. +// if let Some(msg) = last_message { +// if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) { +// update_session(&s_db, &ogid, &id, &msg, results); +// } +// } +// debug!("Over handle sync packed... {}, {}, {}", height, from, to); +// } +// _ => error!("group peer handle event nerver here"), +// } + +// Ok(()) +// } pub(crate) async fn broadcast( + gid: &GroupChatId, + global: &Arc, event: &LayerEvent, - layer: &Arc>, - gcd: &GroupId, results: &mut HandleResult, ) -> Result<()> { - let new_data = bincode::serialize(&event)?; + let new_data = bincode::serialize(event)?; - for (mgid, maddr) in layer.read().await.running(&gcd)?.onlines() { - let s = SendType::Event(0, *maddr, new_data.clone()); - add_server_layer(results, *mgid, s); - debug!("--- DEBUG broadcast to: {:?}", mgid); + for mpid in global.layer.read().await.group(gid)?.addrs.iter() { + let s = SendType::Event(0, *mpid, new_data.clone()); + results.layers.push((GROUP_CHAT_ID, s)); + debug!("--- DEBUG broadcast to: {:?}", mpid); } Ok(()) } // UPDATE SESSION. -pub(crate) fn update_session( - s_db: &DStorage, - gid: &GroupId, - id: &i64, - msg: &Message, - results: &mut HandleResult, -) { +pub(crate) fn update_session(s_db: &DStorage, id: &i64, msg: &Message, results: &mut HandleResult) { let scontent = match msg.m_type { MessageType::String => { format!("{}:{}", msg.m_type.to_int(), msg.content) @@ -658,21 +597,21 @@ pub(crate) fn update_session( ) { results .rpcs - .push(session_last(*gid, &sid, &msg.datetime, &scontent, false)); + .push(session_last(&sid, &msg.datetime, &scontent, false)); } } -pub(crate) fn group_conn(proof: Proof, addr: Peer, gid: GroupId) -> SendType { - let data = bincode::serialize(&LayerConnect(gid, proof)).unwrap_or(vec![]); +pub(crate) fn group_conn(addr: Peer, gid: GroupChatId) -> SendType { + let data = bincode::serialize(&LayerConnect(gid)).unwrap_or(vec![]); SendType::Connect(0, addr, data) } -fn sync(gcd: GroupId, addr: PeerId, height: i64) -> SendType { - let data = bincode::serialize(&LayerEvent::SyncReq(gcd, height + 1)).unwrap_or(vec![]); +fn sync(gid: GroupChatId, addr: PeerId, height: i64) -> SendType { + let data = bincode::serialize(&LayerEvent::SyncReq(gid, height + 1)).unwrap_or(vec![]); SendType::Event(0, addr, data) } -fn sync_online(gcd: GroupId, addr: PeerId) -> SendType { - let data = bincode::serialize(&LayerEvent::MemberOnlineSync(gcd)).unwrap_or(vec![]); +fn sync_online(gid: GroupChatId, addr: PeerId) -> SendType { + let data = bincode::serialize(&LayerEvent::MemberOnlineSync(gid)).unwrap_or(vec![]); SendType::Event(0, addr, data) } diff --git a/src/apps/group/mod.rs b/src/apps/group/mod.rs index 9d08e5f..460ce8b 100644 --- a/src/apps/group/mod.rs +++ b/src/apps/group/mod.rs @@ -1,22 +1,7 @@ mod layer; mod models; -pub use group_types::GROUP_CHAT_ID as GROUP_ID; -use tdn::types::{group::GroupId, message::SendType, primitive::HandleResult}; - -/// Send to group chat service. -#[inline] -pub(crate) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) { - results.layers.push((gid, GROUP_ID, msg)); -} - -/// Send to group chat member. -#[inline] -pub fn add_server_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) { - results.layers.push((GROUP_ID, gid, msg)); -} - pub(crate) mod rpc; -pub(crate) use layer::{group_conn, handle_peer, handle_server}; -pub(crate) use models::GroupChat; +pub(crate) use layer::{group_conn, handle}; +pub(crate) use models::{GroupChat, Member}; pub(crate) use rpc::new_rpc_handler; diff --git a/src/apps/group/models/group.rs b/src/apps/group/models/group.rs index 0c48a2d..fb36820 100644 --- a/src/apps/group/models/group.rs +++ b/src/apps/group/models/group.rs @@ -1,8 +1,8 @@ +use group_types::GroupChatId; use rand::Rng; use std::time::{SystemTime, UNIX_EPOCH}; use tdn::types::{ - group::GroupId, - primitive::{PeerId, Result}, + primitives::{PeerId, Result}, rpc::{json, RpcParam}, }; use tdn_storage::local::{DStorage, DsValue}; @@ -18,11 +18,11 @@ pub(crate) struct GroupChat { /// consensus height. pub height: i64, /// group chat id. - pub g_id: GroupId, + pub gid: GroupChatId, /// group chat server addresse. - pub g_addr: PeerId, + pub addr: PeerId, /// group chat name. - pub g_name: String, + pub name: String, /// group is delete by owner. pub close: bool, /// group is in my device. @@ -30,13 +30,13 @@ pub(crate) struct GroupChat { } impl GroupChat { - pub fn new(g_addr: PeerId, g_name: String) -> Self { - let g_id = GroupId(rand::thread_rng().gen::<[u8; 32]>()); + pub fn new(addr: PeerId, name: String) -> Self { + let gid = rand::thread_rng().gen::(); Self { - g_id, - g_addr, - g_name, + gid, + addr, + name, id: 0, height: 0, close: false, @@ -44,11 +44,11 @@ impl GroupChat { } } - pub fn from(g_id: GroupId, height: i64, g_addr: PeerId, g_name: String) -> Self { + pub fn from(gid: GroupChatId, height: i64, addr: PeerId, name: String) -> Self { Self { - g_id, - g_addr, - g_name, + gid, + addr, + name, height, close: false, local: false, @@ -65,10 +65,10 @@ impl GroupChat { Session::new( self.id, - self.g_id, - self.g_addr, + self.gid.to_string(), + self.addr, SessionType::Group, - self.g_name.clone(), + self.name.clone(), datetime, ) } @@ -76,9 +76,9 @@ impl GroupChat { pub fn to_rpc(&self) -> RpcParam { json!([ self.id, - self.g_id.to_hex(), - self.g_addr.to_hex(), - self.g_name, + self.gid, + self.addr.to_hex(), + self.name, self.close, self.local, ]) @@ -88,9 +88,9 @@ impl GroupChat { Self { local: v.pop().unwrap().as_bool(), close: v.pop().unwrap().as_bool(), - g_name: v.pop().unwrap().as_string(), - g_addr: PeerId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()), - g_id: GroupId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()), + name: v.pop().unwrap().as_string(), + addr: PeerId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()), + gid: v.pop().unwrap().as_i64() as GroupChatId, height: v.pop().unwrap().as_i64(), id: v.pop().unwrap().as_i64(), } @@ -98,7 +98,7 @@ impl GroupChat { pub fn local(db: &DStorage) -> Result> { let matrix = db.query( - "SELECT id, height, gcd, addr, name, is_close, is_local FROM groups WHERE is_local = true", + "SELECT id, height, gid, addr, name, is_close, is_local FROM groups WHERE is_local = true", )?; let mut groups = vec![]; for values in matrix { @@ -109,7 +109,7 @@ impl GroupChat { pub fn all(db: &DStorage) -> Result> { let matrix = - db.query("SELECT id, height, gcd, addr, name, is_close, is_local FROM groups")?; + db.query("SELECT id, height, gid, addr, name, is_close, is_local FROM groups")?; let mut groups = vec![]; for values in matrix { groups.push(Self::from_values(values)); @@ -119,7 +119,7 @@ impl GroupChat { pub fn get(db: &DStorage, id: &i64) -> Result { let sql = format!( - "SELECT id, height, gcd, addr, name, is_close, is_local FROM groups WHERE id = {}", + "SELECT id, height, gid, addr, name, is_close, is_local FROM groups WHERE id = {}", id ); let mut matrix = db.query(&sql)?; @@ -131,11 +131,8 @@ impl GroupChat { } } - pub fn get_id(db: &DStorage, gid: &GroupId) -> Result { - let sql = format!( - "SELECT id, height, gcd, addr, name, is_close, is_local FROM groups WHERE gcd = '{}'", - gid.to_hex() - ); + pub fn get_id(db: &DStorage, gid: &GroupChatId, addr: &PeerId) -> Result { + let sql = format!("SELECT id, height, gid, addr, name, is_close, is_local FROM groups WHERE gid = {} AND addr = '{}'", gid, addr.to_hex()); let mut matrix = db.query(&sql)?; if matrix.len() > 0 { let values = matrix.pop().unwrap(); // safe unwrap() @@ -147,27 +144,20 @@ impl GroupChat { pub fn insert(&mut self, db: &DStorage) -> Result<()> { let mut unique_check = db.query(&format!( - "SELECT id from groups WHERE gcd = '{}'", - self.g_id.to_hex() + "SELECT id from groups WHERE gid = {} AND addr = '{}'", + self.gid, + self.addr.to_hex() ))?; if unique_check.len() > 0 { - let id = unique_check.pop().unwrap().pop().unwrap().as_i64(); - self.id = id; - let sql = format!( - "UPDATE groups SET height = {}, addr='{}', name = '{}' WHERE id = {}", - self.height, - self.g_addr.to_hex(), - self.g_name, - self.id - ); - db.update(&sql)?; + self.gid += 1; + return self.insert(db); } else { let sql = format!( - "INSERT INTO groups (height, gcd, addr, name, is_close, is_local) VALUES ({}, '{}', '{}', '{}', {}, {})", + "INSERT INTO groups (height, gid, addr, name, is_close, is_local) VALUES ({}, {}, '{}', '{}', {}, {})", self.height, - self.g_id.to_hex(), - self.g_addr.to_hex(), - self.g_name, + self.gid, + self.addr.to_hex(), + self.name, self.close, self.local, ); @@ -187,8 +177,8 @@ impl GroupChat { db.update(&sql) } - pub fn close(db: &DStorage, gcd: &GroupId) -> Result { - let group = Self::get_id(db, gcd)?; + pub fn close(db: &DStorage, gid: &GroupChatId, addr: &PeerId) -> Result { + let group = Self::get_id(db, gid, addr)?; let sql = format!("UPDATE groups SET is_close = true WHERE id = {}", group.id); db.update(&sql)?; Ok(group) diff --git a/src/apps/group/models/member.rs b/src/apps/group/models/member.rs index 930f5d4..685076f 100644 --- a/src/apps/group/models/member.rs +++ b/src/apps/group/models/member.rs @@ -1,7 +1,7 @@ +use esse_primitives::{id_from_str, id_to_str}; use std::path::PathBuf; use tdn::types::{ - group::GroupId, - primitive::{PeerId, Result}, + primitives::{PeerId, Result}, rpc::{json, RpcParam}, }; use tdn_storage::local::{DStorage, DsValue}; @@ -16,36 +16,32 @@ pub(crate) struct Member { pub height: i64, /// group's db id. pub fid: i64, - /// member's Did(GroupId) - pub m_id: GroupId, - /// member's addresse. - pub m_addr: PeerId, + /// member's Did(PeerId) + pub pid: PeerId, /// member's name. - pub m_name: String, + pub name: String, /// if leave from group. pub leave: bool, } impl Member { - pub fn new(height: i64, fid: i64, m_id: GroupId, m_addr: PeerId, m_name: String) -> Self { + pub fn new(height: i64, fid: i64, pid: PeerId, name: String) -> Self { Self { height, fid, - m_id, - m_addr, - m_name, + pid, + name, leave: false, id: 0, } } - pub fn info(id: i64, fid: i64, m_id: GroupId, m_addr: PeerId, m_name: String) -> Self { + pub fn info(id: i64, fid: i64, pid: PeerId, name: String) -> Self { Self { id, fid, - m_id, - m_addr, - m_name, + pid, + name, leave: false, height: 0, } @@ -55,9 +51,8 @@ impl Member { json!([ self.id, self.fid, - self.m_id.to_hex(), - self.m_addr.to_hex(), - self.m_name, + id_to_str(&self.pid), + self.name, self.leave, ]) } @@ -65,9 +60,8 @@ impl Member { fn from_values(mut v: Vec) -> Self { Self { leave: v.pop().unwrap().as_bool(), - m_name: v.pop().unwrap().as_string(), - m_addr: PeerId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()), - m_id: GroupId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()), + name: v.pop().unwrap().as_string(), + pid: id_from_str(v.pop().unwrap().as_str()).unwrap_or(Default::default()), fid: v.pop().unwrap().as_i64(), height: v.pop().unwrap().as_i64(), id: v.pop().unwrap().as_i64(), @@ -76,7 +70,7 @@ impl Member { pub fn list(db: &DStorage, fid: &i64) -> Result> { let matrix = db.query(&format!( - "SELECT id, height, fid, mid, addr, name, leave FROM members WHERE fid = {}", + "SELECT id, height, fid, pid, name, leave FROM members WHERE fid = {}", fid ))?; let mut groups = vec![]; @@ -88,28 +82,24 @@ impl Member { pub fn insert(&mut self, db: &DStorage) -> Result<()> { let mut unique_check = db.query(&format!( - "SELECT id from members WHERE fid = {} AND mid = '{}'", + "SELECT id from members WHERE fid = {} AND pid = '{}'", self.fid, - self.m_id.to_hex() + id_to_str(&self.pid) ))?; if unique_check.len() > 0 { let id = unique_check.pop().unwrap().pop().unwrap().as_i64(); self.id = id; let sql = format!( - "UPDATE members SET height = {}, addr='{}', name = '{}', leave = false WHERE id = {}", - self.height, - self.m_addr.to_hex(), - self.m_name, - self.id, + "UPDATE members SET height = {}, name = '{}', leave = false WHERE id = {}", + self.height, self.name, self.id, ); db.update(&sql)?; } else { - let sql = format!("INSERT INTO members (height, fid, mid, addr, name, leave) VALUES ({}, {}, '{}', '{}', '{}', false)", + let sql = format!("INSERT INTO members (height, fid, pid, name, leave) VALUES ({}, {}, '{}', '{}', false)", self.height, self.fid, - self.m_id.to_hex(), - self.m_addr.to_hex(), - self.m_name, + id_to_str(&self.pid), + self.name, ); let id = db.insert(&sql)?; self.id = id; @@ -119,7 +109,7 @@ impl Member { pub fn _get(db: &DStorage, id: &i64) -> Result { let mut matrix = db.query(&format!( - "SELECT id, height, fid, mid, addr, name, leave FROM members WHERE id = {}", + "SELECT id, height, fid, pid, name, leave FROM members WHERE id = {}", id, ))?; if matrix.len() > 0 { @@ -129,11 +119,11 @@ impl Member { } } - pub fn get_id(db: &DStorage, fid: &i64, gid: &GroupId) -> Result { + pub fn get_id(db: &DStorage, fid: &i64, pid: &PeerId) -> Result { let mut matrix = db.query(&format!( - "SELECT id FROM members WHERE fid = {} AND mid = '{}'", + "SELECT id FROM members WHERE fid = {} AND pid = '{}'", fid, - gid.to_hex() + id_to_str(pid) ))?; if matrix.len() > 0 { Ok(matrix.pop().unwrap().pop().unwrap().as_i64()) // safe unwrap. @@ -142,31 +132,10 @@ impl Member { } } - pub fn addr_update(db: &DStorage, fid: &i64, mid: &GroupId, addr: &PeerId) -> Result { - let mdid = Self::get_id(db, fid, mid)?; + pub fn update(db: &DStorage, id: &i64, height: &i64, name: &str) -> Result { let sql = format!( - "UPDATE members SET addr='{}' WHERE fid = {} AND mid = '{}'", - addr.to_hex(), - fid, - mid.to_hex(), - ); - db.update(&sql)?; - Ok(mdid) - } - - pub fn update( - db: &DStorage, - id: &i64, - height: &i64, - addr: &PeerId, - name: &str, - ) -> Result { - let sql = format!( - "UPDATE members SET height = {}, addr='{}', name='{}' WHERE id = {}", - height, - addr.to_hex(), - name, - id, + "UPDATE members SET height = {}, name='{}' WHERE id = {}", + height, name, id, ); db.update(&sql) } @@ -186,26 +155,23 @@ impl Member { pub async fn sync( base: &PathBuf, - gid: &GroupId, + gid: &PeerId, db: &DStorage, fid: &i64, from: &i64, to: &i64, - ) -> Result<( - Vec<(i64, GroupId, PeerId, String, Vec)>, - Vec<(i64, GroupId)>, - )> { - let sql = format!("SELECT id, height, fid, mid, addr, name, leave FROM members WHERE fid = {} AND height BETWEEN {} AND {}", fid, from, to); + ) -> Result<(Vec<(i64, PeerId, String, Vec)>, Vec<(i64, PeerId)>)> { + let sql = format!("SELECT id, height, fid, pid, name, leave FROM members WHERE fid = {} AND height BETWEEN {} AND {}", fid, from, to); let matrix = db.query(&sql)?; let mut adds = vec![]; let mut leaves = vec![]; for values in matrix { let m = Self::from_values(values); if m.leave { - leaves.push((m.height, m.m_id)); + leaves.push((m.height, m.pid)); } else { - let mavatar = read_avatar(base, gid, &m.m_id).await.unwrap_or(vec![]); - adds.push((m.height, m.m_id, m.m_addr, m.m_name, mavatar)) + let mavatar = read_avatar(base, gid, &m.pid).await.unwrap_or(vec![]); + adds.push((m.height, m.pid, m.name, mavatar)) } } Ok((adds, leaves)) diff --git a/src/apps/group/models/message.rs b/src/apps/group/models/message.rs index 963a64f..921d118 100644 --- a/src/apps/group/models/message.rs +++ b/src/apps/group/models/message.rs @@ -1,19 +1,17 @@ +use esse_primitives::id_from_str; use std::collections::HashMap; use std::path::PathBuf; -use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tdn::types::{ - group::GroupId, - primitive::{HandleResult, Result}, + primitives::{HandleResult, PeerId, Result}, rpc::{json, RpcParam}, }; use tdn_storage::local::{DStorage, DsValue}; -use tokio::sync::RwLock; use chat_types::{MessageType, NetworkMessage}; use crate::apps::chat::{from_network_message, raw_to_network_message, to_network_message as tnm}; -use crate::group::Group; +use crate::storage::group_db; use super::Member; @@ -140,19 +138,18 @@ impl Message { } pub async fn sync( + own: &PeerId, base: &PathBuf, - gid: &GroupId, db: &DStorage, fid: &i64, from: &i64, to: &i64, - ) -> Result> { - let sql = format!("SELECT id, mid FROM members WHERE fid = {}", fid); + ) -> Result> { + let sql = format!("SELECT id, pid FROM members WHERE fid = {}", fid); let m = db.query(&sql)?; let mut members = HashMap::new(); for mut v in m { - let m_s = v.pop().unwrap().as_string(); - let mid = GroupId::from_hex(m_s).unwrap_or(Default::default()); + let mid = id_from_str(v.pop().unwrap().as_str()).unwrap_or(Default::default()); let id = v.pop().unwrap().as_i64(); members.insert(id, mid); } @@ -162,8 +159,8 @@ impl Message { let mut messages = vec![]; for values in matrix { let msg = Message::from_values(values); - if let Ok(nmsg) = tnm(base, gid, msg.m_type, msg.content).await { - let mid = members.get(&msg.mid).cloned().unwrap_or(GroupId::default()); + if let Ok(nmsg) = tnm(own, base, msg.m_type, msg.content).await { + let mid = members.get(&msg.mid).cloned().unwrap_or(PeerId::default()); messages.push((msg.height, mid, nmsg, msg.datetime)) } } @@ -173,9 +170,9 @@ impl Message { } pub(crate) async fn to_network_message( - group: &Arc>, + own: &PeerId, base: &PathBuf, - gid: &GroupId, + db_key: &str, mtype: MessageType, content: &str, ) -> Result<(NetworkMessage, i64, String)> { @@ -185,25 +182,25 @@ pub(crate) async fn to_network_message( .map(|s| s.as_secs()) .unwrap_or(0) as i64; // safe for all life. - let (nmsg, raw) = raw_to_network_message(group, base, gid, &mtype, content).await?; + let (nmsg, raw) = raw_to_network_message(own, base, db_key, &mtype, content).await?; Ok((nmsg, datetime, raw)) } pub(crate) async fn handle_network_message( - group: &Arc>, + own: &PeerId, + base: &PathBuf, + db_key: &str, height: i64, gdid: i64, - mid: GroupId, - mgid: &GroupId, + mid: PeerId, msg: NetworkMessage, datetime: i64, - base: &PathBuf, results: &mut HandleResult, ) -> Result { - let db = group.read().await.group_db(mgid)?; + let db = group_db(base, own, db_key)?; let mdid = Member::get_id(&db, &gdid, &mid)?; - let is_me = &mid == mgid; - let (m_type, raw) = from_network_message(group, msg, base, mgid, results).await?; + let is_me = &mid == own; + let (m_type, raw) = from_network_message(own, base, db_key, msg, results).await?; let mut msg = Message::new_with_time(height, gdid, mdid, is_me, m_type, raw, datetime); msg.insert(&db)?; Ok(msg) diff --git a/src/apps/group/rpc.rs b/src/apps/group/rpc.rs index 812491c..12096d3 100644 --- a/src/apps/group/rpc.rs +++ b/src/apps/group/rpc.rs @@ -1,57 +1,49 @@ +use chat_types::{MessageType, CHAT_ID}; +use group_types::{Event, LayerEvent, GROUP_CHAT_ID}; use std::sync::Arc; use tdn::types::{ - group::GroupId, - message::{NetworkType, SendMessage, SendType}, - primitive::{HandleResult, PeerId}, + message::{RpcSendMessage, SendType}, + primitives::{HandleResult, PeerId}, rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, }; -use chat_types::MessageType; -use group_types::{Event, LayerEvent}; - -use crate::apps::chat::{Friend, InviteType}; -use crate::layer::Online; -use crate::rpc::{session_create, session_delete, session_update_name, RpcState}; +use crate::apps::chat::{raw_to_network_message, Friend, InviteType}; +use crate::global::Global; +use crate::rpc::{session_create, session_delete, session_update_name}; use crate::session::{Session, SessionType}; -use crate::storage::{read_avatar, write_avatar}; +use crate::storage::{chat_db, group_db, read_avatar, session_db, write_avatar}; use super::layer::{broadcast, update_session}; use super::models::{to_network_message, GroupChat, Member, Message}; -use super::{add_layer, add_server_layer}; #[inline] -pub(crate) fn member_join(mgid: GroupId, member: &Member) -> RpcParam { - rpc_response(0, "group-member-join", json!(member.to_rpc()), mgid) +pub(crate) fn member_join(member: &Member) -> RpcParam { + rpc_response(0, "group-member-join", json!(member.to_rpc())) } #[inline] -pub(crate) fn member_leave(mgid: GroupId, id: i64, mid: i64) -> RpcParam { - rpc_response(0, "group-member-leave", json!([id, mid]), mgid) +pub(crate) fn member_leave(id: i64, mid: i64) -> RpcParam { + rpc_response(0, "group-member-leave", json!([id, mid])) } #[inline] -pub(crate) fn member_online(mgid: GroupId, id: i64, mid: i64, maddr: &PeerId) -> RpcParam { - rpc_response( - 0, - "group-member-online", - json!([id, mid, maddr.to_hex()]), - mgid, - ) +pub(crate) fn member_online(id: i64, mid: i64) -> RpcParam { + rpc_response(0, "group-member-online", json!([id, mid])) } #[inline] -pub(crate) fn member_offline(mgid: GroupId, gid: i64, mid: i64) -> RpcParam { - rpc_response(0, "group-member-offline", json!([gid, mid]), mgid) +pub(crate) fn member_offline(id: i64, mid: i64) -> RpcParam { + rpc_response(0, "group-member-offline", json!([id, mid])) } #[inline] -pub(crate) fn group_name(mgid: GroupId, gid: &i64, name: &str) -> RpcParam { - rpc_response(0, "group-name", json!([gid, name]), mgid) +pub(crate) fn group_name(id: &i64, name: &str) -> RpcParam { + rpc_response(0, "group-name", json!([id, name])) } #[inline] -pub(crate) fn message_create(mgid: GroupId, msg: &Message) -> RpcParam { - rpc_response(0, "group-message-create", json!(msg.to_rpc()), mgid) +pub(crate) fn message_create(msg: &Message) -> RpcParam { + rpc_response(0, "group-message-create", json!(msg.to_rpc())) } #[inline] @@ -79,20 +71,26 @@ fn detail_list(group: GroupChat, members: Vec, messages: Vec) - json!([group.to_rpc(), member_results, message_results]) } -pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "group-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let db = state.group.read().await.group_db(&gid)?; + |_params: Vec, state: Arc| async move { + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = group_db(&state.base, &pid, &db_key)?; + Ok(HandleResult::rpc(group_list(GroupChat::all(&db)?))) }, ); handler.add_method( "group-detail", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; - let db = state.group.read().await.group_db(&gid)?; + + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = group_db(&state.base, &pid, &db_key)?; let group = GroupChat::get(&db, &id)?; let members = Member::list(&db, &id)?; let messages = Message::list(&db, &id)?; @@ -102,65 +100,52 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "group-create", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned(); + let pid = state.pid().await; let group_lock = state.group.read().await; - let base = group_lock.base().clone(); - let addr = group_lock.addr().clone(); - let sender = group_lock.sender(); - let me = group_lock.clone_user(&gid)?; - let db = group_lock.group_db(&gid)?; - let s_db = group_lock.session_db(&gid)?; + let db_key = group_lock.db_key(&pid)?; + let me = group_lock.clone_user(&pid)?; drop(group_lock); - let mut gc = GroupChat::new(addr, name); - let gcd = gc.g_id; - let gheight = gc.height + 1; // add first member. + let db = group_db(&state.base, &pid, &db_key)?; + let s_db = session_db(&state.base, &pid, &db_key)?; + + let mut gc = GroupChat::new(pid, name); + let gh = gc.height + 1; // add first member. // save db gc.insert(&db)?; - let gdid = gc.id; + let id = gc.id; + let gid = gc.gid; let mut results = HandleResult::new(); - let mut m = Member::new(gheight, gc.id, gid, me.addr, me.name); + let mut m = Member::new(gh, id, pid, me.name); m.insert(&db)?; let mid = m.id; - let _ = write_avatar(&base, &gid, &gid, &me.avatar).await; + let _ = write_avatar(&state.base, &pid, &pid, &me.avatar).await; // Add new session. let mut session = gc.to_session(); session.insert(&s_db)?; let sid = session.id; + let sender = state.rpc_send.clone(); tokio::spawn(async move { let _ = sender - .send(SendMessage::Rpc(0, session_create(gid, &session), true)) + .send(RpcSendMessage(0, session_create(&session), true)) .await; }); // add to rpcs. - results.rpcs.push(json!([sid, gdid])); + results.rpcs.push(json!([sid, id])); // Add frist member join. - let mut layer_lock = state.layer.write().await; - layer_lock.add_running(&gcd, gid, gdid, gheight)?; - - // Add online to layers. - layer_lock - .running_mut(&gcd)? - .check_add_online(gid, Online::Direct(addr), gdid, mid)?; - layer_lock - .running_mut(&gid)? - .check_add_online(gcd, Online::Direct(addr), sid, gdid)?; - - drop(layer_lock); + state.layer.write().await.group_add(gid, pid, sid, id, gh); // Update consensus. - GroupChat::add_height(&db, gdid, gheight)?; - - // Online local group. - results.networks.push(NetworkType::AddGroup(gcd)); + GroupChat::add_height(&db, id, gh)?; Ok(results) }, @@ -168,65 +153,60 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "group-member-join", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let fid = params[1].as_i64().ok_or(RpcError::ParseError)?; - let group_lock = state.group.read().await; - let base = group_lock.base().clone(); - let chat_db = group_lock.chat_db(&gid)?; - let group_db = group_lock.group_db(&gid)?; - let s_db = group_lock.session_db(&gid)?; - drop(group_lock); + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let group_db = group_db(&state.base, &pid, &db_key)?; + let chat_db = chat_db(&state.base, &pid, &db_key)?; + let s_db = session_db(&state.base, &pid, &db_key)?; + let f = Friend::get(&chat_db, &fid)?; let g = GroupChat::get(&group_db, &id)?; - let gcd = g.g_id; + let gid = g.gid; let mut results = HandleResult::new(); // handle invite message - let contact_values = InviteType::Group(gcd, g.g_addr, g.g_name).serialize(); - let (msg, nw) = crate::apps::chat::LayerEvent::from_message( - &state.group, - &base, - gid, - fid, - MessageType::Invite, - &contact_values, - ) - .await?; - let event = crate::apps::chat::LayerEvent::Message(msg.hash, nw); - let mut layer_lock = state.layer.write().await; - let s = crate::apps::chat::event_message(&mut layer_lock, msg.id, gid, f.addr, &event); - drop(layer_lock); - results.layers.push((gid, f.gid, s)); - crate::apps::chat::update_session(&s_db, &gid, &id, &msg, &mut results); + let contact = InviteType::Group(gid, g.addr, g.name).serialize(); + let m_type = MessageType::Invite; + let (nm, raw) = + raw_to_network_message(&pid, &state.base, &db_key, &m_type, &contact).await?; + let mut msg = crate::apps::chat::Message::new(&pid, f.id, true, m_type, raw, false); + msg.insert(&chat_db)?; + let event = crate::apps::chat::LayerEvent::Message(msg.hash, nm); + let tid = state.layer.write().await.delivery(msg.id); + let data = bincode::serialize(&event).unwrap_or(vec![]); + let lmsg = SendType::Event(tid, f.pid, data); + results.layers.push((CHAT_ID, lmsg)); + + // update session. + crate::apps::chat::update_session(&s_db, &id, &msg, &mut results); // handle group member - let avatar = read_avatar(&base, &gid, &f.gid).await.unwrap_or(vec![]); - let event = Event::MemberJoin(f.gid, f.addr, f.name.clone(), avatar); + let avatar = read_avatar(&state.base, &pid, &f.pid) + .await + .unwrap_or(vec![]); + let event = Event::MemberJoin(f.pid, f.name.clone(), avatar); if g.local { // local save. - let new_h = state.layer.write().await.running_mut(&gcd)?.increased(); + let new_h = state.layer.write().await.group_increased(&gid)?; - let mut mem = Member::new(new_h, g.id, f.gid, f.addr, f.name); + let mut mem = Member::new(new_h, g.id, f.pid, f.name); mem.insert(&group_db)?; results.rpcs.push(mem.to_rpc()); GroupChat::add_height(&group_db, id, new_h)?; // broadcast. - broadcast( - &LayerEvent::Sync(gcd, new_h, event), - &state.layer, - &gcd, - &mut results, - ) - .await?; + let data = LayerEvent::Sync(gid, new_h, event); + broadcast(&gid, &state, &data, &mut results).await?; } else { // send to server. - let data = bincode::serialize(&LayerEvent::Sync(gcd, 0, event))?; - let msg = SendType::Event(0, g.g_addr, data); - add_layer(&mut results, gid, msg); + let data = bincode::serialize(&LayerEvent::Sync(gid, 0, event))?; + let msg = SendType::Event(0, g.addr, data); + results.layers.push((GROUP_CHAT_ID, msg)); } Ok(results) @@ -235,29 +215,28 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "group-message-create", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let m_type = MessageType::from_int(params[1].as_i64().ok_or(RpcError::ParseError)?); let m_content = params[2].as_str().ok_or(RpcError::ParseError)?; - let group_lock = state.group.read().await; - let base = group_lock.base().clone(); - let db = group_lock.group_db(&gid)?; - let s_db = group_lock.session_db(&gid)?; - drop(group_lock); + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = group_db(&state.base, &pid, &db_key)?; + let s_db = session_db(&state.base, &pid, &db_key)?; let group = GroupChat::get(&db, &id)?; - let gcd = group.g_id; - let mid = Member::get_id(&db, &id, &gid)?; + let gid = group.gid; + let mid = Member::get_id(&db, &id, &pid)?; let mut results = HandleResult::new(); let (nmsg, datetime, raw) = - to_network_message(&state.group, &base, &gid, m_type, m_content).await?; - let event = Event::MessageCreate(gid, nmsg, datetime); + to_network_message(&pid, &state.base, &db_key, m_type, m_content).await?; + let event = Event::MessageCreate(pid, nmsg, datetime); if group.local { // local save. - let new_h = state.layer.write().await.running_mut(&gcd)?.increased(); + let new_h = state.layer.write().await.group_increased(&gid)?; let mut msg = Message::new_with_time(new_h, id, mid, true, m_type, raw, datetime); msg.insert(&db)?; @@ -265,21 +244,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { GroupChat::add_height(&db, id, new_h)?; // UPDATE SESSION. - update_session(&s_db, &gid, &id, &msg, &mut results); + update_session(&s_db, &id, &msg, &mut results); // broadcast. - broadcast( - &LayerEvent::Sync(gcd, new_h, event), - &state.layer, - &gcd, - &mut results, - ) - .await?; + let data = LayerEvent::Sync(gid, new_h, event); + broadcast(&gid, &state, &data, &mut results).await?; } else { // send to server. - let data = bincode::serialize(&LayerEvent::Sync(gcd, 0, event))?; - let msg = SendType::Event(0, group.g_addr, data); - add_layer(&mut results, gid, msg); + let data = bincode::serialize(&LayerEvent::Sync(gid, 0, event))?; + let msg = SendType::Event(0, group.addr, data); + results.layers.push((GROUP_CHAT_ID, msg)); } Ok(results) @@ -288,35 +262,31 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "group-name", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let name = params[1].as_str().ok_or(RpcError::ParseError)?; let mut results = HandleResult::new(); - let group_lock = state.group.read().await; - let db = group_lock.group_db(&gid)?; - let s_db = group_lock.session_db(&gid)?; - drop(group_lock); + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = group_db(&state.base, &pid, &db_key)?; + let s_db = session_db(&state.base, &pid, &db_key)?; let g = GroupChat::get(&db, &id)?; - let d = bincode::serialize(&LayerEvent::GroupName(g.g_id, name.to_owned()))?; + let data = LayerEvent::GroupName(g.gid, name.to_owned()); if g.local { if let Ok(sid) = Session::update_name_by_id(&s_db, &id, &SessionType::Group, &name) { - results.rpcs.push(session_update_name(gid, &sid, &name)); + results.rpcs.push(session_update_name(&sid, &name)); } results.rpcs.push(json!([id, name])); - // dissolve group. - for (mgid, maddr) in state.layer.read().await.running(&g.g_id)?.onlines() { - let s = SendType::Event(0, *maddr, d.clone()); - add_server_layer(&mut results, *mgid, s); - } + broadcast(&g.gid, &state, &data, &mut results).await?; } else { - // leave group. - let msg = SendType::Event(0, g.g_addr, d); - add_layer(&mut results, gid, msg); + let d = bincode::serialize(&data)?; + let msg = SendType::Event(0, g.addr, d); + results.layers.push((GROUP_CHAT_ID, msg)); } Ok(results) @@ -325,33 +295,33 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "group-delete", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let mut results = HandleResult::new(); - - let group_lock = state.group.read().await; - let db = group_lock.group_db(&gid)?; - let s_db = group_lock.session_db(&gid)?; - drop(group_lock); + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = group_db(&state.base, &pid, &db_key)?; + let s_db = session_db(&state.base, &pid, &db_key)?; let g = GroupChat::delete(&db, &id)?; - let sid = Session::delete(&s_db, &id, &SessionType::Group)?; - results.rpcs.push(session_delete(gid, &sid)); + results.rpcs.push(session_delete(&sid)); if g.local { // dissolve group. - let d = bincode::serialize(&LayerEvent::GroupClose(g.g_id))?; - for (mgid, maddr) in state.layer.read().await.running(&g.g_id)?.onlines() { - let s = SendType::Event(0, *maddr, d.clone()); - add_server_layer(&mut results, *mgid, s); + let data = bincode::serialize(&LayerEvent::GroupClose(g.gid))?; + if let Some(addrs) = state.layer.write().await.group_rm_online(&g.gid) { + for addr in addrs { + let s = SendType::Event(0, addr, data.clone()); + results.layers.push((GROUP_CHAT_ID, s)); + } } } else { // leave group. - let d = bincode::serialize(&LayerEvent::Sync(g.g_id, 0, Event::MemberLeave(gid)))?; - let msg = SendType::Event(0, g.g_addr, d); - add_layer(&mut results, gid, msg); + let d = bincode::serialize(&LayerEvent::Sync(g.gid, 0, Event::MemberLeave(pid)))?; + let msg = SendType::Event(0, g.addr, d); + results.layers.push((GROUP_CHAT_ID, msg)); } Ok(results) diff --git a/src/apps/wallet/models.rs b/src/apps/wallet/models.rs index 9c08903..e1cb894 100644 --- a/src/apps/wallet/models.rs +++ b/src/apps/wallet/models.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use tdn::types::{ - primitive::Result, + primitives::Result, rpc::{json, RpcParam}, }; diff --git a/src/apps/wallet/rpc.rs b/src/apps/wallet/rpc.rs index 1ae16b4..e269863 100644 --- a/src/apps/wallet/rpc.rs +++ b/src/apps/wallet/rpc.rs @@ -1,8 +1,7 @@ use std::sync::Arc; use tdn::types::{ - group::GroupId, - message::SendMessage, - primitive::{HandleResult, Result}, + message::RpcSendMessage, + primitives::{HandleResult, Result}, rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, }; use tdn_did::{generate_btc_account, generate_eth_account, secp256k1::SecretKey}; @@ -16,7 +15,9 @@ use web3::{ Web3, }; -use crate::rpc::RpcState; +use crate::global::Global; +use crate::storage::{account_db, wallet_db}; +use crate::utils::crypto::{decrypt, encrypt}; use super::{ models::{Address, Balance, ChainToken, Network, Token}, @@ -42,34 +43,25 @@ fn token_list(network: Network, tokens: Vec) -> RpcParam { } #[inline] -fn res_balance( - gid: GroupId, - address: &str, - network: &Network, - balance: &str, - token: Option<&Token>, -) -> RpcParam { +fn res_balance(address: &str, network: &Network, balance: &str, token: Option<&Token>) -> RpcParam { if let Some(t) = token { rpc_response( 0, "wallet-balance", json!([address, network.to_i64(), balance, t.to_rpc()]), - gid, ) } else { rpc_response( 0, "wallet-balance", json!([address, network.to_i64(), balance]), - gid, ) } } async fn loop_token( - sender: Sender, + sender: Sender, db: DStorage, - gid: GroupId, network: Network, address: String, c_token: Option, @@ -83,8 +75,8 @@ async fn loop_token( let transport = Http::new(node)?; let web3 = Web3::new(transport); let balance = token_balance(&web3, &token.contract, &address, &token.chain).await?; - let res = res_balance(gid, &address, &network, &balance, Some(&token)); - sender.send(SendMessage::Rpc(0, res, true)).await?; + let res = res_balance(&address, &network, &balance, Some(&token)); + sender.send(RpcSendMessage(0, res, true)).await?; } else { match chain { ChainToken::ETH => { @@ -93,19 +85,19 @@ async fn loop_token( let balance = web3.eth().balance(address.parse()?, None).await?; let balance = balance.to_string(); let _ = Address::update_balance(&db, &address, &network, &balance); - let res = res_balance(gid, &address, &network, &balance, None); - sender.send(SendMessage::Rpc(0, res, true)).await?; + let res = res_balance(&address, &network, &balance, None); + sender.send(RpcSendMessage(0, res, true)).await?; for token in tokens { //tokio::time::sleep(std::time::Duration::from_secs(1)).await; let balance = token_balance(&web3, &token.contract, &address, &token.chain).await?; - let res = res_balance(gid, &address, &network, &balance, Some(&token)); + let res = res_balance(&address, &network, &balance, Some(&token)); // update & clean balances. // TODO - sender.send(SendMessage::Rpc(0, res, true)).await?; + sender.send(RpcSendMessage(0, res, true)).await?; } } ChainToken::BTC => { @@ -119,9 +111,8 @@ async fn loop_token( } async fn token_check( - sender: Sender, + sender: Sender, db: DStorage, - gid: GroupId, chain: ChainToken, network: Network, address: String, @@ -160,8 +151,8 @@ async fn token_check( .query("balanceOf", (account,), None, Default::default(), None) .await?; let balance = balance.to_string(); - let res = res_balance(gid, &address, &network, &balance, Some(&token)); - sender.send(SendMessage::Rpc(0, res, true)).await?; + let res = res_balance(&address, &network, &balance, Some(&token)); + sender.send(RpcSendMessage(0, res, true)).await?; Ok(()) } @@ -320,15 +311,18 @@ async fn nft_check(node: &str, c_str: &str, hash: &str) -> Result { Ok(format!("{:?}", owner)) } -pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { - handler.add_method("wallet-echo", |_, params, _| async move { +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("wallet-echo", |params, _| async move { Ok(HandleResult::rpc(json!(params))) }); handler.add_method( "wallet-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let db = state.group.read().await.wallet_db(&gid)?; + |_params: Vec, state: Arc| async move { + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = wallet_db(&state.base, &pid, &db_key)?; + let addresses = Address::list(&db)?; Ok(HandleResult::rpc(wallet_list(addresses))) }, @@ -336,17 +330,20 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "wallet-generate", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let lock = params[1].as_str().ok_or(RpcError::ParseError)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = wallet_db(&state.base, &pid, &db_key)?; + let group_lock = state.group.read().await; - let mnemonic = group_lock.mnemonic(&gid, lock)?; - let account = group_lock.account(&gid)?; + let mnemonic = group_lock.mnemonic(&pid, lock, &state.secret)?; + let account = group_lock.account(&pid)?; let lang = account.lang(); let pass = account.pass.to_string(); let account_index = account.index as u32; - let db = group_lock.wallet_db(&gid)?; drop(group_lock); let mut results = HandleResult::new(); @@ -373,16 +370,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { address.insert(&db)?; results.rpcs.push(address.to_rpc()); if address.main { + let a_db = account_db(&state.base, &state.secret)?; let mut group_lock = state.group.write().await; - let a_db = group_lock.account_db()?; - let account = group_lock.account_mut(&gid)?; + let account = group_lock.account_mut(&pid)?; account.wallet = address.chain.update_main(&address.address, &account.wallet); account.pub_height = account.pub_height + 1; account.update_info(&a_db)?; - let user = group_lock.clone_user(&gid)?; + let user = group_lock.clone_user(&pid)?; drop(group_lock); - // broadcast all friends. + // broadcast to all friends. state.layer.read().await.broadcast(user, &mut results); } Ok(results) @@ -391,7 +388,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "wallet-import", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let secret = params[1].as_str().ok_or(RpcError::ParseError)?; let lock = params[2].as_str().ok_or(RpcError::ParseError)?; @@ -399,11 +396,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let sk: SecretKey = secret.parse().or(Err(RpcError::ParseError))?; let addr = format!("{:?}", (&sk).address()); + let pid = state.pid().await; + let group_lock = state.group.read().await; - let cbytes = group_lock.encrypt(&gid, lock, sk.as_ref())?; - let db = group_lock.wallet_db(&gid)?; + let ckey = &group_lock.account(&pid)?.encrypt; + let db_key = group_lock.db_key(&pid)?; + let cbytes = encrypt(&state.secret, lock, ckey, sk.as_ref())?; drop(group_lock); + let db = wallet_db(&state.base, &pid, &db_key)?; + let mut address = Address::import(chain, addr, cbytes); address.insert(&db)?; Ok(HandleResult::rpc(address.to_rpc())) @@ -412,14 +414,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "wallet-token", - |gid: GroupId, params: Vec, state: Arc| async move { - let network = Network::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); + |params: Vec, state: Arc| async move { + let net = Network::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let address = params[1].as_str().ok_or(RpcError::ParseError)?.to_owned(); - let group_lock = state.group.read().await; - let db = group_lock.wallet_db(&gid)?; - let sender = group_lock.sender(); - drop(group_lock); + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = wallet_db(&state.base, &pid, &db_key)?; let c_str = if params.len() == 4 { let cid = params[2].as_i64().ok_or(RpcError::ParseError)?; @@ -429,34 +430,32 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { None }; - let tokens = Token::list(&db, &network)?; - tokio::spawn(loop_token(sender, db, gid, network, address, c_str)); - Ok(HandleResult::rpc(token_list(network, tokens))) + let tokens = Token::list(&db, &net)?; + tokio::spawn(loop_token(state.rpc_send.clone(), db, net, address, c_str)); + Ok(HandleResult::rpc(token_list(net, tokens))) }, ); handler.add_method( "wallet-token-import", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); - let network = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?); - let address = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned(); - let c_str = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned(); + let net = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?); + let addr = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned(); + let c = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned(); - let group_lock = state.group.read().await; - let db = group_lock.wallet_db(&gid)?; - let sender = group_lock.sender(); - drop(group_lock); - - tokio::spawn(token_check(sender, db, gid, chain, network, address, c_str)); + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = wallet_db(&state.base, &pid, &db_key)?; + tokio::spawn(token_check(state.rpc_send.clone(), db, chain, net, addr, c)); Ok(HandleResult::new()) }, ); handler.add_method( "wallet-gas-price", - |_gid: GroupId, params: Vec, _state: Arc| async move { + |params: Vec, _state: Arc| async move { let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let network = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?); let from = params[2].as_str().ok_or(RpcError::ParseError)?; @@ -471,7 +470,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "wallet-transfer", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let network = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?); let from = params[2].as_i64().ok_or(RpcError::ParseError)?; @@ -480,20 +479,23 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let c_str = params[5].as_str().ok_or(RpcError::ParseError)?; let lock = params[6].as_str().ok_or(RpcError::ParseError)?; + let pid = state.pid().await; let group_lock = state.group.read().await; - if !group_lock.check_lock(&gid, &lock) { + if !group_lock.check_lock(&pid, &lock) { return Err(RpcError::Custom("Lock is invalid!".to_owned())); } - let db = group_lock.wallet_db(&gid)?; + let db_key = group_lock.db_key(&pid)?; + let db = wallet_db(&state.base, &pid, &db_key)?; let address = Address::get(&db, &from)?; let (mnemonic, pbytes) = if address.is_gen() { - (group_lock.mnemonic(&gid, lock)?, vec![]) + (group_lock.mnemonic(&pid, lock, &state.secret)?, vec![]) } else { - let pbytes = group_lock.decrypt(&gid, lock, address.secret.as_ref())?; + let ckey = &group_lock.account(&pid)?.encrypt; + let pbytes = decrypt(&state.secret, lock, ckey, address.secret.as_ref())?; (String::new(), pbytes) }; - let account = group_lock.account(&gid)?; + let account = group_lock.account(&pid)?; let lang = account.lang(); let pass = account.pass.to_string(); let account_index = account.index as u32; @@ -553,11 +555,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "wallet-nft", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let address = params[0].as_i64().ok_or(RpcError::ParseError)?; let token = params[1].as_i64().ok_or(RpcError::ParseError)?; - let db = state.group.read().await.wallet_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = wallet_db(&state.base, &pid, &db_key)?; let nfts = Balance::list(&db, &address, &token)?; let mut results = vec![]; @@ -570,12 +574,14 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "wallet-nft-add", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let address = params[0].as_i64().ok_or(RpcError::ParseError)?; let token = params[1].as_i64().ok_or(RpcError::ParseError)?; let hash = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned(); - let db = state.group.read().await.wallet_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = wallet_db(&state.base, &pid, &db_key)?; let t = Token::get(&db, &token)?; let a = Address::get(&db, &address)?; @@ -598,13 +604,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "wallet-main", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; - let group_lock = state.group.read().await; - let db = group_lock.wallet_db(&gid)?; - let a_db = group_lock.account_db()?; - drop(group_lock); + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = wallet_db(&state.base, &pid, &db_key)?; + let a_db = account_db(&state.base, &state.secret)?; let address = Address::get(&db, &id)?; Address::main(&db, &id)?; @@ -612,11 +618,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); let mut group_lock = state.group.write().await; - let account = group_lock.account_mut(&gid)?; + let account = group_lock.account_mut(&pid)?; account.wallet = address.chain.update_main(&address.address, &account.wallet); account.pub_height = account.pub_height + 1; account.update_info(&a_db)?; - let user = group_lock.clone_user(&gid)?; + let user = group_lock.clone_user(&pid)?; drop(group_lock); // broadcast all friends. diff --git a/src/group.rs b/src/group.rs index f0761cf..1999252 100644 --- a/src/group.rs +++ b/src/group.rs @@ -443,20 +443,19 @@ impl Group { Ok((pheight, oheight)) } - // pub fn clone_user(&self, pid: &PeerId) -> Result { - // if let Some(u) = self.accounts.get(pid) { - // Ok(User::new( - // u.pid, - // self.addr, - // u.name.clone(), - // u.avatar.clone(), - // u.wallet.clone(), - // u.pub_height, - // )) - // } else { - // Err(anyhow!("user missing.")) - // } - // } + pub fn clone_user(&self, pid: &PeerId) -> Result { + if let Some(u) = self.accounts.get(pid) { + Ok(User::new( + u.pid, + u.name.clone(), + u.avatar.clone(), + u.wallet.clone(), + u.pub_height, + )) + } else { + Err(anyhow!("user missing.")) + } + } pub fn list_accounts(&self) -> &HashMap { &self.accounts @@ -548,15 +547,6 @@ impl Group { account_db.close() } - // pub fn encrypt(&self, pid: &PeerId, lock: &str, bytes: &[u8]) -> Result> { - // let ckey = &self.account(pid)?.encrypt; - // encrypt(&self.secret, lock, ckey, bytes) - // } - // pub fn decrypt(&self, pid: &PeerId, lock: &str, bytes: &[u8]) -> Result> { - // let ckey = &self.account(pid)?.encrypt; - // decrypt(&self.secret, lock, ckey, bytes) - // } - // pub fn create_message(&self, pid: &PeerId, addr: Peer) -> Result { // let user = self.clone_user(pid)?; // let account = self.account(pid)?; diff --git a/src/layer.rs b/src/layer.rs index fcfdfea..18af6a3 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -1,3 +1,4 @@ +use chat_types::CHAT_ID; use esse_primitives::id_to_str; use group_types::GroupChatId; use serde::{Deserialize, Serialize}; @@ -11,7 +12,7 @@ use tdn::types::{ use tokio::sync::RwLock; use crate::account::User; -//use crate::apps::chat::{chat_conn, LayerEvent as ChatLayerEvent}; +use crate::apps::chat::LayerEvent as ChatLayerEvent; //use crate::apps::group::{group_conn, GROUP_ID}; use crate::group::Group; use crate::session::{Session, SessionType}; @@ -56,7 +57,7 @@ impl Layer { return true; } else { for (_, session) in &self.groups { - if session.addr == *addr { + if session.addrs.contains(addr) { return true; } } @@ -101,13 +102,12 @@ impl Layer { } pub fn chat_rm_online(&mut self, pid: &PeerId) -> Option { - self.chats.remove(pid).map(|session| session.addr) + self.chats.remove(pid).map(|session| session.addrs[0]) } - pub fn chat_add(&mut self, pid: PeerId, sid: i64, fid: i64) { + pub fn chat_add(&mut self, pid: PeerId, sid: i64, fid: i64, h: i64) { if !self.chats.contains_key(&pid) { - self.chats - .insert(pid, LayerSession::new(id_to_str(&pid), pid, sid, fid)); + self.chats.insert(pid, LayerSession::new(pid, sid, fid, h)); } } @@ -119,6 +119,44 @@ impl Layer { } } + pub fn group_add(&mut self, gid: GroupChatId, pid: PeerId, sid: i64, fid: i64, h: i64) { + if !self.groups.contains_key(&gid) { + self.groups.insert(gid, LayerSession::new(pid, sid, fid, h)); + } + } + + pub fn group(&self, gid: &GroupChatId) -> Result<&LayerSession> { + if let Some(session) = self.groups.get(gid) { + Ok(session) + } else { + Err(anyhow!("session missing!")) + } + } + + pub fn group_rm_online(&mut self, gid: &GroupChatId) -> Option> { + self.groups.remove(gid).map(|session| session.addrs) + } + + pub fn group_increased(&mut self, gid: &GroupChatId) -> Result { + if let Some(session) = self.groups.get_mut(gid) { + Ok(session.increased()) + } else { + Err(anyhow!("session missing!")) + } + } + + pub fn group_add_member(&mut self, gid: &GroupChatId, addr: PeerId) { + if let Some(session) = self.groups.get_mut(gid) { + session.addrs.push(addr); + } + } + + pub fn group_del_member(&mut self, gid: &GroupChatId, index: usize) { + if let Some(session) = self.groups.get_mut(gid) { + session.addrs.remove(index); + } + } + // pub fn remove_running(&mut self, gid: &GroupId) -> HashMap { // // check close the stable connection. // let mut addrs: HashMap = HashMap::new(); @@ -209,17 +247,16 @@ impl Layer { // } // } - // pub fn broadcast(&self, user: User, results: &mut HandleResult) { - // let gid = user.id; - // let info = ChatLayerEvent::InfoRes(user); - // let data = bincode::serialize(&info).unwrap_or(vec![]); - // if let Some(running) = self.runnings.get(&gid) { - // for (fgid, online) in &running.sessions { - // let msg = SendType::Event(0, *online.online.addr(), data.clone()); - // results.layers.push((gid, *fgid, msg)); - // } - // } - // } + pub fn broadcast(&self, user: User, results: &mut HandleResult) { + let gid = user.id; + let info = ChatLayerEvent::InfoRes(user); + let data = bincode::serialize(&info).unwrap_or(vec![]); + + for fpid in self.chats.keys() { + let msg = SendType::Event(0, *fpid, data.clone()); + results.layers.push((CHAT_ID, msg)); + } + } } // pub(crate) struct OnlineSession { @@ -261,10 +298,9 @@ impl Layer { /// online connected layer session. pub(crate) struct LayerSession { - /// session refs symbol id (Chat is friend's pid, Group is GroupChatId) - pub pid: String, + pub height: i64, /// session network addr. - pub addr: PeerId, + pub addrs: Vec, /// session database id. pub s_id: i64, /// layer service database id. @@ -278,18 +314,27 @@ pub(crate) struct LayerSession { } impl LayerSession { - fn new(pid: String, addr: PeerId, s_id: i64, db_id: i64) -> Self { + fn new(addr: PeerId, s_id: i64, db_id: i64, height: i64) -> Self { Self { - pid, - addr, s_id, db_id, + height, + addrs: vec![addr], suspend_me: false, suspend_remote: false, remain: 0, } } + pub fn info(&self) -> (i64, i64, i64) { + (self.height, self.s_id, self.db_id) + } + + pub fn increased(&mut self) -> i64 { + self.height += 1; + self.height + } + pub fn active(&mut self, is_me: bool) -> PeerId { if is_me { self.suspend_me = false; @@ -297,7 +342,7 @@ impl LayerSession { self.suspend_remote = false; } self.remain = 0; - self.addr + self.addrs[0] } pub fn suspend(&mut self, is_me: bool, must: bool) -> Option { @@ -314,7 +359,7 @@ impl LayerSession { if self.suspend_remote && self.suspend_me { self.remain = 6; // keep-alive 10~11 minutes 120s/time - Some(self.addr) + Some(self.addrs[0]) } else { None } diff --git a/src/migrate/group.rs b/src/migrate/group.rs index cff1d3e..8457f93 100644 --- a/src/migrate/group.rs +++ b/src/migrate/group.rs @@ -3,7 +3,7 @@ pub(super) const GROUP_VERSIONS: [&str; 3] = [ "CREATE TABLE IF NOT EXISTS groups( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, height INTEGER NOT NULL, - gcd TEXT NOT NULL, + gid INTEGER NOT NULL, addr TEXT NOT NULL, name TEXT NOT NULL, is_close INTEGER NOT NULL, @@ -12,8 +12,7 @@ pub(super) const GROUP_VERSIONS: [&str; 3] = [ id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, height INTEGER NOT NULL, fid INTEGER NOT NULL, - mid TEXT NOT NULL, - addr TEXT NOT NULL, + pid TEXT NOT NULL, name TEXT NOT NULL, leave INTEGER NOT NULL);", "CREATE TABLE IF NOT EXISTS messages(