From 0c9ff69bd2cd6de751c023cb3ee29417c64659ae Mon Sep 17 00:00:00 2001 From: Sun Date: Wed, 8 Sep 2021 13:42:24 +0800 Subject: [PATCH] add group chat server side --- src/apps.rs | 5 +- src/apps/group_chat/layer.rs | 266 +++++++++++++++++++++++++++++++--- src/apps/group_chat/mod.rs | 6 + src/apps/group_chat/models.rs | 7 +- src/group.rs | 14 +- src/layer.rs | 28 +++- src/rpc.rs | 16 +- 7 files changed, 299 insertions(+), 43 deletions(-) diff --git a/src/apps.rs b/src/apps.rs index 01e83ca..ae0bf66 100644 --- a/src/apps.rs +++ b/src/apps.rs @@ -32,8 +32,9 @@ pub(crate) async fn app_layer_handle( mgid: GroupId, msg: RecvType, ) -> Result { - match fgid { - group_chat::GROUP_ID => group_chat::layer_handle(layer, mgid, msg).await, + match (fgid, mgid) { + (group_chat::GROUP_ID, _) => group_chat::layer_handle(layer, fgid, mgid, false, msg).await, + (_, group_chat::GROUP_ID) => group_chat::layer_handle(layer, fgid, mgid, true, msg).await, _ => chat::layer_handle(layer, fgid, mgid, msg).await, } } diff --git a/src/apps/group_chat/layer.rs b/src/apps/group_chat/layer.rs index 791640e..e25e09e 100644 --- a/src/apps/group_chat/layer.rs +++ b/src/apps/group_chat/layer.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tdn::types::{ group::GroupId, message::{RecvType, SendType}, @@ -8,7 +9,7 @@ use tdn::types::{ use tokio::sync::RwLock; use group_chat_types::{ - ConnectProof, Event, JoinProof, LayerConnect, LayerEvent, LayerResult, PackedEvent, + ConnectProof, Event, GroupType, JoinProof, LayerConnect, LayerEvent, LayerResult, PackedEvent, }; use tdn_did::Proof; use tdn_storage::local::DStorage; @@ -17,23 +18,82 @@ use crate::apps::chat::Friend; use crate::layer::{Layer, Online}; use crate::rpc::{session_connect, session_create, session_last, session_lost, session_suspend}; use crate::session::{connect_session, Session, SessionType}; -use crate::storage::{chat_db, delete_avatar, group_chat_db, session_db, write_avatar_sync}; +use crate::storage::{ + chat_db, delete_avatar, group_chat_db, read_avatar, session_db, write_avatar, write_avatar_sync, +}; use super::models::{from_network_message, GroupChat, Member, Request}; -use super::{add_layer, rpc}; +use super::{add_layer, add_server_layer, rpc}; pub(crate) async fn handle( layer: &Arc>, - mgid: GroupId, + fgid: GroupId, // when as client, `fgid` is GROUP_ID + mgid: GroupId, // when as server, `mgid` is GROUP_ID + is_server: bool, msg: RecvType, ) -> Result { let mut results = HandleResult::new(); match msg { - RecvType::Connect(..) => {} // Never to here. - RecvType::Leave(..) => {} // Never to here. handled in chat. + RecvType::Connect(addr, data) => { + // only server handle it. + if !is_server { + let s = SendType::Result(0, addr, false, false, vec![]); + add_server_layer(&mut results, fgid, s); + return Ok(results); + } + + let LayerConnect(gcd, connect) = bincode::deserialize(&data) + .map_err(|_e| new_io_error("deserialize group chat connect failure"))?; + + let (ogid, height, id) = layer.read().await.running(&gcd)?.owner_height_id(); + + match connect { + ConnectProof::Common(_proof) => { + // check is member. + let db = group_chat_db(&layer.read().await.base, &ogid)?; + + if let Ok(mid) = Member::get_id(&db, &id, &fgid) { + let res = LayerResult(gcd, height); + let data = bincode::serialize(&res).unwrap_or(vec![]); + let s = SendType::Result(0, addr, true, false, data); + add_server_layer(&mut results, fgid, s); + + layer.write().await.running_mut(&gcd)?.check_add_online( + mgid, + Online::Direct(addr), + id, + mid, + )?; + + let _ = Member::addr_update(&db, &id, &fgid, &addr); + results.rpcs.push(rpc::member_online(mgid, id, fgid, addr)); + + let new_data = + bincode::serialize(&LayerEvent::MemberOnline(gcd, fgid, addr)) + .map_err(|_| new_io_error("serialize event error."))?; + + for (mid, maddr) in layer.read().await.running(&gcd)?.onlines() { + let s = SendType::Event(0, *maddr, new_data.clone()); + add_server_layer(&mut results, *mid, s); + } + } else { + let s = SendType::Result(0, addr, false, false, vec![]); + add_server_layer(&mut results, fgid, s); + } + } + ConnectProof::Zkp(_proof) => { + // + } + } + } + RecvType::Leave(_addr) => { + // only server handle it. + // TODO + } RecvType::Result(addr, is_ok, data) => { - if is_ok { + // only client handle it. + if !is_server && is_ok { let mut layer_lock = layer.write().await; handle_connect(mgid, addr, data, &mut layer_lock, &mut results)?; } else { @@ -42,6 +102,12 @@ pub(crate) async fn handle( } } RecvType::ResultConnect(addr, data) => { + // only client handle it. + if is_server { + let msg = SendType::Result(0, addr, false, false, vec![]); + add_layer(&mut results, mgid, msg); + } + let mut layer_lock = layer.write().await; if handle_connect(mgid, addr, data, &mut layer_lock, &mut results)? { let msg = SendType::Result(0, addr, true, false, vec![]); @@ -49,9 +115,10 @@ pub(crate) async fn handle( } } RecvType::Event(addr, bytes) => { + // server & client handle it. let event: LayerEvent = bincode::deserialize(&bytes).map_err(|_| new_io_error("serialize event error."))?; - handle_event(mgid, addr, event, layer, &mut results).await?; + handle_event(fgid, mgid, is_server, addr, event, layer, &mut results).await?; } RecvType::Stream(_uid, _stream, _bytes) => { // TODO stream @@ -113,7 +180,9 @@ fn handle_connect( } async fn handle_event( - mgid: GroupId, + fgid: GroupId, // server use fgid is remote account. + mgid: GroupId, // client user mgid is my account. + is_server: bool, addr: PeerAddr, event: LayerEvent, layer: &Arc>, @@ -122,11 +191,31 @@ async fn handle_event( println!("Got event......."); match event { LayerEvent::Offline(gcd) => { - let mut layer_lock = layer.write().await; - let (sid, _gid) = layer_lock.get_running_remote_id(&mgid, &gcd)?; - layer_lock.running_mut(&mgid)?.check_offline(&gcd, &addr); - drop(layer_lock); - results.rpcs.push(session_lost(mgid, &sid)); + if is_server { + // 1. check member online. + if layer.write().await.remove_online(&gcd, &fgid).is_none() { + return Ok(()); + } + + // 2. offline this member. + let (ogid, _, id) = layer.read().await.running(&gcd)?.owner_height_id(); + results.rpcs.push(rpc::member_offline(ogid, id, fgid)); + + // 3. broadcast offline event. + let new_data = bincode::serialize(&LayerEvent::MemberOffline(gcd, fgid)) + .map_err(|_| new_io_error("serialize event error."))?; + + for (mid, maddr) in layer.read().await.running(&gcd)?.onlines() { + let s = SendType::Event(0, *maddr, new_data.clone()); + add_layer(results, *mid, s); + } + } else { + let mut layer_lock = layer.write().await; + let (sid, _gid) = layer_lock.get_running_remote_id(&mgid, &gcd)?; + layer_lock.running_mut(&mgid)?.check_offline(&gcd, &addr); + drop(layer_lock); + results.rpcs.push(session_lost(mgid, &sid)); + } } LayerEvent::Suspend(gcd) => { let mut layer_lock = layer.write().await; @@ -330,11 +419,117 @@ async fn handle_event( let id = Request::over_rid(&db, &gcd, &rid, ok)?; results.rpcs.push(rpc::request_handle(mgid, id, ok, false)); } - LayerEvent::MemberOnlineSync(..) => {} // nerver here. - LayerEvent::Request(..) => {} // nerver here. - LayerEvent::Check => {} // nerver here. - LayerEvent::Create(..) => {} // nerver here. - LayerEvent::SyncReq(..) => {} // Nerver here. + LayerEvent::MemberOnlineSync(..) => { + // TODO + } + LayerEvent::Request(gcd, join_proof) => { + let (ogid, height, id) = layer.read().await.running(&gcd)?.owner_height_id(); + let base = layer.read().await.base.clone(); + let db = group_chat_db(&base, &ogid)?; + let group = GroupChat::get_id(&db, &id)?.ok_or(new_io_error("missing group"))?; + + // 1. check account is online, if not online, nothing. + match join_proof { + JoinProof::Open(mname, mavatar) => { + // check is member. + if let Ok(mid) = Member::get_id(&db, &id, &fgid) { + let gavatar = read_avatar(&base, &ogid, &gcd).await?; + let group_info = group.to_group_info("".to_owned(), gavatar, vec![]); + let res = LayerEvent::Agree(gcd, group_info); + let d = bincode::serialize(&res).unwrap_or(vec![]); + let s = SendType::Event(0, addr, d); + add_server_layer(results, fgid, s); + + return Ok(()); + } + + if group.g_type == GroupType::Open { + let start = SystemTime::now(); + let datetime = start + .duration_since(UNIX_EPOCH) + .map(|s| s.as_secs()) + .unwrap_or(0) as i64; // safe for all life. + + let mut m = Member::new(id, fgid, addr, mname, false, datetime); + m.insert(&db)?; + + // save avatar. + let _ = write_avatar(&base, &ogid, &fgid, &mavatar).await; + + // add_height consensus. + + // self.broadcast_join(&gcd, m, mavatar, results).await?; + + // return join result. + let gavatar = read_avatar(&base, &ogid, &gcd).await?; + let group_info = group.to_group_info("".to_owned(), gavatar, vec![]); + let res = LayerEvent::Agree(gcd, group_info); + let d = bincode::serialize(&res).unwrap_or(vec![]); + let s = SendType::Event(0, addr, d); + add_server_layer(results, fgid, s); + } else { + // Self::reject(gcd, fmid, addr, false, results); + } + } + JoinProof::Invite(invite_gid, proof, mname, mavatar) => { + // check is member. + if let Ok(mid) = Member::get_id(&db, &id, &fgid) { + let gavatar = read_avatar(&base, &ogid, &gcd).await?; + let group_info = group.to_group_info("".to_owned(), gavatar, vec![]); + let res = LayerEvent::Agree(gcd, group_info); + let d = bincode::serialize(&res).unwrap_or(vec![]); + let s = SendType::Event(0, addr, d); + add_server_layer(results, fgid, s); + return Ok(()); + } + + // TODO check if request had or is blocked by manager. + + // check if inviter is member. + if Member::get_id(&db, &id, &invite_gid).is_err() { + //Self::reject(gcd, fmid, addr, true, results); + return Ok(()); + } + + // TODO check proof. + // proof.verify(&invite_gid, &addr, &layer.addr)?; + + // if group.is_need_agree { + // if !Member::is_manager(fid, &invite_gid).await? { + // let mut request = Request::new(); + // request.insert().await?; + // self.broadcast_request( + // &gcd, + // request, + // JoinProof::Invite(invite_gid, proof, mname, mavatar), + // results, + // ); + // return Ok(()); + // } + // } + + //let mut m = Member::new(*fid, fmid, addr, mname, false); + //m.insert().await?; + + // save avatar. + //let _ = write_avatar(&self.base, &gcd, &m.m_id, &mavatar).await; + + //self.add_member(&gcd, fmid, addr); + //self.broadcast_join(&gcd, m, mavatar, results).await?; + + // return join result. + //self.agree(gcd, fmid, addr, group, results).await?; + } + JoinProof::Zkp(_proof) => { + // TOOD zkp join. + } + } + } + LayerEvent::SyncReq(..) => { + // TODO + } + LayerEvent::Check => {} // nerver here. + LayerEvent::Create(..) => {} // nerver here. } Ok(()) @@ -363,6 +558,39 @@ fn sync_online(gcd: GroupId, addr: PeerAddr) -> SendType { SendType::Event(0, addr, data) } +// fn broadcast_join( +// gcd: &GroupId, +// member: Member, +// avatar: Vec, +// results: &mut HandleResult, +// ) -> Result<()> { +// println!("start broadcast join..."); +// let height = self +// .add_height(gcd, &member.id, ConsensusType::MemberJoin) +// .await?; + +// let datetime = member.datetime; +// let event = Event::MemberJoin( +// member.m_id, +// member.m_addr, +// member.m_name, +// avatar, +// member.datetime, +// ); + +// let new_data = bincode::serialize(&LayerEvent::Sync(*gcd, height, event)).unwrap_or(vec![]); + +// if let Some((members, _, _)) = self.groups.get(gcd) { +// for (mid, maddr, _) in members { +// let s = SendType::Event(0, *maddr, new_data.clone()); +// add_layer(results, *mid, s); +// } +// } +// println!("over broadcast join..."); + +// Ok(()) +// } + fn handle_sync( mgid: GroupId, fid: i64, diff --git a/src/apps/group_chat/mod.rs b/src/apps/group_chat/mod.rs index 4e3e468..9774fef 100644 --- a/src/apps/group_chat/mod.rs +++ b/src/apps/group_chat/mod.rs @@ -11,6 +11,12 @@ pub(crate) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) results.layers.push((gid, GROUP_ID, msg)); } +/// Group chat server to ESSE. +#[inline] +pub fn add_server_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) { + results.layers.push((GROUP_ID, gid, msg)); +} + pub(crate) mod rpc; pub(crate) use layer::group_chat_conn; pub(crate) use layer::handle as layer_handle; diff --git a/src/apps/group_chat/models.rs b/src/apps/group_chat/models.rs index a2d5a5e..d3ea9ef 100644 --- a/src/apps/group_chat/models.rs +++ b/src/apps/group_chat/models.rs @@ -300,14 +300,15 @@ impl GroupChat { } /// list all local group chat as running layer. - pub fn all_local(db: &DStorage, owner: &GroupId) -> Result> { - let matrix = db.query(&format!("SELECT gcd, height FROM groups WHERE owner = '{}' and is_remote = false and is_closed = false", owner.to_hex()))?; + pub fn all_local(db: &DStorage, owner: &GroupId) -> Result> { + let matrix = db.query(&format!("SELECT id, gcd, height FROM groups WHERE owner = '{}' and is_remote = false and is_closed = false", owner.to_hex()))?; let mut groups = vec![]; for mut values in matrix { let height = values.pop().unwrap().as_i64(); let gcd = GroupId::from_hex(values.pop().unwrap().as_string()).unwrap_or(Default::default()); - groups.push((gcd, height)); + let id = values.pop().unwrap().as_i64(); + groups.push((id, gcd, height)); } Ok(groups) } diff --git a/src/group.rs b/src/group.rs index 9640f35..b0b875c 100644 --- a/src/group.rs +++ b/src/group.rs @@ -395,17 +395,18 @@ impl Group { addrs } - pub fn add_running(&mut self, gid: &GroupId, lock: &str) -> Result<()> { + pub fn add_running(&mut self, gid: &GroupId, lock: &str) -> Result { if let Some(u) = self.accounts.get(gid) { let keypair = u.secret(&self.secret, lock)?; if !self.runnings.contains_key(gid) { // load devices to runnings. let running = RunningAccount::init(keypair, &self.base, gid)?; self.runnings.insert(gid.clone(), running); + return Ok(u.id); } } - Ok(()) + Err(new_io_error("user missing.")) } pub fn clone_user(&self, gid: &GroupId) -> Result { @@ -428,14 +429,14 @@ impl Group { avatar_bytes: Vec, device_name: &str, device_info: &str, - ) -> Result { + ) -> Result<(i64, GroupId)> { let (mut account, sk) = Account::generate(&self.secret, name, seed, lock, avatar_bytes)?; let account_id = account.gid; - if self.accounts.contains_key(&account_id) { + if let Some(u) = self.accounts.get(&account_id) { let running = RunningAccount::init(sk, &self.base, &account_id)?; self.runnings.insert(account_id, running); - return Ok(account_id); + return Ok((u.id, account_id)); } account_init(&self.base, &account.gid).await?; @@ -443,6 +444,7 @@ impl Group { let account_db = account_db(&self.base)?; account.insert(&account_db)?; account_db.close()?; + let account_did = account.id; let _ = write_avatar(&self.base, &account_id, &account_id, &account.avatar).await; self.accounts.insert(account.gid, account); @@ -456,7 +458,7 @@ impl Group { RunningAccount::init(sk, &self.base, &account_id)?, ); - Ok(account_id) + Ok((account_did, account_id)) } pub fn update_account(&mut self, gid: GroupId, name: &str, avatar: Vec) -> Result<()> { diff --git a/src/layer.rs b/src/layer.rs index 9ae6f94..c1df79a 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -64,9 +64,16 @@ impl Layer { self.runnings.get_mut(gid).ok_or(new_io_error("not online")) } - pub fn add_running(&mut self, gid: &GroupId, consensus: i64) -> Result<()> { + pub fn add_running( + &mut self, + gid: &GroupId, + owner: GroupId, + id: i64, + consensus: i64, + ) -> Result<()> { if !self.runnings.contains_key(gid) { - self.runnings.insert(*gid, RunningLayer::init(consensus)); + self.runnings + .insert(*gid, RunningLayer::init(owner, id, consensus)); } Ok(()) @@ -170,7 +177,9 @@ impl Online { pub(crate) struct OnlineSession { pub online: Online, + /// session database id. pub db_id: i64, + /// session ref's service(friend/group) database id. pub db_fid: i64, pub suspend_me: bool, pub suspend_remote: bool, @@ -204,20 +213,29 @@ impl OnlineSession { } pub(crate) struct RunningLayer { - /// online group (friends/services) => (group's address, group's db id) - sessions: HashMap, + owner: GroupId, // if is service it has owner account. + /// layer current database id. + id: i64, /// layer current consensus height. consensus: i64, + /// online group (friends/services) => (group's address, group's db id) + sessions: HashMap, } impl RunningLayer { - pub fn init(consensus: i64) -> Self { + pub fn init(owner: GroupId, id: i64, consensus: i64) -> Self { RunningLayer { + owner, + id, consensus, sessions: HashMap::new(), } } + pub fn owner_height_id(&self) -> (GroupId, i64, i64) { + (self.owner, self.consensus, self.id) + } + pub fn increased(&mut self) -> i64 { self.consensus += 1; self.consensus diff --git a/src/rpc.rs b/src/rpc.rs index 63a0f39..d83b9ad 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -255,13 +255,13 @@ fn new_rpc_handler( let device_info = params[5].as_str().ok_or(RpcError::ParseError)?; let avatar_bytes = base64::decode(avatar).unwrap_or(vec![]); - let gid = state + let (id, gid) = state .group .write() .await .add_account(name, seed, lock, avatar_bytes, device_name, device_info) .await?; - state.layer.write().await.add_running(&gid, 0)?; + state.layer.write().await.add_running(&gid, gid, id, 0)?; let mut results = HandleResult::rpc(json!(vec![gid.to_hex()])); results.networks.push(NetworkType::AddGroup(gid)); // add AddGroup to TDN. @@ -281,13 +281,13 @@ fn new_rpc_handler( let device_name = params[4].as_str().ok_or(RpcError::ParseError)?; let device_info = params[5].as_str().ok_or(RpcError::ParseError)?; - let gid = state + let (id, gid) = state .group .write() .await .add_account(name, seed, lock, vec![], device_name, device_info) .await?; - state.layer.write().await.add_running(&gid, 0)?; + state.layer.write().await.add_running(&gid, gid, id, 0)?; let mut results = HandleResult::rpc(json!(vec![gid.to_hex()])); results.networks.push(NetworkType::AddGroup(gid)); // add AddGroup to TDN. @@ -361,19 +361,19 @@ fn new_rpc_handler( let mut results = HandleResult::rpc(json!([gid.to_hex()])); - state.group.write().await.add_running(&gid, me_lock)?; + let id = state.group.write().await.add_running(&gid, me_lock)?; // add AddGroup to TDN. results.networks.push(NetworkType::AddGroup(gid)); let mut layer_lock = state.layer.write().await; - layer_lock.add_running(&gid, 0)?; // TODO account current state height. + layer_lock.add_running(&gid, gid, id, 0)?; // TODO account current state height. // load all services layer created by this account. // 1. group chat. let group_db = group_chat_db(&layer_lock.base, &gid)?; let group_chats = GroupChat::all_local(&group_db, &gid)?; - for (gcd, gheight) in group_chats { - layer_lock.add_running(&gcd, gheight)?; + for (id, gcd, gheight) in group_chats { + layer_lock.add_running(&gcd, gid, id, gheight)?; results.networks.push(NetworkType::AddGroup(gcd)); } drop(layer_lock);