@ -1,15 +1,15 @@
@@ -1,15 +1,15 @@
use async_lock ::RwLock ;
use std ::collections ::HashMap ;
use std ::net ::SocketAddr ;
use std ::sync ::Arc ;
use tdn ::{
smol ::channel ::{ SendError , Sender } ,
types ::{
group ::GroupId ,
message ::{ NetworkType , SendMessage , SendType , StateRequest , StateResponse } ,
primitive ::{ new_io_error , HandleResult , PeerAddr , Result } ,
rpc ::{ json , rpc_response , RpcError , RpcHandler , RpcParam } ,
} ,
use tdn ::types ::{
group ::GroupId ,
message ::{ NetworkType , SendMessage , SendType , StateRequest , StateResponse } ,
primitive ::{ new_io_error , HandleResult , PeerAddr , Result } ,
rpc ::{ json , rpc_response , RpcError , RpcHandler , RpcParam } ,
} ;
use tokio ::sync ::{
mpsc ::{ self , error ::SendError , Sender } ,
RwLock ,
} ;
use crate ::apps ::app_rpc_inject ;
@ -140,7 +140,7 @@ pub(crate) async fn sleep_waiting_close_stable(
@@ -140,7 +140,7 @@ pub(crate) async fn sleep_waiting_close_stable(
groups : HashMap < PeerAddr , ( ) > ,
layers : HashMap < PeerAddr , GroupId > ,
) -> std ::result ::Result < ( ) , SendError < SendMessage > > {
tdn ::smol ::Timer ::after ( std ::time ::Duration ::from_secs ( 10 ) ) . await ;
tokio ::time ::sleep ( std ::time ::Duration ::from_secs ( 10 ) ) . await ;
for ( addr , _ ) in groups {
sender
. send ( SendMessage ::Group (
@ -164,11 +164,7 @@ pub(crate) async fn sleep_waiting_close_stable(
@@ -164,11 +164,7 @@ pub(crate) async fn sleep_waiting_close_stable(
}
#[ inline ]
pub ( crate ) async fn inner_rpc (
uid : u64 ,
method : & str ,
sender : & async_channel ::Sender < SendMessage > ,
) -> Result < ( ) > {
pub ( crate ) async fn inner_rpc ( uid : u64 , method : & str , sender : & Sender < SendMessage > ) -> Result < ( ) > {
// Inner network default rpc method. only use in http-rpc.
if method = = "network-stable" | | method = = "network-dht" {
let req = match method {
@ -177,16 +173,16 @@ pub(crate) async fn inner_rpc(
@@ -177,16 +173,16 @@ pub(crate) async fn inner_rpc(
_ = > return Ok ( ( ) ) ,
} ;
let ( s , r ) = async_channel ::unbounded ::< StateResponse > ( ) ;
let ( s , mut r ) = mpsc ::channel ::< StateResponse > ( 128 ) ;
let _ = sender
. send ( SendMessage ::Network ( NetworkType ::NetworkState ( req , s ) ) )
. await
. expect ( "TDN channel closed" ) ;
let param = match r . recv ( ) . await {
Ok ( StateResponse ::Stable ( peers ) ) = > network_stable ( peers ) ,
Ok ( StateResponse ::DHT ( peers ) ) = > network_dht ( peers ) ,
Ok ( _ ) | Err ( _ ) = > {
Some ( StateResponse ::Stable ( peers ) ) = > network_stable ( peers ) ,
Some ( StateResponse ::DHT ( peers ) ) = > network_dht ( peers ) ,
Some ( _ ) | None = > {
return Ok ( ( ) ) ;
}
} ;
@ -301,11 +297,10 @@ fn new_rpc_handler(
@@ -301,11 +297,10 @@ fn new_rpc_handler(
let sender = group_lock . sender ( ) ;
let msg = group_lock . create_message ( & gid , addr ) ? ;
drop ( group_lock ) ;
tdn ::smol ::spawn ( async move {
tdn ::smol ::Timer ::after ( std ::time ::Duration ::from_secs ( 2 ) ) . await ;
tokio ::spawn ( async move {
tokio ::time ::sleep ( std ::time ::Duration ::from_secs ( 2 ) ) . await ;
let _ = sender . send ( SendMessage ::Group ( gid , msg ) ) . await ;
} )
. detach ( ) ;
} ) ;
}
Ok ( results )
@ -409,7 +404,7 @@ fn new_rpc_handler(
@@ -409,7 +404,7 @@ fn new_rpc_handler(
let groups = group_lock . remove_all_running ( ) ;
let sender = group_lock . sender ( ) ;
drop ( group_lock ) ;
tdn ::smol ::spawn ( sleep_waiting_close_stable ( sender , groups , layers ) ) . detach ( ) ;
tokio ::spawn ( sleep_waiting_close_stable ( sender , groups , layers ) ) ;
Ok ( results )
} ,
@ -455,7 +450,7 @@ fn new_rpc_handler(
@@ -455,7 +450,7 @@ fn new_rpc_handler(
let groups = group_lock . remove_running ( & gid ) ;
let sender = group_lock . sender ( ) ;
drop ( group_lock ) ;
tdn ::smol ::spawn ( sleep_waiting_close_stable ( sender , groups , layers ) ) . detach ( ) ;
tokio ::spawn ( sleep_waiting_close_stable ( sender , groups , layers ) ) ;
debug ! ( "Account Offline: {}." , gid . to_hex ( ) ) ;
// add Remove Group to TDN.