From 41a71855dbc4e69a101c0175a739f5e78dfbccdf Mon Sep 17 00:00:00 2001 From: Sun Date: Fri, 28 May 2021 10:07:54 +0800 Subject: [PATCH] apply session to group_chat --- lib/global.dart | 8 +- src/apps/group_chat/layer.rs | 277 +++++++++++++++++----------------- src/apps/group_chat/models.rs | 24 ++- src/apps/group_chat/rpc.rs | 20 +-- 4 files changed, 171 insertions(+), 158 deletions(-) diff --git a/lib/global.dart b/lib/global.dart index ead40b7..075252c 100644 --- a/lib/global.dart +++ b/lib/global.dart @@ -1,9 +1,9 @@ class Global { static String gid = "0000000000000000000000000000000000000000000000000000000000000000"; - static String httpRpc = '127.0.0.1:8000'; - static String wsRpc = '127.0.0.1:8080'; - //static String httpRpc = '192.168.2.148:8001'; - //static String wsRpc = '192.168.2.148:8081'; + //static String httpRpc = '127.0.0.1:8000'; + //static String wsRpc = '127.0.0.1:8080'; + static String httpRpc = '192.168.2.148:8001'; + static String wsRpc = '192.168.2.148:8081'; //static String httpRpc = '192.168.50.250:8001'; //static String wsRpc = '192.168.50.250:8081'; static String optionCache = 'option'; diff --git a/src/apps/group_chat/layer.rs b/src/apps/group_chat/layer.rs index 2620309..c9a1048 100644 --- a/src/apps/group_chat/layer.rs +++ b/src/apps/group_chat/layer.rs @@ -9,13 +9,14 @@ use tdn::{ }, }; -use group_chat_types::{Event, GroupConnect, GroupResult, JoinProof, LayerEvent, PackedEvent}; +use group_chat_types::{ConnectProof, Event, LayerConnect, LayerEvent, LayerResult, PackedEvent}; use tdn_did::Proof; use tdn_storage::local::DStorage; use crate::layer::{Layer, Online}; -use crate::rpc::{session_connect, session_lost, session_suspend}; -use crate::storage::{group_chat_db, write_avatar_sync}; +use crate::rpc::{session_connect, session_create, session_last, session_lost, session_suspend}; +use crate::session::{connect_session, SessionType}; +use crate::storage::{group_chat_db, session_db, write_avatar_sync}; use super::models::{from_network_message, GroupChat, Member, Request}; use super::{add_layer, rpc}; @@ -30,109 +31,21 @@ pub(crate) async fn handle( match msg { RecvType::Connect(..) => {} // Never to here. RecvType::Leave(..) => {} // Never to here. handled in chat. - RecvType::Result(addr, _is_ok, data) => { - let res: GroupResult = postcard::from_bytes(&data) - .map_err(|_e| new_io_error("Deseralize result failure"))?; - match res { - GroupResult::Check(ct, supported) => { - println!("check: {:?}, supported: {:?}", ct, supported); - results.rpcs.push(rpc::create_check(mgid, ct, supported)) - } - GroupResult::Create(gcd, ok) => { - println!("Create result: {}", ok); - if ok { - // get gc by gcd. - let db = group_chat_db(layer.read().await.base(), &mgid)?; - if let Some(mut gc) = GroupChat::get(&db, &gcd)? { - gc.ok(&db)?; - results.rpcs.push(rpc::create_result(mgid, gc.id, ok)); - - // 0. get session. TODO - let sid = 0; - - // online this group. - layer.write().await.running_mut(&mgid)?.check_add_online( - gcd, - Online::Direct(addr), - sid, - gc.id, - )?; - } - } - } - GroupResult::Join(gcd, ok, height) => { - println!("Got join result: {}", ok); - if ok { - let base = layer.read().await.base.clone(); - if let Some(group) = load_group(&base, &mgid, &gcd)? { - let mut layer_lock = layer.write().await; - // 1. check address. - if group.g_addr != addr { - return Ok(results); - } - - // 2. get group session. - let sid = 0; // TODO - - // 2. online this group. - layer_lock.running_mut(&mgid)?.check_add_online( - gcd, - Online::Direct(addr), - sid, - group.id, - )?; - // 3. online to UI. - results.rpcs.push(rpc::group_online(mgid, group.id)); - - // 4. online ping. - add_layer(&mut results, mgid, ping(gcd, addr)); - - println!("will sync remote: {}, my: {}", height, group.height); - - // 5. sync group height. - if group.height < height { - add_layer(&mut results, mgid, sync(gcd, addr, group.height)); - } - } else { - let msg = SendType::Result(0, addr, false, false, vec![]); - add_layer(&mut results, mgid, msg); - return Ok(results); - } - } - } - GroupResult::Waiting(_gcd) => { - // TODO waiting - } - GroupResult::Agree(gcd, info) => { - println!("Agree.........."); - let base = layer.read().await.base.clone(); - let db = group_chat_db(&base, &mgid)?; - let (rid, key) = Request::over(&db, &gcd, true)?; - - // 1. add group chat. - let mut group = GroupChat::from_info(key, info, 0, addr, base, &mgid)?; - group.insert(&db)?; - - // 2. update UI. - results.rpcs.push(rpc::group_agree(mgid, rid, group)); - - // 3. online ping. - add_layer(&mut results, mgid, ping(gcd, addr)); - - // 4. sync group height. - add_layer(&mut results, mgid, sync(gcd, addr, 0)); - } - GroupResult::Reject(gcd) => { - println!("Reject.........."); - let db = group_chat_db(layer.read().await.base(), &mgid)?; - let (rid, _key) = Request::over(&db, &gcd, true)?; - results.rpcs.push(rpc::group_reject(mgid, rid)); - } + RecvType::Result(addr, is_ok, data) => { + if is_ok { + let mut layer_lock = layer.write().await; + handle_connect(mgid, addr, data, &mut layer_lock, &mut results)?; + } else { + let msg = SendType::Result(0, addr, false, false, vec![]); + add_layer(&mut results, mgid, msg); } } - RecvType::ResultConnect(_addr, data) => { - let _res: GroupResult = postcard::from_bytes(&data) - .map_err(|_e| new_io_error("Deseralize result failure"))?; + RecvType::ResultConnect(addr, data) => { + let mut layer_lock = layer.write().await; + if handle_connect(mgid, addr, data, &mut layer_lock, &mut results)? { + let msg = SendType::Result(0, addr, true, false, vec![]); + add_layer(&mut results, mgid, msg); + } } RecvType::Event(addr, bytes) => { let event: LayerEvent = @@ -150,6 +63,51 @@ pub(crate) async fn handle( Ok(results) } +fn handle_connect( + mgid: GroupId, + addr: PeerAddr, + data: Vec, + layer: &mut Layer, + results: &mut HandleResult, +) -> Result { + // 0. deserialize result. + let LayerResult(gcd, height) = + postcard::from_bytes(&data).map_err(|_e| new_io_error("Deseralize result failure"))?; + + // 1. check group. + if let Some(group) = load_group(layer.base(), &mgid, &gcd)? { + // 1.0 check address. + if group.g_addr != addr { + return Ok(false); + } + + // 1.1 get session. + let session_some = + connect_session(layer.base(), &mgid, &SessionType::Group, &group.id, &addr)?; + if session_some.is_none() { + return Ok(false); + } + let sid = session_some.unwrap().id; + + // 1.2 online this group. + layer + .running_mut(&mgid)? + .check_add_online(gcd, Online::Direct(addr), sid, group.id)?; + + // 1.3 online to UI. + results.rpcs.push(session_connect(mgid, &sid, &addr)); + + println!("will sync remote: {}, my: {}", height, group.height); + // 1.4 sync group height. + if group.height < height { + add_layer(results, mgid, sync(gcd, addr, group.height)); + } + Ok(true) + } else { + Ok(false) + } +} + async fn handle_event( mgid: GroupId, addr: PeerAddr, @@ -158,28 +116,17 @@ async fn handle_event( results: &mut HandleResult, ) -> Result<()> { println!("Got event......."); - let (sid, gid) = match event { - LayerEvent::Offline(gcd) - | LayerEvent::Suspend(gcd) - | LayerEvent::Actived(gcd) - | LayerEvent::OnlinePing(gcd) - | LayerEvent::OnlinePong(gcd) - | LayerEvent::MemberOnline(gcd, ..) - | LayerEvent::MemberOffline(gcd, ..) - | LayerEvent::Sync(gcd, ..) - | LayerEvent::SyncReq(gcd, ..) - | LayerEvent::Packed(gcd, ..) => layer.read().await.get_running_remote_id(&mgid, &gcd)?, - }; - match event { LayerEvent::Offline(gcd) => { let mut layer_lock = layer.write().await; + let (sid, _gid) = layer_lock.get_running_remote_id(&mgid, &gcd)?; layer_lock.running_mut(&mgid)?.check_offline(&gcd, &addr); drop(layer_lock); results.rpcs.push(session_lost(mgid, &sid)); } LayerEvent::Suspend(gcd) => { let mut layer_lock = layer.write().await; + let (sid, _gid) = layer_lock.get_running_remote_id(&mgid, &gcd)?; if layer_lock.running_mut(&mgid)?.suspend(&gcd, false)? { results.rpcs.push(session_suspend(mgid, &sid)); } @@ -187,27 +134,78 @@ async fn handle_event( } LayerEvent::Actived(gcd) => { let mut layer_lock = layer.write().await; + let (sid, _gid) = layer_lock.get_running_remote_id(&mgid, &gcd)?; let _ = layer_lock.running_mut(&mgid)?.active(&gcd, false); drop(layer_lock); results.rpcs.push(session_connect(mgid, &sid, &addr)); } - LayerEvent::OnlinePing(gcd) => { - results.rpcs.push(rpc::group_online(mgid, gid)); - let data = postcard::to_allocvec(&LayerEvent::OnlinePong(gcd)).unwrap_or(vec![]); - let msg = SendType::Event(0, addr, data); - add_layer(results, mgid, msg); + LayerEvent::CheckResult(ct, supported) => { + println!("check: {:?}, supported: {:?}", ct, supported); + results.rpcs.push(rpc::create_check(mgid, ct, supported)) } - - LayerEvent::OnlinePong(_) => { - results.rpcs.push(rpc::group_online(mgid, gid)); + LayerEvent::CreateResult(gcd, ok) => { + println!("Create result: {}", ok); + if ok { + // get gc by gcd. + let db = group_chat_db(layer.read().await.base(), &mgid)?; + if let Some(mut gc) = GroupChat::get(&db, &gcd)? { + gc.ok(&db)?; + results.rpcs.push(rpc::create_result(mgid, gc.id, ok)); + + // ADD NEW SESSION. + let s_db = session_db(layer.read().await.base(), &mgid)?; + let mut session = gc.to_session(); + session.insert(&s_db)?; + results.rpcs.push(session_create(mgid, &session)); + } + } + } + LayerEvent::Agree(gcd, info) => { + println!("Agree.........."); + let base = layer.read().await.base.clone(); + let db = group_chat_db(&base, &mgid)?; + let (rid, key) = Request::over(&db, &gcd, true)?; + + // 1. add group chat. + let mut group = GroupChat::from_info(key, info, 0, addr, &base, &mgid)?; + group.insert(&db)?; + + // 2. ADD NEW SESSION. + let s_db = session_db(&base, &mgid)?; + let mut session = group.to_session(); + session.insert(&s_db)?; + results.rpcs.push(session_create(mgid, &session)); + + // 3. update UI. + results.rpcs.push(rpc::group_agree(mgid, rid, group)); + + // 4. try connect. + let proof = layer + .read() + .await + .group + .read() + .await + .prove_addr(&mgid, &addr)?; + add_layer(results, mgid, group_chat_conn(proof, addr, gcd)); + } + LayerEvent::Reject(gcd) => { + println!("Reject.........."); + let db = group_chat_db(layer.read().await.base(), &mgid)?; + let (rid, _key) = Request::over(&db, &gcd, true)?; + results.rpcs.push(rpc::group_reject(mgid, rid)); } - LayerEvent::MemberOnline(_, mid, maddr) => { + LayerEvent::MemberOnline(gcd, mid, maddr) => { + let (_sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?; results.rpcs.push(rpc::member_online(mgid, gid, mid, maddr)); } - LayerEvent::MemberOffline(_, mid, ma) => { + LayerEvent::MemberOffline(gcd, mid, ma) => { + let (_sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?; results.rpcs.push(rpc::member_offline(mgid, gid, mid, ma)); } - LayerEvent::Sync(_, height, event) => { + LayerEvent::Sync(gcd, height, event) => { + let (sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?; + println!("Sync: height: {}", height); let base = layer.read().await.base().clone(); let db = group_chat_db(&base, &mgid)?; @@ -239,8 +237,11 @@ async fn handle_event( println!("Sync: create message start"); let base = layer.read().await.base.clone(); let msg = from_network_message(height, gid, mid, &mgid, nmsg, mtime, &base)?; - results.rpcs.push(rpc::message_create(mgid, msg)); + results.rpcs.push(rpc::message_create(mgid, &msg)); println!("Sync: create message ok"); + results + .rpcs + .push(session_last(mgid, &sid, &msg.datetime, &msg.content, true)); } } @@ -248,12 +249,17 @@ async fn handle_event( GroupChat::add_height(&db, gid, height)?; } LayerEvent::Packed(gcd, height, from, to, events) => { + let (_sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?; + println!("Start handle sync packed... {}, {}, {}", height, from, to); let base = layer.read().await.base().clone(); handle_sync( mgid, gid, gcd, addr, height, from, to, events, base, results, )?; } + LayerEvent::Check => {} // nerver here. + LayerEvent::Create(..) => {} // nerver here. + LayerEvent::Request(..) => {} // nerver here. LayerEvent::SyncReq(..) => {} // Never here. } @@ -268,15 +274,10 @@ fn load_group(base: &PathBuf, mgid: &GroupId, gcd: &GroupId) -> Result SendType { let data = - postcard::to_allocvec(&GroupConnect::Join(gid, JoinProof::Had(proof))).unwrap_or(vec![]); + postcard::to_allocvec(&LayerConnect(gid, ConnectProof::Common(proof))).unwrap_or(vec![]); SendType::Connect(0, addr, None, None, data) } -fn ping(gcd: GroupId, addr: PeerAddr) -> SendType { - let data = postcard::to_allocvec(&LayerEvent::OnlinePing(gcd)).unwrap_or(vec![]); - SendType::Event(0, addr, data) -} - fn sync(gcd: GroupId, addr: PeerAddr, height: i64) -> SendType { println!("Send sync request..."); let data = postcard::to_allocvec(&LayerEvent::SyncReq(gcd, height + 1)).unwrap_or(vec![]); @@ -337,7 +338,7 @@ fn handle_sync_event( PackedEvent::GroupClose => { // TOOD } - PackedEvent::MemberInfo(mid, maddr, mname, mavatar) => { + PackedEvent::MemberInfo(_mid, _maddr, _mname, _mavatar) => { // TODO } PackedEvent::MemberJoin(mid, maddr, mname, mavatar, mtime) => { @@ -348,12 +349,12 @@ fn handle_sync_event( member.insert(&db)?; results.rpcs.push(rpc::member_join(*mgid, member)); } - PackedEvent::MemberLeave(mid) => { + PackedEvent::MemberLeave(_mid) => { // TODO } PackedEvent::MessageCreate(mid, nmsg, time) => { let msg = from_network_message(height, *fid, mid, mgid, nmsg, time, base)?; - results.rpcs.push(rpc::message_create(*mgid, msg)); + results.rpcs.push(rpc::message_create(*mgid, &msg)); } PackedEvent::None => {} } diff --git a/src/apps/group_chat/models.rs b/src/apps/group_chat/models.rs index 8bd44e2..980f104 100644 --- a/src/apps/group_chat/models.rs +++ b/src/apps/group_chat/models.rs @@ -11,6 +11,7 @@ use tdn_storage::local::{DStorage, DsValue}; use group_chat_types::{GroupInfo, GroupType, NetworkMessage}; use crate::apps::chat::MessageType; +use crate::session::{Session, SessionType}; use crate::storage::{ group_chat_db, write_avatar_sync, write_file_sync, write_image_sync, write_record_sync, }; @@ -68,7 +69,7 @@ pub(crate) struct GroupChat { /// group chat server addresse. pub g_addr: PeerAddr, /// group chat name. - g_name: String, + pub g_name: String, /// group chat simple intro. g_bio: String, /// group chat is created ok. @@ -162,12 +163,12 @@ impl GroupChat { info: GroupInfo, height: i64, addr: PeerAddr, - base: PathBuf, + base: &PathBuf, mgid: &GroupId, ) -> Result { match info { GroupInfo::Common(owner, _, g_id, g_type, agree, name, g_bio, avatar) => { - write_avatar_sync(&base, &mgid, &g_id, avatar)?; + write_avatar_sync(base, &mgid, &g_id, avatar)?; Ok(Self::new_from( g_id, height, owner, g_type, addr, name, g_bio, agree, key, )) @@ -179,7 +180,7 @@ impl GroupChat { let name = "".to_owned(); let bio = "".to_owned(); - write_avatar_sync(&base, &mgid, &g_id, avatar)?; + write_avatar_sync(base, &mgid, &g_id, avatar)?; Ok(Self::new_from( g_id, height, owner, g_type, addr, name, bio, agree, key, @@ -188,6 +189,17 @@ impl GroupChat { } } + pub fn to_session(&self) -> Session { + Session::new( + self.id, + self.g_id, + self.g_addr, + SessionType::Group, + self.g_name.clone(), + self.datetime, + ) + } + pub fn to_group_info(self, name: String, avatar: Vec) -> GroupInfo { match self.g_type { GroupType::Common | GroupType::Open => GroupInfo::Common( @@ -625,11 +637,11 @@ pub(crate) struct Message { /// message type. m_type: MessageType, /// message content. - content: String, + pub content: String, /// message is delivery. is_delivery: bool, /// message created time. - datetime: i64, + pub datetime: i64, /// message is deteled is_deleted: bool, } diff --git a/src/apps/group_chat/rpc.rs b/src/apps/group_chat/rpc.rs index 112f32a..3a28cda 100644 --- a/src/apps/group_chat/rpc.rs +++ b/src/apps/group_chat/rpc.rs @@ -7,7 +7,7 @@ use tdn::types::{ }; use tdn_did::Proof; -use group_chat_types::{CheckType, Event, GroupConnect, GroupType, JoinProof, LayerEvent}; +use group_chat_types::{CheckType, Event, GroupType, JoinProof, LayerEvent}; use crate::apps::chat::MessageType; use crate::rpc::RpcState; @@ -83,7 +83,7 @@ pub(crate) fn member_offline(mgid: GroupId, gid: i64, mid: GroupId, maddr: PeerA } #[inline] -pub(crate) fn message_create(mgid: GroupId, msg: Message) -> RpcParam { +pub(crate) fn message_create(mgid: GroupId, msg: &Message) -> RpcParam { rpc_response(0, "group-chat-message-create", json!(msg.to_rpc()), mgid) } @@ -163,8 +163,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let addr = PeerAddr::from_hex(params[0].as_str()?)?; let mut results = HandleResult::new(); - let data = postcard::to_allocvec(&GroupConnect::Check).unwrap_or(vec![]); - let s = SendType::Connect(0, addr, None, None, data); + let data = postcard::to_allocvec(&LayerEvent::Check).unwrap_or(vec![]); + let s = SendType::Event(0, addr, data); add_layer(&mut results, gid, s); Ok(results) }, @@ -200,8 +200,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { // TODO create proof. let proof: Proof = Default::default(); - let data = postcard::to_allocvec(&GroupConnect::Create(info, proof)).unwrap_or(vec![]); - let s = SendType::Connect(0, addr, None, None, data); + let data = postcard::to_allocvec(&LayerEvent::Create(info, proof)).unwrap_or(vec![]); + let s = SendType::Event(0, addr, data); add_layer(&mut results, gid, s); Ok(results) }, @@ -225,8 +225,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { // TODO create proof. let proof: Proof = Default::default(); - let data = postcard::to_allocvec(&GroupConnect::Create(info, proof)).unwrap_or(vec![]); - let s = SendType::Connect(0, addr, None, None, data); + let data = postcard::to_allocvec(&LayerEvent::Create(info, proof)).unwrap_or(vec![]); + let s = SendType::Event(0, addr, data); let mut results = HandleResult::new(); add_layer(&mut results, gid, s); Ok(results) @@ -251,9 +251,9 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::rpc(request.to_rpc()); let me = state.group.read().await.clone_user(&gid)?; let join_proof = JoinProof::Open(me.name, me.avatar); - let data = postcard::to_allocvec(&GroupConnect::Join(request.gid, join_proof)) + let data = postcard::to_allocvec(&LayerEvent::Request(request.gid, join_proof)) .unwrap_or(vec![]); - let s = SendType::Connect(0, request.addr, None, None, data); + let s = SendType::Event(0, request.addr, data); add_layer(&mut results, gid, s); Ok(results) },