From cf5bc0b5e9cc439a9f45593e436ae7033862c15a Mon Sep 17 00:00:00 2001 From: Sun Date: Tue, 5 Apr 2022 16:53:41 +0800 Subject: [PATCH] rename group to own for distributed --- src/apps.rs | 2 +- src/apps/chat/layer.rs | 18 ++++---- src/apps/chat/rpc.rs | 38 ++++++++--------- src/apps/dao/rpc.rs | 12 +++--- src/apps/device/rpc.rs | 10 ++--- src/apps/domain/layer.rs | 2 +- src/apps/domain/rpc.rs | 12 +++--- src/apps/file/rpc.rs | 16 +++---- src/apps/group/layer.rs | 8 ++-- src/apps/group/rpc.rs | 14 +++---- src/apps/jarvis/rpc.rs | 6 +-- src/apps/wallet/rpc.rs | 24 +++++------ src/daemon.rs | 2 +- src/event.rs | 8 ++-- src/global.rs | 10 ++--- src/layer.rs | 2 +- src/lib.rs | 2 +- src/{group.rs => own.rs} | 74 ++++++++++++++++----------------- src/{group => own}/consensus.rs | 0 src/{group => own}/running.rs | 0 src/rpc.rs | 32 +++++++------- src/server.rs | 18 ++++++-- 22 files changed, 159 insertions(+), 151 deletions(-) rename src/{group.rs => own.rs} (93%) rename src/{group => own}/consensus.rs (100%) rename src/{group => own}/running.rs (100%) diff --git a/src/apps.rs b/src/apps.rs index 07c169c..c87e69a 100644 --- a/src/apps.rs +++ b/src/apps.rs @@ -63,7 +63,7 @@ pub(crate) async fn app_layer_handle( let mut delete: HashMap> = HashMap::new(); let pid = global.pid().await; - let db_key = global.group.read().await.db_key(&pid)?; + let db_key = global.own.read().await.db_key(&pid)?; let db = group_db(&global.base, &pid, &db_key)?; for (gid, session) in &layer.groups { diff --git a/src/apps/chat/layer.rs b/src/apps/chat/layer.rs index 20f2574..0f10c17 100644 --- a/src/apps/chat/layer.rs +++ b/src/apps/chat/layer.rs @@ -87,7 +87,7 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result) -> Result { @@ -137,7 +137,7 @@ async fn handle_connect( global: &Arc, results: &mut HandleResult, ) -> Result { - let db_key = global.group.read().await.db_key(&pid)?; + let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; // 1. check friendship. @@ -194,7 +194,7 @@ impl LayerEvent { results.rpcs.push(session_connect(&sid, &fpid)); } LayerEvent::Request(name, remark) => { - let db_key = global.group.read().await.db_key(&pid)?; + let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; if Friend::get_id(&db, &fpid).is_err() { @@ -218,7 +218,7 @@ impl LayerEvent { } } LayerEvent::Agree => { - let db_key = global.group.read().await.db_key(&pid)?; + let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; // 1. check friendship. @@ -248,7 +248,7 @@ impl LayerEvent { } } LayerEvent::Reject => { - let db_key = global.group.read().await.db_key(&pid)?; + let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; if let Ok(mut request) = Request::get_id(&db, &fpid) { @@ -260,7 +260,7 @@ impl LayerEvent { } LayerEvent::Message(hash, m) => { let (_sid, fid) = global.layer.read().await.chat_session(&fpid)?; - let db_key = global.group.read().await.db_key(&pid)?; + let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; if !Message::exist(&db, &hash)? { @@ -303,7 +303,7 @@ impl LayerEvent { } LayerEvent::InfoRes(remote) => { let (sid, fid) = global.layer.read().await.chat_session(&fpid)?; - let db_key = global.group.read().await.db_key(&pid)?; + let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; let mut f = Friend::get(&db, &fid)?; @@ -329,7 +329,7 @@ impl LayerEvent { let keep = layer.is_addr_online(&fpid); drop(layer); - let db_key = global.group.read().await.db_key(&pid)?; + let db_key = global.own.read().await.db_key(&pid)?; let db = chat_db(&global.base, &pid, &db_key)?; Friend::id_close(&db, fid)?; diff --git a/src/apps/chat/rpc.rs b/src/apps/chat/rpc.rs index 20c8647..8b05bcf 100644 --- a/src/apps/chat/rpc.rs +++ b/src/apps/chat/rpc.rs @@ -112,7 +112,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let need_online = params[0].as_bool().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let friends = Friend::list(&db)?; @@ -143,7 +143,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let mut f = Friend::get(&db, &id)?; @@ -151,7 +151,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { f.me_update(&db)?; drop(db); - // state.group.write().await.broadcast( + // state.own.write().await.broadcast( // &gid, // InnerEvent::SessionFriendUpdate(f.gid, f.remark), // FRIEND_TABLE_PATH, @@ -170,7 +170,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let friend = Friend::get(&db, &id)?; @@ -189,7 +189,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { .push((ESSE_ID, SendType::Disconnect(friend.pid))); } - // state.group.write().await.broadcast( + // state.own.write().await.broadcast( // &gid, // InnerEvent::SessionFriendClose(friend.gid), // FRIEND_TABLE_PATH, @@ -208,7 +208,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let friend = Friend::get(&db, &id)?; @@ -229,7 +229,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { .push((ESSE_ID, SendType::Disconnect(friend.pid))); } - // state.group.write().await.broadcast( + // state.own.write().await.broadcast( // &gid, // InnerEvent::SessionFriendDelete(friend.gid), // FRIEND_TABLE_PATH, @@ -245,7 +245,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { "chat-request-list", |_params: Vec, state: Arc| async move { let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let requests = Request::list(&db)?; drop(db); @@ -261,7 +261,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let remark = params[2].as_str().ok_or(RpcError::ParseError)?.to_string(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; if Friend::is_friend(&db, &remote_pid)? { @@ -281,7 +281,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::rpc(json!(request.to_rpc())); - let name = state.group.read().await.account(&pid)?.name.clone(); + let name = state.own.read().await.account(&pid)?.name.clone(); let req = LayerEvent::Request(name, request.remark); let data = bincode::serialize(&req).unwrap_or(vec![]); let msg = SendType::Event(0, request.pid, data); @@ -298,7 +298,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let mut request = Request::get(&db, &id)?; @@ -344,7 +344,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let mut req = Request::get(&db, &id)?; @@ -357,7 +357,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let msg = SendType::Event(0, req.pid, data); let mut results = HandleResult::layer(ESSE_ID, msg); - // state.group.write().await.broadcast( + // state.own.write().await.broadcast( // &gid, // InnerEvent::SessionRequestHandle(req.gid, false, vec![]), // REQUEST_TABLE_PATH, @@ -374,7 +374,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let req = Request::get(&db, &id)?; @@ -387,7 +387,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { drop(db); let results = HandleResult::new(); - // state.group.write().await.broadcast( + // state.own.write().await.broadcast( // &gid, // InnerEvent::SessionRequestDelete(req.gid), // REQUEST_TABLE_PATH, @@ -404,7 +404,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let friend = Friend::get(&db, &id)?; @@ -421,7 +421,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let fid = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let messages = Message::get_by_fid(&db, &fid)?; @@ -439,7 +439,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let content = params[3].as_str().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let (nm, raw) = @@ -470,7 +470,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = chat_db(&state.base, &pid, &db_key)?; let msg = Message::get(&db, &id)?; diff --git a/src/apps/dao/rpc.rs b/src/apps/dao/rpc.rs index 4b57cae..058d64a 100644 --- a/src/apps/dao/rpc.rs +++ b/src/apps/dao/rpc.rs @@ -274,7 +274,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let _ = write_avatar(&base, &gid, &gcd, &avatar_bytes).await; let mut results = HandleResult::new(); - let me = state.group.read().await.clone_user(&gid)?; + let me = state.own.read().await.clone_user(&gid)?; // add to rpcs. results.rpcs.push(json!(gc.to_rpc())); @@ -343,8 +343,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { drop(db); // load avatar - let avatar = read_avatar(state.group.read().await.base(), &gid, &gc.g_id).await?; - let owner_avatar = state.group.read().await.clone_user(&gid)?.avatar; + let avatar = read_avatar(state.own.read().await.base(), &gid, &gc.g_id).await?; + let owner_avatar = state.own.read().await.clone_user(&gid)?.avatar; let addr = gc.g_addr; let info = gc.to_group_info(mname, avatar, owner_avatar); @@ -389,7 +389,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { } drop(db); - let me = state.group.read().await.clone_user(&gid)?; + let me = state.own.read().await.clone_user(&gid)?; let join_proof = match gtype { GroupType::Encrypted => { // remark is inviter did. @@ -424,7 +424,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { .filter_map(|v| v.as_i64()) .collect(); - let group_lock = state.group.read().await; + let group_lock = state.own.read().await; let base = group_lock.base().clone(); let chat = chat_db(&base, &gid)?; @@ -533,7 +533,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let addr = state.layer.read().await.running(&gid)?.online(&gcd)?; let mut results = HandleResult::new(); - let base = state.group.read().await.base().clone(); + let base = state.own.read().await.base().clone(); let (nmsg, datetime) = to_network_message(&base, &gid, m_type, m_content).await?; let event = Event::MessageCreate(gid, nmsg, datetime); let data = bincode::serialize(&LayerEvent::Sync(gcd, 0, event))?; diff --git a/src/apps/device/rpc.rs b/src/apps/device/rpc.rs index eb3bd67..ef01317 100644 --- a/src/apps/device/rpc.rs +++ b/src/apps/device/rpc.rs @@ -5,7 +5,7 @@ use tdn::types::{ }; use crate::global::Global; -//use crate::group::GroupEvent; +//use crate::own::OwnEvent; use crate::utils::device_status::device_status as local_device_status; use super::Device; @@ -66,7 +66,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "device-list", |_params: Vec, state: Arc| async move { - let devices = &state.group.read().await.distributes; + let devices = &state.own.read().await.distributes; Ok(HandleResult::rpc(device_list(devices))) }, ); @@ -76,7 +76,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; - let group_lock = state.group.read().await; + let group_lock = state.own.read().await; if id == group_lock.device()?.id { let uptime = group_lock.uptime; let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) = @@ -87,7 +87,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { } drop(group_lock); - //let msg = state.group.write().await.event_message(addr, &GroupEvent::StatusRequest)?; + //let msg = state.own.write().await.event_message(addr, &OwnEvent::StatusRequest)?; //Ok(HandleResult::group(msg)) Ok(HandleResult::new()) }, @@ -96,7 +96,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { handler.add_method( "device-search", |_params: Vec, state: Arc| async move { - //let msg = state.group.read().await.create_message(&gid, Peer::peer(addr))?; + //let msg = state.own.read().await.create_message(&gid, Peer::peer(addr))?; //Ok(HandleResult::group(gid, msg)) Ok(HandleResult::new()) }, diff --git a/src/apps/domain/layer.rs b/src/apps/domain/layer.rs index 77c4667..61ffcf4 100644 --- a/src/apps/domain/layer.rs +++ b/src/apps/domain/layer.rs @@ -27,7 +27,7 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result) { "domain-list", |_params: Vec, state: Arc| async move { let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = domain_db(&state.base, &pid, &db_key)?; // list providers. @@ -83,7 +83,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = domain_db(&state.base, &pid, &db_key)?; let mut p = Provider::prepare(provider); @@ -102,7 +102,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = domain_db(&state.base, &pid, &db_key)?; let provider = Provider::get(&db, &id)?; @@ -124,7 +124,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = domain_db(&state.base, &pid, &db_key)?; let names = Name::get_by_provider(&db, &id)?; @@ -147,10 +147,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { // save to db. let mut results = HandleResult::new(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = domain_db(&state.base, &pid, &db_key)?; - let me = state.group.read().await.clone_user(&pid)?; + let me = state.own.read().await.clone_user(&pid)?; let mut u = Name::prepare(name, bio, provider); u.insert(&db)?; diff --git a/src/apps/file/rpc.rs b/src/apps/file/rpc.rs index 15024e6..391a00a 100644 --- a/src/apps/file/rpc.rs +++ b/src/apps/file/rpc.rs @@ -22,7 +22,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let parent = params[1].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = file_db(&state.base, &pid, &db_key)?; let files: Vec = File::list(&db, &root, &parent)? @@ -42,7 +42,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let name = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = file_db(&state.base, &pid, &db_key)?; // genereate new file. @@ -71,7 +71,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { .to_owned(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = file_db(&state.base, &pid, &db_key)?; let mut file = File::generate(root, parent, name); @@ -91,7 +91,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { // create new folder. let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = file_db(&state.base, &pid, &db_key)?; let mut file = File::generate(root, parent, name); @@ -110,7 +110,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let name = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = file_db(&state.base, &pid, &db_key)?; let mut file = File::get(&db, &id)?; @@ -130,7 +130,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let starred = params[1].as_bool().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = file_db(&state.base, &pid, &db_key)?; File::star(&db, &id, starred)?; @@ -144,7 +144,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = file_db(&state.base, &pid, &db_key)?; // TODO trash a directory. @@ -160,7 +160,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = file_db(&state.base, &pid, &db_key)?; // TODO deleted file & directory. diff --git a/src/apps/group/layer.rs b/src/apps/group/layer.rs index 0e4b7d3..37158c5 100644 --- a/src/apps/group/layer.rs +++ b/src/apps/group/layer.rs @@ -51,7 +51,7 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result) { "group-list", |_params: Vec, state: Arc| async move { let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = group_db(&state.base, &pid, &db_key)?; Ok(HandleResult::rpc(group_list(GroupChat::all(&db)?))) @@ -89,7 +89,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = group_db(&state.base, &pid, &db_key)?; let group = GroupChat::get(&db, &id)?; let members = Member::list(&db, &id)?; @@ -104,7 +104,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned(); let pid = state.pid().await; - let group_lock = state.group.read().await; + let group_lock = state.own.read().await; let db_key = group_lock.db_key(&pid)?; let me = group_lock.clone_user(&pid)?; drop(group_lock); @@ -157,7 +157,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let fid = params[1].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let group_db = group_db(&state.base, &pid, &db_key)?; let chat_db = chat_db(&state.base, &pid, &db_key)?; let s_db = session_db(&state.base, &pid, &db_key)?; @@ -220,7 +220,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let m_content = params[2].as_str().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = group_db(&state.base, &pid, &db_key)?; let s_db = session_db(&state.base, &pid, &db_key)?; @@ -267,7 +267,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = group_db(&state.base, &pid, &db_key)?; let s_db = session_db(&state.base, &pid, &db_key)?; @@ -299,7 +299,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = group_db(&state.base, &pid, &db_key)?; let s_db = session_db(&state.base, &pid, &db_key)?; diff --git a/src/apps/jarvis/rpc.rs b/src/apps/jarvis/rpc.rs index 10e78f5..c63268c 100644 --- a/src/apps/jarvis/rpc.rs +++ b/src/apps/jarvis/rpc.rs @@ -51,7 +51,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { "jarvis-list", |_params: Vec, state: Arc| async move { let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = jarvis_db(&state.base, &pid, &db_key)?; let devices = Message::list(&db)?; db.close()?; @@ -71,7 +71,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let content = params[2].as_str().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = jarvis_db(&state.base, &pid, &db_key)?; let (_, raw) = @@ -91,7 +91,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { |params: Vec, state: Arc| async move { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = jarvis_db(&state.base, &pid, &db_key)?; Message::delete(&db, id)?; db.close()?; diff --git a/src/apps/wallet/rpc.rs b/src/apps/wallet/rpc.rs index e269863..529ab54 100644 --- a/src/apps/wallet/rpc.rs +++ b/src/apps/wallet/rpc.rs @@ -320,7 +320,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { "wallet-list", |_params: Vec, state: Arc| async move { let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = wallet_db(&state.base, &pid, &db_key)?; let addresses = Address::list(&db)?; @@ -335,10 +335,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let lock = params[1].as_str().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = wallet_db(&state.base, &pid, &db_key)?; - let group_lock = state.group.read().await; + let group_lock = state.own.read().await; let mnemonic = group_lock.mnemonic(&pid, lock, &state.secret)?; let account = group_lock.account(&pid)?; let lang = account.lang(); @@ -371,7 +371,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { results.rpcs.push(address.to_rpc()); if address.main { let a_db = account_db(&state.base, &state.secret)?; - let mut group_lock = state.group.write().await; + let mut group_lock = state.own.write().await; let account = group_lock.account_mut(&pid)?; account.wallet = address.chain.update_main(&address.address, &account.wallet); account.pub_height = account.pub_height + 1; @@ -398,7 +398,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let pid = state.pid().await; - let group_lock = state.group.read().await; + let group_lock = state.own.read().await; let ckey = &group_lock.account(&pid)?.encrypt; let db_key = group_lock.db_key(&pid)?; let cbytes = encrypt(&state.secret, lock, ckey, sk.as_ref())?; @@ -419,7 +419,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let address = params[1].as_str().ok_or(RpcError::ParseError)?.to_owned(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = wallet_db(&state.base, &pid, &db_key)?; let c_str = if params.len() == 4 { @@ -445,7 +445,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let c = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = wallet_db(&state.base, &pid, &db_key)?; tokio::spawn(token_check(state.rpc_send.clone(), db, chain, net, addr, c)); @@ -480,7 +480,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let lock = params[6].as_str().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let group_lock = state.group.read().await; + let group_lock = state.own.read().await; if !group_lock.check_lock(&pid, &lock) { return Err(RpcError::Custom("Lock is invalid!".to_owned())); } @@ -560,7 +560,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let token = params[1].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = wallet_db(&state.base, &pid, &db_key)?; let nfts = Balance::list(&db, &address, &token)?; @@ -580,7 +580,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let hash = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned(); let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = wallet_db(&state.base, &pid, &db_key)?; let t = Token::get(&db, &token)?; let a = Address::get(&db, &address)?; @@ -608,7 +608,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = wallet_db(&state.base, &pid, &db_key)?; let a_db = account_db(&state.base, &state.secret)?; @@ -617,7 +617,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); - let mut group_lock = state.group.write().await; + let mut group_lock = state.own.write().await; let account = group_lock.account_mut(&pid)?; account.wallet = address.chain.update_main(&address.address, &account.wallet); account.pub_height = account.pub_height + 1; diff --git a/src/daemon.rs b/src/daemon.rs index 8a566fa..26f3ce5 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -11,9 +11,9 @@ mod apps; //mod consensus; //mod event; mod global; -mod group; mod layer; mod migrate; +mod own; mod primitives; mod rpc; mod server; diff --git a/src/event.rs b/src/event.rs index 6d35090..1d6a593 100644 --- a/src/event.rs +++ b/src/event.rs @@ -15,11 +15,11 @@ use tokio::sync::{mpsc::Sender, RwLock}; use crate::account::{Account, User}; use crate::apps::chat::LayerEvent; use crate::consensus::Event as OldEvent; -use crate::group::{Group, GroupEvent}; use crate::layer::Layer; use crate::migrate::consensus::{ ACCOUNT_TABLE_PATH, FILE_TABLE_PATH, FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH, }; +use crate::own::{Own, OwnEvent}; use crate::apps::chat::rpc as chat_rpc; use crate::apps::chat::{from_model, Friend, Message, Request}; @@ -33,14 +33,14 @@ pub(crate) enum State { Account, Session, ChatMessage, - GroupMessage, + OwnMessage, } pub(crate) async fn _handle_state( _gid: GroupId, _addr: PeerId, state: State, - _group: &Arc>, + _group: &Arc>, _layer: &Arc>, _results: &mut HandleResult, ) -> Result<()> { @@ -48,7 +48,7 @@ pub(crate) async fn _handle_state( State::Account => {} State::Session => {} State::ChatMessage => {} - State::GroupMessage => {} + State::OwnMessage => {} } Ok(()) } diff --git a/src/global.rs b/src/global.rs index d33a7e0..66af2ff 100644 --- a/src/global.rs +++ b/src/global.rs @@ -7,8 +7,8 @@ use tdn::{ use tokio::{sync::mpsc::Sender, sync::RwLock}; use crate::account::Account; -use crate::group::Group; use crate::layer::Layer; +use crate::own::Own; /// global status. pub(crate) struct Global { @@ -18,8 +18,8 @@ pub(crate) struct Global { pub peer_pub_height: RwLock, /// current account own height. pub peer_own_height: RwLock, - /// current group. - pub group: RwLock, + /// current own. + pub own: RwLock, /// current layer. pub layer: RwLock, /// message delivery tracking. uuid, me_gid, db_id. @@ -61,7 +61,7 @@ impl Global { peer_id: RwLock::new(PeerId::default()), peer_pub_height: RwLock::new(0), peer_own_height: RwLock::new(0), - group: RwLock::new(Group::init(accounts)), + own: RwLock::new(Own::init(accounts)), layer: RwLock::new(Layer::init()), p2p_send: RwLock::new(None), _delivery: RwLock::new(HashMap::new()), @@ -104,7 +104,7 @@ impl Global { } let (pheight, oheight) = - self.group + self.own .write() .await .reset(pid, lock, &self.base, &self.secret)?; diff --git a/src/layer.rs b/src/layer.rs index 4e908cc..5985338 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -13,7 +13,7 @@ use tokio::sync::RwLock; use crate::account::User; use crate::apps::chat::LayerEvent as ChatLayerEvent; //use crate::apps::group::{group_conn, GROUP_ID}; -use crate::group::Group; +use crate::own::Own; use crate::session::{Session, SessionType}; /// ESSE layers. diff --git a/src/lib.rs b/src/lib.rs index 28ef2ff..545edda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,9 +12,9 @@ mod apps; //mod consensus; //mod event; mod global; -mod group; mod layer; mod migrate; +mod own; mod primitives; mod rpc; mod server; diff --git a/src/group.rs b/src/own.rs similarity index 93% rename from src/group.rs rename to src/own.rs index 8cb53d6..ae74968 100644 --- a/src/group.rs +++ b/src/own.rs @@ -24,8 +24,8 @@ use crate::storage::{account_db, account_init, consensus_db, wallet_db, write_av use crate::utils::crypto::{decrypt, encrypt}; use crate::utils::device_status::{device_info, device_status as local_device_status}; -/// Esse group. -pub(crate) struct Group { +/// ESSE own distributed accounts. +pub(crate) struct Own { /// all accounts. pub accounts: HashMap, /// current account secret keypair. @@ -38,7 +38,7 @@ pub(crate) struct Group { /// Request for make distributed. #[derive(Serialize, Deserialize)] -enum GroupConnect { +enum OwnConnect { /// Params: User, consensus height, event_id, remote_name, remote_info, other_devices addr. Create(User, u64, EventId, String, String, Vec), /// connected. @@ -47,7 +47,7 @@ enum GroupConnect { /// Esse group's Event. #[derive(Serialize, Deserialize)] -pub(crate) enum GroupEvent { +pub(crate) enum OwnEvent { /// Sync event. Event(u64, EventId, EventId), //Event(u64, EventId, EventId, InnerEvent), @@ -97,8 +97,8 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result { - //let event: GroupEvent = bincode::deserialize(&bytes)?; - //return GroupEvent::handle(self, event, pid, addr, uid).await; + //let event: OwnEvent = bincode::deserialize(&bytes)?; + //return OwnEvent::handle(self, event, pid, addr, uid).await; } RecvType::Stream(_uid, _stream, _bytes) => { todo!(); @@ -121,7 +121,7 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result) -> Result { +// OwnConnect::Connect(remote_height, remote_event) => { // if self // .runnings // .get(pid) @@ -229,9 +229,9 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc) -> Result) -> Group { - Group { +impl Own { + pub fn init(accounts: HashMap) -> Own { + Own { accounts, keypair: PeerKey::default(), distributes: vec![], @@ -562,7 +562,7 @@ impl Group { // Ok(SendType::Connect( // 0, // addr, - // bincode::serialize(&GroupConnect::Create( + // bincode::serialize(&OwnConnect::Create( // proof, // user, // height, @@ -579,7 +579,7 @@ impl Group { // let account = self.account(pid)?; // let height = account.own_height; // let event = account.event; - // let data = bincode::serialize(&GroupConnect::Connect(height, event)).unwrap_or(vec![]); + // let data = bincode::serialize(&OwnConnect::Connect(height, event)).unwrap_or(vec![]); // Ok(SendType::Connect(0, addr, data)) // } @@ -587,7 +587,7 @@ impl Group { // let account = self.account(pid)?; // let height = account.own_height; // let event = account.event; - // let data = bincode::serialize(&GroupConnect::Connect(height, event)).unwrap_or(vec![]); + // let data = bincode::serialize(&OwnConnect::Connect(height, event)).unwrap_or(vec![]); // Ok(SendType::Result(0, addr, true, false, data)) // } @@ -604,7 +604,7 @@ impl Group { // addr, // true, // false, - // bincode::serialize(&GroupConnect::Create( + // bincode::serialize(&OwnConnect::Create( // proof, // me, // height, @@ -640,12 +640,12 @@ impl Group { // (vec![], vec![], true) // }; - // let event = GroupEvent::SyncCheck(ancestors, hashes, is_min); + // let event = OwnEvent::SyncCheck(ancestors, hashes, is_min); // let data = bincode::serialize(&event).unwrap_or(vec![]); // Ok(SendType::Event(0, addr, data)) // } - // pub fn event_message(&self, addr: PeerId, event: &GroupEvent) -> Result { + // pub fn event_message(&self, addr: PeerId, event: &OwnEvent) -> Result { // let data = bincode::serialize(event).unwrap_or(vec![]); // Ok(SendType::Event(0, addr, data)) // } @@ -673,7 +673,7 @@ impl Group { // account_db.close()?; // drop(account); - // let e = GroupEvent::Event(eheight, eid, pre_event, event); + // let e = OwnEvent::Event(eheight, eid, pre_event, event); // let data = bincode::serialize(&e).unwrap_or(vec![]); // let running = self.running(pid)?; // for (addr, (_peer, _id, online)) in &running.distributes { @@ -692,7 +692,7 @@ impl Group { // results: &mut HandleResult, // ) -> Result<()> { // let running = self.running(pid)?; - // let data = bincode::serialize(&GroupEvent::Status(event)).unwrap_or(vec![]); + // let data = bincode::serialize(&OwnEvent::Status(event)).unwrap_or(vec![]); // for (addr, (_peer, _id, online)) in &running.distributes { // if *online { // let msg = SendType::Event(0, *addr, data.clone()); @@ -703,10 +703,10 @@ impl Group { // } } -// impl GroupEvent { +// impl OwnEvent { // pub async fn handle( -// group: &mut Group, -// event: GroupEvent, +// group: &mut Own, +// event: OwnEvent, // pid: PeerId, // addr: PeerId, // //layer: &Arc>, @@ -714,18 +714,18 @@ impl Group { // ) -> Result { // let mut results = HandleResult::new(); // match event { -// GroupEvent::DeviceUpdate(_at, _name) => { +// OwnEvent::DeviceUpdate(_at, _name) => { // // TODO // } -// GroupEvent::DeviceDelete(_at) => { +// OwnEvent::DeviceDelete(_at) => { // // TODO // } -// GroupEvent::DeviceOffline => { +// OwnEvent::DeviceOffline => { // let v = group.running_mut(&pid)?; // let did = v.offline(&addr)?; // results.rpcs.push(device_rpc::device_offline(pid, did)); // } -// GroupEvent::StatusRequest => { +// OwnEvent::StatusRequest => { // let (cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p) = // local_device_status(); // results.groups.push(( @@ -733,7 +733,7 @@ impl Group { // SendType::Event( // 0, // addr, -// bincode::serialize(&GroupEvent::StatusResponse( +// bincode::serialize(&OwnEvent::StatusResponse( // cpu_n, // mem_s, // swap_s, @@ -748,7 +748,7 @@ impl Group { // ), // )) // } -// GroupEvent::StatusResponse( +// OwnEvent::StatusResponse( // cpu_n, // mem_s, // swap_s, @@ -761,13 +761,13 @@ impl Group { // ) => results.rpcs.push(device_rpc::device_status( // pid, cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p, uptime, // )), -// GroupEvent::Event(eheight, eid, pre) => { +// OwnEvent::Event(eheight, eid, pre) => { // //inner_event.handle(group, pid, addr, eheight, eid, pre, &mut results, layer)?; // } -// GroupEvent::Status => { +// OwnEvent::Status => { // //status_event.handle(group, pid, addr, &mut results, layer, uid)?; // } -// GroupEvent::SyncCheck(ancestors, hashes, is_min) => { +// OwnEvent::SyncCheck(ancestors, hashes, is_min) => { // println!("sync check: {:?}", ancestors); // let account = group.account(&pid)?; // if ancestors.len() == 0 || hashes.len() == 0 { @@ -789,7 +789,7 @@ impl Group { // drop(db); // if ours.len() == 0 { -// let event = GroupEvent::SyncRequest(1, remote_height); +// let event = OwnEvent::SyncRequest(1, remote_height); // let data = bincode::serialize(&event).unwrap_or(vec![]); // results.groups.push((pid, SendType::Event(0, addr, data))); // return Ok(results); @@ -827,7 +827,7 @@ impl Group { // } // if ancestor != 0 { -// let event = GroupEvent::SyncRequest(ancestor, remote_height); +// let event = OwnEvent::SyncRequest(ancestor, remote_height); // let data = bincode::serialize(&event).unwrap_or(vec![]); // results.groups.push((pid, SendType::Event(0, addr, data))); // } else { @@ -838,7 +838,7 @@ impl Group { // } // } // } -// GroupEvent::SyncRequest(from, to) => { +// OwnEvent::SyncRequest(from, to) => { // println!("====== DEBUG Sync Request: from: {} to {}", from, to); // // every time sync MAX is 100. // let last_to = if to - from > 100 { to - 100 } else { to }; @@ -851,17 +851,17 @@ impl Group { // // last_to, // // ) // // .await?; -// let event = GroupEvent::SyncResponse(from, last_to, to); +// let event = OwnEvent::SyncResponse(from, last_to, to); // let data = bincode::serialize(&event).unwrap_or(vec![]); // results.groups.push((pid, SendType::Event(0, addr, data))); // } -// GroupEvent::SyncResponse(from, last_to, to) => { +// OwnEvent::SyncResponse(from, last_to, to) => { // println!( // "====== DEBUG Sync Response: from: {} last {}, to {}", // from, last_to, to // ); // if last_to < to { -// let event = GroupEvent::SyncRequest(last_to + 1, to); +// let event = OwnEvent::SyncRequest(last_to + 1, to); // let data = bincode::serialize(&event).unwrap_or(vec![]); // results.groups.push((pid, SendType::Event(0, addr, data))); // } diff --git a/src/group/consensus.rs b/src/own/consensus.rs similarity index 100% rename from src/group/consensus.rs rename to src/own/consensus.rs diff --git a/src/group/running.rs b/src/own/running.rs similarity index 100% rename from src/group/running.rs rename to src/own/running.rs diff --git a/src/rpc.rs b/src/rpc.rs index 8eecae2..c99b3f1 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -174,7 +174,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { handler.add_method("account-list", |_, state: Arc| async move { let mut accounts: Vec> = vec![]; - let group_lock = state.group.read().await; + let group_lock = state.own.read().await; for (pid, account) in group_lock.list_accounts().iter() { accounts.push(vec![ id_to_str(pid), @@ -210,7 +210,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let avatar_bytes = base64::decode(avatar).unwrap_or(vec![]); let (_id, pid) = state - .group + .own .write() .await .add_account( @@ -240,7 +240,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let lock = params[4].as_str().ok_or(RpcError::ParseError)?; let (_id, pid) = state - .group + .own .write() .await .add_account( @@ -270,7 +270,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let avatar_bytes = base64::decode(avatar).unwrap_or(vec![]); let pid = state.pid().await; - let mut group_lock = state.group.write().await; + let mut group_lock = state.own.write().await; group_lock.update_account( pid, name, @@ -298,7 +298,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { |params: Vec, state: Arc| async move { let pid = id_from_str(params[0].as_str().ok_or(RpcError::ParseError)?)?; let lock = params[1].as_str().ok_or(RpcError::ParseError)?; - let res = state.group.read().await.check_lock(&pid, lock); + let res = state.own.read().await.check_lock(&pid, lock); Ok(HandleResult::rpc(json!([res]))) }, ); @@ -311,7 +311,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let pid = state.pid().await; let result = HandleResult::rpc(json!([new])); state - .group + .own .write() .await .pin(&pid, old, new, &state.base, &state.secret)?; @@ -324,11 +324,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { |params: Vec, state: Arc| async move { let lock = params[0].as_str().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let mnemonic = state - .group - .read() - .await - .mnemonic(&pid, lock, &state.secret)?; + let mnemonic = state.own.read().await.mnemonic(&pid, lock, &state.secret)?; Ok(HandleResult::rpc(json!([mnemonic]))) }, ); @@ -348,7 +344,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { } // load all local services created by this account. - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let group_db = group_db(&state.base, &pid, &db_key)?; let s_db = session_db(&state.base, &pid, &db_key)?; // 1. group chat. @@ -362,7 +358,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { } drop(layer); - let key = state.group.read().await.keypair(); + let key = state.own.read().await.keypair(); let peer_id = start_main( state.gids.clone(), state.p2p_config.clone(), @@ -394,7 +390,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { "session-list", |_: Vec, state: Arc| async move { let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = session_db(&state.base, &pid, &db_key)?; Ok(HandleResult::rpc(session_list(Session::list(&db)?))) }, @@ -407,7 +403,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let remote = params[1].as_str().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = session_db(&state.base, &pid, &db_key)?; Session::readed(&db, &id)?; let s = Session::get(&db, &id)?; @@ -449,7 +445,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let must = params[2].as_bool().ok_or(RpcError::ParseError)?; // if need must suspend. let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = session_db(&state.base, &pid, &db_key)?; let s = Session::get(&db, &id)?; drop(db); @@ -491,7 +487,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = session_db(&state.base, &pid, &db_key)?; Session::readed(&db, &id)?; Ok(HandleResult::new()) @@ -506,7 +502,7 @@ fn new_rpc_handler(global: Arc) -> RpcHandler { let is_close = params[2].as_bool().ok_or(RpcError::ParseError)?; let pid = state.pid().await; - let db_key = state.group.read().await.db_key(&pid)?; + let db_key = state.own.read().await.db_key(&pid)?; let db = session_db(&state.base, &pid, &db_key)?; Session::update(&db, &id, is_top, is_close)?; Ok(HandleResult::new()) diff --git a/src/server.rs b/src/server.rs index 8256578..c809c2b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -20,9 +20,9 @@ use tdn_storage::local::DStorage; use crate::account::Account; use crate::apps::app_layer_handle; use crate::global::Global; -use crate::group::{handle as group_handle, Group}; use crate::layer::Layer; use crate::migrate::{main_migrate, ACCOUNT_DB}; +use crate::own::{handle as own_handle, Own}; use crate::primitives::network_seeds; use crate::rpc::{init_rpc, inner_rpc}; @@ -90,7 +90,7 @@ pub async fn start(db_path: String) -> Result<()> { while let Some(message) = self_recv.recv().await { match message { ReceiveMessage::Own(o_msg) => { - if let Ok(handle_result) = group_handle(o_msg, &global).await { + if let Ok(handle_result) = own_handle(o_msg, &global).await { handle(handle_result, now_rpc_uid, true, &global).await; } } @@ -221,7 +221,7 @@ async fn handle(handle_result: HandleResult, uid: u64, is_ws: bool, global: &Arc mut rpcs, mut layers, mut networks, - groups: _, // no-group message. + mut groups, } = handle_result; loop { @@ -250,6 +250,18 @@ async fn handle(handle_result: HandleResult, uid: u64, is_ws: bool, global: &Arc } } + loop { + if groups.len() != 0 { + let msg = groups.remove(0); + sender + .send(SendMessage::Group(msg)) + .await + .expect("TDN channel closed"); + } else { + break; + } + } + loop { if layers.len() != 0 { let (tgid, msg) = layers.remove(0);