Browse Source

refactor rpc to different apps

pull/6/head
Sun 5 years ago
parent
commit
cc00d5498f
  1. BIN
      assets/logo/logo_40.jpg
  2. 0
      src/account.rs
  3. 38
      src/apps.rs
  4. 0
      src/apps/assistant/layer.rs
  5. 0
      src/apps/assistant/migrate.rs
  6. 12
      src/apps/assistant/mod.rs
  7. 0
      src/apps/assistant/models.rs
  8. 0
      src/apps/assistant/rpc.rs
  9. 5
      src/apps/chat/mod.rs
  10. 0
      src/apps/chat/models.rs
  11. 493
      src/apps/chat/rpc.rs
  12. 5
      src/apps/device/mod.rs
  13. 0
      src/apps/device/models.rs
  14. 138
      src/apps/device/rpc.rs
  15. 12
      src/apps/domain/mod.rs
  16. 5
      src/apps/file/mod.rs
  17. 0
      src/apps/file/models.rs
  18. 22
      src/apps/file/rpc.rs
  19. 0
      src/consensus.rs
  20. 4
      src/daemon.rs
  21. 63
      src/event.rs
  22. 25
      src/group.rs
  23. 2
      src/group/running.rs
  24. 5
      src/layer.rs
  25. 2
      src/layer/running.rs
  26. 4
      src/lib.rs
  27. 6
      src/models.rs
  28. 666
      src/rpc.rs
  29. 6
      src/server.rs

BIN
assets/logo/logo_40.jpg

Binary file not shown.

Before

Width:  |  Height:  |  Size: 13 KiB

After

Width:  |  Height:  |  Size: 16 KiB

0
src/models/account.rs → src/account.rs

38
src/apps.rs

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
use tdn::types::{
group::GroupId,
primitive::{HandleResult, PeerAddr, Result},
rpc::RpcHandler,
};
use crate::rpc::RpcState;
pub(crate) mod assistant;
pub(crate) mod chat;
pub(crate) mod device;
pub(crate) mod domain;
pub(crate) mod file;
pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<RpcState>) {
device::new_rpc_handler(handler);
chat::new_rpc_handler(handler);
assistant::new_rpc_handler(handler);
domain::new_rpc_handler(handler);
file::new_rpc_handler(handler);
}
pub(crate) fn _app_layer_handle(
_gid: GroupId,
_fgid: GroupId,
_addr: PeerAddr,
_data: Vec<u8>,
) -> Result<HandleResult> {
todo!()
}
pub(crate) fn _app_group_handle() -> Result<HandleResult> {
todo!()
}
pub(crate) fn _app_migrate() -> Result<()> {
todo!()
}

0
src/models/service.rs → src/apps/assistant/layer.rs

0
src/apps/assistant/migrate.rs

12
src/apps/assistant/mod.rs

@ -0,0 +1,12 @@ @@ -0,0 +1,12 @@
use tdn::types::{
primitive::HandleResult,
rpc::{json, RpcHandler},
};
use crate::rpc::RpcState;
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method("assistant-echo", |_, params, _| async move {
Ok(HandleResult::rpc(json!(params)))
});
}

0
src/apps/assistant/models.rs

0
src/apps/assistant/rpc.rs

5
src/apps/chat/mod.rs

@ -0,0 +1,5 @@ @@ -0,0 +1,5 @@
mod models;
pub(crate) mod rpc;
pub(crate) use models::{Friend, Message, MessageType, NetworkMessage, Request};
pub(crate) use rpc::new_rpc_handler;

0
src/models/session.rs → src/apps/chat/models.rs

493
src/apps/chat/rpc.rs

@ -0,0 +1,493 @@ @@ -0,0 +1,493 @@
use std::collections::HashMap;
use std::sync::Arc;
use tdn::types::{
group::GroupId,
message::SendType,
primitive::{HandleResult, PeerAddr},
rpc::{json, rpc_response, RpcHandler, RpcParam},
};
use tdn_did::user::User;
use crate::event::InnerEvent;
use crate::layer::LayerEvent;
use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH};
use crate::rpc::{sleep_waiting_close_stable, RpcState};
use crate::storage::{delete_avatar, session_db};
use super::{Friend, Message, MessageType, Request};
#[inline]
pub(crate) fn friend_online(mgid: GroupId, fid: i64, addr: PeerAddr) -> RpcParam {
rpc_response(0, "friend-online", json!([fid, addr.to_hex()]), mgid)
}
#[inline]
pub(crate) fn friend_offline(mgid: GroupId, fid: i64) -> RpcParam {
rpc_response(0, "friend-offline", json!([fid]), mgid)
}
#[inline]
pub(crate) fn friend_info(mgid: GroupId, friend: &Friend) -> RpcParam {
rpc_response(0, "friend-info", json!(friend.to_rpc()), mgid)
}
#[inline]
pub(crate) fn friend_update(mgid: GroupId, fid: i64, is_top: bool, remark: &str) -> RpcParam {
rpc_response(0, "friend-update", json!([fid, is_top, remark]), mgid)
}
#[inline]
pub(crate) fn friend_close(mgid: GroupId, fid: i64) -> RpcParam {
rpc_response(0, "friend-close", json!([fid]), mgid)
}
#[inline]
pub(crate) fn friend_delete(mgid: GroupId, fid: i64) -> RpcParam {
rpc_response(0, "friend-delete", json!([fid]), mgid)
}
#[inline]
pub(crate) fn request_create(mgid: GroupId, req: &Request) -> RpcParam {
rpc_response(0, "request-create", json!(req.to_rpc()), mgid)
}
#[inline]
pub(crate) fn request_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam {
rpc_response(0, "request-delivery", json!([id, is_d]), mgid)
}
#[inline]
pub(crate) fn request_agree(mgid: GroupId, id: i64, friend: &Friend) -> RpcParam {
rpc_response(0, "request-agree", json!([id, friend.to_rpc()]), mgid)
}
#[inline]
pub(crate) fn request_reject(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "request-reject", json!([id]), mgid)
}
#[inline]
pub(crate) fn request_delete(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "request-delete", json!([id]), mgid)
}
#[inline]
pub(crate) fn message_create(mgid: GroupId, msg: &Message) -> RpcParam {
rpc_response(0, "message-create", json!(msg.to_rpc()), mgid)
}
#[inline]
pub(crate) fn message_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam {
rpc_response(0, "message-delivery", json!([id, is_d]), mgid)
}
#[inline]
pub(crate) fn message_delete(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "message-delete", json!([id]), mgid)
}
#[inline]
fn friend_list(friends: Vec<Friend>) -> RpcParam {
let mut results = vec![];
for friend in friends {
results.push(friend.to_rpc());
}
json!(results)
}
#[inline]
fn request_list(requests: Vec<Request>) -> RpcParam {
let mut results = vec![];
for request in requests {
results.push(request.to_rpc());
}
json!(results)
}
#[inline]
fn message_list(messages: Vec<Message>) -> RpcParam {
let mut results = vec![];
for msg in messages {
results.push(msg.to_rpc());
}
json!(results)
}
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method("chat-echo", |_, params, _| async move {
Ok(HandleResult::rpc(json!(params)))
});
handler.add_method(
"friend-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let friends = state.layer.read().await.all_friends_with_online(&gid)?;
Ok(HandleResult::rpc(friend_list(friends)))
},
);
handler.add_method(
"friend-update",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let remark = params[1].as_str()?;
let is_top = params[2].as_bool()?;
let mut results = HandleResult::new();
let db = session_db(state.layer.read().await.base(), &gid)?;
let f = if let Some(mut f) = Friend::get_id(&db, id)? {
f.is_top = is_top;
f.remark = remark.to_owned();
f.me_update(&db)?;
f
} else {
return Ok(results);
};
drop(db);
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionFriendUpdate(f.gid, f.is_top, f.remark),
FRIEND_TABLE_PATH,
f.id,
&mut results,
)?;
Ok(results)
},
);
handler.add_method(
"friend-readed",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let fid = params[0].as_i64()?;
let db = session_db(state.layer.read().await.base(), &gid)?;
Friend::readed(&db, fid)?;
drop(db);
Ok(HandleResult::new())
},
);
handler.add_method(
"friend-close",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let mut results = HandleResult::new();
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
let friend = Friend::get_id(&db, id)??;
friend.close(&db)?;
drop(db);
let online = layer_lock.remove_friend(&gid, &friend.gid);
drop(layer_lock);
if let Some(faddr) = online {
let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new();
addrs.insert(faddr, friend.gid);
let sender = state.group.read().await.sender();
tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs))
.detach();
}
let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]);
let msg = SendType::Event(0, friend.addr, data);
results.layers.push((gid, friend.gid, msg));
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionFriendClose(friend.gid),
FRIEND_TABLE_PATH,
friend.id,
&mut results,
)?;
Ok(results)
},
);
handler.add_method(
"friend-delete",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let mut results = HandleResult::new();
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
let friend = Friend::get_id(&db, id)??;
friend.delete(&db)?;
drop(db);
let online = layer_lock.remove_friend(&gid, &friend.gid);
delete_avatar(layer_lock.base(), &gid, &friend.gid).await?;
drop(layer_lock);
if let Some(faddr) = online {
let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new();
addrs.insert(faddr, friend.gid);
let sender = state.group.read().await.sender();
tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs))
.detach();
}
let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]);
let msg = SendType::Event(0, friend.addr, data);
results.layers.push((gid, friend.gid, msg));
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionFriendDelete(friend.gid),
FRIEND_TABLE_PATH,
friend.id,
&mut results,
)?;
Ok(results)
},
);
handler.add_method(
"request-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let layer_lock = state.layer.read().await;
let db = session_db(layer_lock.base(), &gid)?;
drop(layer_lock);
let requests = Request::all(&db)?;
drop(db);
Ok(HandleResult::rpc(request_list(requests)))
},
);
handler.add_method(
"request-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let remote_gid = GroupId::from_hex(params[0].as_str()?)?;
let remote_addr = PeerAddr::from_hex(params[1].as_str()?)?;
let remote_name = params[2].as_str()?.to_string();
let remark = params[3].as_str()?.to_string();
let mut request = Request::new(
remote_gid,
remote_addr,
remote_name.clone(),
remark.clone(),
true,
false,
);
let mut results = HandleResult::rpc(Default::default());
let me = state.group.read().await.clone_user(&gid)?;
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
if Friend::is_friend(&db, &request.gid)? {
debug!("had friend.");
drop(layer_lock);
return Ok(results);
}
if let Some(req) = Request::get(&db, &request.gid)? {
println!("Had this request.");
req.delete(&db)?;
}
request.insert(&db)?;
drop(db);
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionRequestCreate(
true,
User::new(remote_gid, remote_addr, remote_name, vec![])?,
remark,
),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
results
.layers
.push((gid, remote_gid, layer_lock.req_message(me, request)));
drop(layer_lock);
Ok(results)
},
);
handler.add_method(
"request-agree",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let mut group_lock = state.group.write().await;
let me = group_lock.clone_user(&gid)?;
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
let mut results = HandleResult::new();
if let Some(mut request) = Request::get_id(&db, id)? {
group_lock.broadcast(
&gid,
InnerEvent::SessionRequestHandle(request.gid, true, vec![]),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_ok = true;
request.is_over = true;
request.update(&db)?;
let f = Friend::from_request(&db, request)?;
layer_lock.running_mut(&gid)?.add_permissioned(f.gid, f.id);
results.rpcs.push(json!([id, f.to_rpc()]));
let proof = group_lock.prove_addr(&gid, &f.addr)?;
let msg = layer_lock.rpc_agree_message(id, proof, me, &gid, f.addr)?;
results.layers.push((gid, f.gid, msg));
}
db.close()?;
drop(group_lock);
drop(layer_lock);
Ok(results)
},
);
handler.add_method(
"request-reject",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
let mut req = Request::get_id(&db, id)??;
req.is_ok = false;
req.is_over = true;
req.update(&db)?;
drop(db);
let msg = layer_lock.reject_message(id, req.addr, gid);
drop(layer_lock);
let mut results = HandleResult::layer(gid, req.gid, msg);
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionRequestHandle(req.gid, false, vec![]),
REQUEST_TABLE_PATH,
req.id,
&mut results,
)?;
Ok(results)
},
);
handler.add_method(
"request-delete",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let layer_lock = state.layer.read().await;
let db = session_db(layer_lock.base(), &gid)?;
let base = layer_lock.base().clone();
drop(layer_lock);
let req = Request::get_id(&db, id)??;
req.delete(&db)?;
// delete avatar. check had friend.
if Friend::get(&db, &req.gid)?.is_none() {
delete_avatar(&base, &gid, &req.gid).await?;
}
drop(db);
let mut results = HandleResult::new();
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionRequestDelete(req.gid),
REQUEST_TABLE_PATH,
req.id,
&mut results,
)?;
Ok(results)
},
);
handler.add_method(
"message-list",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let fid = params[0].as_i64()?;
let layer_lock = state.layer.read().await;
let db = session_db(layer_lock.base(), &gid)?;
drop(layer_lock);
Friend::readed(&db, fid)?;
let messages = Message::get(&db, &fid)?;
drop(db);
Ok(HandleResult::rpc(message_list(messages)))
},
);
handler.add_method(
"message-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let fid = params[0].as_i64()?;
let fgid = GroupId::from_hex(params[1].as_str()?)?;
let m_type = MessageType::from_int(params[2].as_i64()?);
let content = params[3].as_str()?.to_string();
let mut layer_lock = state.layer.write().await;
let base = layer_lock.base();
let faddr = layer_lock.running(&gid)?.online(&fgid)?;
let (msg, nw) = LayerEvent::from_message(base, gid, fid, m_type, content).await?;
let event = LayerEvent::Message(msg.hash, nw);
let s = layer_lock.event_message(msg.id, gid, faddr, &event);
drop(layer_lock);
let mut results = HandleResult::rpc(json!(msg.to_rpc()));
results.layers.push((gid, fgid, s));
match event {
LayerEvent::Message(hash, nw) => {
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionMessageCreate(fgid, true, hash, nw),
MESSAGE_TABLE_PATH,
msg.id,
&mut results,
)?;
}
_ => {}
}
Ok(results)
},
);
handler.add_method(
"message-delete",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let layer_lock = state.layer.read().await;
let db = session_db(&layer_lock.base(), &gid)?;
drop(layer_lock);
let msg = Message::get_id(&db, id)??;
msg.delete(&db)?;
drop(db);
let mut results = HandleResult::new();
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionMessageDelete(msg.hash),
MESSAGE_TABLE_PATH,
msg.id,
&mut results,
)?;
Ok(results)
},
);
}

5
src/apps/device/mod.rs

@ -0,0 +1,5 @@ @@ -0,0 +1,5 @@
mod models;
pub(crate) mod rpc;
pub(crate) use models::Device;
pub(crate) use rpc::new_rpc_handler;

0
src/models/device.rs → src/apps/device/models.rs

138
src/apps/device/rpc.rs

@ -0,0 +1,138 @@ @@ -0,0 +1,138 @@
use std::sync::Arc;
use tdn::types::{
group::GroupId,
primitive::{new_io_error, HandleResult, PeerAddr},
rpc::{json, rpc_response, RpcHandler, RpcParam},
};
use crate::group::GroupEvent;
use crate::rpc::RpcState;
use crate::storage::consensus_db;
use crate::utils::device_status::device_status as local_device_status;
use super::Device;
#[inline]
pub(crate) fn device_create(mgid: GroupId, device: &Device) -> RpcParam {
rpc_response(0, "device-create", json!(device.to_rpc()), mgid)
}
#[inline]
pub(crate) fn _device_remove(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "device-remove", json!([id]), mgid)
}
#[inline]
pub(crate) fn device_online(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "device-online", json!([id]), mgid)
}
#[inline]
pub(crate) fn device_offline(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "device-offline", json!([id]), mgid)
}
#[inline]
pub(crate) fn device_status(
mgid: GroupId,
cpu: u32,
memory: u32,
swap: u32,
disk: u32,
cpu_p: u16,
memory_p: u16,
swap_p: u16,
disk_p: u16,
uptime: u32,
) -> RpcParam {
rpc_response(
0,
"device-status",
json!([cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]),
mgid,
)
}
#[inline]
fn device_list(devices: Vec<Device>) -> RpcParam {
let mut results = vec![];
for device in devices {
results.push(device.to_rpc());
}
json!(results)
}
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method("device-echo", |_, params, _| async move {
Ok(HandleResult::rpc(json!(params)))
});
handler.add_method(
"device-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let db = consensus_db(state.layer.read().await.base(), &gid)?;
let devices = Device::all(&db)?;
drop(db);
let online_devices = state.group.read().await.online_devices(&gid, devices);
Ok(HandleResult::rpc(device_list(online_devices)))
},
);
handler.add_method(
"device-status",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let addr = PeerAddr::from_hex(params[0].as_str()?)
.map_err(|_e| new_io_error("PeerAddr invalid!"))?;
let group_lock = state.group.read().await;
if &addr == group_lock.addr() {
let uptime = group_lock.uptime(&gid)?;
let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) =
local_device_status();
return Ok(HandleResult::rpc(json!([
cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime
])));
}
drop(group_lock);
let msg = state
.group
.write()
.await
.event_message(addr, &GroupEvent::StatusRequest)?;
Ok(HandleResult::group(gid, msg))
},
);
handler.add_method(
"device-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let addr = PeerAddr::from_hex(params[0].as_str()?)
.map_err(|_e| new_io_error("PeerAddr invalid!"))?;
let msg = state.group.read().await.create_message(&gid, addr)?;
Ok(HandleResult::group(gid, msg))
},
);
handler.add_method(
"device-connect",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let addr = PeerAddr::from_hex(params[0].as_str()?)
.map_err(|_e| new_io_error("PeerAddr invalid!"))?;
let msg = state.group.read().await.connect_message(&gid, addr)?;
Ok(HandleResult::group(gid, msg))
},
);
handler.add_method(
"device-delete",
|_gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move {
let _id = params[0].as_i64()?;
// TODO delete a device.
Ok(HandleResult::new())
},
);
}

12
src/apps/domain/mod.rs

@ -0,0 +1,12 @@ @@ -0,0 +1,12 @@
use tdn::types::{
primitive::HandleResult,
rpc::{json, RpcHandler},
};
use crate::rpc::RpcState;
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method("domain-echo", |_, params, _| async move {
Ok(HandleResult::rpc(json!(params)))
});
}

5
src/apps/file/mod.rs

@ -0,0 +1,5 @@ @@ -0,0 +1,5 @@
mod models;
mod rpc;
pub(crate) use models::{FileId, FileType};
pub(crate) use rpc::new_rpc_handler;

0
src/models/file.rs → src/apps/file/models.rs

22
src/apps/file/rpc.rs

@ -0,0 +1,22 @@ @@ -0,0 +1,22 @@
use std::sync::Arc;
use tdn::types::{
group::GroupId,
primitive::HandleResult,
rpc::{json, RpcHandler, RpcParam},
};
use crate::rpc::RpcState;
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method("files-echo", |_, params, _| async move {
Ok(HandleResult::rpc(json!(params)))
});
handler.add_method(
"files-folder",
|_gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move {
let _path = params[0].as_str()?;
Ok(HandleResult::new())
},
);
}

0
src/models/consensus.rs → src/consensus.rs

4
src/daemon.rs

@ -4,11 +4,13 @@ extern crate log; @@ -4,11 +4,13 @@ extern crate log;
use std::env::args;
use tdn::smol::{self, io::Result};
mod account;
mod apps;
mod consensus;
mod event;
mod group;
mod layer;
mod migrate;
mod models;
mod primitives;
mod rpc;
mod server;

63
src/event.rs

@ -12,16 +12,18 @@ use tdn::types::{ @@ -12,16 +12,18 @@ use tdn::types::{
use tdn_did::user::User;
use tdn_storage::local::DStorage;
use crate::account::Account;
use crate::consensus::Event;
use crate::group::{Group, GroupEvent};
use crate::layer::running::Online;
use crate::layer::{Layer, LayerEvent};
use crate::migrate::consensus::{
ACCOUNT_TABLE_PATH, FILE_TABLE_PATH, FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH,
};
use crate::models::account::Account;
use crate::models::consensus::Event;
use crate::models::file::{FileId, FileType};
use crate::models::session::{Friend, Message, NetworkMessage, Request};
use crate::apps::chat::rpc as chat_rpc;
use crate::apps::chat::{Friend, Message, NetworkMessage, Request};
use crate::apps::file::{FileId, FileType};
use crate::rpc;
use crate::storage::{
account_db, consensus_db, delete_avatar_sync, read_avatar_sync, session_db, write_avatar_sync,
@ -255,7 +257,7 @@ impl InnerEvent { @@ -255,7 +257,7 @@ impl InnerEvent {
}
if let Some(req) = Request::get(&db, &remote.id)? {
req.delete(&db)?; // delete the old request.
results.rpcs.push(rpc::request_delete(gid, req.id));
results.rpcs.push(chat_rpc::request_delete(gid, req.id));
}
let mut request =
Request::new(remote.id, remote.addr, remote.name, remark, is_me, true);
@ -264,7 +266,7 @@ impl InnerEvent { @@ -264,7 +266,7 @@ impl InnerEvent {
drop(db);
// save the avatar.
write_avatar_sync(group.base(), &gid, &remote.id, remote.avatar)?;
results.rpcs.push(rpc::request_create(gid, &request));
results.rpcs.push(chat_rpc::request_create(gid, &request));
(REQUEST_TABLE_PATH, request.id)
}
InnerEvent::SessionRequestHandle(rgid, is_ok, avatar) => {
@ -292,9 +294,11 @@ impl InnerEvent { @@ -292,9 +294,11 @@ impl InnerEvent {
}
})
.detach();
results.rpcs.push(rpc::request_agree(gid, rid, &friend));
results
.rpcs
.push(chat_rpc::request_agree(gid, rid, &friend));
} else {
results.rpcs.push(rpc::request_reject(gid, rid));
results.rpcs.push(chat_rpc::request_reject(gid, rid));
}
(REQUEST_TABLE_PATH, rid)
} else {
@ -310,7 +314,7 @@ impl InnerEvent { @@ -310,7 +314,7 @@ impl InnerEvent {
if Friend::get(&db, &request.gid)?.is_none() {
delete_avatar_sync(group.base(), &gid, &request.gid)?;
}
results.rpcs.push(rpc::request_delete(gid, rid));
results.rpcs.push(chat_rpc::request_delete(gid, rid));
(REQUEST_TABLE_PATH, rid)
} else {
return Ok(());
@ -340,7 +344,7 @@ impl InnerEvent { @@ -340,7 +344,7 @@ impl InnerEvent {
}
let msg = m.handle(is_me, gid, group.base(), &db, f.id, hash)?;
results.rpcs.push(rpc::message_create(gid, &msg));
results.rpcs.push(chat_rpc::message_create(gid, &msg));
(MESSAGE_TABLE_PATH, msg.id)
} else {
return Ok(());
@ -350,7 +354,7 @@ impl InnerEvent { @@ -350,7 +354,7 @@ impl InnerEvent {
let db = session_db(group.base(), &gid)?;
if let Some(m) = Message::get_it(&db, &hash)? {
m.delete(&db)?;
results.rpcs.push(rpc::message_delete(gid, m.id));
results.rpcs.push(chat_rpc::message_delete(gid, m.id));
(MESSAGE_TABLE_PATH, m.id)
} else {
return Ok(());
@ -365,7 +369,7 @@ impl InnerEvent { @@ -365,7 +369,7 @@ impl InnerEvent {
if ravatar.len() > 0 {
write_avatar_sync(group.base(), &gid, &rgid, ravatar)?;
}
results.rpcs.push(rpc::friend_info(gid, &f));
results.rpcs.push(chat_rpc::friend_info(gid, &f));
(FRIEND_TABLE_PATH, f.id)
} else {
return Ok(());
@ -379,7 +383,7 @@ impl InnerEvent { @@ -379,7 +383,7 @@ impl InnerEvent {
f.me_update(&db)?;
results
.rpcs
.push(rpc::friend_update(gid, f.id, is_top, &f.remark));
.push(chat_rpc::friend_update(gid, f.id, is_top, &f.remark));
(FRIEND_TABLE_PATH, f.id)
} else {
return Ok(());
@ -389,7 +393,7 @@ impl InnerEvent { @@ -389,7 +393,7 @@ impl InnerEvent {
let db = session_db(group.base(), &gid)?;
if let Some(f) = Friend::get_it(&db, &rgid)? {
f.close(&db)?;
results.rpcs.push(rpc::friend_close(gid, f.id));
results.rpcs.push(chat_rpc::friend_close(gid, f.id));
let rfid = f.id;
let layer_lock = layer.clone();
@ -418,7 +422,7 @@ impl InnerEvent { @@ -418,7 +422,7 @@ impl InnerEvent {
let db = session_db(group.base(), &gid)?;
if let Some(f) = Friend::get_it(&db, &rgid)? {
f.delete(&db)?;
results.rpcs.push(rpc::friend_delete(gid, f.id));
results.rpcs.push(chat_rpc::friend_delete(gid, f.id));
delete_avatar_sync(group.base(), &gid, &f.gid)?;
let rfid = f.id;
@ -494,7 +498,9 @@ impl StatusEvent { @@ -494,7 +498,9 @@ impl StatusEvent {
StatusEvent::SessionFriendOnline(rgid) => {
let db = session_db(group.base(), &gid)?;
if let Some(f) = Friend::get_it(&db, &rgid)? {
results.rpcs.push(rpc::friend_online(gid, f.id, f.addr));
results
.rpcs
.push(chat_rpc::friend_online(gid, f.id, f.addr));
let layer_lock = layer.clone();
let rgid = f.gid;
let ggid = gid.clone();
@ -517,8 +523,11 @@ impl StatusEvent { @@ -517,8 +523,11 @@ impl StatusEvent {
tdn::smol::spawn(async move {
if let Ok(running) = layer_lock.write().await.running_mut(&ggid) {
if running.check_offline(&rgid, &addr) {
let msg =
SendMessage::Rpc(uid, rpc::friend_offline(ggid, rid), true);
let msg = SendMessage::Rpc(
uid,
chat_rpc::friend_offline(ggid, rid),
true,
);
let _ = sender.send(msg).await;
}
}
@ -752,7 +761,7 @@ impl SyncEvent { @@ -752,7 +761,7 @@ impl SyncEvent {
if Friend::get(&session_db, &rgid)?.is_none() {
delete_avatar_sync(&base, &gid, &rgid)?;
}
results.rpcs.push(rpc::request_delete(gid, req.id));
results.rpcs.push(chat_rpc::request_delete(gid, req.id));
}
req.is_ok = is_ok;
@ -768,13 +777,13 @@ impl SyncEvent { @@ -768,13 +777,13 @@ impl SyncEvent {
// save to db.
request.insert(&session_db)?;
let rid = request.id;
results.rpcs.push(rpc::request_create(gid, &request));
results.rpcs.push(chat_rpc::request_create(gid, &request));
if is_delete {
if Friend::get(&session_db, &rgid)?.is_none() {
delete_avatar_sync(&base, &gid, &rgid)?;
}
results.rpcs.push(rpc::request_delete(gid, rid));
results.rpcs.push(chat_rpc::request_delete(gid, rid));
}
request
@ -795,9 +804,11 @@ impl SyncEvent { @@ -795,9 +804,11 @@ impl SyncEvent {
}
})
.detach();
results.rpcs.push(rpc::request_agree(gid, rid, &friend));
results
.rpcs
.push(chat_rpc::request_agree(gid, rid, &friend));
} else {
results.rpcs.push(rpc::request_reject(gid, rid));
results.rpcs.push(chat_rpc::request_reject(gid, rid));
}
session_db.close()?;
@ -859,9 +870,9 @@ impl SyncEvent { @@ -859,9 +870,9 @@ impl SyncEvent {
}
if friend.is_deleted {
results.rpcs.push(rpc::friend_delete(gid, friend.id));
results.rpcs.push(chat_rpc::friend_delete(gid, friend.id));
} else {
results.rpcs.push(rpc::friend_info(gid, &friend));
results.rpcs.push(chat_rpc::friend_info(gid, &friend));
}
friend.id
@ -887,7 +898,7 @@ impl SyncEvent { @@ -887,7 +898,7 @@ impl SyncEvent {
let id = if let Some(f) = Friend::get_it(&session_db, &fgid)? {
let msg = m.handle(is_me, gid, &base, &session_db, f.id, eid)?;
results.rpcs.push(rpc::message_create(gid, &msg));
results.rpcs.push(chat_rpc::message_create(gid, &msg));
msg.id
} else {
-1

25
src/group.rs

@ -12,9 +12,12 @@ use tdn::{ @@ -12,9 +12,12 @@ use tdn::{
};
use tdn_did::{user::User, Proof};
use crate::account::Account;
use crate::apps::device::rpc as device_rpc;
use crate::apps::device::Device;
use crate::consensus::Event;
use crate::event::{InnerEvent, StatusEvent, SyncEvent};
use crate::layer::Layer;
use crate::models::{account::Account, consensus::Event, device::Device};
use crate::rpc;
use crate::storage::{account_db, account_init, consensus_db};
use crate::utils::device_status::device_status as local_device_status;
@ -97,7 +100,7 @@ impl Group { @@ -97,7 +100,7 @@ impl Group {
for (_, account) in &mut self.runnings {
if let Some(device) = account.distributes.get_mut(&addr) {
device.1 = false;
results.rpcs.push(rpc::device_offline(gid, device.0));
results.rpcs.push(device_rpc::device_offline(gid, device.0));
}
}
}
@ -182,7 +185,7 @@ impl Group { @@ -182,7 +185,7 @@ impl Group {
if let Some(v) = running.distributes.get_mut(&addr) {
v.1 = true;
results.rpcs.push(rpc::device_online(*gid, v.0));
results.rpcs.push(device_rpc::device_online(*gid, v.0));
(remote_height, remote_event, new_addrs)
} else {
let mut device = Device::new(device_name, device_info, addr);
@ -190,8 +193,10 @@ impl Group { @@ -190,8 +193,10 @@ impl Group {
device.insert(&db)?;
db.close()?;
running.distributes.insert(addr, (device.id, true));
results.rpcs.push(rpc::device_create(*gid, &device));
results.rpcs.push(rpc::device_online(*gid, device.id));
results.rpcs.push(device_rpc::device_create(*gid, &device));
results
.rpcs
.push(device_rpc::device_online(*gid, device.id));
(remote_height, remote_event, new_addrs)
}
}
@ -215,7 +220,7 @@ impl Group { @@ -215,7 +220,7 @@ impl Group {
let v = self.running_mut(gid)?;
let did = v.add_online(&addr)?;
results.rpcs.push(rpc::device_online(*gid, did));
results.rpcs.push(device_rpc::device_online(*gid, did));
(remote_height, remote_event, vec![])
}
};
@ -575,7 +580,7 @@ impl Group { @@ -575,7 +580,7 @@ impl Group {
let (ancestors, hashes, is_min) = if to >= from {
let (ancestors, is_min) = Self::ancestor(from, to);
let db = consensus_db(&self.base, gid)?;
let hashes = crate::models::consensus::Event::get_assign_hash(&db, &ancestors)?;
let hashes = crate::consensus::Event::get_assign_hash(&db, &ancestors)?;
db.close()?;
(ancestors, hashes, is_min)
} else {
@ -665,7 +670,7 @@ impl GroupEvent { @@ -665,7 +670,7 @@ impl GroupEvent {
GroupEvent::DeviceOffline => {
let v = group.running_mut(&gid)?;
let did = v.offline(&addr)?;
results.rpcs.push(rpc::device_offline(gid, did));
results.rpcs.push(device_rpc::device_offline(gid, did));
}
GroupEvent::StatusRequest => {
let (cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p) =
@ -700,7 +705,7 @@ impl GroupEvent { @@ -700,7 +705,7 @@ impl GroupEvent {
swap_p,
disk_p,
uptime,
) => results.rpcs.push(rpc::device_status(
) => results.rpcs.push(device_rpc::device_status(
gid, cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p, uptime,
)),
GroupEvent::Event(eheight, eid, pre, inner_event) => {
@ -726,7 +731,7 @@ impl GroupEvent { @@ -726,7 +731,7 @@ impl GroupEvent {
if account.height != remote_height || account.event != remote_event {
// check ancestor and merge.
let db = consensus_db(&group.base, &gid)?;
let ours = crate::models::consensus::Event::get_assign_hash(&db, &ancestors)?;
let ours = crate::consensus::Event::get_assign_hash(&db, &ancestors)?;
drop(db);
if ours.len() == 0 {

2
src/group/running.rs

@ -8,7 +8,7 @@ use tdn::types::{ @@ -8,7 +8,7 @@ use tdn::types::{
use tdn_did::Keypair;
use crate::models::device::Device;
use crate::apps::device::Device;
use crate::storage::consensus_db;
pub(crate) struct RunningAccount {

5
src/layer.rs

@ -12,11 +12,12 @@ use tdn::{ @@ -12,11 +12,12 @@ use tdn::{
};
use tdn_did::{user::User, Proof};
use crate::apps::chat::{Friend, Message, MessageType, NetworkMessage, Request};
use crate::event::{InnerEvent, StatusEvent};
use crate::group::Group;
use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH};
use crate::models::session::{Friend, Message, MessageType, NetworkMessage, Request};
use crate::rpc;
use crate::apps::chat::rpc;
use crate::storage::{
read_avatar, read_file, read_record, session_db, write_avatar_sync, write_file, write_image,
};

2
src/layer/running.rs

@ -5,7 +5,7 @@ use tdn::types::{ @@ -5,7 +5,7 @@ use tdn::types::{
primitive::{new_io_error, PeerAddr, Result},
};
use crate::models::session::Friend;
use crate::apps::chat::Friend;
use crate::storage::session_db;
/// online info.

4
src/lib.rs

@ -5,11 +5,13 @@ use std::ffi::CStr; @@ -5,11 +5,13 @@ use std::ffi::CStr;
use std::os::raw::c_char;
use tdn::smol;
mod account;
mod apps;
mod consensus;
mod event;
mod group;
mod layer;
mod migrate;
mod models;
mod primitives;
mod rpc;
mod server;

6
src/models.rs

@ -1,6 +0,0 @@ @@ -1,6 +0,0 @@
pub(crate) mod account;
pub(crate) mod consensus;
pub(crate) mod device;
pub(crate) mod file;
pub(crate) mod service;
pub(crate) mod session;

666
src/rpc.rs

@ -7,22 +7,30 @@ use tdn::{ @@ -7,22 +7,30 @@ use tdn::{
types::{
group::GroupId,
message::{NetworkType, SendMessage, SendType, StateRequest, StateResponse},
primitive::{new_io_error, HandleResult, PeerAddr},
primitive::{new_io_error, HandleResult, PeerAddr, Result},
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam},
},
};
use tdn_did::user::User;
use crate::apps::app_rpc_inject;
use crate::event::InnerEvent;
use crate::group::{Group, GroupEvent};
use crate::group::Group;
use crate::layer::{Layer, LayerEvent};
use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH};
use crate::models::{
device::Device,
session::{Friend, Message, MessageType, Request},
};
use crate::storage::{consensus_db, delete_avatar, session_db};
use crate::utils::device_status::device_status as local_device_status;
pub(crate) fn init_rpc(
addr: PeerAddr,
group: Arc<RwLock<Group>>,
layer: Arc<RwLock<Layer>>,
) -> RpcHandler<RpcState> {
let mut handler = new_rpc_handler(addr, group, layer);
app_rpc_inject(&mut handler);
handler
}
pub(crate) struct RpcState {
pub group: Arc<RwLock<Group>>,
pub layer: Arc<RwLock<Layer>>,
}
#[inline]
pub(crate) fn network_stable(peers: Vec<(PeerAddr, bool)>) -> RpcParam {
@ -52,159 +60,11 @@ pub(crate) fn network_seed(peers: Vec<SocketAddr>) -> RpcParam { @@ -52,159 +60,11 @@ pub(crate) fn network_seed(peers: Vec<SocketAddr>) -> RpcParam {
rpc_response(0, "network-seed", json!(s_peers), GroupId::default())
}
#[inline]
pub(crate) fn friend_online(mgid: GroupId, fid: i64, addr: PeerAddr) -> RpcParam {
rpc_response(0, "friend-online", json!([fid, addr.to_hex()]), mgid)
}
#[inline]
pub(crate) fn friend_offline(mgid: GroupId, fid: i64) -> RpcParam {
rpc_response(0, "friend-offline", json!([fid]), mgid)
}
#[inline]
pub(crate) fn friend_info(mgid: GroupId, friend: &Friend) -> RpcParam {
rpc_response(0, "friend-info", json!(friend.to_rpc()), mgid)
}
#[inline]
pub(crate) fn friend_update(mgid: GroupId, fid: i64, is_top: bool, remark: &str) -> RpcParam {
rpc_response(0, "friend-update", json!([fid, is_top, remark]), mgid)
}
#[inline]
pub(crate) fn friend_close(mgid: GroupId, fid: i64) -> RpcParam {
rpc_response(0, "friend-close", json!([fid]), mgid)
}
#[inline]
pub(crate) fn friend_delete(mgid: GroupId, fid: i64) -> RpcParam {
rpc_response(0, "friend-delete", json!([fid]), mgid)
}
#[inline]
pub(crate) fn request_create(mgid: GroupId, req: &Request) -> RpcParam {
rpc_response(0, "request-create", json!(req.to_rpc()), mgid)
}
#[inline]
pub(crate) fn request_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam {
rpc_response(0, "request-delivery", json!([id, is_d]), mgid)
}
#[inline]
pub(crate) fn request_agree(mgid: GroupId, id: i64, friend: &Friend) -> RpcParam {
rpc_response(0, "request-agree", json!([id, friend.to_rpc()]), mgid)
}
#[inline]
pub(crate) fn request_reject(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "request-reject", json!([id]), mgid)
}
#[inline]
pub(crate) fn request_delete(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "request-delete", json!([id]), mgid)
}
#[inline]
pub(crate) fn message_create(mgid: GroupId, msg: &Message) -> RpcParam {
rpc_response(0, "message-create", json!(msg.to_rpc()), mgid)
}
#[inline]
pub(crate) fn message_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam {
rpc_response(0, "message-delivery", json!([id, is_d]), mgid)
}
#[inline]
pub(crate) fn message_delete(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "message-delete", json!([id]), mgid)
}
#[inline]
pub(crate) fn device_create(mgid: GroupId, device: &Device) -> RpcParam {
rpc_response(0, "device-create", json!(device.to_rpc()), mgid)
}
#[inline]
pub(crate) fn _device_remove(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "device-remove", json!([id]), mgid)
}
#[inline]
pub(crate) fn device_online(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "device-online", json!([id]), mgid)
}
#[inline]
pub(crate) fn device_offline(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "device-offline", json!([id]), mgid)
}
#[inline]
pub(crate) fn account_update(mgid: GroupId, name: &str, avatar: String) -> RpcParam {
rpc_response(0, "account-update", json!([name, avatar]), mgid)
}
#[inline]
pub(crate) fn device_status(
mgid: GroupId,
cpu: u32,
memory: u32,
swap: u32,
disk: u32,
cpu_p: u16,
memory_p: u16,
swap_p: u16,
disk_p: u16,
uptime: u32,
) -> RpcParam {
rpc_response(
0,
"device-status",
json!([cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]),
mgid,
)
}
#[inline]
fn friend_list(friends: Vec<Friend>) -> RpcParam {
let mut results = vec![];
for friend in friends {
results.push(friend.to_rpc());
}
json!(results)
}
#[inline]
fn request_list(requests: Vec<Request>) -> RpcParam {
let mut results = vec![];
for request in requests {
results.push(request.to_rpc());
}
json!(results)
}
#[inline]
fn message_list(messages: Vec<Message>) -> RpcParam {
let mut results = vec![];
for msg in messages {
results.push(msg.to_rpc());
}
json!(results)
}
#[inline]
fn device_list(devices: Vec<Device>) -> RpcParam {
let mut results = vec![];
for device in devices {
results.push(device.to_rpc());
}
json!(results)
}
#[inline]
pub(crate) async fn sleep_waiting_close_stable(
sender: Sender<SendMessage>,
@ -239,7 +99,7 @@ pub(crate) async fn inner_rpc( @@ -239,7 +99,7 @@ pub(crate) async fn inner_rpc(
uid: u64,
method: &str,
sender: &async_channel::Sender<SendMessage>,
) -> Result<(), std::io::Error> {
) -> Result<()> {
// Inner network default rpc method. only use in http-rpc.
if method == "network-stable" || method == "network-dht" || method == "network-seed" {
let req = match method {
@ -275,28 +135,22 @@ pub(crate) async fn inner_rpc( @@ -275,28 +135,22 @@ pub(crate) async fn inner_rpc(
Err(new_io_error("not found"))
}
pub(crate) struct RpcState {
group: Arc<RwLock<Group>>,
layer: Arc<RwLock<Layer>>,
}
#[inline]
pub(crate) fn new_rpc_handler(
fn new_rpc_handler(
addr: PeerAddr,
group: Arc<RwLock<Group>>,
layer: Arc<RwLock<Layer>>,
) -> RpcHandler<RpcState> {
let mut rpc_handler = RpcHandler::new(RpcState { group, layer });
let mut handler = RpcHandler::new(RpcState { group, layer });
rpc_handler.add_method("echo", |_, params, _| async move {
handler.add_method("echo", |_, params, _| async move {
Ok(HandleResult::rpc(json!(params)))
});
rpc_handler.add_method("system-info", move |_, _, _| async move {
handler.add_method("system-info", move |_, _, _| async move {
Ok(HandleResult::rpc(json!(vec![addr.to_hex()])))
});
rpc_handler.add_method(
handler.add_method(
"add-bootstrap",
|_gid, params: Vec<RpcParam>, _| async move {
let socket = params[0].as_str()?;
@ -308,7 +162,7 @@ pub(crate) fn new_rpc_handler( @@ -308,7 +162,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
handler.add_method(
"account-list",
|_gid, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let mut users: Vec<Vec<String>> = vec![];
@ -327,7 +181,7 @@ pub(crate) fn new_rpc_handler( @@ -327,7 +181,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
handler.add_method(
"account-create",
|_gid, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let name = params[0].as_str()?;
@ -353,7 +207,7 @@ pub(crate) fn new_rpc_handler( @@ -353,7 +207,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
handler.add_method(
"account-restore",
|_gid, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let name = params[0].as_str()?;
@ -390,76 +244,7 @@ pub(crate) fn new_rpc_handler( @@ -390,76 +244,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
"device-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let db = consensus_db(state.layer.read().await.base(), &gid)?;
let devices = Device::all(&db)?;
drop(db);
let online_devices = state.group.read().await.online_devices(&gid, devices);
Ok(HandleResult::rpc(device_list(online_devices)))
},
);
rpc_handler.add_method(
"device-status",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let addr = PeerAddr::from_hex(params[0].as_str()?)
.map_err(|_e| new_io_error("PeerAddr invalid!"))?;
let group_lock = state.group.read().await;
if &addr == group_lock.addr() {
let uptime = group_lock.uptime(&gid)?;
let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) =
local_device_status();
return Ok(HandleResult::rpc(json!([
cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime
])));
}
drop(group_lock);
let msg = state
.group
.write()
.await
.event_message(addr, &GroupEvent::StatusRequest)?;
Ok(HandleResult::group(gid, msg))
},
);
rpc_handler.add_method(
"device-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let addr = PeerAddr::from_hex(params[0].as_str()?)
.map_err(|_e| new_io_error("PeerAddr invalid!"))?;
let msg = state.group.read().await.create_message(&gid, addr)?;
Ok(HandleResult::group(gid, msg))
},
);
rpc_handler.add_method(
"device-connect",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let addr = PeerAddr::from_hex(params[0].as_str()?)
.map_err(|_e| new_io_error("PeerAddr invalid!"))?;
let msg = state.group.read().await.connect_message(&gid, addr)?;
Ok(HandleResult::group(gid, msg))
},
);
rpc_handler.add_method(
"device-delete",
|_gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move {
let _id = params[0].as_i64()?;
// TODO delete a device.
Ok(HandleResult::new())
},
);
rpc_handler.add_method(
handler.add_method(
"account-update",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let name = params[0].as_str()?;
@ -484,7 +269,7 @@ pub(crate) fn new_rpc_handler( @@ -484,7 +269,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
handler.add_method(
"account-pin",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let old = params[0].as_str()?;
@ -495,7 +280,7 @@ pub(crate) fn new_rpc_handler( @@ -495,7 +280,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
handler.add_method(
"account-mnemonic",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let lock = params[0].as_str()?;
@ -505,7 +290,7 @@ pub(crate) fn new_rpc_handler( @@ -505,7 +290,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
handler.add_method(
"account-login",
|_gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let gid = GroupId::from_hex(params[0].as_str()?)?;
@ -524,7 +309,7 @@ pub(crate) fn new_rpc_handler( @@ -524,7 +309,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
handler.add_method(
"account-logout",
|_gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let mut results = HandleResult::new();
@ -562,7 +347,7 @@ pub(crate) fn new_rpc_handler( @@ -562,7 +347,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
handler.add_method(
"account-online",
|_gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let gid = GroupId::from_hex(params[0].as_str()?)?;
@ -588,7 +373,7 @@ pub(crate) fn new_rpc_handler( @@ -588,7 +373,7 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
handler.add_method(
"account-offline",
|_gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let gid = GroupId::from_hex(params[0].as_str()?)?;
@ -618,386 +403,5 @@ pub(crate) fn new_rpc_handler( @@ -618,386 +403,5 @@ pub(crate) fn new_rpc_handler(
},
);
rpc_handler.add_method(
"friend-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let friends = state.layer.read().await.all_friends_with_online(&gid)?;
Ok(HandleResult::rpc(friend_list(friends)))
},
);
rpc_handler.add_method(
"friend-update",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let remark = params[1].as_str()?;
let is_top = params[2].as_bool()?;
let mut results = HandleResult::new();
let db = session_db(state.layer.read().await.base(), &gid)?;
let f = if let Some(mut f) = Friend::get_id(&db, id)? {
f.is_top = is_top;
f.remark = remark.to_owned();
f.me_update(&db)?;
f
} else {
return Ok(results);
};
drop(db);
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionFriendUpdate(f.gid, f.is_top, f.remark),
FRIEND_TABLE_PATH,
f.id,
&mut results,
)?;
Ok(results)
},
);
rpc_handler.add_method(
"friend-readed",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let fid = params[0].as_i64()?;
let db = session_db(state.layer.read().await.base(), &gid)?;
Friend::readed(&db, fid)?;
drop(db);
Ok(HandleResult::new())
},
);
rpc_handler.add_method(
"friend-close",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let mut results = HandleResult::new();
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
let friend = Friend::get_id(&db, id)??;
friend.close(&db)?;
drop(db);
let online = layer_lock.remove_friend(&gid, &friend.gid);
drop(layer_lock);
if let Some(faddr) = online {
let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new();
addrs.insert(faddr, friend.gid);
let sender = state.group.read().await.sender();
tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs))
.detach();
}
let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]);
let msg = SendType::Event(0, friend.addr, data);
results.layers.push((gid, friend.gid, msg));
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionFriendClose(friend.gid),
FRIEND_TABLE_PATH,
friend.id,
&mut results,
)?;
Ok(results)
},
);
rpc_handler.add_method(
"friend-delete",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let mut results = HandleResult::new();
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
let friend = Friend::get_id(&db, id)??;
friend.delete(&db)?;
drop(db);
let online = layer_lock.remove_friend(&gid, &friend.gid);
delete_avatar(layer_lock.base(), &gid, &friend.gid).await?;
drop(layer_lock);
if let Some(faddr) = online {
let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new();
addrs.insert(faddr, friend.gid);
let sender = state.group.read().await.sender();
tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs))
.detach();
}
let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]);
let msg = SendType::Event(0, friend.addr, data);
results.layers.push((gid, friend.gid, msg));
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionFriendDelete(friend.gid),
FRIEND_TABLE_PATH,
friend.id,
&mut results,
)?;
Ok(results)
},
);
rpc_handler.add_method(
"request-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let layer_lock = state.layer.read().await;
let db = session_db(layer_lock.base(), &gid)?;
drop(layer_lock);
let requests = Request::all(&db)?;
drop(db);
Ok(HandleResult::rpc(request_list(requests)))
},
);
rpc_handler.add_method(
"request-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let remote_gid = GroupId::from_hex(params[0].as_str()?)?;
let remote_addr = PeerAddr::from_hex(params[1].as_str()?)?;
let remote_name = params[2].as_str()?.to_string();
let remark = params[3].as_str()?.to_string();
let mut request = Request::new(
remote_gid,
remote_addr,
remote_name.clone(),
remark.clone(),
true,
false,
);
let mut results = HandleResult::rpc(Default::default());
let me = state.group.read().await.clone_user(&gid)?;
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
if Friend::is_friend(&db, &request.gid)? {
debug!("had friend.");
drop(layer_lock);
return Ok(results);
}
if let Some(req) = Request::get(&db, &request.gid)? {
println!("Had this request.");
req.delete(&db)?;
}
request.insert(&db)?;
drop(db);
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionRequestCreate(
true,
User::new(remote_gid, remote_addr, remote_name, vec![])?,
remark,
),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
results
.layers
.push((gid, remote_gid, layer_lock.req_message(me, request)));
drop(layer_lock);
Ok(results)
},
);
rpc_handler.add_method(
"request-agree",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let mut group_lock = state.group.write().await;
let me = group_lock.clone_user(&gid)?;
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
let mut results = HandleResult::new();
if let Some(mut request) = Request::get_id(&db, id)? {
group_lock.broadcast(
&gid,
InnerEvent::SessionRequestHandle(request.gid, true, vec![]),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_ok = true;
request.is_over = true;
request.update(&db)?;
let f = Friend::from_request(&db, request)?;
layer_lock.running_mut(&gid)?.add_permissioned(f.gid, f.id);
results.rpcs.push(json!([id, f.to_rpc()]));
let proof = group_lock.prove_addr(&gid, &f.addr)?;
let msg = layer_lock.rpc_agree_message(id, proof, me, &gid, f.addr)?;
results.layers.push((gid, f.gid, msg));
}
db.close()?;
drop(group_lock);
drop(layer_lock);
Ok(results)
},
);
rpc_handler.add_method(
"request-reject",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let mut layer_lock = state.layer.write().await;
let db = session_db(layer_lock.base(), &gid)?;
let mut req = Request::get_id(&db, id)??;
req.is_ok = false;
req.is_over = true;
req.update(&db)?;
drop(db);
let msg = layer_lock.reject_message(id, req.addr, gid);
drop(layer_lock);
let mut results = HandleResult::layer(gid, req.gid, msg);
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionRequestHandle(req.gid, false, vec![]),
REQUEST_TABLE_PATH,
req.id,
&mut results,
)?;
Ok(results)
},
);
rpc_handler.add_method(
"request-delete",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let layer_lock = state.layer.read().await;
let db = session_db(layer_lock.base(), &gid)?;
let base = layer_lock.base().clone();
drop(layer_lock);
let req = Request::get_id(&db, id)??;
req.delete(&db)?;
// delete avatar. check had friend.
if Friend::get(&db, &req.gid)?.is_none() {
delete_avatar(&base, &gid, &req.gid).await?;
}
drop(db);
let mut results = HandleResult::new();
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionRequestDelete(req.gid),
REQUEST_TABLE_PATH,
req.id,
&mut results,
)?;
Ok(results)
},
);
rpc_handler.add_method(
"message-list",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let fid = params[0].as_i64()?;
let layer_lock = state.layer.read().await;
let db = session_db(layer_lock.base(), &gid)?;
drop(layer_lock);
Friend::readed(&db, fid)?;
let messages = Message::get(&db, &fid)?;
drop(db);
Ok(HandleResult::rpc(message_list(messages)))
},
);
rpc_handler.add_method(
"message-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let fid = params[0].as_i64()?;
let fgid = GroupId::from_hex(params[1].as_str()?)?;
let m_type = MessageType::from_int(params[2].as_i64()?);
let content = params[3].as_str()?.to_string();
let mut layer_lock = state.layer.write().await;
let base = layer_lock.base();
let faddr = layer_lock.running(&gid)?.online(&fgid)?;
let (msg, nw) = LayerEvent::from_message(base, gid, fid, m_type, content).await?;
let event = LayerEvent::Message(msg.hash, nw);
let s = layer_lock.event_message(msg.id, gid, faddr, &event);
drop(layer_lock);
let mut results = HandleResult::rpc(json!(msg.to_rpc()));
results.layers.push((gid, fgid, s));
match event {
LayerEvent::Message(hash, nw) => {
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionMessageCreate(fgid, true, hash, nw),
MESSAGE_TABLE_PATH,
msg.id,
&mut results,
)?;
}
_ => {}
}
Ok(results)
},
);
rpc_handler.add_method(
"message-delete",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let layer_lock = state.layer.read().await;
let db = session_db(&layer_lock.base(), &gid)?;
drop(layer_lock);
let msg = Message::get_id(&db, id)??;
msg.delete(&db)?;
drop(db);
let mut results = HandleResult::new();
state.group.write().await.broadcast(
&gid,
InnerEvent::SessionMessageDelete(msg.hash),
MESSAGE_TABLE_PATH,
msg.id,
&mut results,
)?;
Ok(results)
},
);
rpc_handler.add_method(
"files-folder",
|_gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move {
let _path = params[0].as_str()?;
Ok(HandleResult::new())
},
);
rpc_handler
handler
}

6
src/server.rs

@ -12,12 +12,12 @@ use tdn::{ @@ -12,12 +12,12 @@ use tdn::{
types::primitive::HandleResult,
};
use crate::account::Account;
use crate::group::Group;
use crate::layer::Layer;
use crate::migrate::main_migrate;
use crate::models::account::Account;
use crate::primitives::network_seeds;
use crate::rpc::{inner_rpc, new_rpc_handler};
use crate::rpc::{init_rpc, inner_rpc};
use crate::storage::account_db;
pub const DEFAULT_WS_ADDR: &'static str = "127.0.0.1:8080";
@ -67,7 +67,7 @@ pub async fn start(db_path: String) -> Result<()> { @@ -67,7 +67,7 @@ pub async fn start(db_path: String) -> Result<()> {
Layer::init(db_path, peer_id, group.clone()).await?,
));
let rpc = new_rpc_handler(peer_id, group.clone(), layer.clone());
let rpc = init_rpc(peer_id, group.clone(), layer.clone());
//let mut group_rpcs: HashMap<u64, GroupId> = HashMap::new();
let mut now_rpc_uid = 0;

Loading…
Cancel
Save