Browse Source

refactor chat to group

pull/24/head
Sun 3 years ago
parent
commit
a339cbacdb
  1. 10
      src/apps/chat/mod.rs
  2. 2
      src/apps/dao/layer.rs
  3. 2
      src/apps/dao/models/message.rs
  4. 15
      src/apps/dao/rpc.rs
  5. 8
      src/apps/device/rpc.rs
  6. 2
      src/apps/group/layer.rs
  7. 2
      src/apps/group/models/message.rs
  8. 21
      src/apps/group/rpc.rs
  9. 2
      src/apps/jarvis/rpc.rs
  10. 10
      src/apps/mod.rs
  11. 50
      src/apps/wallet/rpc.rs
  12. 1
      src/daemon.rs
  13. 10
      src/event.rs
  14. 6
      src/global.rs
  15. 153
      src/group/handle.rs
  16. 212
      src/group/mod.rs
  17. 0
      src/group/models.rs
  18. 0
      src/group/models/friend.rs
  19. 0
      src/group/models/message.rs
  20. 0
      src/group/models/request.rs
  21. 65
      src/group/rpc.rs
  22. 80
      src/layer.rs
  23. 1
      src/lib.rs
  24. 0
      src/migrate/mod.rs
  25. 0
      src/own/mod.rs
  26. 58
      src/rpc.rs
  27. 7
      src/server.rs
  28. 0
      src/utils/mod.rs

10
src/apps/chat/mod.rs

@ -1,10 +0,0 @@
mod layer;
mod models;
pub(crate) mod rpc;
pub(crate) use layer::{chat_conn, handle, update_session, LayerEvent};
pub(crate) use models::{
from_model, from_network_message, raw_to_network_message, to_network_message, Friend,
InviteType, Message, Request,
};
pub(crate) use rpc::new_rpc_handler;

2
src/apps/dao/layer.rs

@ -14,7 +14,7 @@ use group_types::{
use tdn_did::Proof; use tdn_did::Proof;
use tdn_storage::local::DStorage; use tdn_storage::local::DStorage;
use crate::apps::chat::Friend; use crate::group::Friend;
use crate::layer::{Layer, Online}; use crate::layer::{Layer, Online};
use crate::rpc::{session_connect, session_create, session_last, session_lost, session_suspend}; use crate::rpc::{session_connect, session_create, session_last, session_lost, session_suspend};
use crate::session::{connect_session, Session, SessionType}; use crate::session::{connect_session, Session, SessionType};

2
src/apps/dao/models/message.rs

@ -9,7 +9,7 @@ use tdn_storage::local::{DStorage, DsValue};
use group_types::NetworkMessage; use group_types::NetworkMessage;
use crate::apps::chat::{Friend, MessageType}; use crate::group::{Friend, MessageType};
use crate::storage::{ use crate::storage::{
chat_db, group_db, read_avatar, read_file, read_record, write_avatar_sync, write_file_sync, chat_db, group_db, read_avatar, read_file, read_record, write_avatar_sync, write_file_sync,
write_image_sync, write_record_sync, write_image_sync, write_record_sync,

15
src/apps/dao/rpc.rs

@ -9,7 +9,7 @@ use tdn_did::Proof;
use group_types::{Event, GroupLocation, GroupType, JoinProof, LayerEvent}; use group_types::{Event, GroupLocation, GroupType, JoinProof, LayerEvent};
use crate::apps::chat::{Friend, MessageType}; use crate::group::{Friend, MessageType};
use crate::layer::Online; use crate::layer::Online;
use crate::rpc::{session_close, session_create, session_delete, session_last, RpcState}; use crate::rpc::{session_close, session_create, session_delete, session_last, RpcState};
use crate::session::{Session, SessionType}; use crate::session::{Session, SessionType};
@ -424,8 +424,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
.filter_map(|v| v.as_i64()) .filter_map(|v| v.as_i64())
.collect(); .collect();
let group_lock = state.own.read().await; let own_lock = state.own.read().await;
let base = group_lock.base().clone(); let base = own_lock.base().clone();
let chat = chat_db(&base, &gid)?; let chat = chat_db(&base, &gid)?;
let group_db = group_db(&base, &gid)?; let group_db = group_db(&base, &gid)?;
@ -434,7 +434,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
for fid in ids { for fid in ids {
let friend = Friend::get_id(&chat, fid)?.ok_or(RpcError::ParseError)?; let friend = Friend::get_id(&chat, fid)?.ok_or(RpcError::ParseError)?;
if Member::get_id(&group_db, &id, &friend.gid).is_err() { if Member::get_id(&group_db, &id, &friend.gid).is_err() {
let proof = group_lock.prove_addr(&gid, &friend.gid.into())?; let proof = own_lock.prove_addr(&gid, &friend.gid.into())?;
invites.push((friend.id, friend.gid, friend.addr, proof)); invites.push((friend.id, friend.gid, friend.addr, proof));
} }
} }
@ -467,7 +467,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
} }
} }
let (msg, nw, sc) = crate::apps::chat::LayerEvent::from_message( let (msg, nw, sc) = crate::group::GroupEvent::from_message(
&base, &base,
gid, gid,
fid, fid,
@ -475,9 +475,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
contact_values, contact_values,
) )
.await?; .await?;
let event = crate::apps::chat::LayerEvent::Message(msg.hash, nw); let event = crate::group::GroupEvent::Message(msg.hash, nw);
let s = let s = crate::group::event_message(&mut layer_lock, msg.id, gid, faddr, &event);
crate::apps::chat::event_message(&mut layer_lock, msg.id, gid, faddr, &event);
results.layers.push((gid, fgid, s)); results.layers.push((gid, fgid, s));
if let Ok(id) = if let Ok(id) =

8
src/apps/device/rpc.rs

@ -76,16 +76,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
|params: Vec<RpcParam>, state: Arc<Global>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let group_lock = state.own.read().await; let own_lock = state.own.read().await;
if id == group_lock.device()?.id { if id == own_lock.device()?.id {
let uptime = group_lock.uptime; let uptime = own_lock.uptime;
let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) = let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) =
local_device_status(); local_device_status();
return Ok(HandleResult::rpc(json!([ return Ok(HandleResult::rpc(json!([
cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime
]))); ])));
} }
drop(group_lock); drop(own_lock);
//let msg = state.own.write().await.event_message(addr, &OwnEvent::StatusRequest)?; //let msg = state.own.write().await.event_message(addr, &OwnEvent::StatusRequest)?;
//Ok(HandleResult::group(msg)) //Ok(HandleResult::group(msg))

2
src/apps/group/layer.rs

@ -7,8 +7,8 @@ use tdn::types::{
}; };
use tdn_storage::local::DStorage; use tdn_storage::local::DStorage;
use crate::apps::chat::Friend;
use crate::global::Global; use crate::global::Global;
use crate::group::Friend;
use crate::rpc::{ use crate::rpc::{
session_close, session_connect, session_last, session_lost, session_suspend, session_close, session_connect, session_last, session_lost, session_suspend,
session_update_name, session_update_name,

2
src/apps/group/models/message.rs

@ -8,7 +8,7 @@ use tdn::types::{
}; };
use tdn_storage::local::{DStorage, DsValue}; use tdn_storage::local::{DStorage, DsValue};
use crate::apps::chat::{from_network_message, raw_to_network_message, to_network_message as tnm}; use crate::group::{from_network_message, raw_to_network_message, to_network_message as tnm};
use crate::storage::group_db; use crate::storage::group_db;
use super::Member; use super::Member;

21
src/apps/group/rpc.rs

@ -1,4 +1,4 @@
use esse_primitives::{MessageType, ESSE_ID}; use esse_primitives::MessageType;
use group_types::{Event, LayerEvent, GROUP_CHAT_ID}; use group_types::{Event, LayerEvent, GROUP_CHAT_ID};
use std::sync::Arc; use std::sync::Arc;
use tdn::types::{ use tdn::types::{
@ -7,8 +7,8 @@ use tdn::types::{
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam},
}; };
use crate::apps::chat::{raw_to_network_message, Friend, InviteType};
use crate::global::Global; use crate::global::Global;
use crate::group::{raw_to_network_message, Friend, InviteType};
use crate::rpc::{session_create, session_delete, session_update_name}; use crate::rpc::{session_create, session_delete, session_update_name};
use crate::session::{Session, SessionType}; use crate::session::{Session, SessionType};
use crate::storage::{chat_db, group_db, read_avatar, session_db, write_avatar}; use crate::storage::{chat_db, group_db, read_avatar, session_db, write_avatar};
@ -104,10 +104,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned(); let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned();
let pid = state.pid().await; let pid = state.pid().await;
let group_lock = state.own.read().await; let own_lock = state.own.read().await;
let db_key = group_lock.db_key(&pid)?; let db_key = own_lock.db_key(&pid)?;
let me = group_lock.clone_user(&pid)?; let me = own_lock.clone_user(&pid)?;
drop(group_lock); drop(own_lock);
let db = group_db(&state.base, &pid, &db_key)?; let db = group_db(&state.base, &pid, &db_key)?;
let s_db = session_db(&state.base, &pid, &db_key)?; let s_db = session_db(&state.base, &pid, &db_key)?;
@ -172,16 +172,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let m_type = MessageType::Invite; let m_type = MessageType::Invite;
let (nm, raw) = let (nm, raw) =
raw_to_network_message(&pid, &state.base, &db_key, &m_type, &contact).await?; raw_to_network_message(&pid, &state.base, &db_key, &m_type, &contact).await?;
let mut msg = crate::apps::chat::Message::new(&pid, f.id, true, m_type, raw, false); let mut msg = crate::group::Message::new(&pid, f.id, true, m_type, raw, false);
msg.insert(&chat_db)?; msg.insert(&chat_db)?;
let event = crate::apps::chat::LayerEvent::Message(msg.hash, nm); let event = crate::group::GroupEvent::Message(msg.hash, nm);
let tid = state.layer.write().await.delivery(msg.id); let tid = state.layer.write().await.delivery(msg.id);
let data = bincode::serialize(&event).unwrap_or(vec![]); let data = bincode::serialize(&event).unwrap_or(vec![]);
let lmsg = SendType::Event(tid, f.pid, data); results.groups.push(SendType::Event(tid, f.pid, data));
results.layers.push((ESSE_ID, lmsg));
// update session. // update session.
crate::apps::chat::update_session(&s_db, &id, &msg, &mut results); crate::group::update_session(&s_db, &id, &msg, &mut results);
// handle group member // handle group member
let avatar = read_avatar(&state.base, &pid, &f.pid) let avatar = read_avatar(&state.base, &pid, &f.pid)

2
src/apps/jarvis/rpc.rs

@ -11,8 +11,8 @@ use tdn_storage::local::DStorage;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::account::lang_from_i64; use crate::account::lang_from_i64;
use crate::apps::chat::raw_to_network_message;
use crate::global::Global; use crate::global::Global;
use crate::group::raw_to_network_message;
use crate::storage::jarvis_db; use crate::storage::jarvis_db;
use crate::utils::answer::load_answer; use crate::utils::answer::load_answer;

10
src/apps.rs → src/apps/mod.rs

@ -1,7 +1,6 @@
use cloud_types::CLOUD_ID; use cloud_types::CLOUD_ID;
use dao_types::DAO_ID; use dao_types::DAO_ID;
use domain_types::DOMAIN_ID; use domain_types::DOMAIN_ID;
use esse_primitives::ESSE_ID;
use group_types::{GroupChatId, GROUP_CHAT_ID}; use group_types::{GroupChatId, GROUP_CHAT_ID};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -16,7 +15,6 @@ use crate::global::Global;
use crate::rpc::session_lost; use crate::rpc::session_lost;
use crate::storage::group_db; use crate::storage::group_db;
pub(crate) mod chat;
pub(crate) mod cloud; pub(crate) mod cloud;
pub(crate) mod device; pub(crate) mod device;
pub(crate) mod domain; pub(crate) mod domain;
@ -28,7 +26,6 @@ pub(crate) mod wallet;
pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<Global>) { pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<Global>) {
device::new_rpc_handler(handler); device::new_rpc_handler(handler);
chat::new_rpc_handler(handler);
jarvis::new_rpc_handler(handler); jarvis::new_rpc_handler(handler);
domain::new_rpc_handler(handler); domain::new_rpc_handler(handler);
file::new_rpc_handler(handler); file::new_rpc_handler(handler);
@ -46,21 +43,16 @@ pub(crate) async fn app_layer_handle(
) -> Result<HandleResult> { ) -> Result<HandleResult> {
debug!("TODO GOT LAYER MESSAGE: ====== {} -> {} ===== ", fgid, tgid); debug!("TODO GOT LAYER MESSAGE: ====== {} -> {} ===== ", fgid, tgid);
match (fgid, tgid) { match (fgid, tgid) {
(ESSE_ID, 0) | (0, ESSE_ID) => chat::handle(msg, global).await,
(GROUP_CHAT_ID, 0) | (0, GROUP_CHAT_ID) => group::handle(msg, global).await, (GROUP_CHAT_ID, 0) | (0, GROUP_CHAT_ID) => group::handle(msg, global).await,
(DOMAIN_ID, 0) | (0, DOMAIN_ID) => domain::handle(msg, global).await, (DOMAIN_ID, 0) | (0, DOMAIN_ID) => domain::handle(msg, global).await,
(CLOUD_ID, 0) | (0, CLOUD_ID) => cloud::handle(msg, global).await, (CLOUD_ID, 0) | (0, CLOUD_ID) => cloud::handle(msg, global).await,
(DAO_ID, 0) | (0, DAO_ID) => chat::handle(msg, global).await, (DAO_ID, 0) | (0, DAO_ID) => cloud::handle(msg, global).await, // TODO DAO
_ => match msg { _ => match msg {
RecvType::Leave(peer) => { RecvType::Leave(peer) => {
debug!("Peer leaved: {}", peer.id.to_hex()); debug!("Peer leaved: {}", peer.id.to_hex());
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let mut layer = global.layer.write().await; let mut layer = global.layer.write().await;
if let Some(session) = layer.chats.remove(&peer.id) {
results.rpcs.push(session_lost(&session.s_id));
}
let mut delete: HashMap<GroupChatId, Vec<usize>> = HashMap::new(); let mut delete: HashMap<GroupChatId, Vec<usize>> = HashMap::new();
let pid = global.pid().await; let pid = global.pid().await;
let db_key = global.own.read().await.db_key(&pid)?; let db_key = global.own.read().await.db_key(&pid)?;

50
src/apps/wallet/rpc.rs

@ -338,13 +338,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let db_key = state.own.read().await.db_key(&pid)?; let db_key = state.own.read().await.db_key(&pid)?;
let db = wallet_db(&state.base, &pid, &db_key)?; let db = wallet_db(&state.base, &pid, &db_key)?;
let group_lock = state.own.read().await; let own_lock = state.own.read().await;
let mnemonic = group_lock.mnemonic(&pid, lock, &state.secret)?; let mnemonic = own_lock.mnemonic(&pid, lock, &state.secret)?;
let account = group_lock.account(&pid)?; let account = own_lock.account(&pid)?;
let lang = account.lang(); let lang = account.lang();
let pass = account.pass.to_string(); let pass = account.pass.to_string();
let account_index = account.index as u32; let account_index = account.index as u32;
drop(group_lock); drop(own_lock);
let mut results = HandleResult::new(); let mut results = HandleResult::new();
@ -371,16 +371,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
results.rpcs.push(address.to_rpc()); results.rpcs.push(address.to_rpc());
if address.main { if address.main {
let a_db = account_db(&state.base, &state.secret)?; let a_db = account_db(&state.base, &state.secret)?;
let mut group_lock = state.own.write().await; let mut own_lock = state.own.write().await;
let account = group_lock.account_mut(&pid)?; let account = own_lock.account_mut(&pid)?;
account.wallet = address.chain.update_main(&address.address, &account.wallet); account.wallet = address.chain.update_main(&address.address, &account.wallet);
account.pub_height = account.pub_height + 1; account.pub_height = account.pub_height + 1;
account.update_info(&a_db)?; account.update_info(&a_db)?;
let user = group_lock.clone_user(&pid)?; let user = own_lock.clone_user(&pid)?;
drop(group_lock); drop(own_lock);
// broadcast to all friends. // broadcast to all friends.
state.layer.read().await.broadcast(user, &mut results); state.group.read().await.broadcast(user, &mut results);
} }
Ok(results) Ok(results)
}, },
@ -398,11 +398,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let pid = state.pid().await; let pid = state.pid().await;
let group_lock = state.own.read().await; let own_lock = state.own.read().await;
let ckey = &group_lock.account(&pid)?.encrypt; let ckey = &own_lock.account(&pid)?.encrypt;
let db_key = group_lock.db_key(&pid)?; let db_key = own_lock.db_key(&pid)?;
let cbytes = encrypt(&state.secret, lock, ckey, sk.as_ref())?; let cbytes = encrypt(&state.secret, lock, ckey, sk.as_ref())?;
drop(group_lock); drop(own_lock);
let db = wallet_db(&state.base, &pid, &db_key)?; let db = wallet_db(&state.base, &pid, &db_key)?;
@ -480,26 +480,26 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let lock = params[6].as_str().ok_or(RpcError::ParseError)?; let lock = params[6].as_str().ok_or(RpcError::ParseError)?;
let pid = state.pid().await; let pid = state.pid().await;
let group_lock = state.own.read().await; let own_lock = state.own.read().await;
if !group_lock.check_lock(&pid, &lock) { if !own_lock.check_lock(&pid, &lock) {
return Err(RpcError::Custom("Lock is invalid!".to_owned())); return Err(RpcError::Custom("Lock is invalid!".to_owned()));
} }
let db_key = group_lock.db_key(&pid)?; let db_key = own_lock.db_key(&pid)?;
let db = wallet_db(&state.base, &pid, &db_key)?; let db = wallet_db(&state.base, &pid, &db_key)?;
let address = Address::get(&db, &from)?; let address = Address::get(&db, &from)?;
let (mnemonic, pbytes) = if address.is_gen() { let (mnemonic, pbytes) = if address.is_gen() {
(group_lock.mnemonic(&pid, lock, &state.secret)?, vec![]) (own_lock.mnemonic(&pid, lock, &state.secret)?, vec![])
} else { } else {
let ckey = &group_lock.account(&pid)?.encrypt; let ckey = &own_lock.account(&pid)?.encrypt;
let pbytes = decrypt(&state.secret, lock, ckey, address.secret.as_ref())?; let pbytes = decrypt(&state.secret, lock, ckey, address.secret.as_ref())?;
(String::new(), pbytes) (String::new(), pbytes)
}; };
let account = group_lock.account(&pid)?; let account = own_lock.account(&pid)?;
let lang = account.lang(); let lang = account.lang();
let pass = account.pass.to_string(); let pass = account.pass.to_string();
let account_index = account.index as u32; let account_index = account.index as u32;
drop(group_lock); drop(own_lock);
let pass = if pass.len() > 0 { let pass = if pass.len() > 0 {
Some(pass.as_ref()) Some(pass.as_ref())
} else { } else {
@ -617,16 +617,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let mut group_lock = state.own.write().await; let mut own_lock = state.own.write().await;
let account = group_lock.account_mut(&pid)?; let account = own_lock.account_mut(&pid)?;
account.wallet = address.chain.update_main(&address.address, &account.wallet); account.wallet = address.chain.update_main(&address.address, &account.wallet);
account.pub_height = account.pub_height + 1; account.pub_height = account.pub_height + 1;
account.update_info(&a_db)?; account.update_info(&a_db)?;
let user = group_lock.clone_user(&pid)?; let user = own_lock.clone_user(&pid)?;
drop(group_lock); drop(own_lock);
// broadcast all friends. // broadcast all friends.
state.layer.read().await.broadcast(user, &mut results); state.group.read().await.broadcast(user, &mut results);
Ok(HandleResult::new()) Ok(HandleResult::new())
}, },

1
src/daemon.rs

@ -11,6 +11,7 @@ mod apps;
//mod consensus; //mod consensus;
//mod event; //mod event;
mod global; mod global;
mod group;
mod layer; mod layer;
mod migrate; mod migrate;
mod own; mod own;

10
src/event.rs

@ -13,17 +13,17 @@ use tdn_storage::local::DStorage;
use tokio::sync::{mpsc::Sender, RwLock}; use tokio::sync::{mpsc::Sender, RwLock};
use crate::account::{Account, User}; use crate::account::{Account, User};
use crate::apps::chat::LayerEvent;
use crate::consensus::Event as OldEvent; use crate::consensus::Event as OldEvent;
use crate::group::GroupEvent;
use crate::layer::Layer; use crate::layer::Layer;
use crate::migrate::consensus::{ use crate::migrate::consensus::{
ACCOUNT_TABLE_PATH, FILE_TABLE_PATH, FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH, ACCOUNT_TABLE_PATH, FILE_TABLE_PATH, FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH,
}; };
use crate::own::{Own, OwnEvent}; use crate::own::{Own, OwnEvent};
use crate::apps::chat::rpc as chat_rpc;
use crate::apps::chat::{from_model, Friend, Message, Request};
use crate::apps::file::{FileDid, RootDirectory}; use crate::apps::file::{FileDid, RootDirectory};
use crate::group::rpc as chat_rpc;
use crate::group::{from_model, Friend, Message, Request};
use crate::rpc; use crate::rpc;
use crate::storage::{delete_avatar_sync, read_avatar_sync, write_avatar_sync}; use crate::storage::{delete_avatar_sync, read_avatar_sync, write_avatar_sync};
@ -156,7 +156,7 @@ impl InnerEvent {
layer: Arc<RwLock<Layer>>, layer: Arc<RwLock<Layer>>,
gid: GroupId, gid: GroupId,
fgid: GroupId, fgid: GroupId,
event: LayerEvent, event: GroupEvent,
) -> Result<()> { ) -> Result<()> {
let addr = layer.read().await.running(&gid)?.online_direct(&fgid)?; let addr = layer.read().await.running(&gid)?.online_direct(&fgid)?;
let data = bincode::serialize(&event).unwrap_or(vec![]); let data = bincode::serialize(&event).unwrap_or(vec![]);
@ -342,7 +342,7 @@ impl InnerEvent {
let ggid = gid.clone(); let ggid = gid.clone();
let fgid = f.gid; let fgid = f.gid;
let sender = group.sender(); let sender = group.sender();
let layer_event = LayerEvent::Message(hash, m.clone()); let layer_event = GroupEvent::Message(hash, m.clone());
tokio::spawn(InnerEvent::direct_layer_session( tokio::spawn(InnerEvent::direct_layer_session(
sender, sender,
layer_lock, layer_lock,

6
src/global.rs

@ -7,6 +7,7 @@ use tdn::{
use tokio::{sync::mpsc::Sender, sync::RwLock}; use tokio::{sync::mpsc::Sender, sync::RwLock};
use crate::account::Account; use crate::account::Account;
use crate::group::Group;
use crate::layer::Layer; use crate::layer::Layer;
use crate::own::Own; use crate::own::Own;
@ -20,6 +21,8 @@ pub(crate) struct Global {
pub peer_own_height: RwLock<u64>, pub peer_own_height: RwLock<u64>,
/// current own. /// current own.
pub own: RwLock<Own>, pub own: RwLock<Own>,
/// current group.
pub group: RwLock<Group>,
/// current layer. /// current layer.
pub layer: RwLock<Layer>, pub layer: RwLock<Layer>,
/// message delivery tracking. uuid, me_gid, db_id. /// message delivery tracking. uuid, me_gid, db_id.
@ -62,6 +65,7 @@ impl Global {
peer_pub_height: RwLock::new(0), peer_pub_height: RwLock::new(0),
peer_own_height: RwLock::new(0), peer_own_height: RwLock::new(0),
own: RwLock::new(Own::init(accounts)), own: RwLock::new(Own::init(accounts)),
group: RwLock::new(Group::init()),
layer: RwLock::new(Layer::init()), layer: RwLock::new(Layer::init()),
p2p_send: RwLock::new(None), p2p_send: RwLock::new(None),
_delivery: RwLock::new(HashMap::new()), _delivery: RwLock::new(HashMap::new()),
@ -90,6 +94,7 @@ impl Global {
pub async fn clear(&self) { pub async fn clear(&self) {
*self.peer_id.write().await = PeerId::default(); *self.peer_id.write().await = PeerId::default();
self.group.write().await.clear();
self.layer.write().await.clear(); self.layer.write().await.clear();
} }
@ -108,6 +113,7 @@ impl Global {
.write() .write()
.await .await
.reset(pid, lock, &self.base, &self.secret)?; .reset(pid, lock, &self.base, &self.secret)?;
self.group.write().await.clear();
self.layer.write().await.clear(); self.layer.write().await.clear();
*self.p2p_send.write().await = Some(send); *self.p2p_send.write().await = Some(send);

153
src/apps/chat/layer.rs → src/group/handle.rs

@ -1,5 +1,6 @@
use esse_primitives::{MessageType, NetworkMessage, ESSE_ID}; use esse_primitives::{MessageType, NetworkMessage};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tdn::types::{ use tdn::types::{
group::EventId, group::EventId,
@ -17,74 +18,44 @@ use crate::rpc::{
use crate::session::{connect_session, Session, SessionType}; use crate::session::{connect_session, Session, SessionType};
use crate::storage::{account_db, chat_db, session_db, write_avatar_sync}; use crate::storage::{account_db, chat_db, session_db, write_avatar_sync};
use super::models::{handle_nmsg, Friend, Message, Request};
use super::rpc; use super::rpc;
use super::{
from_model, from_network_message, handle_nmsg, Friend, GroupEvent, InviteType, Message, Request,
};
/// Chat connect data structure. pub(crate) async fn group_handle(msg: RecvType, global: &Arc<Global>) -> Result<HandleResult> {
/// params: Friend about me height debug!("---------DEBUG--------- GOT GROUP MESSAGE");
//#[derive(Serialize, Deserialize)]
//pub struct LayerConnect(pub i64);
/// ESSE chat layer Event.
#[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent {
/// offline. extend BaseLayerEvent.
Offline,
/// suspend. extend BaseLayerEvent.
Suspend,
/// actived. extend BaseLayerEvent.
Actived,
/// make friendship request.
/// params is name, remark.
Request(String, String),
/// agree friendship request.
/// params is gid.
Agree,
/// reject friendship request.
Reject,
/// receiver gid, sender gid, message.
Message(EventId, NetworkMessage),
/// request user info.
InfoReq(u64),
/// user full info.
InfoRes(User),
/// close friendship.
Close,
}
pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<HandleResult> {
debug!("---------DEBUG--------- GOT CHAT EVENT");
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let pid = global.pid().await; let pid = global.pid().await;
match msg { match msg {
RecvType::Connect(peer, _) | RecvType::ResultConnect(peer, _) => { RecvType::Connect(peer, _) | RecvType::ResultConnect(peer, _) => {
// ESSE chat layer connect date structure. // ESSE group connect date structure.
if let Ok(height) = handle_connect(pid, &peer, global, &mut results).await { if let Ok(height) = handle_connect(pid, &peer, global, &mut results).await {
let peer_id = peer.id; let peer_id = peer.id;
let msg = SendType::Result(0, peer, true, false, vec![]); let msg = SendType::Result(0, peer, true, false, vec![]);
results.layers.push((ESSE_ID, msg)); results.groups.push(msg);
let info = LayerEvent::InfoReq(height); let info = GroupEvent::InfoReq(height);
let data = bincode::serialize(&info).unwrap_or(vec![]); let data = bincode::serialize(&info).unwrap_or(vec![]);
let msg = SendType::Event(0, peer_id, data); let msg = SendType::Event(0, peer_id, data);
results.layers.push((ESSE_ID, msg)); results.groups.push(msg);
} else { } else {
let msg = SendType::Result(0, peer, false, false, vec![]); let msg = SendType::Result(0, peer, false, false, vec![]);
results.layers.push((ESSE_ID, msg)); results.groups.push(msg);
} }
} }
RecvType::Result(peer, is_ok, _) => { RecvType::Result(peer, is_ok, _) => {
// ESSE chat layer result date structure. // ESSE group result date structure.
if is_ok { if is_ok {
if let Ok(height) = handle_connect(pid, &peer, global, &mut results).await { if let Ok(height) = handle_connect(pid, &peer, global, &mut results).await {
let info = LayerEvent::InfoReq(height); let info = GroupEvent::InfoReq(height);
let data = bincode::serialize(&info).unwrap_or(vec![]); let data = bincode::serialize(&info).unwrap_or(vec![]);
let msg = SendType::Event(0, peer.id, data); let msg = SendType::Event(0, peer.id, data);
results.layers.push((ESSE_ID, msg)); results.groups.push(msg);
} else { } else {
let msg = SendType::Result(0, peer, false, false, vec![]); let msg = SendType::Result(0, peer, false, false, vec![]);
results.layers.push((ESSE_ID, msg)); results.groups.push(msg);
} }
} else { } else {
let db_key = global.own.read().await.db_key(&pid)?; let db_key = global.own.read().await.db_key(&pid)?;
@ -95,12 +66,12 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle
} }
} }
RecvType::Event(fpid, bytes) => { RecvType::Event(fpid, bytes) => {
return LayerEvent::handle(pid, fpid, global, bytes).await; return GroupEvent::handle(pid, fpid, global, bytes).await;
} }
RecvType::Delivery(t, tid, is_ok) => { RecvType::Delivery(t, tid, is_ok) => {
let mut layer = global.layer.write().await; let mut group = global.group.write().await;
let id = layer.delivery.remove(&tid).ok_or(anyhow!("delivery err"))?; let id = group.delivery.remove(&tid).ok_or(anyhow!("delivery err"))?;
drop(layer); drop(group);
let db_key = global.own.read().await.db_key(&pid)?; let db_key = global.own.read().await.db_key(&pid)?;
let db = chat_db(&global.base, &pid, &db_key)?; let db = chat_db(&global.base, &pid, &db_key)?;
let resp = match t { let resp = match t {
@ -125,7 +96,15 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle
RecvType::Stream(_uid, _stream, _bytes) => { RecvType::Stream(_uid, _stream, _bytes) => {
// TODO stream // TODO stream
} }
RecvType::Leave(..) => {} // nerver here. RecvType::Leave(peer) => {
debug!("Peer leaved: {}", peer.id.to_hex());
let mut group_lock = global.group.write().await;
if let Ok((sid, _fid)) = group_lock.get(&peer.id) {
results.rpcs.push(session_lost(&sid));
}
group_lock.rm_online(&peer.id);
drop(group_lock);
}
} }
Ok(results) Ok(results)
@ -159,41 +138,41 @@ async fn handle_connect(
results.rpcs.push(session_connect(&sid, &peer.id)); results.rpcs.push(session_connect(&sid, &peer.id));
// 4. active this session. // 4. active this session.
global.layer.write().await.chat_add(peer.id, sid, f.id, 0); global.group.write().await.add(peer.id, sid, f.id, 0);
Ok(f.height as u64) Ok(f.height as u64)
} }
impl LayerEvent { impl GroupEvent {
pub async fn handle( pub async fn handle(
pid: PeerId, pid: PeerId,
fpid: PeerId, fpid: PeerId,
global: &Arc<Global>, global: &Arc<Global>,
bytes: Vec<u8>, bytes: Vec<u8>,
) -> Result<HandleResult> { ) -> Result<HandleResult> {
let event: LayerEvent = bincode::deserialize(&bytes)?; let event: GroupEvent = bincode::deserialize(&bytes)?;
let mut results = HandleResult::new(); let mut results = HandleResult::new();
match event { match event {
LayerEvent::Offline => { GroupEvent::Offline => {
let mut layer = global.layer.write().await; let mut group = global.group.write().await;
let (sid, _fid) = layer.chat_session(&fpid)?; let (sid, _fid) = group.get(&fpid)?;
let _ = layer.chat_rm_online(&fpid); group.rm_online(&fpid);
results.rpcs.push(session_lost(&sid)); results.rpcs.push(session_lost(&sid));
} }
LayerEvent::Suspend => { GroupEvent::Suspend => {
let mut layer = global.layer.write().await; let mut group = global.group.write().await;
let (sid, _fid) = layer.chat_session(&fpid)?; let (sid, _fid) = group.get(&fpid)?;
let _ = layer.chat_suspend(&fpid, false, false)?; group.suspend(&fpid, false, false)?;
results.rpcs.push(session_suspend(&sid)); results.rpcs.push(session_suspend(&sid));
} }
LayerEvent::Actived => { GroupEvent::Actived => {
let mut layer = global.layer.write().await; let mut group = global.group.write().await;
let (sid, _fid) = layer.chat_session(&fpid)?; let (sid, _fid) = group.get(&fpid)?;
let _ = layer.chat_active(&fpid, false); group.active(&fpid, false)?;
results.rpcs.push(session_connect(&sid, &fpid)); results.rpcs.push(session_connect(&sid, &fpid));
} }
LayerEvent::Request(name, remark) => { GroupEvent::Request(name, remark) => {
let db_key = global.own.read().await.db_key(&pid)?; let db_key = global.own.read().await.db_key(&pid)?;
let db = chat_db(&global.base, &pid, &db_key)?; let db = chat_db(&global.base, &pid, &db_key)?;
@ -212,12 +191,12 @@ impl LayerEvent {
results.rpcs.push(notice_menu(&SessionType::Chat)); results.rpcs.push(notice_menu(&SessionType::Chat));
return Ok(results); return Ok(results);
} else { } else {
let data = bincode::serialize(&LayerEvent::Agree).unwrap_or(vec![]); let data = bincode::serialize(&GroupEvent::Agree).unwrap_or(vec![]);
let msg = SendType::Event(0, fpid, data); let msg = SendType::Event(0, fpid, data);
results.layers.push((ESSE_ID, msg)); results.groups.push(msg);
} }
} }
LayerEvent::Agree => { GroupEvent::Agree => {
let db_key = global.own.read().await.db_key(&pid)?; let db_key = global.own.read().await.db_key(&pid)?;
let db = chat_db(&global.base, &pid, &db_key)?; let db = chat_db(&global.base, &pid, &db_key)?;
@ -247,7 +226,7 @@ impl LayerEvent {
drop(db); drop(db);
} }
} }
LayerEvent::Reject => { GroupEvent::Reject => {
let db_key = global.own.read().await.db_key(&pid)?; let db_key = global.own.read().await.db_key(&pid)?;
let db = chat_db(&global.base, &pid, &db_key)?; let db = chat_db(&global.base, &pid, &db_key)?;
@ -258,8 +237,8 @@ impl LayerEvent {
results.rpcs.push(rpc::request_reject(request.id)); results.rpcs.push(rpc::request_reject(request.id));
} }
} }
LayerEvent::Message(hash, m) => { GroupEvent::Message(hash, m) => {
let (_sid, fid) = global.layer.read().await.chat_session(&fpid)?; let (_sid, fid) = global.group.read().await.get(&fpid)?;
let db_key = global.own.read().await.db_key(&pid)?; let db_key = global.own.read().await.db_key(&pid)?;
let db = chat_db(&global.base, &pid, &db_key)?; let db = chat_db(&global.base, &pid, &db_key)?;
@ -283,12 +262,12 @@ impl LayerEvent {
update_session(&s_db, &fid, &msg, &mut results); update_session(&s_db, &fid, &msg, &mut results);
} }
} }
LayerEvent::InfoReq(height) => { GroupEvent::InfoReq(height) => {
// check sync remote height. // check sync remote height.
let a_db = account_db(&global.base, &global.secret)?; let a_db = account_db(&global.base, &global.secret)?;
let account = Account::get(&a_db, &pid)?; let account = Account::get(&a_db, &pid)?;
if account.pub_height > height { if account.pub_height > height {
let info = LayerEvent::InfoRes(User::info( let info = GroupEvent::InfoRes(User::info(
account.pub_height, account.pub_height,
account.name, account.name,
account.wallet, account.wallet,
@ -298,11 +277,11 @@ impl LayerEvent {
)); ));
let data = bincode::serialize(&info).unwrap_or(vec![]); let data = bincode::serialize(&info).unwrap_or(vec![]);
let msg = SendType::Event(0, fpid, data); let msg = SendType::Event(0, fpid, data);
results.layers.push((ESSE_ID, msg)); results.groups.push(msg);
} }
} }
LayerEvent::InfoRes(remote) => { GroupEvent::InfoRes(remote) => {
let (sid, fid) = global.layer.read().await.chat_session(&fpid)?; let (sid, fid) = global.group.read().await.get(&fpid)?;
let db_key = global.own.read().await.db_key(&pid)?; let db_key = global.own.read().await.db_key(&pid)?;
let db = chat_db(&global.base, &pid, &db_key)?; let db = chat_db(&global.base, &pid, &db_key)?;
@ -322,12 +301,12 @@ impl LayerEvent {
let _ = Session::update_name(&s_db, &sid, &name); let _ = Session::update_name(&s_db, &sid, &name);
results.rpcs.push(session_update_name(&sid, &name)); results.rpcs.push(session_update_name(&sid, &name));
} }
LayerEvent::Close => { GroupEvent::Close => {
let mut layer = global.layer.write().await; let mut group = global.group.write().await;
let _ = layer.chat_rm_online(&fpid); group.rm_online(&fpid);
let (sid, fid) = global.layer.read().await.chat_session(&fpid)?; let (sid, fid) = group.get(&fpid)?;
let keep = layer.is_addr_online(&fpid); let keep = group.is_online(&fpid);
drop(layer); drop(group);
let db_key = global.own.read().await.db_key(&pid)?; let db_key = global.own.read().await.db_key(&pid)?;
let db = chat_db(&global.base, &pid, &db_key)?; let db = chat_db(&global.base, &pid, &db_key)?;
@ -336,7 +315,7 @@ impl LayerEvent {
drop(db); drop(db);
results.rpcs.push(rpc::friend_close(fid)); results.rpcs.push(rpc::friend_close(fid));
if !keep { if !keep {
results.layers.push((ESSE_ID, SendType::Disconnect(fpid))) results.groups.push(SendType::Disconnect(fpid))
} }
// TODO close session // TODO close session
} }
@ -346,10 +325,10 @@ impl LayerEvent {
} }
} }
pub(crate) fn chat_conn(pid: PeerId, results: &mut HandleResult) { pub(crate) fn group_conn(pid: PeerId, results: &mut HandleResult) {
results results
.layers .groups
.push((ESSE_ID, SendType::Connect(0, Peer::peer(pid), vec![]))); .push(SendType::Connect(0, Peer::peer(pid), vec![]));
} }
// UPDATE SESSION. // UPDATE SESSION.

212
src/group/mod.rs

@ -0,0 +1,212 @@
use esse_primitives::{MessageType, NetworkMessage};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tdn::types::{
group::EventId,
message::{RecvType, SendType},
primitives::{DeliveryType, HandleResult, Peer, PeerId, Result},
};
use tdn_storage::local::DStorage;
use crate::account::{Account, User};
use crate::global::Global;
use crate::rpc::{
notice_menu, session_connect, session_create, session_last, session_lost, session_suspend,
session_update_name,
};
use crate::session::{connect_session, Session, SessionType};
use crate::storage::{account_db, chat_db, session_db, write_avatar_sync};
mod handle;
mod models;
mod rpc;
pub(crate) use handle::{group_conn, group_handle, update_session};
pub(crate) use models::{
from_model, from_network_message, handle_nmsg, raw_to_network_message, to_network_message,
Friend, InviteType, Message, Request,
};
pub(crate) use rpc::group_rpc;
/// ESSE groups.
pub(crate) struct Group {
/// friend pid => Session
pub sessions: HashMap<PeerId, GroupSession>,
/// delivery feedback.
pub delivery: HashMap<u64, i64>,
/// delivery counter.
delivery_count: usize,
}
/// online connected layer session.
pub(crate) struct GroupSession {
/// consensus height.
pub height: i64,
/// session database id.
pub sid: i64,
/// friend database id.
pub fid: i64,
/// if session is suspend by me.
pub suspend_me: bool,
/// if session is suspend by remote.
pub suspend_remote: bool,
/// keep alive remain minutes.
pub remain: u16,
}
/// ESSE group Event (Chat).
#[derive(Serialize, Deserialize)]
pub(crate) enum GroupEvent {
/// offline. extend BaseGroupEvent.
Offline,
/// suspend. extend BaseGroupEvent.
Suspend,
/// actived. extend BaseGroupEvent.
Actived,
/// make friendship request.
/// params is name, remark.
Request(String, String),
/// agree friendship request.
/// params is gid.
Agree,
/// reject friendship request.
Reject,
/// receiver gid, sender gid, message.
Message(EventId, NetworkMessage),
/// request user info.
InfoReq(u64),
/// user full info.
InfoRes(User),
/// close friendship.
Close,
}
impl Group {
pub fn init() -> Group {
Group {
sessions: HashMap::new(),
delivery: HashMap::new(),
delivery_count: 0,
}
}
pub fn delivery(&mut self, db_id: i64) -> u64 {
let next = self.delivery_count as u64;
self.delivery.insert(next, db_id);
self.delivery_count += 1;
next
}
pub fn clear(&mut self) {
self.sessions.clear();
self.delivery.clear();
self.delivery_count = 0;
}
pub fn add(&mut self, pid: PeerId, sid: i64, fid: i64, h: i64) {
self.sessions
.entry(pid)
.and_modify(|s| {
s.sid = sid;
s.fid = fid;
s.height = h;
})
.or_insert(GroupSession::new(sid, fid, h));
}
pub fn get(&self, pid: &PeerId) -> Result<(i64, i64)> {
if let Some(session) = self.sessions.get(pid) {
Ok((session.sid, session.fid))
} else {
Err(anyhow!("session missing!"))
}
}
pub fn is_online(&self, pid: &PeerId) -> bool {
self.sessions.contains_key(pid)
}
pub fn rm_online(&mut self, pid: &PeerId) -> bool {
if self.sessions.contains_key(pid) {
self.sessions.remove(pid);
true
} else {
false
}
}
pub fn active(&mut self, pid: &PeerId, is_me: bool) -> Result<()> {
if let Some(session) = self.sessions.get_mut(pid) {
Ok(session.active(is_me))
} else {
Err(anyhow!("session missing!"))
}
}
pub fn suspend(&mut self, pid: &PeerId, me: bool, m: bool) -> Result<()> {
if let Some(session) = self.sessions.get_mut(pid) {
Ok(session.suspend(me, m))
} else {
Err(anyhow!("session missing!"))
}
}
pub fn broadcast(&self, user: User, results: &mut HandleResult) {
let info = GroupEvent::InfoRes(user);
let data = bincode::serialize(&info).unwrap_or(vec![]);
for fpid in self.sessions.keys() {
let msg = SendType::Event(0, *fpid, data.clone());
results.groups.push(msg);
}
}
}
impl GroupSession {
fn new(sid: i64, fid: i64, height: i64) -> Self {
Self {
sid,
fid,
height,
suspend_me: false,
suspend_remote: false,
remain: 0,
}
}
pub fn info(&self) -> (i64, i64, i64) {
(self.height, self.sid, self.fid)
}
pub fn increased(&mut self) -> i64 {
self.height += 1;
self.height
}
pub fn active(&mut self, is_me: bool) {
if is_me {
self.suspend_me = false;
} else {
self.suspend_remote = false;
}
self.remain = 0;
}
pub fn suspend(&mut self, is_me: bool, must: bool) {
if must {
self.suspend_me = true;
self.suspend_remote = true;
}
if is_me {
self.suspend_me = true;
} else {
self.suspend_remote = true;
}
if self.suspend_remote && self.suspend_me {
self.remain = 6; // keep-alive 10~11 minutes 120s/time
}
}
}

0
src/apps/chat/models.rs → src/group/models.rs

0
src/apps/chat/models/friend.rs → src/group/models/friend.rs

0
src/apps/chat/models/message.rs → src/group/models/message.rs

0
src/apps/chat/models/request.rs → src/group/models/request.rs

65
src/apps/chat/rpc.rs → src/group/rpc.rs

@ -1,4 +1,4 @@
use esse_primitives::{id_from_str, MessageType, ESSE_ID}; use esse_primitives::{id_from_str, MessageType};
use std::sync::Arc; use std::sync::Arc;
use tdn::types::{ use tdn::types::{
message::SendType, message::SendType,
@ -11,8 +11,7 @@ use crate::global::Global;
use crate::rpc::session_create; use crate::rpc::session_create;
use crate::storage::{chat_db, delete_avatar, session_db}; use crate::storage::{chat_db, delete_avatar, session_db};
use super::layer::{update_session, LayerEvent}; use super::{raw_to_network_message, update_session, Friend, GroupEvent, Message, Request};
use super::{raw_to_network_message, Friend, Message, Request};
#[inline] #[inline]
pub(crate) fn friend_info(friend: &Friend) -> RpcParam { pub(crate) fn friend_info(friend: &Friend) -> RpcParam {
@ -101,7 +100,7 @@ fn detail_list(friend: Friend, messages: Vec<Message>) -> RpcParam {
json!([friend.to_rpc(), message_results]) json!([friend.to_rpc(), message_results])
} }
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) { pub(crate) fn group_rpc(handler: &mut RpcHandler<Global>) {
handler.add_method("chat-echo", |params, _| async move { handler.add_method("chat-echo", |params, _| async move {
Ok(HandleResult::rpc(json!(params))) Ok(HandleResult::rpc(json!(params)))
}); });
@ -118,10 +117,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let friends = Friend::list(&db)?; let friends = Friend::list(&db)?;
let mut results = vec![]; let mut results = vec![];
let layer_lock = state.layer.read().await; let group_lock = state.group.read().await;
if need_online { if need_online {
for friend in friends { for friend in friends {
let online = layer_lock.chat_is_online(&friend.pid); let online = group_lock.is_online(&friend.pid);
results.push(friend.to_rpc_online(online)); results.push(friend.to_rpc_online(online));
} }
} else { } else {
@ -129,7 +128,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
results.push(friend.to_rpc()); results.push(friend.to_rpc());
} }
} }
drop(layer_lock); drop(group_lock);
Ok(HandleResult::rpc(json!(results))) Ok(HandleResult::rpc(json!(results)))
}, },
@ -177,16 +176,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
friend.close(&db)?; friend.close(&db)?;
drop(db); drop(db);
let online = state.layer.write().await.chat_rm_online(&friend.pid); let online = state.group.write().await.rm_online(&friend.pid);
if let Some(faddr) = online { if online {
let data = bincode::serialize(&LayerEvent::Close)?; let data = bincode::serialize(&GroupEvent::Close)?;
results results.groups.push(SendType::Event(0, friend.pid, data));
.layers results.groups.push(SendType::Disconnect(friend.pid));
.push((ESSE_ID, SendType::Event(0, friend.pid, data)));
results
.layers
.push((ESSE_ID, SendType::Disconnect(friend.pid)));
} }
// state.own.write().await.broadcast( // state.own.write().await.broadcast(
@ -215,18 +209,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
Friend::delete(&db, &id)?; Friend::delete(&db, &id)?;
drop(db); drop(db);
let online = state.layer.write().await.chat_rm_online(&friend.pid); let online = state.group.write().await.rm_online(&friend.pid);
delete_avatar(&state.base, &pid, &friend.pid).await?; delete_avatar(&state.base, &pid, &friend.pid).await?;
if let Some(faddr) = online { if online {
let data = bincode::serialize(&LayerEvent::Close)?; let data = bincode::serialize(&GroupEvent::Close)?;
results results.groups.push(SendType::Event(0, friend.pid, data));
.layers results.groups.push(SendType::Disconnect(friend.pid));
.push((ESSE_ID, SendType::Event(0, friend.pid, data)));
results
.layers
.push((ESSE_ID, SendType::Disconnect(friend.pid)));
} }
// state.own.write().await.broadcast( // state.own.write().await.broadcast(
@ -282,10 +271,9 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let mut results = HandleResult::rpc(json!(request.to_rpc())); let mut results = HandleResult::rpc(json!(request.to_rpc()));
let name = state.own.read().await.account(&pid)?.name.clone(); let name = state.own.read().await.account(&pid)?.name.clone();
let req = LayerEvent::Request(name, request.remark); let req = GroupEvent::Request(name, request.remark);
let data = bincode::serialize(&req).unwrap_or(vec![]); let data = bincode::serialize(&req).unwrap_or(vec![]);
let msg = SendType::Event(0, request.pid, data); results.groups.push(SendType::Event(0, request.pid, data));
results.layers.push((ESSE_ID, msg));
Ok(results) Ok(results)
}, },
@ -330,9 +318,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
session.insert(&s_db)?; session.insert(&s_db)?;
results.rpcs.push(session_create(&session)); results.rpcs.push(session_create(&session));
let data = bincode::serialize(&LayerEvent::Agree).unwrap_or(vec![]); let data = bincode::serialize(&GroupEvent::Agree).unwrap_or(vec![]);
let msg = SendType::Event(0, friend.pid, data); results.groups.push(SendType::Event(0, friend.pid, data));
results.layers.push((ESSE_ID, msg));
Ok(results) Ok(results)
}, },
@ -353,9 +340,9 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
req.update(&db)?; req.update(&db)?;
drop(db); drop(db);
let data = bincode::serialize(&LayerEvent::Reject).unwrap_or(vec![]); let data = bincode::serialize(&GroupEvent::Reject).unwrap_or(vec![]);
let msg = SendType::Event(0, req.pid, data); let msg = SendType::Event(0, req.pid, data);
let mut results = HandleResult::layer(ESSE_ID, msg); let mut results = HandleResult::group(msg);
// state.own.write().await.broadcast( // state.own.write().await.broadcast(
// &gid, // &gid,
@ -449,12 +436,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let mut results = HandleResult::rpc(json!(msg.to_rpc())); let mut results = HandleResult::rpc(json!(msg.to_rpc()));
let tid = state.layer.write().await.delivery(msg.id); let tid = state.group.write().await.delivery(msg.id);
let event = LayerEvent::Message(msg.hash, nm); let event = GroupEvent::Message(msg.hash, nm);
let data = bincode::serialize(&event).unwrap_or(vec![]); let data = bincode::serialize(&event).unwrap_or(vec![]);
results results.groups.push(SendType::Event(tid, fpid, data));
.layers
.push((ESSE_ID, SendType::Event(tid, fpid, data)));
// UPDATE SESSION. // UPDATE SESSION.
let s_db = session_db(&state.base, &pid, &db_key)?; let s_db = session_db(&state.base, &pid, &db_key)?;

80
src/layer.rs

@ -1,4 +1,4 @@
use esse_primitives::{id_to_str, ESSE_ID}; use esse_primitives::id_to_str;
use group_types::GroupChatId; use group_types::GroupChatId;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
@ -11,15 +11,13 @@ use tdn::types::{
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::account::User; use crate::account::User;
use crate::apps::chat::LayerEvent as ChatLayerEvent; use crate::group::GroupEvent;
//use crate::apps::group::{group_conn, GROUP_ID}; //use crate::apps::group::{group_conn, GROUP_ID};
use crate::own::Own; use crate::own::Own;
use crate::session::{Session, SessionType}; use crate::session::{Session, SessionType};
/// ESSE layers. /// ESSE layers.
pub(crate) struct Layer { pub(crate) struct Layer {
/// friend pid => Session
pub chats: HashMap<PeerId, LayerSession>,
/// group chat id => Session /// group chat id => Session
pub groups: HashMap<GroupChatId, LayerSession>, pub groups: HashMap<GroupChatId, LayerSession>,
/// delivery feedback. /// delivery feedback.
@ -31,7 +29,6 @@ pub(crate) struct Layer {
impl Layer { impl Layer {
pub fn init() -> Layer { pub fn init() -> Layer {
Layer { Layer {
chats: HashMap::new(),
groups: HashMap::new(), groups: HashMap::new(),
delivery: HashMap::new(), delivery: HashMap::new(),
delivery_count: 0, delivery_count: 0,
@ -46,30 +43,18 @@ impl Layer {
} }
pub fn clear(&mut self) { pub fn clear(&mut self) {
self.chats.clear();
self.groups.clear(); self.groups.clear();
self.delivery.clear(); self.delivery.clear();
} }
pub fn is_addr_online(&self, addr: &PeerId) -> bool { pub fn is_addr_online(&self, addr: &PeerId) -> bool {
if self.chats.contains_key(addr) { for (_, session) in &self.groups {
return true; if session.addrs.contains(addr) {
} else { return true;
for (_, session) in &self.groups {
if session.addrs.contains(addr) {
return true;
}
} }
} }
false
}
pub fn chat_active(&mut self, pid: &PeerId, is_me: bool) -> Option<PeerId> { false
if let Some(session) = self.chats.get_mut(pid) {
Some(session.active(is_me))
} else {
None
}
} }
pub fn group_active(&mut self, gid: &GroupChatId, is_me: bool) -> Option<PeerId> { pub fn group_active(&mut self, gid: &GroupChatId, is_me: bool) -> Option<PeerId> {
@ -80,14 +65,6 @@ impl Layer {
} }
} }
pub fn chat_suspend(&mut self, pid: &PeerId, me: bool, m: bool) -> Result<Option<PeerId>> {
if let Some(session) = self.chats.get_mut(pid) {
Ok(session.suspend(me, m))
} else {
Err(anyhow!("session missing!"))
}
}
pub fn group_suspend(&mut self, g: &GroupChatId, me: bool, m: bool) -> Result<Option<PeerId>> { pub fn group_suspend(&mut self, g: &GroupChatId, me: bool, m: bool) -> Result<Option<PeerId>> {
if let Some(session) = self.groups.get_mut(g) { if let Some(session) = self.groups.get_mut(g) {
Ok(session.suspend(me, m)) Ok(session.suspend(me, m))
@ -96,28 +73,6 @@ impl Layer {
} }
} }
pub fn chat_is_online(&self, pid: &PeerId) -> bool {
self.chats.contains_key(pid)
}
pub fn chat_rm_online(&mut self, pid: &PeerId) -> Option<PeerId> {
self.chats.remove(pid).map(|session| session.addrs[0])
}
pub fn chat_add(&mut self, pid: PeerId, sid: i64, fid: i64, h: i64) {
if !self.chats.contains_key(&pid) {
self.chats.insert(pid, LayerSession::new(pid, sid, fid, h));
}
}
pub fn chat_session(&self, pid: &PeerId) -> Result<(i64, i64)> {
if let Some(session) = self.chats.get(pid) {
Ok((session.s_id, session.db_id))
} else {
Err(anyhow!("session missing!"))
}
}
pub fn group(&self, gid: &GroupChatId) -> Result<&LayerSession> { pub fn group(&self, gid: &GroupChatId) -> Result<&LayerSession> {
if let Some(session) = self.groups.get(gid) { if let Some(session) = self.groups.get(gid) {
Ok(session) Ok(session)
@ -211,22 +166,22 @@ impl Layer {
// pub async fn all_layer_conns(&self) -> Result<HashMap<GroupId, Vec<(GroupId, SendType)>>> { // pub async fn all_layer_conns(&self) -> Result<HashMap<GroupId, Vec<(GroupId, SendType)>>> {
// let mut conns = HashMap::new(); // let mut conns = HashMap::new();
// let group_lock = self.group.read().await; // let own_lock = self.group.read().await;
// for mgid in self.runnings.keys() { // for mgid in self.runnings.keys() {
// let mut vecs = vec![]; // let mut vecs = vec![];
// let db = group_lock.session_db(&mgid)?; // let db = own_lock.session_db(&mgid)?;
// let sessions = Session::list(&db)?; // let sessions = Session::list(&db)?;
// drop(db); // drop(db);
// for s in sessions { // for s in sessions {
// match s.s_type { // match s.s_type {
// SessionType::Chat => { // SessionType::Chat => {
// let proof = group_lock.prove_addr(mgid, &s.addr)?; // let proof = own_lock.prove_addr(mgid, &s.addr)?;
// vecs.push((s.gid, chat_conn(proof, Peer::peer(s.addr)))); // vecs.push((s.gid, chat_conn(proof, Peer::peer(s.addr))));
// } // }
// SessionType::Group => { // SessionType::Group => {
// let proof = group_lock.prove_addr(mgid, &s.addr)?; // let proof = own_lock.prove_addr(mgid, &s.addr)?;
// vecs.push((GROUP_ID, group_conn(proof, Peer::peer(s.addr), s.gid))); // vecs.push((GROUP_ID, group_conn(proof, Peer::peer(s.addr), s.gid)));
// } // }
// _ => {} // _ => {}
@ -256,17 +211,12 @@ impl Layer {
// } // }
// } // }
pub fn broadcast(&self, user: User, results: &mut HandleResult) { // pub fn broadcast(&self, user: User, results: &mut HandleResult) {
let info = ChatLayerEvent::InfoRes(user); // let info = GroupEvent::InfoRes(user);
let data = bincode::serialize(&info).unwrap_or(vec![]); // let data = bincode::serialize(&info).unwrap_or(vec![]);
for fpid in self.chats.keys() { // // TODO GROUPS
let msg = SendType::Event(0, *fpid, data.clone()); // }
results.layers.push((ESSE_ID, msg));
}
// TODO GROUPS
}
} }
// pub(crate) struct OnlineSession { // pub(crate) struct OnlineSession {

1
src/lib.rs

@ -12,6 +12,7 @@ mod apps;
//mod consensus; //mod consensus;
//mod event; //mod event;
mod global; mod global;
mod group;
mod layer; mod layer;
mod migrate; mod migrate;
mod own; mod own;

0
src/migrate.rs → src/migrate/mod.rs

0
src/own.rs → src/own/mod.rs

58
src/rpc.rs

@ -1,4 +1,4 @@
use esse_primitives::{id_from_str, id_to_str, ESSE_ID}; use esse_primitives::{id_from_str, id_to_str};
use group_types::{GroupChatId, LayerEvent as GroupLayerEvent, GROUP_CHAT_ID}; use group_types::{GroupChatId, LayerEvent as GroupLayerEvent, GROUP_CHAT_ID};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -16,15 +16,20 @@ use tdn_did::{generate_mnemonic, Count};
use crate::account::lang_from_i64; use crate::account::lang_from_i64;
use crate::apps::app_rpc_inject; use crate::apps::app_rpc_inject;
use crate::apps::chat::{chat_conn, LayerEvent as ChatLayerEvent}; use crate::apps::group::{group_conn as group_chat_conn, GroupChat};
use crate::apps::group::{group_conn, GroupChat};
use crate::global::Global; use crate::global::Global;
use crate::group::{group_conn, group_rpc, GroupEvent};
//use crate::event::InnerEvent; //use crate::event::InnerEvent;
use crate::session::{connect_session, Session, SessionType}; use crate::session::{connect_session, Session, SessionType};
use crate::storage::{group_db, session_db}; use crate::storage::{group_db, session_db};
pub(crate) fn init_rpc(global: Arc<Global>) -> RpcHandler<Global> { pub(crate) fn init_rpc(global: Arc<Global>) -> RpcHandler<Global> {
let mut handler = new_rpc_handler(global); let mut handler = new_rpc_handler(global);
// inject group rpcs
group_rpc(&mut handler);
// inject layers rpcs
app_rpc_inject(&mut handler); app_rpc_inject(&mut handler);
handler handler
} }
@ -174,15 +179,15 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
handler.add_method("account-list", |_, state: Arc<Global>| async move { handler.add_method("account-list", |_, state: Arc<Global>| async move {
let mut accounts: Vec<Vec<String>> = vec![]; let mut accounts: Vec<Vec<String>> = vec![];
let group_lock = state.own.read().await; let own_lock = state.own.read().await;
for (pid, account) in group_lock.list_accounts().iter() { for (pid, account) in own_lock.list_accounts().iter() {
accounts.push(vec![ accounts.push(vec![
id_to_str(pid), id_to_str(pid),
account.name.clone(), account.name.clone(),
base64::encode(&account.avatar), base64::encode(&account.avatar),
]); ]);
} }
drop(group_lock); drop(own_lock);
Ok(HandleResult::rpc(json!(accounts))) Ok(HandleResult::rpc(json!(accounts)))
}); });
@ -270,21 +275,15 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
let avatar_bytes = base64::decode(avatar).unwrap_or(vec![]); let avatar_bytes = base64::decode(avatar).unwrap_or(vec![]);
let pid = state.pid().await; let pid = state.pid().await;
let mut group_lock = state.own.write().await; let mut own_lock = state.own.write().await;
group_lock.update_account( own_lock.update_account(pid, name, avatar_bytes.clone(), &state.base, &state.secret)?;
pid, drop(own_lock);
name,
avatar_bytes.clone(),
&state.base,
&state.secret,
)?;
drop(group_lock);
let results = HandleResult::new(); let results = HandleResult::new();
// TODO broadcast to all devices. // TODO broadcast to all devices.
//let user = group_lock.clone_user(&pid)?; //let user = own_lock.clone_user(&pid)?;
//group_lock.broadcast(&pid, &mut results)?; //own_lock.broadcast(&pid, &mut results)?;
// TODO broadcast to all layers. // TODO broadcast to all layers.
//state.layer.read().await.broadcast(user, &mut results); //state.layer.read().await.broadcast(user, &mut results);
@ -409,26 +408,23 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
let s = Session::get(&db, &id)?; let s = Session::get(&db, &id)?;
drop(db); drop(db);
let mut layer_lock = state.layer.write().await;
let mut results = HandleResult::new(); let mut results = HandleResult::new();
match s.s_type { match s.s_type {
SessionType::Chat => { SessionType::Chat => {
let remote_pid = id_from_str(remote)?; let remote_pid = id_from_str(remote)?;
let online = layer_lock.chat_active(&remote_pid, true); if state.group.write().await.active(&remote_pid, true).is_ok() {
if let Some(addr) = online { return Ok(HandleResult::rpc(json!([id, id_to_str(&remote_pid)])));
return Ok(HandleResult::rpc(json!([id, id_to_str(&addr)])));
} }
chat_conn(remote_pid, &mut results); group_conn(remote_pid, &mut results);
} }
SessionType::Group => { SessionType::Group => {
let remote_gid: GroupChatId = let remote_gid: GroupChatId =
remote.parse().map_err(|_| RpcError::ParseError)?; remote.parse().map_err(|_| RpcError::ParseError)?;
let online = layer_lock.group_active(&remote_gid, true); let online = state.layer.write().await.group_active(&remote_gid, true);
if let Some(addr) = online { if let Some(addr) = online {
return Ok(HandleResult::rpc(json!([id, id_to_str(&addr)]))); return Ok(HandleResult::rpc(json!([id, id_to_str(&addr)])));
} }
group_conn(s.addr, remote_gid, &mut results); group_chat_conn(s.addr, remote_gid, &mut results);
} }
_ => {} _ => {}
} }
@ -451,23 +447,23 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
drop(db); drop(db);
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let mut layer_lock = state.layer.write().await;
match s.s_type { match s.s_type {
SessionType::Chat => { SessionType::Chat => {
let remote_id = id_from_str(remote)?; let rid = id_from_str(remote)?;
if layer_lock.chat_suspend(&remote_id, true, must)?.is_some() { if state.group.write().await.suspend(&rid, true, must).is_ok() {
results.rpcs.push(json!([id])); results.rpcs.push(json!([id]));
} }
let data = bincode::serialize(&ChatLayerEvent::Suspend)?; let data = bincode::serialize(&GroupEvent::Suspend)?;
let msg = SendType::Event(0, remote_id, data); results.groups.push(SendType::Event(0, rid, data));
results.layers.push((ESSE_ID, msg));
} }
SessionType::Group => { SessionType::Group => {
let remote_gid: GroupChatId = let remote_gid: GroupChatId =
remote.parse().map_err(|_| RpcError::ParseError)?; remote.parse().map_err(|_| RpcError::ParseError)?;
let mut layer_lock = state.layer.write().await;
if layer_lock.group_suspend(&remote_gid, true, must)?.is_some() { if layer_lock.group_suspend(&remote_gid, true, must)?.is_some() {
results.rpcs.push(json!([id])); results.rpcs.push(json!([id]));
} }
drop(layer_lock);
let data = bincode::serialize(&GroupLayerEvent::Suspend(remote_gid))?; let data = bincode::serialize(&GroupLayerEvent::Suspend(remote_gid))?;
let msg = SendType::Event(0, s.addr, data); let msg = SendType::Event(0, s.addr, data);
results.layers.push((GROUP_CHAT_ID, msg)); results.layers.push((GROUP_CHAT_ID, msg));

7
src/server.rs

@ -20,6 +20,7 @@ use tdn_storage::local::DStorage;
use crate::account::Account; use crate::account::Account;
use crate::apps::app_layer_handle; use crate::apps::app_layer_handle;
use crate::global::Global; use crate::global::Global;
use crate::group::group_handle;
use crate::layer::Layer; use crate::layer::Layer;
use crate::migrate::{main_migrate, ACCOUNT_DB}; use crate::migrate::{main_migrate, ACCOUNT_DB};
use crate::own::{handle as own_handle, Own}; use crate::own::{handle as own_handle, Own};
@ -94,8 +95,10 @@ pub async fn start(db_path: String) -> Result<()> {
handle(handle_result, now_rpc_uid, true, &global).await; handle(handle_result, now_rpc_uid, true, &global).await;
} }
} }
ReceiveMessage::Group(_) => { ReceiveMessage::Group(g_msg) => {
warn!("ESSE has no Group Message!"); if let Ok(handle_result) = group_handle(g_msg, &global).await {
handle(handle_result, now_rpc_uid, true, &global).await;
}
} }
ReceiveMessage::Layer(fgid, tgid, l_msg) => { ReceiveMessage::Layer(fgid, tgid, l_msg) => {
if let Ok(handle_result) = app_layer_handle(fgid, tgid, l_msg, &global).await { if let Ok(handle_result) = app_layer_handle(fgid, tgid, l_msg, &global).await {

0
src/utils.rs → src/utils/mod.rs

Loading…
Cancel
Save