Browse Source

upgrade device & files

pull/18/head
Sun 3 years ago
parent
commit
c3b1ce83fc
  1. 92
      lib/apps/device/page.dart
  2. 2
      lib/apps/device/provider.dart
  3. 22
      src/apps.rs
  4. 17
      src/apps/cloud/layer.rs
  5. 24
      src/apps/cloud/mod.rs
  6. 9
      src/apps/cloud/rpc.rs
  7. 4
      src/apps/device/mod.rs
  8. 26
      src/apps/device/models.rs
  9. 89
      src/apps/device/rpc.rs
  10. 54
      src/apps/domain/layer.rs
  11. 24
      src/apps/domain/mod.rs
  12. 2
      src/apps/domain/models.rs
  13. 124
      src/apps/domain/rpc.rs
  14. 2
      src/apps/file/models.rs
  15. 77
      src/apps/file/rpc.rs
  16. 60
      src/group.rs

92
lib/apps/device/page.dart

@ -275,21 +275,22 @@ class DeviceListenPage extends StatefulWidget { @@ -275,21 +275,22 @@ class DeviceListenPage extends StatefulWidget {
class _DeviceListenPageState extends State<DeviceListenPage> {
Widget percentWidget(double cpu_p, String cpu_u, double radius, Color color) {
return Container(
width: radius + 10,
margin: const EdgeInsets.symmetric(vertical: 10.0),
width: radius * 2,
alignment: Alignment.center,
child: CircularPercentIndicator(
radius: radius,
lineWidth: 16.0,
animation: true,
percent: cpu_p/100,
center: Text("${cpu_p}%",
style: TextStyle(fontWeight: FontWeight.bold, fontSize: 20.0),
),
footer: Padding(
padding: const EdgeInsets.only(top: 8.0, bottom: 32.0),
child: Text(cpu_u,
style: TextStyle(fontWeight: FontWeight.bold, fontSize: 17.0),
),
center: Column(
mainAxisAlignment: MainAxisAlignment.center,
crossAxisAlignment: CrossAxisAlignment.center,
children: [
Text("${cpu_p}%", style: TextStyle(fontWeight: FontWeight.bold, fontSize: 20.0)),
const SizedBox(height: 4.0),
Text(cpu_u)
]
),
circularStrokeCap: CircularStrokeCap.round,
progressColor: color,
@ -305,17 +306,28 @@ class _DeviceListenPageState extends State<DeviceListenPage> { @@ -305,17 +306,28 @@ class _DeviceListenPageState extends State<DeviceListenPage> {
final status = context.watch<DeviceProvider>().status;
final uptimes = status.uptime.uptime();
double radius = MediaQuery.of(context).size.width / 2 - 40;
if (radius > 150) {
radius = 150;
double radius = MediaQuery.of(context).size.width / 4 - 10;
if (radius > 100) {
radius = 100;
} else {
radius = MediaQuery.of(context).size.width / 2 - 50;
if (radius > 99) {
radius = 99;
}
}
double height = MediaQuery.of(context).size.height / 2 - radius - 60;
if (height < 16) {
height = 16;
} else if (!isDesktop) {
height = 32;
}
final w1 = percentWidget(
status.cpu_p(), "CPU: ${status.cpu_u()} cores", radius, Color(0xFF6174FF),
);
final w2 = percentWidget(
status.memory_p(), "${lang.memory}: ${status.memory_u()}", radius, Colors.blue,
);
final w3 = percentWidget(
status.swap_p(), "${lang.swap}: ${status.memory_u()}", radius, Colors.green,
);
final w4 = percentWidget(
status.disk_p(), "${lang.disk}: ${status.disk_u()}", radius, Colors.purple,
);
return Scaffold(
body: SafeArea(
@ -350,47 +362,21 @@ class _DeviceListenPageState extends State<DeviceListenPage> { @@ -350,47 +362,21 @@ class _DeviceListenPageState extends State<DeviceListenPage> {
const SizedBox(height: 10.0),
]
),
SizedBox(height: height),
const SizedBox(height: 20.0),
Expanded(
child: SingleChildScrollView(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Row(
mainAxisAlignment: MainAxisAlignment.spaceEvenly,
children: [
percentWidget(
status.cpu_p(),
"CPU: ${status.cpu_u()} cores",
radius,
Color(0xFF6174FF),
),
percentWidget(
status.memory_p(),
"${lang.memory}: ${status.memory_u()}",
radius,
Colors.blue,
),
]
children:
radius == 100 ? [
Row(mainAxisAlignment: MainAxisAlignment.spaceEvenly,
children: [w1, w2]
),
Row(
mainAxisAlignment: MainAxisAlignment.spaceEvenly,
children: [
percentWidget(
status.swap_p(),
"${lang.swap}: ${status.memory_u()}",
radius,
Colors.green,
),
percentWidget(
status.disk_p(),
"${lang.disk}: ${status.disk_u()}",
radius,
Colors.purple,
),
]
const SizedBox(height: 40.0),
Row(mainAxisAlignment: MainAxisAlignment.spaceEvenly,
children: [w3, w4]
),
]
] : [w1, w2, w3, w4]
),
)
)

2
lib/apps/device/provider.dart

@ -32,7 +32,7 @@ class DeviceProvider extends ChangeNotifier { @@ -32,7 +32,7 @@ class DeviceProvider extends ChangeNotifier {
this.clear();
// load status.
rpc.send('device-status', [this.devices[id]!.addr]);
rpc.send('device-status', [id]);
}
connect(String addr) {

22
src/apps.rs

@ -17,25 +17,25 @@ use crate::rpc::session_lost; @@ -17,25 +17,25 @@ use crate::rpc::session_lost;
use crate::storage::group_db;
pub(crate) mod chat;
//pub(crate) mod cloud;
pub(crate) mod cloud;
pub(crate) mod device;
//pub(crate) mod domain;
//pub(crate) mod file;
pub(crate) mod domain;
pub(crate) mod file;
pub(crate) mod group;
pub(crate) mod jarvis;
//pub(crate) mod dao;
pub(crate) mod wallet;
//pub(crate) mod dao;
pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<Global>) {
//device::new_rpc_handler(handler);
device::new_rpc_handler(handler);
chat::new_rpc_handler(handler);
jarvis::new_rpc_handler(handler);
//domain::new_rpc_handler(handler);
//file::new_rpc_handler(handler);
domain::new_rpc_handler(handler);
file::new_rpc_handler(handler);
group::new_rpc_handler(handler);
wallet::new_rpc_handler(handler);
cloud::new_rpc_handler(handler);
//dao::new_rpc_handler(handler);
//cloud::new_rpc_handler(handler);
}
pub(crate) async fn app_layer_handle(
@ -48,9 +48,9 @@ pub(crate) async fn app_layer_handle( @@ -48,9 +48,9 @@ pub(crate) async fn app_layer_handle(
match (fgid, tgid) {
(CHAT_ID, 0) | (0, CHAT_ID) => chat::handle(msg, global).await,
(GROUP_CHAT_ID, 0) | (0, GROUP_CHAT_ID) => group::handle(msg, global).await,
(DAO_ID, 0) => chat::handle(msg, global).await,
(DOMAIN_ID, 0) => chat::handle(msg, global).await,
(CLOUD_ID, 0) => chat::handle(msg, global).await,
(DOMAIN_ID, 0) | (0, DOMAIN_ID) => domain::handle(msg, global).await,
(CLOUD_ID, 0) | (0, CLOUD_ID) => cloud::handle(msg, global).await,
(DAO_ID, 0) | (0, DAO_ID) => chat::handle(msg, global).await,
_ => match msg {
RecvType::Leave(peer) => {
debug!("Peer leaved: {}", peer.id.to_hex());

17
src/apps/cloud/layer.rs

@ -1,20 +1,13 @@ @@ -1,20 +1,13 @@
use cloud_types::LayerServerEvent;
use std::sync::Arc;
use tdn::types::{
group::GroupId,
message::RecvType,
primitive::{HandleResult, Result},
primitives::{HandleResult, Result},
};
use tokio::sync::RwLock;
use cloud_types::LayerServerEvent;
use crate::layer::Layer;
use crate::global::Global;
pub(crate) async fn handle(
_layer: &Arc<RwLock<Layer>>,
_ogid: GroupId,
msg: RecvType,
) -> Result<HandleResult> {
pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<HandleResult> {
let results = HandleResult::new();
match msg {
@ -26,7 +19,7 @@ pub(crate) async fn handle( @@ -26,7 +19,7 @@ pub(crate) async fn handle(
info!("cloud message nerver to here.")
}
RecvType::Event(_addr, bytes) => {
let LayerServerEvent(_event, _proof) = bincode::deserialize(&bytes)?;
let LayerServerEvent(_event) = bincode::deserialize(&bytes)?;
}
RecvType::Delivery(_t, _tid, _is_ok) => {
// MAYBE

24
src/apps/cloud/mod.rs

@ -1,30 +1,6 @@ @@ -1,30 +1,6 @@
mod layer;
mod models;
pub use cloud_types::CLOUD_ID as GROUP_ID;
use cloud_types::{LayerPeerEvent, PeerEvent};
use tdn::types::{
group::GroupId,
message::SendType,
primitive::{HandleResult, PeerId, Result},
};
use tdn_did::Proof;
/// Send to domain service.
#[inline]
pub(crate) fn add_layer(
results: &mut HandleResult,
addr: PeerId,
event: PeerEvent,
ogid: GroupId,
) -> Result<()> {
let proof = Proof::default();
let data = bincode::serialize(&LayerPeerEvent(event, proof))?;
let s = SendType::Event(0, addr, data);
results.layers.push((ogid, GROUP_ID, s));
Ok(())
}
pub(crate) mod rpc;
pub(crate) use layer::handle;
pub(crate) use rpc::new_rpc_handler;

9
src/apps/cloud/rpc.rs

@ -1,16 +1,15 @@ @@ -1,16 +1,15 @@
use std::sync::Arc;
use tdn::types::{
group::GroupId,
primitive::HandleResult,
primitives::HandleResult,
rpc::{json, RpcHandler, RpcParam},
};
use crate::rpc::RpcState;
use crate::global::Global;
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
handler.add_method(
"cloud-echo",
|_gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, _state: Arc<Global>| async move {
Ok(HandleResult::rpc(json!(params)))
},
);

4
src/apps/device/mod.rs

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
mod models;
//pub(crate) mod rpc;
pub(crate) mod rpc;
pub(crate) use models::Device;
//pub(crate) use rpc::new_rpc_handler;
pub(crate) use rpc::new_rpc_handler;

26
src/apps/device/models.rs

@ -63,32 +63,6 @@ impl Device { @@ -63,32 +63,6 @@ impl Device {
Ok(devices)
}
pub fn distributes(db: &DStorage) -> Result<Vec<(Peer, i64, bool)>> {
let matrix = db.query("SELECT id, peer FROM devices")?;
let mut devices = vec![];
for mut v in matrix {
if v.len() == 3 {
let peer = Peer::from_string(v.pop().unwrap().as_str()).unwrap_or(Peer::default());
let id = v.pop().unwrap().as_i64();
devices.push((peer, id, false));
}
}
Ok(devices)
}
pub fn device_info(db: &DStorage) -> Result<(String, String)> {
let mut matrix = db.query("SELECT name, info FROM devices ORDER BY id LIMIT 1")?;
if matrix.len() > 0 {
let mut values = matrix.pop().unwrap(); // safe unwrap()
if values.len() == 2 {
let info = values.pop().unwrap().as_string();
let name = values.pop().unwrap().as_string();
return Ok((name, info));
}
}
Ok((String::new(), String::new()))
}
pub fn insert(&mut self, db: &DStorage) -> Result<()> {
let sql = format!(
"INSERT INTO devices (name, info, peer, lasttime) VALUES ('{}', '{}', '{}', {})",

89
src/apps/device/rpc.rs

@ -1,39 +1,37 @@ @@ -1,39 +1,37 @@
use std::sync::Arc;
use tdn::types::{
group::GroupId,
primitive::{HandleResult, Peer, PeerId},
primitives::HandleResult,
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam},
};
use crate::group::GroupEvent;
use crate::rpc::RpcState;
use crate::global::Global;
//use crate::group::GroupEvent;
use crate::utils::device_status::device_status as local_device_status;
use super::Device;
#[inline]
pub(crate) fn device_create(mgid: PeerId, device: &Device) -> RpcParam {
rpc_response(0, "device-create", json!(device.to_rpc()), mgid)
pub(crate) fn device_create(device: &Device) -> RpcParam {
rpc_response(0, "device-create", json!(device.to_rpc()))
}
#[inline]
pub(crate) fn _device_remove(mgid: PeerId, id: i64) -> RpcParam {
rpc_response(0, "device-remove", json!([id]), mgid)
pub(crate) fn _device_remove(id: i64) -> RpcParam {
rpc_response(0, "device-remove", json!([id]))
}
#[inline]
pub(crate) fn device_online(mgid: PeerId, id: i64) -> RpcParam {
rpc_response(0, "device-online", json!([id]), mgid)
pub(crate) fn device_online(id: i64) -> RpcParam {
rpc_response(0, "device-online", json!([id]))
}
#[inline]
pub(crate) fn device_offline(mgid: PeerId, id: i64) -> RpcParam {
rpc_response(0, "device-offline", json!([id]), mgid)
pub(crate) fn device_offline(id: i64) -> RpcParam {
rpc_response(0, "device-offline", json!([id]))
}
#[inline]
pub(crate) fn device_status(
mgid: PeerId,
cpu: u32,
memory: u32,
swap: u32,
@ -48,12 +46,11 @@ pub(crate) fn device_status( @@ -48,12 +46,11 @@ pub(crate) fn device_status(
0,
"device-status",
json!([cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]),
mgid,
)
}
#[inline]
fn device_list(devices: Vec<Device>) -> RpcParam {
fn device_list(devices: &[Device]) -> RpcParam {
let mut results = vec![];
for device in devices {
results.push(device.to_rpc());
@ -61,30 +58,27 @@ fn device_list(devices: Vec<Device>) -> RpcParam { @@ -61,30 +58,27 @@ fn device_list(devices: Vec<Device>) -> RpcParam {
json!(results)
}
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method("device-echo", |_, params, _| async move {
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
handler.add_method("device-echo", |params, _| async move {
Ok(HandleResult::rpc(json!(params)))
});
handler.add_method(
"device-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let db = state.group.read().await.consensus_db(&gid)?;
let devices = Device::list(&db)?;
drop(db);
let online_devices = state.group.read().await.online_devices(&gid, devices);
Ok(HandleResult::rpc(device_list(online_devices)))
|_params: Vec<RpcParam>, state: Arc<Global>| async move {
let devices = &state.group.read().await.distributes;
Ok(HandleResult::rpc(device_list(devices)))
},
);
handler.add_method(
"device-status",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let addr = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let group_lock = state.group.read().await;
if &addr == group_lock.addr() {
let uptime = group_lock.uptime(&gid)?;
if id == group_lock.device()?.id {
let uptime = group_lock.uptime;
let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) =
local_device_status();
return Ok(HandleResult::rpc(json!([
@ -93,47 +87,24 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -93,47 +87,24 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
}
drop(group_lock);
let msg = state
.group
.write()
.await
.event_message(addr, &GroupEvent::StatusRequest)?;
Ok(HandleResult::group(gid, msg))
},
);
handler.add_method(
"device-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let addr = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let msg = state
.group
.read()
.await
.create_message(&gid, Peer::peer(addr))?;
Ok(HandleResult::group(gid, msg))
//let msg = state.group.write().await.event_message(addr, &GroupEvent::StatusRequest)?;
//Ok(HandleResult::group(msg))
Ok(HandleResult::new())
},
);
handler.add_method(
"device-connect",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let addr = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let msg = state
.group
.read()
.await
.connect_message(&gid, Peer::peer(addr))?;
Ok(HandleResult::group(gid, msg))
"device-search",
|_params: Vec<RpcParam>, state: Arc<Global>| async move {
//let msg = state.group.read().await.create_message(&gid, Peer::peer(addr))?;
//Ok(HandleResult::group(gid, msg))
Ok(HandleResult::new())
},
);
handler.add_method(
"device-delete",
|_gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, _state: Arc<Global>| async move {
let _id = params[0].as_i64().ok_or(RpcError::ParseError)?;
// TODO delete a device.
Ok(HandleResult::new())

54
src/apps/domain/layer.rs

@ -1,23 +1,17 @@ @@ -1,23 +1,17 @@
use domain_types::LayerServerEvent;
use std::sync::Arc;
use tdn::types::{
group::GroupId,
message::RecvType,
primitive::{HandleResult, Result},
primitives::{HandleResult, Result},
};
use tokio::sync::RwLock;
use domain_types::{LayerServerEvent, ServerEvent};
use crate::layer::Layer;
use crate::global::Global;
use crate::storage::domain_db;
use super::models::{Name, Provider};
use super::rpc;
pub(crate) async fn handle(
layer: &Arc<RwLock<Layer>>,
ogid: GroupId,
msg: RecvType,
) -> Result<HandleResult> {
pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<HandleResult> {
let mut results = HandleResult::new();
match msg {
@ -30,17 +24,19 @@ pub(crate) async fn handle( @@ -30,17 +24,19 @@ pub(crate) async fn handle(
}
RecvType::Event(addr, bytes) => {
// server & client handle it.
let LayerServerEvent(event, _proof) = bincode::deserialize(&bytes)?;
let event: LayerServerEvent = bincode::deserialize(&bytes)?;
let db = layer.read().await.group.read().await.domain_db(&ogid)?;
let pid = global.pid().await;
let db_key = global.group.read().await.db_key(&pid)?;
let db = domain_db(&global.base, &pid, &db_key)?;
match event {
ServerEvent::Status(name, support_request) => {
LayerServerEvent::Status(name, support_request) => {
let mut provider = Provider::get_by_addr(&db, &addr)?;
provider.ok(&db, name, support_request)?;
results.rpcs.push(rpc::add_provider(ogid, &provider));
results.rpcs.push(rpc::add_provider(&provider));
}
ServerEvent::Result(name, is_ok) => {
LayerServerEvent::Result(name, is_ok) => {
let provider = Provider::get_by_addr(&db, &addr)?;
let mut user = Name::get_by_name_provider(&db, &name, &provider.id)?;
@ -48,39 +44,39 @@ pub(crate) async fn handle( @@ -48,39 +44,39 @@ pub(crate) async fn handle(
Name::active(&db, &user.id, true)?;
user.is_ok = true;
user.is_actived = true;
results.rpcs.push(rpc::register_success(ogid, &user));
results.rpcs.push(rpc::register_success(&user));
} else {
user.delete(&db)?;
results.rpcs.push(rpc::register_failure(ogid, &name));
results.rpcs.push(rpc::register_failure(&name));
}
}
ServerEvent::Info(uname, ugid, uaddr, ubio, uavatar) => {
results.rpcs.push(rpc::search_result(
ogid, &uname, &ugid, &uaddr, &ubio, &uavatar,
));
LayerServerEvent::Info(upid, uname, ubio, uavatar) => {
results
.rpcs
.push(rpc::search_result(&upid, &uname, &ubio, &uavatar));
}
ServerEvent::None(uname) => {
results.rpcs.push(rpc::search_none(ogid, &uname));
LayerServerEvent::None(uname) => {
results.rpcs.push(rpc::search_none(&uname));
}
ServerEvent::Actived(uname, is_actived) => {
LayerServerEvent::Actived(uname, is_actived) => {
let provider = Provider::get_by_addr(&db, &addr)?;
let name = Name::get_by_name_provider(&db, &uname, &provider.id)?;
Name::active(&db, &name.id, is_actived)?;
let ps = Provider::list(&db)?;
let names = Name::list(&db)?;
results.rpcs.push(rpc::domain_list(ogid, &ps, &names));
results.rpcs.push(rpc::domain_list(&ps, &names));
}
ServerEvent::Deleted(uname) => {
LayerServerEvent::Deleted(uname) => {
let provider = Provider::get_by_addr(&db, &addr)?;
let name = Name::get_by_name_provider(&db, &uname, &provider.id)?;
name.delete(&db)?;
let ps = Provider::list(&db)?;
let names = Name::list(&db)?;
results.rpcs.push(rpc::domain_list(ogid, &ps, &names));
results.rpcs.push(rpc::domain_list(&ps, &names));
}
ServerEvent::Response(_ugid, _uname, _is_ok) => {}
LayerServerEvent::Response(_ugid, _uname, _is_ok) => {}
}
}
RecvType::Delivery(_t, _tid, _is_ok) => {

24
src/apps/domain/mod.rs

@ -1,30 +1,6 @@ @@ -1,30 +1,6 @@
mod layer;
mod models;
pub use domain_types::DOMAIN_ID as GROUP_ID;
use domain_types::{LayerPeerEvent, PeerEvent};
use tdn::types::{
group::GroupId,
message::SendType,
primitive::{HandleResult, PeerId, Result},
};
use tdn_did::Proof;
/// Send to domain service.
#[inline]
pub(crate) fn add_layer(
results: &mut HandleResult,
addr: PeerId,
event: PeerEvent,
ogid: GroupId,
) -> Result<()> {
let proof = Proof::default();
let data = bincode::serialize(&LayerPeerEvent(event, proof))?;
let s = SendType::Event(0, addr, data);
results.layers.push((ogid, GROUP_ID, s));
Ok(())
}
pub(crate) mod rpc;
pub(crate) use layer::handle;
pub(crate) use rpc::new_rpc_handler;

2
src/apps/domain/models.rs

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
use tdn::types::{
primitive::{PeerId, Result},
primitives::{PeerId, Result},
rpc::{json, RpcParam},
};
use tdn_storage::local::{DStorage, DsValue};

124
src/apps/domain/rpc.rs

@ -1,56 +1,47 @@ @@ -1,56 +1,47 @@
use domain_types::{LayerPeerEvent, DOMAIN_ID};
use esse_primitives::id_to_str;
use std::sync::Arc;
use tdn::types::{
group::GroupId,
primitive::{HandleResult, PeerId},
message::SendType,
primitives::{HandleResult, PeerId},
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam},
};
use domain_types::PeerEvent;
use crate::global::Global;
use crate::storage::domain_db;
use super::{
add_layer,
models::{Name, Provider},
};
use crate::rpc::RpcState;
use super::models::{Name, Provider};
#[inline]
pub(crate) fn add_provider(mgid: GroupId, provider: &Provider) -> RpcParam {
rpc_response(0, "domain-provider-add", json!(provider.to_rpc()), mgid)
pub(crate) fn add_provider(provider: &Provider) -> RpcParam {
rpc_response(0, "domain-provider-add", json!(provider.to_rpc()))
}
#[inline]
pub(crate) fn register_success(mgid: GroupId, name: &Name) -> RpcParam {
rpc_response(0, "domain-register-success", json!(name.to_rpc()), mgid)
pub(crate) fn register_success(name: &Name) -> RpcParam {
rpc_response(0, "domain-register-success", json!(name.to_rpc()))
}
#[inline]
pub(crate) fn register_failure(mgid: GroupId, name: &str) -> RpcParam {
rpc_response(0, "domain-register-failure", json!([name]), mgid)
pub(crate) fn register_failure(name: &str) -> RpcParam {
rpc_response(0, "domain-register-failure", json!([name]))
}
#[inline]
pub(crate) fn domain_list(mgid: GroupId, providers: &[Provider], names: &[Name]) -> RpcParam {
pub(crate) fn domain_list(providers: &[Provider], names: &[Name]) -> RpcParam {
let providers: Vec<RpcParam> = providers.iter().map(|p| p.to_rpc()).collect();
let names: Vec<RpcParam> = names.iter().map(|p| p.to_rpc()).collect();
rpc_response(0, "domain-list", json!([providers, names]), mgid)
rpc_response(0, "domain-list", json!([providers, names]))
}
#[inline]
pub(crate) fn search_result(
mgid: GroupId,
name: &str,
gid: &GroupId,
addr: &PeerId,
bio: &str,
avatar: &Vec<u8>,
) -> RpcParam {
pub(crate) fn search_result(pid: &PeerId, name: &str, bio: &str, avatar: &Vec<u8>) -> RpcParam {
rpc_response(
0,
"domain-search",
json!([
name,
gid.to_hex(),
addr.to_hex(),
id_to_str(pid),
bio,
if avatar.len() > 0 {
base64::encode(avatar)
@ -58,20 +49,21 @@ pub(crate) fn search_result( @@ -58,20 +49,21 @@ pub(crate) fn search_result(
"".to_owned()
}
]),
mgid,
)
}
#[inline]
pub(crate) fn search_none(mgid: GroupId, name: &str) -> RpcParam {
rpc_response(0, "domain-search", json!([name]), mgid)
pub(crate) fn search_none(name: &str) -> RpcParam {
rpc_response(0, "domain-search", json!([name]))
}
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
handler.add_method(
"domain-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let db = state.group.read().await.domain_db(&gid)?;
|_params: Vec<RpcParam>, state: Arc<Global>| async move {
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = domain_db(&state.base, &pid, &db_key)?;
// list providers.
let providers: Vec<RpcParam> =
@ -86,25 +78,33 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -86,25 +78,33 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"domain-provider-add",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let provider = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let mut results = HandleResult::new();
let db = state.group.read().await.domain_db(&gid)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = domain_db(&state.base, &pid, &db_key)?;
let mut p = Provider::prepare(provider);
p.insert(&db)?;
add_layer(&mut results, provider, PeerEvent::Check, gid)?;
let data = bincode::serialize(&LayerPeerEvent::Check)?;
let msg = SendType::Event(0, provider, data);
results.layers.push((DOMAIN_ID, msg));
Ok(results)
},
);
handler.add_method(
"domain-provider-default",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let db = state.group.read().await.domain_db(&gid)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = domain_db(&state.base, &pid, &db_key)?;
let provider = Provider::get(&db, &id)?;
if let Ok(default) = Provider::get_default(&db) {
if default.id == provider.id {
@ -120,10 +120,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -120,10 +120,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"domain-provider-remove",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let db = state.group.read().await.domain_db(&gid)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = domain_db(&state.base, &pid, &db_key)?;
let names = Name::get_by_provider(&db, &id)?;
if names.len() == 0 {
Provider::delete(&db, &id)?;
@ -135,71 +138,80 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -135,71 +138,80 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"domain-register",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let provider = params[0].as_i64().ok_or(RpcError::ParseError)?;
let addr = PeerId::from_hex(params[1].as_str().ok_or(RpcError::ParseError)?)?;
let name = params[2].as_str().ok_or(RpcError::ParseError)?.to_string();
let bio = params[3].as_str().ok_or(RpcError::ParseError)?.to_string();
let me = state.group.read().await.clone_user(&gid)?;
// save to db.
let mut results = HandleResult::new();
let db = state.group.read().await.domain_db(&gid)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = domain_db(&state.base, &pid, &db_key)?;
let me = state.group.read().await.clone_user(&pid)?;
let mut u = Name::prepare(name, bio, provider);
u.insert(&db)?;
// send to server.
let event = PeerEvent::Register(u.name, u.bio, me.avatar);
add_layer(&mut results, addr, event, gid)?;
let data = bincode::serialize(&LayerPeerEvent::Register(u.name, u.bio, me.avatar))?;
let msg = SendType::Event(0, addr, data);
results.layers.push((DOMAIN_ID, msg));
Ok(results)
},
);
handler.add_method(
"domain-active",
|gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, _state: Arc<Global>| async move {
let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned();
let provider = PeerId::from_hex(params[1].as_str().ok_or(RpcError::ParseError)?)?;
let active = params[2].as_bool().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new();
let event = if active {
PeerEvent::Active(name)
LayerPeerEvent::Active(name)
} else {
PeerEvent::Suspend(name)
LayerPeerEvent::Suspend(name)
};
add_layer(&mut results, provider, event, gid)?;
let data = bincode::serialize(&event)?;
let msg = SendType::Event(0, provider, data);
results.layers.push((DOMAIN_ID, msg));
Ok(results)
},
);
handler.add_method(
"domain-remove",
|gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, _state: Arc<Global>| async move {
let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned();
let provider = PeerId::from_hex(params[1].as_str().ok_or(RpcError::ParseError)?)?;
let mut results = HandleResult::new();
let event = PeerEvent::Delete(name);
add_layer(&mut results, provider, event, gid)?;
let event = LayerPeerEvent::Delete(name);
let data = bincode::serialize(&event)?;
let msg = SendType::Event(0, provider, data);
results.layers.push((DOMAIN_ID, msg));
Ok(results)
},
);
handler.add_method(
"domain-search",
|gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, _state: Arc<Global>| async move {
let addr = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let name = params[1].as_str().ok_or(RpcError::ParseError)?.to_owned();
let mut results = HandleResult::new();
// send to server.
let event = PeerEvent::Search(name);
add_layer(&mut results, addr, event, gid)?;
let event = LayerPeerEvent::Search(name);
let data = bincode::serialize(&event)?;
let msg = SendType::Event(0, addr, data);
results.layers.push((DOMAIN_ID, msg));
Ok(results)
},
);

2
src/apps/file/models.rs

@ -2,7 +2,7 @@ use rand::Rng; @@ -2,7 +2,7 @@ use rand::Rng;
use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};
use tdn::types::{
primitive::Result,
primitives::Result,
rpc::{json, RpcParam},
};
use tdn_storage::local::{DStorage, DsValue};

77
src/apps/file/rpc.rs

@ -1,28 +1,30 @@ @@ -1,28 +1,30 @@
use std::path::PathBuf;
use std::sync::Arc;
use tdn::types::{
group::GroupId,
primitive::HandleResult,
primitives::HandleResult,
rpc::{json, RpcError, RpcHandler, RpcParam},
};
use crate::rpc::RpcState;
use crate::storage::{copy_file, write_file};
use crate::global::Global;
use crate::storage::{copy_file, file_db, write_file};
use super::models::{File, RootDirectory};
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method("dc-echo", |_, params, _| async move {
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
handler.add_method("dc-echo", |params, _| async move {
Ok(HandleResult::rpc(json!(params)))
});
handler.add_method(
"dc-list",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let root = RootDirectory::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let parent = params[1].as_i64().ok_or(RpcError::ParseError)?;
let db = state.group.read().await.file_db(&gid)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = file_db(&state.base, &pid, &db_key)?;
let files: Vec<RpcParam> = File::list(&db, &root, &parent)?
.iter()
.map(|p| p.to_rpc())
@ -34,28 +36,28 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -34,28 +36,28 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"dc-file-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let root = RootDirectory::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let parent = params[1].as_i64().ok_or(RpcError::ParseError)?;
let name = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned();
let group_lock = state.group.read().await;
let base = group_lock.base().clone();
let db = group_lock.file_db(&gid)?;
drop(group_lock);
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = file_db(&state.base, &pid, &db_key)?;
// genereate new file.
let mut file = File::generate(root, parent, name);
file.insert(&db)?;
// create file on disk.
let _ = write_file(&base, &gid, &file.storage_name(), &[]).await?;
let _ = write_file(&state.base, &pid, &file.storage_name(), &[]).await?;
Ok(HandleResult::rpc(file.to_rpc()))
},
);
handler.add_method(
"dc-file-upload",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let root = RootDirectory::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let parent = params[1].as_i64().ok_or(RpcError::ParseError)?;
let path = params[2].as_str().ok_or(RpcError::ParseError)?;
@ -68,13 +70,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -68,13 +70,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
.ok_or(RpcError::ParseError)?
.to_owned();
let group_lock = state.group.read().await;
let base = group_lock.base().clone();
let db = group_lock.file_db(&gid)?;
drop(group_lock);
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = file_db(&state.base, &pid, &db_key)?;
let mut file = File::generate(root, parent, name);
file.insert(&db)?;
copy_file(&file_path, &base, &gid, &file.storage_name()).await?;
copy_file(&file_path, &state.base, &pid, &file.storage_name()).await?;
Ok(HandleResult::rpc(file.to_rpc()))
},
@ -82,13 +84,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -82,13 +84,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"dc-folder-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let root = RootDirectory::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let parent = params[1].as_i64().ok_or(RpcError::ParseError)?;
let name = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned();
// create new folder.
let db = state.group.read().await.file_db(&gid)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = file_db(&state.base, &pid, &db_key)?;
let mut file = File::generate(root, parent, name);
file.insert(&db)?;
@ -98,13 +103,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -98,13 +103,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"dc-file-update",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let root = RootDirectory::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?);
let parent = params[2].as_i64().ok_or(RpcError::ParseError)?;
let name = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned();
let db = state.group.read().await.file_db(&gid)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = file_db(&state.base, &pid, &db_key)?;
let mut file = File::get(&db, &id)?;
file.root = root;
file.parent = parent;
@ -117,11 +125,14 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -117,11 +125,14 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"dc-file-star",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let starred = params[1].as_bool().ok_or(RpcError::ParseError)?;
let db = state.group.read().await.file_db(&gid)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = file_db(&state.base, &pid, &db_key)?;
File::star(&db, &id, starred)?;
Ok(HandleResult::new())
},
@ -129,12 +140,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -129,12 +140,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"dc-file-trash",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = file_db(&state.base, &pid, &db_key)?;
// TODO trash a directory.
let db = state.group.read().await.file_db(&gid)?;
File::trash(&db, &id)?;
Ok(HandleResult::new())
},
@ -142,12 +156,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -142,12 +156,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"dc-file-delete",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
|params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = file_db(&state.base, &pid, &db_key)?;
// TODO deleted file & directory.
let db = state.group.read().await.file_db(&gid)?;
File::delete(&db, &id)?;
Ok(HandleResult::new())
},

60
src/group.rs

@ -30,12 +30,8 @@ pub(crate) struct Group { @@ -30,12 +30,8 @@ pub(crate) struct Group {
pub accounts: HashMap<PeerId, Account>,
/// current account secret keypair.
pub keypair: PeerKey,
/// current account device's name.
pub device_name: String,
/// current account device's info.
pub device_info: String,
/// current account distribute connected devices.
pub distributes: Vec<(Peer, i64, bool)>,
pub distributes: Vec<Device>,
/// current account uptime
pub uptime: u32,
}
@ -238,8 +234,6 @@ impl Group { @@ -238,8 +234,6 @@ impl Group {
Group {
accounts,
keypair: PeerKey::default(),
device_name: String::new(),
device_info: String::new(),
distributes: vec![],
uptime: 0,
}
@ -254,25 +248,25 @@ impl Group { @@ -254,25 +248,25 @@ impl Group {
Ok(self.account(pid)?.plainkey())
}
pub fn online(&mut self, peer: &Peer) -> Result<i64> {
for i in self.distributes.iter_mut() {
if &i.0 == peer {
i.2 = true;
return Ok(i.1);
}
}
Err(anyhow!("missing distribute device"))
}
pub fn offline(&mut self, peer: &Peer) -> Result<i64> {
for i in self.distributes.iter_mut() {
if &i.0 == peer {
i.2 = false;
return Ok(i.1);
}
}
Err(anyhow!("missing distribute device"))
}
// pub fn online(&mut self, peer: &Peer) -> Result<i64> {
// for i in self.distributes.iter_mut() {
// if &i.0 == peer {
// i.2 = true;
// return Ok(i.1);
// }
// }
// Err(anyhow!("missing distribute device"))
// }
// pub fn offline(&mut self, peer: &Peer) -> Result<i64> {
// for i in self.distributes.iter_mut() {
// if &i.0 == peer {
// i.2 = false;
// return Ok(i.1);
// }
// }
// Err(anyhow!("missing distribute device"))
// }
pub fn check_lock(&self, pid: &PeerId, lock: &str) -> bool {
if let Some(account) = self.accounts.get(pid) {
@ -427,12 +421,8 @@ impl Group { @@ -427,12 +421,8 @@ impl Group {
self.keypair = keypair;
let db = consensus_db(base, pid, &self.db_key(pid)?)?;
self.distributes = Device::distributes(&db)?;
let (device_name, device_info) = Device::device_info(&db)?;
self.distributes = Device::list(&db)?;
db.close()?;
self.device_name = device_name;
self.device_info = device_info;
let start = SystemTime::now();
self.uptime = start
@ -547,6 +537,14 @@ impl Group { @@ -547,6 +537,14 @@ impl Group {
account_db.close()
}
pub fn device(&self) -> Result<&Device> {
if self.distributes.len() > 0 {
Ok(&self.distributes[0])
} else {
Err(anyhow!("no devices"))
}
}
// pub fn create_message(&self, pid: &PeerId, addr: Peer) -> Result<SendType> {
// let user = self.clone_user(pid)?;
// let account = self.account(pid)?;

Loading…
Cancel
Save