From c3b1ce83fc0028757aa5d288b5d47100a3a55fd0 Mon Sep 17 00:00:00 2001 From: Sun Date: Sat, 5 Mar 2022 15:41:03 +0800 Subject: [PATCH] upgrade device & files --- lib/apps/device/page.dart | 92 +++++++++++-------------- lib/apps/device/provider.dart | 2 +- src/apps.rs | 22 +++--- src/apps/cloud/layer.rs | 17 ++--- src/apps/cloud/mod.rs | 24 ------- src/apps/cloud/rpc.rs | 9 ++- src/apps/device/mod.rs | 4 +- src/apps/device/models.rs | 26 ------- src/apps/device/rpc.rs | 89 ++++++++---------------- src/apps/domain/layer.rs | 54 +++++++-------- src/apps/domain/mod.rs | 24 ------- src/apps/domain/models.rs | 2 +- src/apps/domain/rpc.rs | 124 +++++++++++++++++++--------------- src/apps/file/models.rs | 2 +- src/apps/file/rpc.rs | 77 +++++++++++++-------- src/group.rs | 60 ++++++++-------- 16 files changed, 263 insertions(+), 365 deletions(-) diff --git a/lib/apps/device/page.dart b/lib/apps/device/page.dart index d6fbc0f..f26d64d 100644 --- a/lib/apps/device/page.dart +++ b/lib/apps/device/page.dart @@ -275,21 +275,22 @@ class DeviceListenPage extends StatefulWidget { class _DeviceListenPageState extends State { Widget percentWidget(double cpu_p, String cpu_u, double radius, Color color) { return Container( - width: radius + 10, + margin: const EdgeInsets.symmetric(vertical: 10.0), + width: radius * 2, alignment: Alignment.center, child: CircularPercentIndicator( radius: radius, lineWidth: 16.0, animation: true, percent: cpu_p/100, - center: Text("${cpu_p}%", - style: TextStyle(fontWeight: FontWeight.bold, fontSize: 20.0), - ), - footer: Padding( - padding: const EdgeInsets.only(top: 8.0, bottom: 32.0), - child: Text(cpu_u, - style: TextStyle(fontWeight: FontWeight.bold, fontSize: 17.0), - ), + center: Column( + mainAxisAlignment: MainAxisAlignment.center, + crossAxisAlignment: CrossAxisAlignment.center, + children: [ + Text("${cpu_p}%", style: TextStyle(fontWeight: FontWeight.bold, fontSize: 20.0)), + const SizedBox(height: 4.0), + Text(cpu_u) + ] ), circularStrokeCap: CircularStrokeCap.round, progressColor: color, @@ -305,17 +306,28 @@ class _DeviceListenPageState extends State { final status = context.watch().status; final uptimes = status.uptime.uptime(); - double radius = MediaQuery.of(context).size.width / 2 - 40; - if (radius > 150) { - radius = 150; + double radius = MediaQuery.of(context).size.width / 4 - 10; + if (radius > 100) { + radius = 100; + } else { + radius = MediaQuery.of(context).size.width / 2 - 50; + if (radius > 99) { + radius = 99; + } } - double height = MediaQuery.of(context).size.height / 2 - radius - 60; - if (height < 16) { - height = 16; - } else if (!isDesktop) { - height = 32; - } + final w1 = percentWidget( + status.cpu_p(), "CPU: ${status.cpu_u()} cores", radius, Color(0xFF6174FF), + ); + final w2 = percentWidget( + status.memory_p(), "${lang.memory}: ${status.memory_u()}", radius, Colors.blue, + ); + final w3 = percentWidget( + status.swap_p(), "${lang.swap}: ${status.memory_u()}", radius, Colors.green, + ); + final w4 = percentWidget( + status.disk_p(), "${lang.disk}: ${status.disk_u()}", radius, Colors.purple, + ); return Scaffold( body: SafeArea( @@ -350,47 +362,21 @@ class _DeviceListenPageState extends State { const SizedBox(height: 10.0), ] ), - SizedBox(height: height), + const SizedBox(height: 20.0), Expanded( child: SingleChildScrollView( child: Column( mainAxisAlignment: MainAxisAlignment.center, - children: [ - Row( - mainAxisAlignment: MainAxisAlignment.spaceEvenly, - children: [ - percentWidget( - status.cpu_p(), - "CPU: ${status.cpu_u()} cores", - radius, - Color(0xFF6174FF), - ), - percentWidget( - status.memory_p(), - "${lang.memory}: ${status.memory_u()}", - radius, - Colors.blue, - ), - ] + children: + radius == 100 ? [ + Row(mainAxisAlignment: MainAxisAlignment.spaceEvenly, + children: [w1, w2] ), - Row( - mainAxisAlignment: MainAxisAlignment.spaceEvenly, - children: [ - percentWidget( - status.swap_p(), - "${lang.swap}: ${status.memory_u()}", - radius, - Colors.green, - ), - percentWidget( - status.disk_p(), - "${lang.disk}: ${status.disk_u()}", - radius, - Colors.purple, - ), - ] + const SizedBox(height: 40.0), + Row(mainAxisAlignment: MainAxisAlignment.spaceEvenly, + children: [w3, w4] ), - ] + ] : [w1, w2, w3, w4] ), ) ) diff --git a/lib/apps/device/provider.dart b/lib/apps/device/provider.dart index 0e67091..0e9f0f5 100644 --- a/lib/apps/device/provider.dart +++ b/lib/apps/device/provider.dart @@ -32,7 +32,7 @@ class DeviceProvider extends ChangeNotifier { this.clear(); // load status. - rpc.send('device-status', [this.devices[id]!.addr]); + rpc.send('device-status', [id]); } connect(String addr) { diff --git a/src/apps.rs b/src/apps.rs index 955605f..f4ed69a 100644 --- a/src/apps.rs +++ b/src/apps.rs @@ -17,25 +17,25 @@ use crate::rpc::session_lost; use crate::storage::group_db; pub(crate) mod chat; -//pub(crate) mod cloud; +pub(crate) mod cloud; pub(crate) mod device; -//pub(crate) mod domain; -//pub(crate) mod file; +pub(crate) mod domain; +pub(crate) mod file; pub(crate) mod group; pub(crate) mod jarvis; -//pub(crate) mod dao; pub(crate) mod wallet; +//pub(crate) mod dao; pub(crate) fn app_rpc_inject(handler: &mut RpcHandler) { - //device::new_rpc_handler(handler); + device::new_rpc_handler(handler); chat::new_rpc_handler(handler); jarvis::new_rpc_handler(handler); - //domain::new_rpc_handler(handler); - //file::new_rpc_handler(handler); + domain::new_rpc_handler(handler); + file::new_rpc_handler(handler); group::new_rpc_handler(handler); wallet::new_rpc_handler(handler); + cloud::new_rpc_handler(handler); //dao::new_rpc_handler(handler); - //cloud::new_rpc_handler(handler); } pub(crate) async fn app_layer_handle( @@ -48,9 +48,9 @@ pub(crate) async fn app_layer_handle( match (fgid, tgid) { (CHAT_ID, 0) | (0, CHAT_ID) => chat::handle(msg, global).await, (GROUP_CHAT_ID, 0) | (0, GROUP_CHAT_ID) => group::handle(msg, global).await, - (DAO_ID, 0) => chat::handle(msg, global).await, - (DOMAIN_ID, 0) => chat::handle(msg, global).await, - (CLOUD_ID, 0) => chat::handle(msg, global).await, + (DOMAIN_ID, 0) | (0, DOMAIN_ID) => domain::handle(msg, global).await, + (CLOUD_ID, 0) | (0, CLOUD_ID) => cloud::handle(msg, global).await, + (DAO_ID, 0) | (0, DAO_ID) => chat::handle(msg, global).await, _ => match msg { RecvType::Leave(peer) => { debug!("Peer leaved: {}", peer.id.to_hex()); diff --git a/src/apps/cloud/layer.rs b/src/apps/cloud/layer.rs index 5330c33..771ff8c 100644 --- a/src/apps/cloud/layer.rs +++ b/src/apps/cloud/layer.rs @@ -1,20 +1,13 @@ +use cloud_types::LayerServerEvent; use std::sync::Arc; use tdn::types::{ - group::GroupId, message::RecvType, - primitive::{HandleResult, Result}, + primitives::{HandleResult, Result}, }; -use tokio::sync::RwLock; - -use cloud_types::LayerServerEvent; -use crate::layer::Layer; +use crate::global::Global; -pub(crate) async fn handle( - _layer: &Arc>, - _ogid: GroupId, - msg: RecvType, -) -> Result { +pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result { let results = HandleResult::new(); match msg { @@ -26,7 +19,7 @@ pub(crate) async fn handle( info!("cloud message nerver to here.") } RecvType::Event(_addr, bytes) => { - let LayerServerEvent(_event, _proof) = bincode::deserialize(&bytes)?; + let LayerServerEvent(_event) = bincode::deserialize(&bytes)?; } RecvType::Delivery(_t, _tid, _is_ok) => { // MAYBE diff --git a/src/apps/cloud/mod.rs b/src/apps/cloud/mod.rs index 57f6e6b..3d318ee 100644 --- a/src/apps/cloud/mod.rs +++ b/src/apps/cloud/mod.rs @@ -1,30 +1,6 @@ mod layer; mod models; -pub use cloud_types::CLOUD_ID as GROUP_ID; -use cloud_types::{LayerPeerEvent, PeerEvent}; -use tdn::types::{ - group::GroupId, - message::SendType, - primitive::{HandleResult, PeerId, Result}, -}; -use tdn_did::Proof; - -/// Send to domain service. -#[inline] -pub(crate) fn add_layer( - results: &mut HandleResult, - addr: PeerId, - event: PeerEvent, - ogid: GroupId, -) -> Result<()> { - let proof = Proof::default(); - let data = bincode::serialize(&LayerPeerEvent(event, proof))?; - let s = SendType::Event(0, addr, data); - results.layers.push((ogid, GROUP_ID, s)); - Ok(()) -} - pub(crate) mod rpc; pub(crate) use layer::handle; pub(crate) use rpc::new_rpc_handler; diff --git a/src/apps/cloud/rpc.rs b/src/apps/cloud/rpc.rs index 8669e6f..b8aab6f 100644 --- a/src/apps/cloud/rpc.rs +++ b/src/apps/cloud/rpc.rs @@ -1,16 +1,15 @@ use std::sync::Arc; use tdn::types::{ - group::GroupId, - primitive::HandleResult, + primitives::HandleResult, rpc::{json, RpcHandler, RpcParam}, }; -use crate::rpc::RpcState; +use crate::global::Global; -pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "cloud-echo", - |_gid: GroupId, params: Vec, _state: Arc| async move { + |params: Vec, _state: Arc| async move { Ok(HandleResult::rpc(json!(params))) }, ); diff --git a/src/apps/device/mod.rs b/src/apps/device/mod.rs index d1076b0..32f135b 100644 --- a/src/apps/device/mod.rs +++ b/src/apps/device/mod.rs @@ -1,5 +1,5 @@ mod models; -//pub(crate) mod rpc; +pub(crate) mod rpc; pub(crate) use models::Device; -//pub(crate) use rpc::new_rpc_handler; +pub(crate) use rpc::new_rpc_handler; diff --git a/src/apps/device/models.rs b/src/apps/device/models.rs index 21705c2..69fe73e 100644 --- a/src/apps/device/models.rs +++ b/src/apps/device/models.rs @@ -63,32 +63,6 @@ impl Device { Ok(devices) } - pub fn distributes(db: &DStorage) -> Result> { - let matrix = db.query("SELECT id, peer FROM devices")?; - let mut devices = vec![]; - for mut v in matrix { - if v.len() == 3 { - let peer = Peer::from_string(v.pop().unwrap().as_str()).unwrap_or(Peer::default()); - let id = v.pop().unwrap().as_i64(); - devices.push((peer, id, false)); - } - } - Ok(devices) - } - - pub fn device_info(db: &DStorage) -> Result<(String, String)> { - let mut matrix = db.query("SELECT name, info FROM devices ORDER BY id LIMIT 1")?; - if matrix.len() > 0 { - let mut values = matrix.pop().unwrap(); // safe unwrap() - if values.len() == 2 { - let info = values.pop().unwrap().as_string(); - let name = values.pop().unwrap().as_string(); - return Ok((name, info)); - } - } - Ok((String::new(), String::new())) - } - pub fn insert(&mut self, db: &DStorage) -> Result<()> { let sql = format!( "INSERT INTO devices (name, info, peer, lasttime) VALUES ('{}', '{}', '{}', {})", diff --git a/src/apps/device/rpc.rs b/src/apps/device/rpc.rs index a1f30d5..eb3bd67 100644 --- a/src/apps/device/rpc.rs +++ b/src/apps/device/rpc.rs @@ -1,39 +1,37 @@ use std::sync::Arc; use tdn::types::{ - group::GroupId, - primitive::{HandleResult, Peer, PeerId}, + primitives::HandleResult, rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, }; -use crate::group::GroupEvent; -use crate::rpc::RpcState; +use crate::global::Global; +//use crate::group::GroupEvent; use crate::utils::device_status::device_status as local_device_status; use super::Device; #[inline] -pub(crate) fn device_create(mgid: PeerId, device: &Device) -> RpcParam { - rpc_response(0, "device-create", json!(device.to_rpc()), mgid) +pub(crate) fn device_create(device: &Device) -> RpcParam { + rpc_response(0, "device-create", json!(device.to_rpc())) } #[inline] -pub(crate) fn _device_remove(mgid: PeerId, id: i64) -> RpcParam { - rpc_response(0, "device-remove", json!([id]), mgid) +pub(crate) fn _device_remove(id: i64) -> RpcParam { + rpc_response(0, "device-remove", json!([id])) } #[inline] -pub(crate) fn device_online(mgid: PeerId, id: i64) -> RpcParam { - rpc_response(0, "device-online", json!([id]), mgid) +pub(crate) fn device_online(id: i64) -> RpcParam { + rpc_response(0, "device-online", json!([id])) } #[inline] -pub(crate) fn device_offline(mgid: PeerId, id: i64) -> RpcParam { - rpc_response(0, "device-offline", json!([id]), mgid) +pub(crate) fn device_offline(id: i64) -> RpcParam { + rpc_response(0, "device-offline", json!([id])) } #[inline] pub(crate) fn device_status( - mgid: PeerId, cpu: u32, memory: u32, swap: u32, @@ -48,12 +46,11 @@ pub(crate) fn device_status( 0, "device-status", json!([cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]), - mgid, ) } #[inline] -fn device_list(devices: Vec) -> RpcParam { +fn device_list(devices: &[Device]) -> RpcParam { let mut results = vec![]; for device in devices { results.push(device.to_rpc()); @@ -61,30 +58,27 @@ fn device_list(devices: Vec) -> RpcParam { json!(results) } -pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { - handler.add_method("device-echo", |_, params, _| async move { +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("device-echo", |params, _| async move { Ok(HandleResult::rpc(json!(params))) }); handler.add_method( "device-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let db = state.group.read().await.consensus_db(&gid)?; - let devices = Device::list(&db)?; - drop(db); - let online_devices = state.group.read().await.online_devices(&gid, devices); - Ok(HandleResult::rpc(device_list(online_devices))) + |_params: Vec, state: Arc| async move { + let devices = &state.group.read().await.distributes; + Ok(HandleResult::rpc(device_list(devices))) }, ); handler.add_method( "device-status", - |gid: GroupId, params: Vec, state: Arc| async move { - let addr = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?; + |params: Vec, state: Arc| async move { + let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let group_lock = state.group.read().await; - if &addr == group_lock.addr() { - let uptime = group_lock.uptime(&gid)?; + if id == group_lock.device()?.id { + let uptime = group_lock.uptime; let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) = local_device_status(); return Ok(HandleResult::rpc(json!([ @@ -93,47 +87,24 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { } drop(group_lock); - let msg = state - .group - .write() - .await - .event_message(addr, &GroupEvent::StatusRequest)?; - - Ok(HandleResult::group(gid, msg)) - }, - ); - - handler.add_method( - "device-create", - |gid: GroupId, params: Vec, state: Arc| async move { - let addr = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?; - - let msg = state - .group - .read() - .await - .create_message(&gid, Peer::peer(addr))?; - Ok(HandleResult::group(gid, msg)) + //let msg = state.group.write().await.event_message(addr, &GroupEvent::StatusRequest)?; + //Ok(HandleResult::group(msg)) + Ok(HandleResult::new()) }, ); handler.add_method( - "device-connect", - |gid: GroupId, params: Vec, state: Arc| async move { - let addr = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?; - - let msg = state - .group - .read() - .await - .connect_message(&gid, Peer::peer(addr))?; - Ok(HandleResult::group(gid, msg)) + "device-search", + |_params: Vec, state: Arc| async move { + //let msg = state.group.read().await.create_message(&gid, Peer::peer(addr))?; + //Ok(HandleResult::group(gid, msg)) + Ok(HandleResult::new()) }, ); handler.add_method( "device-delete", - |_gid: GroupId, params: Vec, _state: Arc| async move { + |params: Vec, _state: Arc| async move { let _id = params[0].as_i64().ok_or(RpcError::ParseError)?; // TODO delete a device. Ok(HandleResult::new()) diff --git a/src/apps/domain/layer.rs b/src/apps/domain/layer.rs index e4d845d..77c4667 100644 --- a/src/apps/domain/layer.rs +++ b/src/apps/domain/layer.rs @@ -1,23 +1,17 @@ +use domain_types::LayerServerEvent; use std::sync::Arc; use tdn::types::{ - group::GroupId, message::RecvType, - primitive::{HandleResult, Result}, + primitives::{HandleResult, Result}, }; -use tokio::sync::RwLock; -use domain_types::{LayerServerEvent, ServerEvent}; - -use crate::layer::Layer; +use crate::global::Global; +use crate::storage::domain_db; use super::models::{Name, Provider}; use super::rpc; -pub(crate) async fn handle( - layer: &Arc>, - ogid: GroupId, - msg: RecvType, -) -> Result { +pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result { let mut results = HandleResult::new(); match msg { @@ -30,17 +24,19 @@ pub(crate) async fn handle( } RecvType::Event(addr, bytes) => { // server & client handle it. - let LayerServerEvent(event, _proof) = bincode::deserialize(&bytes)?; + let event: LayerServerEvent = bincode::deserialize(&bytes)?; - let db = layer.read().await.group.read().await.domain_db(&ogid)?; + let pid = global.pid().await; + let db_key = global.group.read().await.db_key(&pid)?; + let db = domain_db(&global.base, &pid, &db_key)?; match event { - ServerEvent::Status(name, support_request) => { + LayerServerEvent::Status(name, support_request) => { let mut provider = Provider::get_by_addr(&db, &addr)?; provider.ok(&db, name, support_request)?; - results.rpcs.push(rpc::add_provider(ogid, &provider)); + results.rpcs.push(rpc::add_provider(&provider)); } - ServerEvent::Result(name, is_ok) => { + LayerServerEvent::Result(name, is_ok) => { let provider = Provider::get_by_addr(&db, &addr)?; let mut user = Name::get_by_name_provider(&db, &name, &provider.id)?; @@ -48,39 +44,39 @@ pub(crate) async fn handle( Name::active(&db, &user.id, true)?; user.is_ok = true; user.is_actived = true; - results.rpcs.push(rpc::register_success(ogid, &user)); + results.rpcs.push(rpc::register_success(&user)); } else { user.delete(&db)?; - results.rpcs.push(rpc::register_failure(ogid, &name)); + results.rpcs.push(rpc::register_failure(&name)); } } - ServerEvent::Info(uname, ugid, uaddr, ubio, uavatar) => { - results.rpcs.push(rpc::search_result( - ogid, &uname, &ugid, &uaddr, &ubio, &uavatar, - )); + LayerServerEvent::Info(upid, uname, ubio, uavatar) => { + results + .rpcs + .push(rpc::search_result(&upid, &uname, &ubio, &uavatar)); } - ServerEvent::None(uname) => { - results.rpcs.push(rpc::search_none(ogid, &uname)); + LayerServerEvent::None(uname) => { + results.rpcs.push(rpc::search_none(&uname)); } - ServerEvent::Actived(uname, is_actived) => { + LayerServerEvent::Actived(uname, is_actived) => { let provider = Provider::get_by_addr(&db, &addr)?; let name = Name::get_by_name_provider(&db, &uname, &provider.id)?; Name::active(&db, &name.id, is_actived)?; let ps = Provider::list(&db)?; let names = Name::list(&db)?; - results.rpcs.push(rpc::domain_list(ogid, &ps, &names)); + results.rpcs.push(rpc::domain_list(&ps, &names)); } - ServerEvent::Deleted(uname) => { + LayerServerEvent::Deleted(uname) => { let provider = Provider::get_by_addr(&db, &addr)?; let name = Name::get_by_name_provider(&db, &uname, &provider.id)?; name.delete(&db)?; let ps = Provider::list(&db)?; let names = Name::list(&db)?; - results.rpcs.push(rpc::domain_list(ogid, &ps, &names)); + results.rpcs.push(rpc::domain_list(&ps, &names)); } - ServerEvent::Response(_ugid, _uname, _is_ok) => {} + LayerServerEvent::Response(_ugid, _uname, _is_ok) => {} } } RecvType::Delivery(_t, _tid, _is_ok) => { diff --git a/src/apps/domain/mod.rs b/src/apps/domain/mod.rs index b4b5ee5..3d318ee 100644 --- a/src/apps/domain/mod.rs +++ b/src/apps/domain/mod.rs @@ -1,30 +1,6 @@ mod layer; mod models; -pub use domain_types::DOMAIN_ID as GROUP_ID; -use domain_types::{LayerPeerEvent, PeerEvent}; -use tdn::types::{ - group::GroupId, - message::SendType, - primitive::{HandleResult, PeerId, Result}, -}; -use tdn_did::Proof; - -/// Send to domain service. -#[inline] -pub(crate) fn add_layer( - results: &mut HandleResult, - addr: PeerId, - event: PeerEvent, - ogid: GroupId, -) -> Result<()> { - let proof = Proof::default(); - let data = bincode::serialize(&LayerPeerEvent(event, proof))?; - let s = SendType::Event(0, addr, data); - results.layers.push((ogid, GROUP_ID, s)); - Ok(()) -} - pub(crate) mod rpc; pub(crate) use layer::handle; pub(crate) use rpc::new_rpc_handler; diff --git a/src/apps/domain/models.rs b/src/apps/domain/models.rs index a7684ec..3a79ed0 100644 --- a/src/apps/domain/models.rs +++ b/src/apps/domain/models.rs @@ -1,5 +1,5 @@ use tdn::types::{ - primitive::{PeerId, Result}, + primitives::{PeerId, Result}, rpc::{json, RpcParam}, }; use tdn_storage::local::{DStorage, DsValue}; diff --git a/src/apps/domain/rpc.rs b/src/apps/domain/rpc.rs index b6b7791..62dc6b8 100644 --- a/src/apps/domain/rpc.rs +++ b/src/apps/domain/rpc.rs @@ -1,56 +1,47 @@ +use domain_types::{LayerPeerEvent, DOMAIN_ID}; +use esse_primitives::id_to_str; use std::sync::Arc; use tdn::types::{ - group::GroupId, - primitive::{HandleResult, PeerId}, + message::SendType, + primitives::{HandleResult, PeerId}, rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, }; -use domain_types::PeerEvent; +use crate::global::Global; +use crate::storage::domain_db; -use super::{ - add_layer, - models::{Name, Provider}, -}; -use crate::rpc::RpcState; +use super::models::{Name, Provider}; #[inline] -pub(crate) fn add_provider(mgid: GroupId, provider: &Provider) -> RpcParam { - rpc_response(0, "domain-provider-add", json!(provider.to_rpc()), mgid) +pub(crate) fn add_provider(provider: &Provider) -> RpcParam { + rpc_response(0, "domain-provider-add", json!(provider.to_rpc())) } #[inline] -pub(crate) fn register_success(mgid: GroupId, name: &Name) -> RpcParam { - rpc_response(0, "domain-register-success", json!(name.to_rpc()), mgid) +pub(crate) fn register_success(name: &Name) -> RpcParam { + rpc_response(0, "domain-register-success", json!(name.to_rpc())) } #[inline] -pub(crate) fn register_failure(mgid: GroupId, name: &str) -> RpcParam { - rpc_response(0, "domain-register-failure", json!([name]), mgid) +pub(crate) fn register_failure(name: &str) -> RpcParam { + rpc_response(0, "domain-register-failure", json!([name])) } #[inline] -pub(crate) fn domain_list(mgid: GroupId, providers: &[Provider], names: &[Name]) -> RpcParam { +pub(crate) fn domain_list(providers: &[Provider], names: &[Name]) -> RpcParam { let providers: Vec = providers.iter().map(|p| p.to_rpc()).collect(); let names: Vec = names.iter().map(|p| p.to_rpc()).collect(); - rpc_response(0, "domain-list", json!([providers, names]), mgid) + rpc_response(0, "domain-list", json!([providers, names])) } #[inline] -pub(crate) fn search_result( - mgid: GroupId, - name: &str, - gid: &GroupId, - addr: &PeerId, - bio: &str, - avatar: &Vec, -) -> RpcParam { +pub(crate) fn search_result(pid: &PeerId, name: &str, bio: &str, avatar: &Vec) -> RpcParam { rpc_response( 0, "domain-search", json!([ name, - gid.to_hex(), - addr.to_hex(), + id_to_str(pid), bio, if avatar.len() > 0 { base64::encode(avatar) @@ -58,20 +49,21 @@ pub(crate) fn search_result( "".to_owned() } ]), - mgid, ) } #[inline] -pub(crate) fn search_none(mgid: GroupId, name: &str) -> RpcParam { - rpc_response(0, "domain-search", json!([name]), mgid) +pub(crate) fn search_none(name: &str) -> RpcParam { + rpc_response(0, "domain-search", json!([name])) } -pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "domain-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let db = state.group.read().await.domain_db(&gid)?; + |_params: Vec, state: Arc| async move { + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = domain_db(&state.base, &pid, &db_key)?; // list providers. let providers: Vec = @@ -86,25 +78,33 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "domain-provider-add", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let provider = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?; let mut results = HandleResult::new(); - let db = state.group.read().await.domain_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = domain_db(&state.base, &pid, &db_key)?; + let mut p = Provider::prepare(provider); p.insert(&db)?; - add_layer(&mut results, provider, PeerEvent::Check, gid)?; + let data = bincode::serialize(&LayerPeerEvent::Check)?; + let msg = SendType::Event(0, provider, data); + results.layers.push((DOMAIN_ID, msg)); Ok(results) }, ); handler.add_method( "domain-provider-default", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; - let db = state.group.read().await.domain_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = domain_db(&state.base, &pid, &db_key)?; + let provider = Provider::get(&db, &id)?; if let Ok(default) = Provider::get_default(&db) { if default.id == provider.id { @@ -120,10 +120,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "domain-provider-remove", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; - let db = state.group.read().await.domain_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = domain_db(&state.base, &pid, &db_key)?; + let names = Name::get_by_provider(&db, &id)?; if names.len() == 0 { Provider::delete(&db, &id)?; @@ -135,71 +138,80 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "domain-register", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let provider = params[0].as_i64().ok_or(RpcError::ParseError)?; let addr = PeerId::from_hex(params[1].as_str().ok_or(RpcError::ParseError)?)?; let name = params[2].as_str().ok_or(RpcError::ParseError)?.to_string(); let bio = params[3].as_str().ok_or(RpcError::ParseError)?.to_string(); - let me = state.group.read().await.clone_user(&gid)?; - // save to db. let mut results = HandleResult::new(); - let db = state.group.read().await.domain_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = domain_db(&state.base, &pid, &db_key)?; + + let me = state.group.read().await.clone_user(&pid)?; + let mut u = Name::prepare(name, bio, provider); u.insert(&db)?; // send to server. - let event = PeerEvent::Register(u.name, u.bio, me.avatar); - add_layer(&mut results, addr, event, gid)?; + let data = bincode::serialize(&LayerPeerEvent::Register(u.name, u.bio, me.avatar))?; + let msg = SendType::Event(0, addr, data); + results.layers.push((DOMAIN_ID, msg)); Ok(results) }, ); handler.add_method( "domain-active", - |gid: GroupId, params: Vec, _state: Arc| async move { + |params: Vec, _state: Arc| async move { let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned(); let provider = PeerId::from_hex(params[1].as_str().ok_or(RpcError::ParseError)?)?; let active = params[2].as_bool().ok_or(RpcError::ParseError)?; let mut results = HandleResult::new(); let event = if active { - PeerEvent::Active(name) + LayerPeerEvent::Active(name) } else { - PeerEvent::Suspend(name) + LayerPeerEvent::Suspend(name) }; - add_layer(&mut results, provider, event, gid)?; + let data = bincode::serialize(&event)?; + let msg = SendType::Event(0, provider, data); + results.layers.push((DOMAIN_ID, msg)); Ok(results) }, ); handler.add_method( "domain-remove", - |gid: GroupId, params: Vec, _state: Arc| async move { + |params: Vec, _state: Arc| async move { let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned(); let provider = PeerId::from_hex(params[1].as_str().ok_or(RpcError::ParseError)?)?; let mut results = HandleResult::new(); - let event = PeerEvent::Delete(name); - add_layer(&mut results, provider, event, gid)?; - + let event = LayerPeerEvent::Delete(name); + let data = bincode::serialize(&event)?; + let msg = SendType::Event(0, provider, data); + results.layers.push((DOMAIN_ID, msg)); Ok(results) }, ); handler.add_method( "domain-search", - |gid: GroupId, params: Vec, _state: Arc| async move { + |params: Vec, _state: Arc| async move { let addr = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?; let name = params[1].as_str().ok_or(RpcError::ParseError)?.to_owned(); let mut results = HandleResult::new(); // send to server. - let event = PeerEvent::Search(name); - add_layer(&mut results, addr, event, gid)?; + let event = LayerPeerEvent::Search(name); + let data = bincode::serialize(&event)?; + let msg = SendType::Event(0, addr, data); + results.layers.push((DOMAIN_ID, msg)); Ok(results) }, ); diff --git a/src/apps/file/models.rs b/src/apps/file/models.rs index 4c974c8..f2f6fb0 100644 --- a/src/apps/file/models.rs +++ b/src/apps/file/models.rs @@ -2,7 +2,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use std::time::{SystemTime, UNIX_EPOCH}; use tdn::types::{ - primitive::Result, + primitives::Result, rpc::{json, RpcParam}, }; use tdn_storage::local::{DStorage, DsValue}; diff --git a/src/apps/file/rpc.rs b/src/apps/file/rpc.rs index 4cebc68..15024e6 100644 --- a/src/apps/file/rpc.rs +++ b/src/apps/file/rpc.rs @@ -1,28 +1,30 @@ use std::path::PathBuf; use std::sync::Arc; use tdn::types::{ - group::GroupId, - primitive::HandleResult, + primitives::HandleResult, rpc::{json, RpcError, RpcHandler, RpcParam}, }; -use crate::rpc::RpcState; -use crate::storage::{copy_file, write_file}; +use crate::global::Global; +use crate::storage::{copy_file, file_db, write_file}; use super::models::{File, RootDirectory}; -pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { - handler.add_method("dc-echo", |_, params, _| async move { +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("dc-echo", |params, _| async move { Ok(HandleResult::rpc(json!(params))) }); handler.add_method( "dc-list", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let root = RootDirectory::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let parent = params[1].as_i64().ok_or(RpcError::ParseError)?; - let db = state.group.read().await.file_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = file_db(&state.base, &pid, &db_key)?; + let files: Vec = File::list(&db, &root, &parent)? .iter() .map(|p| p.to_rpc()) @@ -34,28 +36,28 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "dc-file-create", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let root = RootDirectory::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let parent = params[1].as_i64().ok_or(RpcError::ParseError)?; let name = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned(); - let group_lock = state.group.read().await; - let base = group_lock.base().clone(); - let db = group_lock.file_db(&gid)?; - drop(group_lock); + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = file_db(&state.base, &pid, &db_key)?; + // genereate new file. let mut file = File::generate(root, parent, name); file.insert(&db)?; // create file on disk. - let _ = write_file(&base, &gid, &file.storage_name(), &[]).await?; + let _ = write_file(&state.base, &pid, &file.storage_name(), &[]).await?; Ok(HandleResult::rpc(file.to_rpc())) }, ); handler.add_method( "dc-file-upload", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let root = RootDirectory::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let parent = params[1].as_i64().ok_or(RpcError::ParseError)?; let path = params[2].as_str().ok_or(RpcError::ParseError)?; @@ -68,13 +70,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { .ok_or(RpcError::ParseError)? .to_owned(); - let group_lock = state.group.read().await; - let base = group_lock.base().clone(); - let db = group_lock.file_db(&gid)?; - drop(group_lock); + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = file_db(&state.base, &pid, &db_key)?; + let mut file = File::generate(root, parent, name); file.insert(&db)?; - copy_file(&file_path, &base, &gid, &file.storage_name()).await?; + copy_file(&file_path, &state.base, &pid, &file.storage_name()).await?; Ok(HandleResult::rpc(file.to_rpc())) }, @@ -82,13 +84,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "dc-folder-create", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let root = RootDirectory::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let parent = params[1].as_i64().ok_or(RpcError::ParseError)?; let name = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned(); // create new folder. - let db = state.group.read().await.file_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = file_db(&state.base, &pid, &db_key)?; + let mut file = File::generate(root, parent, name); file.insert(&db)?; @@ -98,13 +103,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "dc-file-update", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let root = RootDirectory::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?); let parent = params[2].as_i64().ok_or(RpcError::ParseError)?; let name = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned(); - let db = state.group.read().await.file_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = file_db(&state.base, &pid, &db_key)?; + let mut file = File::get(&db, &id)?; file.root = root; file.parent = parent; @@ -117,11 +125,14 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "dc-file-star", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let starred = params[1].as_bool().ok_or(RpcError::ParseError)?; - let db = state.group.read().await.file_db(&gid)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = file_db(&state.base, &pid, &db_key)?; + File::star(&db, &id, starred)?; Ok(HandleResult::new()) }, @@ -129,12 +140,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "dc-file-trash", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = file_db(&state.base, &pid, &db_key)?; + // TODO trash a directory. - let db = state.group.read().await.file_db(&gid)?; File::trash(&db, &id)?; Ok(HandleResult::new()) }, @@ -142,12 +156,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "dc-file-delete", - |gid: GroupId, params: Vec, state: Arc| async move { + |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; + let pid = state.pid().await; + let db_key = state.group.read().await.db_key(&pid)?; + let db = file_db(&state.base, &pid, &db_key)?; + // TODO deleted file & directory. - let db = state.group.read().await.file_db(&gid)?; File::delete(&db, &id)?; Ok(HandleResult::new()) }, diff --git a/src/group.rs b/src/group.rs index 1999252..2cb9c91 100644 --- a/src/group.rs +++ b/src/group.rs @@ -30,12 +30,8 @@ pub(crate) struct Group { pub accounts: HashMap, /// current account secret keypair. pub keypair: PeerKey, - /// current account device's name. - pub device_name: String, - /// current account device's info. - pub device_info: String, /// current account distribute connected devices. - pub distributes: Vec<(Peer, i64, bool)>, + pub distributes: Vec, /// current account uptime pub uptime: u32, } @@ -238,8 +234,6 @@ impl Group { Group { accounts, keypair: PeerKey::default(), - device_name: String::new(), - device_info: String::new(), distributes: vec![], uptime: 0, } @@ -254,25 +248,25 @@ impl Group { Ok(self.account(pid)?.plainkey()) } - pub fn online(&mut self, peer: &Peer) -> Result { - for i in self.distributes.iter_mut() { - if &i.0 == peer { - i.2 = true; - return Ok(i.1); - } - } - Err(anyhow!("missing distribute device")) - } - - pub fn offline(&mut self, peer: &Peer) -> Result { - for i in self.distributes.iter_mut() { - if &i.0 == peer { - i.2 = false; - return Ok(i.1); - } - } - Err(anyhow!("missing distribute device")) - } + // pub fn online(&mut self, peer: &Peer) -> Result { + // for i in self.distributes.iter_mut() { + // if &i.0 == peer { + // i.2 = true; + // return Ok(i.1); + // } + // } + // Err(anyhow!("missing distribute device")) + // } + + // pub fn offline(&mut self, peer: &Peer) -> Result { + // for i in self.distributes.iter_mut() { + // if &i.0 == peer { + // i.2 = false; + // return Ok(i.1); + // } + // } + // Err(anyhow!("missing distribute device")) + // } pub fn check_lock(&self, pid: &PeerId, lock: &str) -> bool { if let Some(account) = self.accounts.get(pid) { @@ -427,12 +421,8 @@ impl Group { self.keypair = keypair; let db = consensus_db(base, pid, &self.db_key(pid)?)?; - self.distributes = Device::distributes(&db)?; - - let (device_name, device_info) = Device::device_info(&db)?; + self.distributes = Device::list(&db)?; db.close()?; - self.device_name = device_name; - self.device_info = device_info; let start = SystemTime::now(); self.uptime = start @@ -547,6 +537,14 @@ impl Group { account_db.close() } + pub fn device(&self) -> Result<&Device> { + if self.distributes.len() > 0 { + Ok(&self.distributes[0]) + } else { + Err(anyhow!("no devices")) + } + } + // pub fn create_message(&self, pid: &PeerId, addr: Peer) -> Result { // let user = self.clone_user(pid)?; // let account = self.account(pid)?;