@ -4,7 +4,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
@@ -4,7 +4,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tdn ::types ::{
group ::GroupId ,
message ::{ RecvType , SendType } ,
primitive ::{ new_io_error , HandleResult , PeerAddr , Result } ,
primitive ::{ HandleResult , PeerAddr , Result } ,
} ;
use tokio ::sync ::RwLock ;
@ -43,9 +43,7 @@ pub(crate) async fn handle(
@@ -43,9 +43,7 @@ pub(crate) async fn handle(
return Ok ( results ) ;
}
let LayerConnect ( gcd , connect ) = bincode ::deserialize ( & data )
. map_err ( | _e | new_io_error ( "deserialize group chat connect failure" ) ) ? ;
let LayerConnect ( gcd , connect ) = bincode ::deserialize ( & data ) ? ;
let ( ogid , height , id ) = layer . read ( ) . await . running ( & gcd ) ? . owner_height_id ( ) ;
match connect {
@ -70,8 +68,7 @@ pub(crate) async fn handle(
@@ -70,8 +68,7 @@ pub(crate) async fn handle(
results . rpcs . push ( rpc ::member_online ( mgid , id , fgid , addr ) ) ;
let new_data =
bincode ::serialize ( & LayerEvent ::MemberOnline ( gcd , fgid , addr ) )
. map_err ( | _ | new_io_error ( "serialize event error." ) ) ? ;
bincode ::serialize ( & LayerEvent ::MemberOnline ( gcd , fgid , addr ) ) ? ;
for ( mid , maddr ) in layer . read ( ) . await . running ( & gcd ) ? . onlines ( ) {
let s = SendType ::Event ( 0 , * maddr , new_data . clone ( ) ) ;
@ -116,8 +113,7 @@ pub(crate) async fn handle(
@@ -116,8 +113,7 @@ pub(crate) async fn handle(
}
RecvType ::Event ( addr , bytes ) = > {
// server & client handle it.
let event : LayerEvent =
bincode ::deserialize ( & bytes ) . map_err ( | _ | new_io_error ( "serialize event error." ) ) ? ;
let event : LayerEvent = bincode ::deserialize ( & bytes ) ? ;
handle_event ( fgid , mgid , is_server , addr , event , layer , & mut results ) . await ? ;
}
RecvType ::Stream ( _uid , _stream , _bytes ) = > {
@ -139,11 +135,11 @@ fn handle_connect(
@@ -139,11 +135,11 @@ fn handle_connect(
results : & mut HandleResult ,
) -> Result < bool > {
// 0. deserialize result.
let LayerResult ( gcd , height ) =
bincode ::deserialize ( & data ) . map_err ( | _e | new_io_error ( "Deseralize result failure" ) ) ? ;
let LayerResult ( gcd , height ) = bincode ::deserialize ( & data ) ? ;
// 1. check group.
if let Some ( group ) = load_group ( layer . base ( ) , & mgid , & gcd ) ? {
let db = group_chat_db ( layer . base ( ) , & mgid ) ? ;
if let Some ( group ) = GroupChat ::get ( & db , & gcd ) ? {
// 1.0 check address.
if group . g_addr ! = addr {
return Ok ( false ) ;
@ -202,8 +198,7 @@ async fn handle_event(
@@ -202,8 +198,7 @@ async fn handle_event(
results . rpcs . push ( rpc ::member_offline ( ogid , id , fgid ) ) ;
// 3. broadcast offline event.
let new_data = bincode ::serialize ( & LayerEvent ::MemberOffline ( gcd , fgid ) )
. map_err ( | _ | new_io_error ( "serialize event error." ) ) ? ;
let new_data = bincode ::serialize ( & LayerEvent ::MemberOffline ( gcd , fgid ) ) ? ;
for ( mid , maddr ) in layer . read ( ) . await . running ( & gcd ) ? . onlines ( ) {
let s = SendType ::Event ( 0 , * maddr , new_data . clone ( ) ) ;
@ -426,20 +421,15 @@ async fn handle_event(
@@ -426,20 +421,15 @@ async fn handle_event(
let ( ogid , height , id ) = layer . read ( ) . await . running ( & gcd ) ? . owner_height_id ( ) ;
let base = layer . read ( ) . await . base . clone ( ) ;
let db = group_chat_db ( & base , & ogid ) ? ;
let group = GroupChat ::get_id ( & db , & id ) ? . ok_or ( new_io_error ( "missing group" ) ) ? ;
let group = GroupChat ::get_id ( & db , & id ) ? . ok_or ( anyhow ! ( "missing group" ) ) ? ;
// 1. check account is online, if not online, nothing.
match join_proof {
JoinProof ::Open ( mname , mavatar ) = > {
// check is member.
if let Ok ( mid ) = Member ::get_id ( & db , & id , & fgid ) {
let gavatar = read_avatar ( & base , & ogid , & gcd ) . await ? ;
let group_info = group . to_group_info ( "" . to_owned ( ) , gavatar , vec! [ ] ) ;
let res = LayerEvent ::Agree ( gcd , group_info ) ;
let d = bincode ::serialize ( & res ) . unwrap_or ( vec! [ ] ) ;
let s = SendType ::Event ( 0 , addr , d ) ;
if Member ::get_id ( & db , & id , & fgid ) . is_ok ( ) {
let s = agree ( & base , & ogid , & gcd , group , addr ) . await ;
add_server_layer ( results , fgid , s ) ;
return Ok ( ( ) ) ;
}
@ -461,24 +451,16 @@ async fn handle_event(
@@ -461,24 +451,16 @@ async fn handle_event(
// self.broadcast_join(&gcd, m, mavatar, results).await?;
// return join result.
let gavatar = read_avatar ( & base , & ogid , & gcd ) . await ? ;
let group_info = group . to_group_info ( "" . to_owned ( ) , gavatar , vec! [ ] ) ;
let res = LayerEvent ::Agree ( gcd , group_info ) ;
let d = bincode ::serialize ( & res ) . unwrap_or ( vec! [ ] ) ;
let s = SendType ::Event ( 0 , addr , d ) ;
let s = agree ( & base , & ogid , & gcd , group , addr ) . await ;
add_server_layer ( results , fgid , s ) ;
} else {
// Self::reject(gcd, fmid, addr, false, results);
add_server_layer ( results , fgid , reject ( gcd , addr , false ) ) ;
}
}
JoinProof ::Invite ( invite_gid , proof , mname , mavatar ) = > {
// check is member.
if let Ok ( mid ) = Member ::get_id ( & db , & id , & fgid ) {
let gavatar = read_avatar ( & base , & ogid , & gcd ) . await ? ;
let group_info = group . to_group_info ( "" . to_owned ( ) , gavatar , vec! [ ] ) ;
let res = LayerEvent ::Agree ( gcd , group_info ) ;
let d = bincode ::serialize ( & res ) . unwrap_or ( vec! [ ] ) ;
let s = SendType ::Event ( 0 , addr , d ) ;
if Member ::get_id ( & db , & id , & fgid ) . is_ok ( ) {
let s = agree ( & base , & ogid , & gcd , group , addr ) . await ;
add_server_layer ( results , fgid , s ) ;
return Ok ( ( ) ) ;
}
@ -487,7 +469,7 @@ async fn handle_event(
@@ -487,7 +469,7 @@ async fn handle_event(
// check if inviter is member.
if Member ::get_id ( & db , & id , & invite_gid ) . is_err ( ) {
//Self::reject(gcd, fmid, addr, true, results);
add_server_layer ( results , fgid , reject ( gcd , addr , false ) ) ;
return Ok ( ( ) ) ;
}
@ -518,7 +500,8 @@ async fn handle_event(
@@ -518,7 +500,8 @@ async fn handle_event(
//self.broadcast_join(&gcd, m, mavatar, results).await?;
// return join result.
//self.agree(gcd, fmid, addr, group, results).await?;
let s = agree ( & base , & ogid , & gcd , group , addr ) . await ;
add_server_layer ( results , fgid , s ) ;
}
JoinProof ::Zkp ( _proof ) = > {
// TOOD zkp join.
@ -535,12 +518,6 @@ async fn handle_event(
@@ -535,12 +518,6 @@ async fn handle_event(
Ok ( ( ) )
}
#[ inline ]
fn load_group ( base : & PathBuf , mgid : & GroupId , gcd : & GroupId ) -> Result < Option < GroupChat > > {
let db = group_chat_db ( base , mgid ) ? ;
GroupChat ::get ( & db , gcd )
}
pub ( crate ) fn group_chat_conn ( proof : Proof , addr : PeerAddr , gid : GroupId ) -> SendType {
let data =
bincode ::serialize ( & LayerConnect ( gid , ConnectProof ::Common ( proof ) ) ) . unwrap_or ( vec! [ ] ) ;
@ -558,6 +535,25 @@ fn sync_online(gcd: GroupId, addr: PeerAddr) -> SendType {
@@ -558,6 +535,25 @@ fn sync_online(gcd: GroupId, addr: PeerAddr) -> SendType {
SendType ::Event ( 0 , addr , data )
}
async fn agree (
base : & PathBuf ,
ogid : & GroupId ,
gcd : & GroupId ,
group : GroupChat ,
addr : PeerAddr ,
) -> SendType {
let gavatar = read_avatar ( base , ogid , gcd ) . await . unwrap_or ( vec! [ ] ) ;
let group_info = group . to_group_info ( "" . to_owned ( ) , gavatar , vec! [ ] ) ;
let res = LayerEvent ::Agree ( * gcd , group_info ) ;
let d = bincode ::serialize ( & res ) . unwrap_or ( vec! [ ] ) ;
SendType ::Event ( 0 , addr , d )
}
fn reject ( gcd : GroupId , addr : PeerAddr , lost : bool ) -> SendType {
let d = bincode ::serialize ( & LayerEvent ::Reject ( gcd , lost ) ) . unwrap_or ( vec! [ ] ) ;
SendType ::Event ( 0 , addr , d )
}
// fn broadcast_join(
// gcd: &GroupId,
// member: Member,