diff --git a/assets/logo/logo_40.jpg b/assets/logo/logo_40.jpg index 0f76537..58f028e 100644 Binary files a/assets/logo/logo_40.jpg and b/assets/logo/logo_40.jpg differ diff --git a/src/models/account.rs b/src/account.rs similarity index 100% rename from src/models/account.rs rename to src/account.rs diff --git a/src/apps.rs b/src/apps.rs new file mode 100644 index 0000000..6c7a30f --- /dev/null +++ b/src/apps.rs @@ -0,0 +1,38 @@ +use tdn::types::{ + group::GroupId, + primitive::{HandleResult, PeerAddr, Result}, + rpc::RpcHandler, +}; + +use crate::rpc::RpcState; + +pub(crate) mod assistant; +pub(crate) mod chat; +pub(crate) mod device; +pub(crate) mod domain; +pub(crate) mod file; + +pub(crate) fn app_rpc_inject(handler: &mut RpcHandler) { + device::new_rpc_handler(handler); + chat::new_rpc_handler(handler); + assistant::new_rpc_handler(handler); + domain::new_rpc_handler(handler); + file::new_rpc_handler(handler); +} + +pub(crate) fn _app_layer_handle( + _gid: GroupId, + _fgid: GroupId, + _addr: PeerAddr, + _data: Vec, +) -> Result { + todo!() +} + +pub(crate) fn _app_group_handle() -> Result { + todo!() +} + +pub(crate) fn _app_migrate() -> Result<()> { + todo!() +} diff --git a/src/models/service.rs b/src/apps/assistant/layer.rs similarity index 100% rename from src/models/service.rs rename to src/apps/assistant/layer.rs diff --git a/src/apps/assistant/migrate.rs b/src/apps/assistant/migrate.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/apps/assistant/mod.rs b/src/apps/assistant/mod.rs new file mode 100644 index 0000000..eeb5c48 --- /dev/null +++ b/src/apps/assistant/mod.rs @@ -0,0 +1,12 @@ +use tdn::types::{ + primitive::HandleResult, + rpc::{json, RpcHandler}, +}; + +use crate::rpc::RpcState; + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("assistant-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); +} diff --git a/src/apps/assistant/models.rs b/src/apps/assistant/models.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/apps/assistant/rpc.rs b/src/apps/assistant/rpc.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/apps/chat/mod.rs b/src/apps/chat/mod.rs new file mode 100644 index 0000000..e646058 --- /dev/null +++ b/src/apps/chat/mod.rs @@ -0,0 +1,5 @@ +mod models; + +pub(crate) mod rpc; +pub(crate) use models::{Friend, Message, MessageType, NetworkMessage, Request}; +pub(crate) use rpc::new_rpc_handler; diff --git a/src/models/session.rs b/src/apps/chat/models.rs similarity index 100% rename from src/models/session.rs rename to src/apps/chat/models.rs diff --git a/src/apps/chat/rpc.rs b/src/apps/chat/rpc.rs new file mode 100644 index 0000000..ef46aaf --- /dev/null +++ b/src/apps/chat/rpc.rs @@ -0,0 +1,493 @@ +use std::collections::HashMap; +use std::sync::Arc; +use tdn::types::{ + group::GroupId, + message::SendType, + primitive::{HandleResult, PeerAddr}, + rpc::{json, rpc_response, RpcHandler, RpcParam}, +}; +use tdn_did::user::User; + +use crate::event::InnerEvent; +use crate::layer::LayerEvent; +use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH}; +use crate::rpc::{sleep_waiting_close_stable, RpcState}; +use crate::storage::{delete_avatar, session_db}; + +use super::{Friend, Message, MessageType, Request}; + +#[inline] +pub(crate) fn friend_online(mgid: GroupId, fid: i64, addr: PeerAddr) -> RpcParam { + rpc_response(0, "friend-online", json!([fid, addr.to_hex()]), mgid) +} + +#[inline] +pub(crate) fn friend_offline(mgid: GroupId, fid: i64) -> RpcParam { + rpc_response(0, "friend-offline", json!([fid]), mgid) +} + +#[inline] +pub(crate) fn friend_info(mgid: GroupId, friend: &Friend) -> RpcParam { + rpc_response(0, "friend-info", json!(friend.to_rpc()), mgid) +} + +#[inline] +pub(crate) fn friend_update(mgid: GroupId, fid: i64, is_top: bool, remark: &str) -> RpcParam { + rpc_response(0, "friend-update", json!([fid, is_top, remark]), mgid) +} + +#[inline] +pub(crate) fn friend_close(mgid: GroupId, fid: i64) -> RpcParam { + rpc_response(0, "friend-close", json!([fid]), mgid) +} + +#[inline] +pub(crate) fn friend_delete(mgid: GroupId, fid: i64) -> RpcParam { + rpc_response(0, "friend-delete", json!([fid]), mgid) +} + +#[inline] +pub(crate) fn request_create(mgid: GroupId, req: &Request) -> RpcParam { + rpc_response(0, "request-create", json!(req.to_rpc()), mgid) +} + +#[inline] +pub(crate) fn request_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam { + rpc_response(0, "request-delivery", json!([id, is_d]), mgid) +} + +#[inline] +pub(crate) fn request_agree(mgid: GroupId, id: i64, friend: &Friend) -> RpcParam { + rpc_response(0, "request-agree", json!([id, friend.to_rpc()]), mgid) +} + +#[inline] +pub(crate) fn request_reject(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "request-reject", json!([id]), mgid) +} + +#[inline] +pub(crate) fn request_delete(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "request-delete", json!([id]), mgid) +} + +#[inline] +pub(crate) fn message_create(mgid: GroupId, msg: &Message) -> RpcParam { + rpc_response(0, "message-create", json!(msg.to_rpc()), mgid) +} + +#[inline] +pub(crate) fn message_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam { + rpc_response(0, "message-delivery", json!([id, is_d]), mgid) +} + +#[inline] +pub(crate) fn message_delete(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "message-delete", json!([id]), mgid) +} + +#[inline] +fn friend_list(friends: Vec) -> RpcParam { + let mut results = vec![]; + for friend in friends { + results.push(friend.to_rpc()); + } + + json!(results) +} + +#[inline] +fn request_list(requests: Vec) -> RpcParam { + let mut results = vec![]; + for request in requests { + results.push(request.to_rpc()); + } + json!(results) +} + +#[inline] +fn message_list(messages: Vec) -> RpcParam { + let mut results = vec![]; + for msg in messages { + results.push(msg.to_rpc()); + } + json!(results) +} + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("chat-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); + + handler.add_method( + "friend-list", + |gid: GroupId, _params: Vec, state: Arc| async move { + let friends = state.layer.read().await.all_friends_with_online(&gid)?; + Ok(HandleResult::rpc(friend_list(friends))) + }, + ); + + handler.add_method( + "friend-update", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + let remark = params[1].as_str()?; + let is_top = params[2].as_bool()?; + + let mut results = HandleResult::new(); + let db = session_db(state.layer.read().await.base(), &gid)?; + let f = if let Some(mut f) = Friend::get_id(&db, id)? { + f.is_top = is_top; + f.remark = remark.to_owned(); + f.me_update(&db)?; + f + } else { + return Ok(results); + }; + drop(db); + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionFriendUpdate(f.gid, f.is_top, f.remark), + FRIEND_TABLE_PATH, + f.id, + &mut results, + )?; + Ok(results) + }, + ); + + handler.add_method( + "friend-readed", + |gid: GroupId, params: Vec, state: Arc| async move { + let fid = params[0].as_i64()?; + + let db = session_db(state.layer.read().await.base(), &gid)?; + Friend::readed(&db, fid)?; + drop(db); + + Ok(HandleResult::new()) + }, + ); + + handler.add_method( + "friend-close", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let mut results = HandleResult::new(); + let mut layer_lock = state.layer.write().await; + + let db = session_db(layer_lock.base(), &gid)?; + let friend = Friend::get_id(&db, id)??; + friend.close(&db)?; + drop(db); + + let online = layer_lock.remove_friend(&gid, &friend.gid); + drop(layer_lock); + + if let Some(faddr) = online { + let mut addrs: HashMap = HashMap::new(); + addrs.insert(faddr, friend.gid); + let sender = state.group.read().await.sender(); + tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs)) + .detach(); + } + + let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]); + let msg = SendType::Event(0, friend.addr, data); + results.layers.push((gid, friend.gid, msg)); + + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionFriendClose(friend.gid), + FRIEND_TABLE_PATH, + friend.id, + &mut results, + )?; + + Ok(results) + }, + ); + + handler.add_method( + "friend-delete", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let mut results = HandleResult::new(); + let mut layer_lock = state.layer.write().await; + + let db = session_db(layer_lock.base(), &gid)?; + let friend = Friend::get_id(&db, id)??; + friend.delete(&db)?; + drop(db); + + let online = layer_lock.remove_friend(&gid, &friend.gid); + delete_avatar(layer_lock.base(), &gid, &friend.gid).await?; + drop(layer_lock); + + if let Some(faddr) = online { + let mut addrs: HashMap = HashMap::new(); + addrs.insert(faddr, friend.gid); + let sender = state.group.read().await.sender(); + tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs)) + .detach(); + } + + let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]); + let msg = SendType::Event(0, friend.addr, data); + results.layers.push((gid, friend.gid, msg)); + + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionFriendDelete(friend.gid), + FRIEND_TABLE_PATH, + friend.id, + &mut results, + )?; + + Ok(results) + }, + ); + + handler.add_method( + "request-list", + |gid: GroupId, _params: Vec, state: Arc| async move { + let layer_lock = state.layer.read().await; + let db = session_db(layer_lock.base(), &gid)?; + drop(layer_lock); + let requests = Request::all(&db)?; + drop(db); + Ok(HandleResult::rpc(request_list(requests))) + }, + ); + + handler.add_method( + "request-create", + |gid: GroupId, params: Vec, state: Arc| async move { + let remote_gid = GroupId::from_hex(params[0].as_str()?)?; + let remote_addr = PeerAddr::from_hex(params[1].as_str()?)?; + let remote_name = params[2].as_str()?.to_string(); + let remark = params[3].as_str()?.to_string(); + + let mut request = Request::new( + remote_gid, + remote_addr, + remote_name.clone(), + remark.clone(), + true, + false, + ); + + let mut results = HandleResult::rpc(Default::default()); + let me = state.group.read().await.clone_user(&gid)?; + + let mut layer_lock = state.layer.write().await; + let db = session_db(layer_lock.base(), &gid)?; + if Friend::is_friend(&db, &request.gid)? { + debug!("had friend."); + drop(layer_lock); + return Ok(results); + } + + if let Some(req) = Request::get(&db, &request.gid)? { + println!("Had this request."); + req.delete(&db)?; + } + request.insert(&db)?; + drop(db); + + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionRequestCreate( + true, + User::new(remote_gid, remote_addr, remote_name, vec![])?, + remark, + ), + REQUEST_TABLE_PATH, + request.id, + &mut results, + )?; + + results + .layers + .push((gid, remote_gid, layer_lock.req_message(me, request))); + + drop(layer_lock); + + Ok(results) + }, + ); + + handler.add_method( + "request-agree", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let mut group_lock = state.group.write().await; + let me = group_lock.clone_user(&gid)?; + let mut layer_lock = state.layer.write().await; + let db = session_db(layer_lock.base(), &gid)?; + let mut results = HandleResult::new(); + + if let Some(mut request) = Request::get_id(&db, id)? { + group_lock.broadcast( + &gid, + InnerEvent::SessionRequestHandle(request.gid, true, vec![]), + REQUEST_TABLE_PATH, + request.id, + &mut results, + )?; + request.is_ok = true; + request.is_over = true; + request.update(&db)?; + + let f = Friend::from_request(&db, request)?; + layer_lock.running_mut(&gid)?.add_permissioned(f.gid, f.id); + results.rpcs.push(json!([id, f.to_rpc()])); + + let proof = group_lock.prove_addr(&gid, &f.addr)?; + let msg = layer_lock.rpc_agree_message(id, proof, me, &gid, f.addr)?; + results.layers.push((gid, f.gid, msg)); + } + db.close()?; + drop(group_lock); + drop(layer_lock); + Ok(results) + }, + ); + + handler.add_method( + "request-reject", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let mut layer_lock = state.layer.write().await; + let db = session_db(layer_lock.base(), &gid)?; + let mut req = Request::get_id(&db, id)??; + req.is_ok = false; + req.is_over = true; + req.update(&db)?; + drop(db); + let msg = layer_lock.reject_message(id, req.addr, gid); + drop(layer_lock); + + let mut results = HandleResult::layer(gid, req.gid, msg); + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionRequestHandle(req.gid, false, vec![]), + REQUEST_TABLE_PATH, + req.id, + &mut results, + )?; + Ok(results) + }, + ); + + handler.add_method( + "request-delete", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let layer_lock = state.layer.read().await; + let db = session_db(layer_lock.base(), &gid)?; + let base = layer_lock.base().clone(); + drop(layer_lock); + let req = Request::get_id(&db, id)??; + req.delete(&db)?; + + // delete avatar. check had friend. + if Friend::get(&db, &req.gid)?.is_none() { + delete_avatar(&base, &gid, &req.gid).await?; + } + drop(db); + + let mut results = HandleResult::new(); + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionRequestDelete(req.gid), + REQUEST_TABLE_PATH, + req.id, + &mut results, + )?; + Ok(results) + }, + ); + + handler.add_method( + "message-list", + |gid: GroupId, params: Vec, state: Arc| async move { + let fid = params[0].as_i64()?; + + let layer_lock = state.layer.read().await; + let db = session_db(layer_lock.base(), &gid)?; + drop(layer_lock); + + Friend::readed(&db, fid)?; + let messages = Message::get(&db, &fid)?; + drop(db); + Ok(HandleResult::rpc(message_list(messages))) + }, + ); + + handler.add_method( + "message-create", + |gid: GroupId, params: Vec, state: Arc| async move { + let fid = params[0].as_i64()?; + let fgid = GroupId::from_hex(params[1].as_str()?)?; + let m_type = MessageType::from_int(params[2].as_i64()?); + let content = params[3].as_str()?.to_string(); + + let mut layer_lock = state.layer.write().await; + let base = layer_lock.base(); + let faddr = layer_lock.running(&gid)?.online(&fgid)?; + + let (msg, nw) = LayerEvent::from_message(base, gid, fid, m_type, content).await?; + let event = LayerEvent::Message(msg.hash, nw); + let s = layer_lock.event_message(msg.id, gid, faddr, &event); + drop(layer_lock); + + let mut results = HandleResult::rpc(json!(msg.to_rpc())); + results.layers.push((gid, fgid, s)); + + match event { + LayerEvent::Message(hash, nw) => { + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionMessageCreate(fgid, true, hash, nw), + MESSAGE_TABLE_PATH, + msg.id, + &mut results, + )?; + } + _ => {} + } + + Ok(results) + }, + ); + + handler.add_method( + "message-delete", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let layer_lock = state.layer.read().await; + let db = session_db(&layer_lock.base(), &gid)?; + drop(layer_lock); + + let msg = Message::get_id(&db, id)??; + msg.delete(&db)?; + drop(db); + let mut results = HandleResult::new(); + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionMessageDelete(msg.hash), + MESSAGE_TABLE_PATH, + msg.id, + &mut results, + )?; + Ok(results) + }, + ); +} diff --git a/src/apps/device/mod.rs b/src/apps/device/mod.rs new file mode 100644 index 0000000..32f135b --- /dev/null +++ b/src/apps/device/mod.rs @@ -0,0 +1,5 @@ +mod models; + +pub(crate) mod rpc; +pub(crate) use models::Device; +pub(crate) use rpc::new_rpc_handler; diff --git a/src/models/device.rs b/src/apps/device/models.rs similarity index 100% rename from src/models/device.rs rename to src/apps/device/models.rs diff --git a/src/apps/device/rpc.rs b/src/apps/device/rpc.rs new file mode 100644 index 0000000..957669a --- /dev/null +++ b/src/apps/device/rpc.rs @@ -0,0 +1,138 @@ +use std::sync::Arc; +use tdn::types::{ + group::GroupId, + primitive::{new_io_error, HandleResult, PeerAddr}, + rpc::{json, rpc_response, RpcHandler, RpcParam}, +}; + +use crate::group::GroupEvent; +use crate::rpc::RpcState; +use crate::storage::consensus_db; +use crate::utils::device_status::device_status as local_device_status; + +use super::Device; + +#[inline] +pub(crate) fn device_create(mgid: GroupId, device: &Device) -> RpcParam { + rpc_response(0, "device-create", json!(device.to_rpc()), mgid) +} + +#[inline] +pub(crate) fn _device_remove(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "device-remove", json!([id]), mgid) +} + +#[inline] +pub(crate) fn device_online(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "device-online", json!([id]), mgid) +} + +#[inline] +pub(crate) fn device_offline(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "device-offline", json!([id]), mgid) +} + +#[inline] +pub(crate) fn device_status( + mgid: GroupId, + cpu: u32, + memory: u32, + swap: u32, + disk: u32, + cpu_p: u16, + memory_p: u16, + swap_p: u16, + disk_p: u16, + uptime: u32, +) -> RpcParam { + rpc_response( + 0, + "device-status", + json!([cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]), + mgid, + ) +} + +#[inline] +fn device_list(devices: Vec) -> RpcParam { + let mut results = vec![]; + for device in devices { + results.push(device.to_rpc()); + } + json!(results) +} + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("device-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); + + handler.add_method( + "device-list", + |gid: GroupId, _params: Vec, state: Arc| async move { + let db = consensus_db(state.layer.read().await.base(), &gid)?; + let devices = Device::all(&db)?; + drop(db); + let online_devices = state.group.read().await.online_devices(&gid, devices); + Ok(HandleResult::rpc(device_list(online_devices))) + }, + ); + + handler.add_method( + "device-status", + |gid: GroupId, params: Vec, state: Arc| async move { + let addr = PeerAddr::from_hex(params[0].as_str()?) + .map_err(|_e| new_io_error("PeerAddr invalid!"))?; + + let group_lock = state.group.read().await; + if &addr == group_lock.addr() { + let uptime = group_lock.uptime(&gid)?; + let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) = + local_device_status(); + return Ok(HandleResult::rpc(json!([ + cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime + ]))); + } + drop(group_lock); + + let msg = state + .group + .write() + .await + .event_message(addr, &GroupEvent::StatusRequest)?; + + Ok(HandleResult::group(gid, msg)) + }, + ); + + handler.add_method( + "device-create", + |gid: GroupId, params: Vec, state: Arc| async move { + let addr = PeerAddr::from_hex(params[0].as_str()?) + .map_err(|_e| new_io_error("PeerAddr invalid!"))?; + + let msg = state.group.read().await.create_message(&gid, addr)?; + Ok(HandleResult::group(gid, msg)) + }, + ); + + handler.add_method( + "device-connect", + |gid: GroupId, params: Vec, state: Arc| async move { + let addr = PeerAddr::from_hex(params[0].as_str()?) + .map_err(|_e| new_io_error("PeerAddr invalid!"))?; + + let msg = state.group.read().await.connect_message(&gid, addr)?; + Ok(HandleResult::group(gid, msg)) + }, + ); + + handler.add_method( + "device-delete", + |_gid: GroupId, params: Vec, _state: Arc| async move { + let _id = params[0].as_i64()?; + // TODO delete a device. + Ok(HandleResult::new()) + }, + ); +} diff --git a/src/apps/domain/mod.rs b/src/apps/domain/mod.rs new file mode 100644 index 0000000..6968310 --- /dev/null +++ b/src/apps/domain/mod.rs @@ -0,0 +1,12 @@ +use tdn::types::{ + primitive::HandleResult, + rpc::{json, RpcHandler}, +}; + +use crate::rpc::RpcState; + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("domain-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); +} diff --git a/src/apps/file/mod.rs b/src/apps/file/mod.rs new file mode 100644 index 0000000..f9b898c --- /dev/null +++ b/src/apps/file/mod.rs @@ -0,0 +1,5 @@ +mod models; +mod rpc; + +pub(crate) use models::{FileId, FileType}; +pub(crate) use rpc::new_rpc_handler; diff --git a/src/models/file.rs b/src/apps/file/models.rs similarity index 100% rename from src/models/file.rs rename to src/apps/file/models.rs diff --git a/src/apps/file/rpc.rs b/src/apps/file/rpc.rs new file mode 100644 index 0000000..eeb8818 --- /dev/null +++ b/src/apps/file/rpc.rs @@ -0,0 +1,22 @@ +use std::sync::Arc; +use tdn::types::{ + group::GroupId, + primitive::HandleResult, + rpc::{json, RpcHandler, RpcParam}, +}; + +use crate::rpc::RpcState; + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("files-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); + + handler.add_method( + "files-folder", + |_gid: GroupId, params: Vec, _state: Arc| async move { + let _path = params[0].as_str()?; + Ok(HandleResult::new()) + }, + ); +} diff --git a/src/models/consensus.rs b/src/consensus.rs similarity index 100% rename from src/models/consensus.rs rename to src/consensus.rs diff --git a/src/daemon.rs b/src/daemon.rs index 69db1af..2d2a35f 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -4,11 +4,13 @@ extern crate log; use std::env::args; use tdn::smol::{self, io::Result}; +mod account; +mod apps; +mod consensus; mod event; mod group; mod layer; mod migrate; -mod models; mod primitives; mod rpc; mod server; diff --git a/src/event.rs b/src/event.rs index 972ee7f..b432b94 100644 --- a/src/event.rs +++ b/src/event.rs @@ -12,16 +12,18 @@ use tdn::types::{ use tdn_did::user::User; use tdn_storage::local::DStorage; +use crate::account::Account; +use crate::consensus::Event; use crate::group::{Group, GroupEvent}; use crate::layer::running::Online; use crate::layer::{Layer, LayerEvent}; use crate::migrate::consensus::{ ACCOUNT_TABLE_PATH, FILE_TABLE_PATH, FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH, }; -use crate::models::account::Account; -use crate::models::consensus::Event; -use crate::models::file::{FileId, FileType}; -use crate::models::session::{Friend, Message, NetworkMessage, Request}; + +use crate::apps::chat::rpc as chat_rpc; +use crate::apps::chat::{Friend, Message, NetworkMessage, Request}; +use crate::apps::file::{FileId, FileType}; use crate::rpc; use crate::storage::{ account_db, consensus_db, delete_avatar_sync, read_avatar_sync, session_db, write_avatar_sync, @@ -255,7 +257,7 @@ impl InnerEvent { } if let Some(req) = Request::get(&db, &remote.id)? { req.delete(&db)?; // delete the old request. - results.rpcs.push(rpc::request_delete(gid, req.id)); + results.rpcs.push(chat_rpc::request_delete(gid, req.id)); } let mut request = Request::new(remote.id, remote.addr, remote.name, remark, is_me, true); @@ -264,7 +266,7 @@ impl InnerEvent { drop(db); // save the avatar. write_avatar_sync(group.base(), &gid, &remote.id, remote.avatar)?; - results.rpcs.push(rpc::request_create(gid, &request)); + results.rpcs.push(chat_rpc::request_create(gid, &request)); (REQUEST_TABLE_PATH, request.id) } InnerEvent::SessionRequestHandle(rgid, is_ok, avatar) => { @@ -292,9 +294,11 @@ impl InnerEvent { } }) .detach(); - results.rpcs.push(rpc::request_agree(gid, rid, &friend)); + results + .rpcs + .push(chat_rpc::request_agree(gid, rid, &friend)); } else { - results.rpcs.push(rpc::request_reject(gid, rid)); + results.rpcs.push(chat_rpc::request_reject(gid, rid)); } (REQUEST_TABLE_PATH, rid) } else { @@ -310,7 +314,7 @@ impl InnerEvent { if Friend::get(&db, &request.gid)?.is_none() { delete_avatar_sync(group.base(), &gid, &request.gid)?; } - results.rpcs.push(rpc::request_delete(gid, rid)); + results.rpcs.push(chat_rpc::request_delete(gid, rid)); (REQUEST_TABLE_PATH, rid) } else { return Ok(()); @@ -340,7 +344,7 @@ impl InnerEvent { } let msg = m.handle(is_me, gid, group.base(), &db, f.id, hash)?; - results.rpcs.push(rpc::message_create(gid, &msg)); + results.rpcs.push(chat_rpc::message_create(gid, &msg)); (MESSAGE_TABLE_PATH, msg.id) } else { return Ok(()); @@ -350,7 +354,7 @@ impl InnerEvent { let db = session_db(group.base(), &gid)?; if let Some(m) = Message::get_it(&db, &hash)? { m.delete(&db)?; - results.rpcs.push(rpc::message_delete(gid, m.id)); + results.rpcs.push(chat_rpc::message_delete(gid, m.id)); (MESSAGE_TABLE_PATH, m.id) } else { return Ok(()); @@ -365,7 +369,7 @@ impl InnerEvent { if ravatar.len() > 0 { write_avatar_sync(group.base(), &gid, &rgid, ravatar)?; } - results.rpcs.push(rpc::friend_info(gid, &f)); + results.rpcs.push(chat_rpc::friend_info(gid, &f)); (FRIEND_TABLE_PATH, f.id) } else { return Ok(()); @@ -379,7 +383,7 @@ impl InnerEvent { f.me_update(&db)?; results .rpcs - .push(rpc::friend_update(gid, f.id, is_top, &f.remark)); + .push(chat_rpc::friend_update(gid, f.id, is_top, &f.remark)); (FRIEND_TABLE_PATH, f.id) } else { return Ok(()); @@ -389,7 +393,7 @@ impl InnerEvent { let db = session_db(group.base(), &gid)?; if let Some(f) = Friend::get_it(&db, &rgid)? { f.close(&db)?; - results.rpcs.push(rpc::friend_close(gid, f.id)); + results.rpcs.push(chat_rpc::friend_close(gid, f.id)); let rfid = f.id; let layer_lock = layer.clone(); @@ -418,7 +422,7 @@ impl InnerEvent { let db = session_db(group.base(), &gid)?; if let Some(f) = Friend::get_it(&db, &rgid)? { f.delete(&db)?; - results.rpcs.push(rpc::friend_delete(gid, f.id)); + results.rpcs.push(chat_rpc::friend_delete(gid, f.id)); delete_avatar_sync(group.base(), &gid, &f.gid)?; let rfid = f.id; @@ -494,7 +498,9 @@ impl StatusEvent { StatusEvent::SessionFriendOnline(rgid) => { let db = session_db(group.base(), &gid)?; if let Some(f) = Friend::get_it(&db, &rgid)? { - results.rpcs.push(rpc::friend_online(gid, f.id, f.addr)); + results + .rpcs + .push(chat_rpc::friend_online(gid, f.id, f.addr)); let layer_lock = layer.clone(); let rgid = f.gid; let ggid = gid.clone(); @@ -517,8 +523,11 @@ impl StatusEvent { tdn::smol::spawn(async move { if let Ok(running) = layer_lock.write().await.running_mut(&ggid) { if running.check_offline(&rgid, &addr) { - let msg = - SendMessage::Rpc(uid, rpc::friend_offline(ggid, rid), true); + let msg = SendMessage::Rpc( + uid, + chat_rpc::friend_offline(ggid, rid), + true, + ); let _ = sender.send(msg).await; } } @@ -752,7 +761,7 @@ impl SyncEvent { if Friend::get(&session_db, &rgid)?.is_none() { delete_avatar_sync(&base, &gid, &rgid)?; } - results.rpcs.push(rpc::request_delete(gid, req.id)); + results.rpcs.push(chat_rpc::request_delete(gid, req.id)); } req.is_ok = is_ok; @@ -768,13 +777,13 @@ impl SyncEvent { // save to db. request.insert(&session_db)?; let rid = request.id; - results.rpcs.push(rpc::request_create(gid, &request)); + results.rpcs.push(chat_rpc::request_create(gid, &request)); if is_delete { if Friend::get(&session_db, &rgid)?.is_none() { delete_avatar_sync(&base, &gid, &rgid)?; } - results.rpcs.push(rpc::request_delete(gid, rid)); + results.rpcs.push(chat_rpc::request_delete(gid, rid)); } request @@ -795,9 +804,11 @@ impl SyncEvent { } }) .detach(); - results.rpcs.push(rpc::request_agree(gid, rid, &friend)); + results + .rpcs + .push(chat_rpc::request_agree(gid, rid, &friend)); } else { - results.rpcs.push(rpc::request_reject(gid, rid)); + results.rpcs.push(chat_rpc::request_reject(gid, rid)); } session_db.close()?; @@ -859,9 +870,9 @@ impl SyncEvent { } if friend.is_deleted { - results.rpcs.push(rpc::friend_delete(gid, friend.id)); + results.rpcs.push(chat_rpc::friend_delete(gid, friend.id)); } else { - results.rpcs.push(rpc::friend_info(gid, &friend)); + results.rpcs.push(chat_rpc::friend_info(gid, &friend)); } friend.id @@ -887,7 +898,7 @@ impl SyncEvent { let id = if let Some(f) = Friend::get_it(&session_db, &fgid)? { let msg = m.handle(is_me, gid, &base, &session_db, f.id, eid)?; - results.rpcs.push(rpc::message_create(gid, &msg)); + results.rpcs.push(chat_rpc::message_create(gid, &msg)); msg.id } else { -1 diff --git a/src/group.rs b/src/group.rs index ef7b7cb..73d3db5 100644 --- a/src/group.rs +++ b/src/group.rs @@ -12,9 +12,12 @@ use tdn::{ }; use tdn_did::{user::User, Proof}; +use crate::account::Account; +use crate::apps::device::rpc as device_rpc; +use crate::apps::device::Device; +use crate::consensus::Event; use crate::event::{InnerEvent, StatusEvent, SyncEvent}; use crate::layer::Layer; -use crate::models::{account::Account, consensus::Event, device::Device}; use crate::rpc; use crate::storage::{account_db, account_init, consensus_db}; use crate::utils::device_status::device_status as local_device_status; @@ -97,7 +100,7 @@ impl Group { for (_, account) in &mut self.runnings { if let Some(device) = account.distributes.get_mut(&addr) { device.1 = false; - results.rpcs.push(rpc::device_offline(gid, device.0)); + results.rpcs.push(device_rpc::device_offline(gid, device.0)); } } } @@ -182,7 +185,7 @@ impl Group { if let Some(v) = running.distributes.get_mut(&addr) { v.1 = true; - results.rpcs.push(rpc::device_online(*gid, v.0)); + results.rpcs.push(device_rpc::device_online(*gid, v.0)); (remote_height, remote_event, new_addrs) } else { let mut device = Device::new(device_name, device_info, addr); @@ -190,8 +193,10 @@ impl Group { device.insert(&db)?; db.close()?; running.distributes.insert(addr, (device.id, true)); - results.rpcs.push(rpc::device_create(*gid, &device)); - results.rpcs.push(rpc::device_online(*gid, device.id)); + results.rpcs.push(device_rpc::device_create(*gid, &device)); + results + .rpcs + .push(device_rpc::device_online(*gid, device.id)); (remote_height, remote_event, new_addrs) } } @@ -215,7 +220,7 @@ impl Group { let v = self.running_mut(gid)?; let did = v.add_online(&addr)?; - results.rpcs.push(rpc::device_online(*gid, did)); + results.rpcs.push(device_rpc::device_online(*gid, did)); (remote_height, remote_event, vec![]) } }; @@ -575,7 +580,7 @@ impl Group { let (ancestors, hashes, is_min) = if to >= from { let (ancestors, is_min) = Self::ancestor(from, to); let db = consensus_db(&self.base, gid)?; - let hashes = crate::models::consensus::Event::get_assign_hash(&db, &ancestors)?; + let hashes = crate::consensus::Event::get_assign_hash(&db, &ancestors)?; db.close()?; (ancestors, hashes, is_min) } else { @@ -665,7 +670,7 @@ impl GroupEvent { GroupEvent::DeviceOffline => { let v = group.running_mut(&gid)?; let did = v.offline(&addr)?; - results.rpcs.push(rpc::device_offline(gid, did)); + results.rpcs.push(device_rpc::device_offline(gid, did)); } GroupEvent::StatusRequest => { let (cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p) = @@ -700,7 +705,7 @@ impl GroupEvent { swap_p, disk_p, uptime, - ) => results.rpcs.push(rpc::device_status( + ) => results.rpcs.push(device_rpc::device_status( gid, cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p, uptime, )), GroupEvent::Event(eheight, eid, pre, inner_event) => { @@ -726,7 +731,7 @@ impl GroupEvent { if account.height != remote_height || account.event != remote_event { // check ancestor and merge. let db = consensus_db(&group.base, &gid)?; - let ours = crate::models::consensus::Event::get_assign_hash(&db, &ancestors)?; + let ours = crate::consensus::Event::get_assign_hash(&db, &ancestors)?; drop(db); if ours.len() == 0 { diff --git a/src/group/running.rs b/src/group/running.rs index a3ce8bb..200d695 100644 --- a/src/group/running.rs +++ b/src/group/running.rs @@ -8,7 +8,7 @@ use tdn::types::{ use tdn_did::Keypair; -use crate::models::device::Device; +use crate::apps::device::Device; use crate::storage::consensus_db; pub(crate) struct RunningAccount { diff --git a/src/layer.rs b/src/layer.rs index 3e3d9e1..31ff2f8 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -12,11 +12,12 @@ use tdn::{ }; use tdn_did::{user::User, Proof}; +use crate::apps::chat::{Friend, Message, MessageType, NetworkMessage, Request}; use crate::event::{InnerEvent, StatusEvent}; use crate::group::Group; use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH}; -use crate::models::session::{Friend, Message, MessageType, NetworkMessage, Request}; -use crate::rpc; + +use crate::apps::chat::rpc; use crate::storage::{ read_avatar, read_file, read_record, session_db, write_avatar_sync, write_file, write_image, }; diff --git a/src/layer/running.rs b/src/layer/running.rs index 80610ff..227cb4a 100644 --- a/src/layer/running.rs +++ b/src/layer/running.rs @@ -5,7 +5,7 @@ use tdn::types::{ primitive::{new_io_error, PeerAddr, Result}, }; -use crate::models::session::Friend; +use crate::apps::chat::Friend; use crate::storage::session_db; /// online info. diff --git a/src/lib.rs b/src/lib.rs index f742181..6c66541 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,11 +5,13 @@ use std::ffi::CStr; use std::os::raw::c_char; use tdn::smol; +mod account; +mod apps; +mod consensus; mod event; mod group; mod layer; mod migrate; -mod models; mod primitives; mod rpc; mod server; diff --git a/src/models.rs b/src/models.rs deleted file mode 100644 index 773f21e..0000000 --- a/src/models.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub(crate) mod account; -pub(crate) mod consensus; -pub(crate) mod device; -pub(crate) mod file; -pub(crate) mod service; -pub(crate) mod session; diff --git a/src/rpc.rs b/src/rpc.rs index 21d8034..e4b6378 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -7,22 +7,30 @@ use tdn::{ types::{ group::GroupId, message::{NetworkType, SendMessage, SendType, StateRequest, StateResponse}, - primitive::{new_io_error, HandleResult, PeerAddr}, + primitive::{new_io_error, HandleResult, PeerAddr, Result}, rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, }, }; -use tdn_did::user::User; +use crate::apps::app_rpc_inject; use crate::event::InnerEvent; -use crate::group::{Group, GroupEvent}; +use crate::group::Group; use crate::layer::{Layer, LayerEvent}; -use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH}; -use crate::models::{ - device::Device, - session::{Friend, Message, MessageType, Request}, -}; -use crate::storage::{consensus_db, delete_avatar, session_db}; -use crate::utils::device_status::device_status as local_device_status; + +pub(crate) fn init_rpc( + addr: PeerAddr, + group: Arc>, + layer: Arc>, +) -> RpcHandler { + let mut handler = new_rpc_handler(addr, group, layer); + app_rpc_inject(&mut handler); + handler +} + +pub(crate) struct RpcState { + pub group: Arc>, + pub layer: Arc>, +} #[inline] pub(crate) fn network_stable(peers: Vec<(PeerAddr, bool)>) -> RpcParam { @@ -52,159 +60,11 @@ pub(crate) fn network_seed(peers: Vec) -> RpcParam { rpc_response(0, "network-seed", json!(s_peers), GroupId::default()) } -#[inline] -pub(crate) fn friend_online(mgid: GroupId, fid: i64, addr: PeerAddr) -> RpcParam { - rpc_response(0, "friend-online", json!([fid, addr.to_hex()]), mgid) -} - -#[inline] -pub(crate) fn friend_offline(mgid: GroupId, fid: i64) -> RpcParam { - rpc_response(0, "friend-offline", json!([fid]), mgid) -} - -#[inline] -pub(crate) fn friend_info(mgid: GroupId, friend: &Friend) -> RpcParam { - rpc_response(0, "friend-info", json!(friend.to_rpc()), mgid) -} - -#[inline] -pub(crate) fn friend_update(mgid: GroupId, fid: i64, is_top: bool, remark: &str) -> RpcParam { - rpc_response(0, "friend-update", json!([fid, is_top, remark]), mgid) -} - -#[inline] -pub(crate) fn friend_close(mgid: GroupId, fid: i64) -> RpcParam { - rpc_response(0, "friend-close", json!([fid]), mgid) -} - -#[inline] -pub(crate) fn friend_delete(mgid: GroupId, fid: i64) -> RpcParam { - rpc_response(0, "friend-delete", json!([fid]), mgid) -} - -#[inline] -pub(crate) fn request_create(mgid: GroupId, req: &Request) -> RpcParam { - rpc_response(0, "request-create", json!(req.to_rpc()), mgid) -} - -#[inline] -pub(crate) fn request_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam { - rpc_response(0, "request-delivery", json!([id, is_d]), mgid) -} - -#[inline] -pub(crate) fn request_agree(mgid: GroupId, id: i64, friend: &Friend) -> RpcParam { - rpc_response(0, "request-agree", json!([id, friend.to_rpc()]), mgid) -} - -#[inline] -pub(crate) fn request_reject(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "request-reject", json!([id]), mgid) -} - -#[inline] -pub(crate) fn request_delete(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "request-delete", json!([id]), mgid) -} - -#[inline] -pub(crate) fn message_create(mgid: GroupId, msg: &Message) -> RpcParam { - rpc_response(0, "message-create", json!(msg.to_rpc()), mgid) -} - -#[inline] -pub(crate) fn message_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam { - rpc_response(0, "message-delivery", json!([id, is_d]), mgid) -} - -#[inline] -pub(crate) fn message_delete(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "message-delete", json!([id]), mgid) -} - -#[inline] -pub(crate) fn device_create(mgid: GroupId, device: &Device) -> RpcParam { - rpc_response(0, "device-create", json!(device.to_rpc()), mgid) -} - -#[inline] -pub(crate) fn _device_remove(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "device-remove", json!([id]), mgid) -} - -#[inline] -pub(crate) fn device_online(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "device-online", json!([id]), mgid) -} - -#[inline] -pub(crate) fn device_offline(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "device-offline", json!([id]), mgid) -} - #[inline] pub(crate) fn account_update(mgid: GroupId, name: &str, avatar: String) -> RpcParam { rpc_response(0, "account-update", json!([name, avatar]), mgid) } -#[inline] -pub(crate) fn device_status( - mgid: GroupId, - cpu: u32, - memory: u32, - swap: u32, - disk: u32, - cpu_p: u16, - memory_p: u16, - swap_p: u16, - disk_p: u16, - uptime: u32, -) -> RpcParam { - rpc_response( - 0, - "device-status", - json!([cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]), - mgid, - ) -} - -#[inline] -fn friend_list(friends: Vec) -> RpcParam { - let mut results = vec![]; - for friend in friends { - results.push(friend.to_rpc()); - } - - json!(results) -} - -#[inline] -fn request_list(requests: Vec) -> RpcParam { - let mut results = vec![]; - for request in requests { - results.push(request.to_rpc()); - } - json!(results) -} - -#[inline] -fn message_list(messages: Vec) -> RpcParam { - let mut results = vec![]; - for msg in messages { - results.push(msg.to_rpc()); - } - json!(results) -} - -#[inline] -fn device_list(devices: Vec) -> RpcParam { - let mut results = vec![]; - for device in devices { - results.push(device.to_rpc()); - } - json!(results) -} - #[inline] pub(crate) async fn sleep_waiting_close_stable( sender: Sender, @@ -239,7 +99,7 @@ pub(crate) async fn inner_rpc( uid: u64, method: &str, sender: &async_channel::Sender, -) -> Result<(), std::io::Error> { +) -> Result<()> { // Inner network default rpc method. only use in http-rpc. if method == "network-stable" || method == "network-dht" || method == "network-seed" { let req = match method { @@ -275,28 +135,22 @@ pub(crate) async fn inner_rpc( Err(new_io_error("not found")) } -pub(crate) struct RpcState { - group: Arc>, - layer: Arc>, -} - -#[inline] -pub(crate) fn new_rpc_handler( +fn new_rpc_handler( addr: PeerAddr, group: Arc>, layer: Arc>, ) -> RpcHandler { - let mut rpc_handler = RpcHandler::new(RpcState { group, layer }); + let mut handler = RpcHandler::new(RpcState { group, layer }); - rpc_handler.add_method("echo", |_, params, _| async move { + handler.add_method("echo", |_, params, _| async move { Ok(HandleResult::rpc(json!(params))) }); - rpc_handler.add_method("system-info", move |_, _, _| async move { + handler.add_method("system-info", move |_, _, _| async move { Ok(HandleResult::rpc(json!(vec![addr.to_hex()]))) }); - rpc_handler.add_method( + handler.add_method( "add-bootstrap", |_gid, params: Vec, _| async move { let socket = params[0].as_str()?; @@ -308,7 +162,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-list", |_gid, _params: Vec, state: Arc| async move { let mut users: Vec> = vec![]; @@ -327,7 +181,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-create", |_gid, params: Vec, state: Arc| async move { let name = params[0].as_str()?; @@ -353,7 +207,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-restore", |_gid, params: Vec, state: Arc| async move { let name = params[0].as_str()?; @@ -390,76 +244,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( - "device-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let db = consensus_db(state.layer.read().await.base(), &gid)?; - let devices = Device::all(&db)?; - drop(db); - let online_devices = state.group.read().await.online_devices(&gid, devices); - Ok(HandleResult::rpc(device_list(online_devices))) - }, - ); - - rpc_handler.add_method( - "device-status", - |gid: GroupId, params: Vec, state: Arc| async move { - let addr = PeerAddr::from_hex(params[0].as_str()?) - .map_err(|_e| new_io_error("PeerAddr invalid!"))?; - - let group_lock = state.group.read().await; - if &addr == group_lock.addr() { - let uptime = group_lock.uptime(&gid)?; - let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) = - local_device_status(); - return Ok(HandleResult::rpc(json!([ - cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime - ]))); - } - drop(group_lock); - - let msg = state - .group - .write() - .await - .event_message(addr, &GroupEvent::StatusRequest)?; - - Ok(HandleResult::group(gid, msg)) - }, - ); - - rpc_handler.add_method( - "device-create", - |gid: GroupId, params: Vec, state: Arc| async move { - let addr = PeerAddr::from_hex(params[0].as_str()?) - .map_err(|_e| new_io_error("PeerAddr invalid!"))?; - - let msg = state.group.read().await.create_message(&gid, addr)?; - Ok(HandleResult::group(gid, msg)) - }, - ); - - rpc_handler.add_method( - "device-connect", - |gid: GroupId, params: Vec, state: Arc| async move { - let addr = PeerAddr::from_hex(params[0].as_str()?) - .map_err(|_e| new_io_error("PeerAddr invalid!"))?; - - let msg = state.group.read().await.connect_message(&gid, addr)?; - Ok(HandleResult::group(gid, msg)) - }, - ); - - rpc_handler.add_method( - "device-delete", - |_gid: GroupId, params: Vec, _state: Arc| async move { - let _id = params[0].as_i64()?; - // TODO delete a device. - Ok(HandleResult::new()) - }, - ); - - rpc_handler.add_method( + handler.add_method( "account-update", |gid: GroupId, params: Vec, state: Arc| async move { let name = params[0].as_str()?; @@ -484,7 +269,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-pin", |gid: GroupId, params: Vec, state: Arc| async move { let old = params[0].as_str()?; @@ -495,7 +280,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-mnemonic", |gid: GroupId, params: Vec, state: Arc| async move { let lock = params[0].as_str()?; @@ -505,7 +290,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-login", |_gid: GroupId, params: Vec, state: Arc| async move { let gid = GroupId::from_hex(params[0].as_str()?)?; @@ -524,7 +309,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-logout", |_gid: GroupId, _params: Vec, state: Arc| async move { let mut results = HandleResult::new(); @@ -562,7 +347,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-online", |_gid: GroupId, params: Vec, state: Arc| async move { let gid = GroupId::from_hex(params[0].as_str()?)?; @@ -588,7 +373,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-offline", |_gid: GroupId, params: Vec, state: Arc| async move { let gid = GroupId::from_hex(params[0].as_str()?)?; @@ -618,386 +403,5 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( - "friend-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let friends = state.layer.read().await.all_friends_with_online(&gid)?; - Ok(HandleResult::rpc(friend_list(friends))) - }, - ); - - rpc_handler.add_method( - "friend-update", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - let remark = params[1].as_str()?; - let is_top = params[2].as_bool()?; - - let mut results = HandleResult::new(); - let db = session_db(state.layer.read().await.base(), &gid)?; - let f = if let Some(mut f) = Friend::get_id(&db, id)? { - f.is_top = is_top; - f.remark = remark.to_owned(); - f.me_update(&db)?; - f - } else { - return Ok(results); - }; - drop(db); - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionFriendUpdate(f.gid, f.is_top, f.remark), - FRIEND_TABLE_PATH, - f.id, - &mut results, - )?; - Ok(results) - }, - ); - - rpc_handler.add_method( - "friend-readed", - |gid: GroupId, params: Vec, state: Arc| async move { - let fid = params[0].as_i64()?; - - let db = session_db(state.layer.read().await.base(), &gid)?; - Friend::readed(&db, fid)?; - drop(db); - - Ok(HandleResult::new()) - }, - ); - - rpc_handler.add_method( - "friend-close", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let mut results = HandleResult::new(); - let mut layer_lock = state.layer.write().await; - - let db = session_db(layer_lock.base(), &gid)?; - let friend = Friend::get_id(&db, id)??; - friend.close(&db)?; - drop(db); - - let online = layer_lock.remove_friend(&gid, &friend.gid); - drop(layer_lock); - - if let Some(faddr) = online { - let mut addrs: HashMap = HashMap::new(); - addrs.insert(faddr, friend.gid); - let sender = state.group.read().await.sender(); - tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs)) - .detach(); - } - - let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]); - let msg = SendType::Event(0, friend.addr, data); - results.layers.push((gid, friend.gid, msg)); - - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionFriendClose(friend.gid), - FRIEND_TABLE_PATH, - friend.id, - &mut results, - )?; - - Ok(results) - }, - ); - - rpc_handler.add_method( - "friend-delete", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let mut results = HandleResult::new(); - let mut layer_lock = state.layer.write().await; - - let db = session_db(layer_lock.base(), &gid)?; - let friend = Friend::get_id(&db, id)??; - friend.delete(&db)?; - drop(db); - - let online = layer_lock.remove_friend(&gid, &friend.gid); - delete_avatar(layer_lock.base(), &gid, &friend.gid).await?; - drop(layer_lock); - - if let Some(faddr) = online { - let mut addrs: HashMap = HashMap::new(); - addrs.insert(faddr, friend.gid); - let sender = state.group.read().await.sender(); - tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs)) - .detach(); - } - - let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]); - let msg = SendType::Event(0, friend.addr, data); - results.layers.push((gid, friend.gid, msg)); - - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionFriendDelete(friend.gid), - FRIEND_TABLE_PATH, - friend.id, - &mut results, - )?; - - Ok(results) - }, - ); - - rpc_handler.add_method( - "request-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let layer_lock = state.layer.read().await; - let db = session_db(layer_lock.base(), &gid)?; - drop(layer_lock); - let requests = Request::all(&db)?; - drop(db); - Ok(HandleResult::rpc(request_list(requests))) - }, - ); - - rpc_handler.add_method( - "request-create", - |gid: GroupId, params: Vec, state: Arc| async move { - let remote_gid = GroupId::from_hex(params[0].as_str()?)?; - let remote_addr = PeerAddr::from_hex(params[1].as_str()?)?; - let remote_name = params[2].as_str()?.to_string(); - let remark = params[3].as_str()?.to_string(); - - let mut request = Request::new( - remote_gid, - remote_addr, - remote_name.clone(), - remark.clone(), - true, - false, - ); - - let mut results = HandleResult::rpc(Default::default()); - let me = state.group.read().await.clone_user(&gid)?; - - let mut layer_lock = state.layer.write().await; - let db = session_db(layer_lock.base(), &gid)?; - if Friend::is_friend(&db, &request.gid)? { - debug!("had friend."); - drop(layer_lock); - return Ok(results); - } - - if let Some(req) = Request::get(&db, &request.gid)? { - println!("Had this request."); - req.delete(&db)?; - } - request.insert(&db)?; - drop(db); - - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionRequestCreate( - true, - User::new(remote_gid, remote_addr, remote_name, vec![])?, - remark, - ), - REQUEST_TABLE_PATH, - request.id, - &mut results, - )?; - - results - .layers - .push((gid, remote_gid, layer_lock.req_message(me, request))); - - drop(layer_lock); - - Ok(results) - }, - ); - - rpc_handler.add_method( - "request-agree", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let mut group_lock = state.group.write().await; - let me = group_lock.clone_user(&gid)?; - let mut layer_lock = state.layer.write().await; - let db = session_db(layer_lock.base(), &gid)?; - let mut results = HandleResult::new(); - - if let Some(mut request) = Request::get_id(&db, id)? { - group_lock.broadcast( - &gid, - InnerEvent::SessionRequestHandle(request.gid, true, vec![]), - REQUEST_TABLE_PATH, - request.id, - &mut results, - )?; - request.is_ok = true; - request.is_over = true; - request.update(&db)?; - - let f = Friend::from_request(&db, request)?; - layer_lock.running_mut(&gid)?.add_permissioned(f.gid, f.id); - results.rpcs.push(json!([id, f.to_rpc()])); - - let proof = group_lock.prove_addr(&gid, &f.addr)?; - let msg = layer_lock.rpc_agree_message(id, proof, me, &gid, f.addr)?; - results.layers.push((gid, f.gid, msg)); - } - db.close()?; - drop(group_lock); - drop(layer_lock); - Ok(results) - }, - ); - - rpc_handler.add_method( - "request-reject", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let mut layer_lock = state.layer.write().await; - let db = session_db(layer_lock.base(), &gid)?; - let mut req = Request::get_id(&db, id)??; - req.is_ok = false; - req.is_over = true; - req.update(&db)?; - drop(db); - let msg = layer_lock.reject_message(id, req.addr, gid); - drop(layer_lock); - - let mut results = HandleResult::layer(gid, req.gid, msg); - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionRequestHandle(req.gid, false, vec![]), - REQUEST_TABLE_PATH, - req.id, - &mut results, - )?; - Ok(results) - }, - ); - - rpc_handler.add_method( - "request-delete", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let layer_lock = state.layer.read().await; - let db = session_db(layer_lock.base(), &gid)?; - let base = layer_lock.base().clone(); - drop(layer_lock); - let req = Request::get_id(&db, id)??; - req.delete(&db)?; - - // delete avatar. check had friend. - if Friend::get(&db, &req.gid)?.is_none() { - delete_avatar(&base, &gid, &req.gid).await?; - } - drop(db); - - let mut results = HandleResult::new(); - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionRequestDelete(req.gid), - REQUEST_TABLE_PATH, - req.id, - &mut results, - )?; - Ok(results) - }, - ); - - rpc_handler.add_method( - "message-list", - |gid: GroupId, params: Vec, state: Arc| async move { - let fid = params[0].as_i64()?; - - let layer_lock = state.layer.read().await; - let db = session_db(layer_lock.base(), &gid)?; - drop(layer_lock); - - Friend::readed(&db, fid)?; - let messages = Message::get(&db, &fid)?; - drop(db); - Ok(HandleResult::rpc(message_list(messages))) - }, - ); - - rpc_handler.add_method( - "message-create", - |gid: GroupId, params: Vec, state: Arc| async move { - let fid = params[0].as_i64()?; - let fgid = GroupId::from_hex(params[1].as_str()?)?; - let m_type = MessageType::from_int(params[2].as_i64()?); - let content = params[3].as_str()?.to_string(); - - let mut layer_lock = state.layer.write().await; - let base = layer_lock.base(); - let faddr = layer_lock.running(&gid)?.online(&fgid)?; - - let (msg, nw) = LayerEvent::from_message(base, gid, fid, m_type, content).await?; - let event = LayerEvent::Message(msg.hash, nw); - let s = layer_lock.event_message(msg.id, gid, faddr, &event); - drop(layer_lock); - - let mut results = HandleResult::rpc(json!(msg.to_rpc())); - results.layers.push((gid, fgid, s)); - - match event { - LayerEvent::Message(hash, nw) => { - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionMessageCreate(fgid, true, hash, nw), - MESSAGE_TABLE_PATH, - msg.id, - &mut results, - )?; - } - _ => {} - } - - Ok(results) - }, - ); - - rpc_handler.add_method( - "message-delete", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let layer_lock = state.layer.read().await; - let db = session_db(&layer_lock.base(), &gid)?; - drop(layer_lock); - - let msg = Message::get_id(&db, id)??; - msg.delete(&db)?; - drop(db); - let mut results = HandleResult::new(); - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionMessageDelete(msg.hash), - MESSAGE_TABLE_PATH, - msg.id, - &mut results, - )?; - Ok(results) - }, - ); - - rpc_handler.add_method( - "files-folder", - |_gid: GroupId, params: Vec, _state: Arc| async move { - let _path = params[0].as_str()?; - - Ok(HandleResult::new()) - }, - ); - - rpc_handler + handler } diff --git a/src/server.rs b/src/server.rs index ff61122..d4fd600 100644 --- a/src/server.rs +++ b/src/server.rs @@ -12,12 +12,12 @@ use tdn::{ types::primitive::HandleResult, }; +use crate::account::Account; use crate::group::Group; use crate::layer::Layer; use crate::migrate::main_migrate; -use crate::models::account::Account; use crate::primitives::network_seeds; -use crate::rpc::{inner_rpc, new_rpc_handler}; +use crate::rpc::{init_rpc, inner_rpc}; use crate::storage::account_db; pub const DEFAULT_WS_ADDR: &'static str = "127.0.0.1:8080"; @@ -67,7 +67,7 @@ pub async fn start(db_path: String) -> Result<()> { Layer::init(db_path, peer_id, group.clone()).await?, )); - let rpc = new_rpc_handler(peer_id, group.clone(), layer.clone()); + let rpc = init_rpc(peer_id, group.clone(), layer.clone()); //let mut group_rpcs: HashMap = HashMap::new(); let mut now_rpc_uid = 0;