Browse Source

refactor layer's friend chat

pull/18/head
Sun 4 years ago
parent
commit
e0ce8586eb
  1. 130
      src/apps/chat/layer.rs
  2. 2
      src/apps/chat/mod.rs
  3. 17
      src/apps/chat/rpc.rs
  4. 13
      src/apps/group_chat/layer.rs
  5. 3
      src/apps/group_chat/mod.rs
  6. 17
      src/apps/group_chat/models.rs
  7. 12
      src/apps/group_chat/rpc.rs
  8. 27
      src/event.rs
  9. 216
      src/layer.rs
  10. 33
      src/rpc.rs
  11. 7
      src/server.rs

130
src/apps/chat/layer.rs

@ -50,19 +50,19 @@ pub(crate) enum LayerResponse {
//Service, //Service,
} }
/// Esse app's Event. /// ESSE chat layer Event.
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent { pub(crate) enum LayerEvent {
/// receiver gid, sender gid. as BaseLayerEvent.
OnlinePing,
/// receiver gid, sender gid. as BaseLayerEvent.
OnlinePong,
/// receiver gid, sender gid. as BaseLayerEvent.
Offline,
/// receiver gid, sender gid, message. /// receiver gid, sender gid, message.
Message(EventId, NetworkMessage), Message(EventId, NetworkMessage),
/// receiver gid, sender user. /// receiver gid, sender user.
Info(User), Info(User),
/// receiver gid, sender gid.
OnlinePing,
/// receiver gid, sender gid.
OnlinePong,
/// receiver gid, sender gid.
Offline,
/// close friendship. /// close friendship.
Close, Close,
} }
@ -83,19 +83,27 @@ pub(crate) async fn handle(
match request { match request {
LayerRequest::Connect(proof) => { LayerRequest::Connect(proof) => {
let fid = layer.get_remote_id(&mgid, &fgid)?; let friend = load_friend(&layer.base, &mgid, &fgid)?;
if friend.is_none() {
let data = postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]);
let msg = SendType::Result(0, addr, false, false, data);
results.layers.push((mgid, fgid, msg));
return Ok(results);
}
let f = friend.unwrap(); // safe.
// 1. check verify. // 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?; proof.verify(&fgid, &addr, &layer.addr)?;
// 2. online this group. // 2. online this group.
layer layer
.running_mut(&mgid)? .running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?; .check_add_online(fgid, Online::Direct(addr), f.id)?;
// 3. update remote addr. TODO // 3. update remote addr. TODO
let db = session_db(&layer.base, &mgid)?; let db = session_db(&layer.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?; Friend::addr_update(&db, f.id, &addr)?;
drop(db); drop(db);
// 4. online to UI. // 4. online to UI.
results.rpcs.push(rpc::friend_online(mgid, fid, addr)); results.rpcs.push(rpc::friend_online(mgid, f.id, addr));
// 5. connected. // 5. connected.
let msg = conn_res_message(&layer, &mgid, addr).await?; let msg = conn_res_message(&layer, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg)); results.layers.push((mgid, fgid, msg));
@ -106,8 +114,8 @@ pub(crate) async fn handle(
)?; )?;
} }
LayerRequest::Friend(remote, remark) => { LayerRequest::Friend(remote, remark) => {
let some_fid = layer.get_remote_id(&mgid, &fgid); let some_friend = load_friend(&layer.base, &mgid, &fgid)?;
if some_fid.is_err() { if some_friend.is_none() {
// check if exist request. // check if exist request.
let db = session_db(&layer.base, &mgid)?; let db = session_db(&layer.base, &mgid)?;
if let Some(req) = Request::get(&db, &remote.id)? { if let Some(req) = Request::get(&db, &remote.id)? {
@ -139,15 +147,23 @@ pub(crate) async fn handle(
results.rpcs.push(rpc::request_create(mgid, &request)); results.rpcs.push(rpc::request_create(mgid, &request));
return Ok(results); return Ok(results);
} }
let fid = some_fid.unwrap(); // safe checked. let mut friend = some_friend.unwrap(); // safe checked.
// already friendship & update. // already friendship & update.
// 1. online this group. // 1. online this group.
layer layer.running_mut(&mgid)?.check_add_online(
.running_mut(&mgid)? fgid,
.check_add_online(fgid, Online::Direct(addr))?; Online::Direct(addr),
friend.id,
)?;
// 2. update remote user. // 2. update remote user.
let mut friend = layer.update_friend(&mgid, fid, remote)?; friend.name = remote.name;
friend.addr = remote.addr;
let db = session_db(&layer.base, &mgid)?;
friend.remote_update(&db)?;
drop(db);
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
// 3. online to UI. // 3. online to UI.
friend.online = true; friend.online = true;
results.rpcs.push(rpc::friend_info(mgid, &friend)); results.rpcs.push(rpc::friend_info(mgid, &friend));
@ -223,11 +239,16 @@ pub(crate) async fn handle(
// 1. check verify. // 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?; proof.verify(&fgid, &addr, &layer.addr)?;
// 2. check has this remove. // 2. check has this remove.
let fid = layer.get_remote_id(&mgid, &fgid)?; let some_friend = load_friend(&layer.base, &mgid, &fgid)?;
if some_friend.is_none() {
return Ok(results);
}
let fid = some_friend.unwrap().id; // safe.
// 3. online this group. // 3. online this group.
layer layer
.running_mut(&mgid)? .running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?; .check_add_online(fgid, Online::Direct(addr), fid)?;
// 4. update remote addr. // 4. update remote addr.
let db = session_db(&layer.base, &mgid)?; let db = session_db(&layer.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?; Friend::addr_update(&db, fid, &addr)?;
@ -243,12 +264,14 @@ pub(crate) async fn handle(
LayerResponse::Agree(remote, proof) => { LayerResponse::Agree(remote, proof) => {
// 1. check verify. // 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?; proof.verify(&fgid, &addr, &layer.addr)?;
if let Ok(fid) = layer.get_remote_id(&mgid, &fgid) { if let Some(friend) = load_friend(&layer.base, &mgid, &fgid)? {
// already friendship. // already friendship.
layer layer.running_mut(&mgid)?.check_add_online(
.running_mut(&mgid)? fgid,
.check_add_online(fgid, Online::Direct(addr))?; Online::Direct(addr),
results.rpcs.push(rpc::friend_online(mgid, fid, addr)); friend.id,
)?;
results.rpcs.push(rpc::friend_online(mgid, friend.id, addr));
layer.group.write().await.status( layer.group.write().await.status(
&mgid, &mgid,
StatusEvent::SessionFriendOnline(fgid), StatusEvent::SessionFriendOnline(fgid),
@ -275,7 +298,6 @@ pub(crate) async fn handle(
let request_id = request.id; let request_id = request.id;
let friend = Friend::from_request(&db, request)?; let friend = Friend::from_request(&db, request)?;
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
layer.running_mut(&mgid)?.add_permissioned(fgid, friend.id);
results results
.rpcs .rpcs
.push(rpc::request_agree(mgid, request_id, &friend)); .push(rpc::request_agree(mgid, request_id, &friend));
@ -299,11 +321,16 @@ pub(crate) async fn handle(
// 1. check verify. // 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?; proof.verify(&fgid, &addr, &layer.addr)?;
// 2. check has this remove. // 2. check has this remove.
let fid = layer.get_remote_id(&mgid, &fgid)?; let some_friend = load_friend(&layer.base, &mgid, &fgid)?;
if some_friend.is_none() {
return Ok(results);
}
let fid = some_friend.unwrap().id; // safe.
// 3. online this group. // 3. online this group.
layer layer
.running_mut(&mgid)? .running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?; .check_add_online(fgid, Online::Direct(addr), fid)?;
// 4. update remote addr. // 4. update remote addr.
let db = session_db(&layer.base, &mgid)?; let db = session_db(&layer.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?; Friend::addr_update(&db, fid, &addr)?;
@ -322,12 +349,14 @@ pub(crate) async fn handle(
LayerResponse::Agree(remote, proof) => { LayerResponse::Agree(remote, proof) => {
// 1. check verify. // 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?; proof.verify(&fgid, &addr, &layer.addr)?;
if let Ok(fid) = layer.get_remote_id(&mgid, &fgid) { if let Some(friend) = load_friend(&layer.base, &mgid, &fgid)? {
// already friendship. // already friendship.
layer layer.running_mut(&mgid)?.check_add_online(
.running_mut(&mgid)? fgid,
.check_add_online(fgid, Online::Direct(addr))?; Online::Direct(addr),
results.rpcs.push(rpc::friend_online(mgid, fid, addr)); friend.id,
)?;
results.rpcs.push(rpc::friend_online(mgid, friend.id, addr));
layer.group.write().await.status( layer.group.write().await.status(
&mgid, &mgid,
StatusEvent::SessionFriendOnline(fgid), StatusEvent::SessionFriendOnline(fgid),
@ -354,7 +383,6 @@ pub(crate) async fn handle(
let request_id = request.id; let request_id = request.id;
let friend = Friend::from_request(&db, request)?; let friend = Friend::from_request(&db, request)?;
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?; write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
layer.running_mut(&mgid)?.add_permissioned(fgid, friend.id);
results results
.rpcs .rpcs
.push(rpc::request_agree(mgid, request_id, &friend)); .push(rpc::request_agree(mgid, request_id, &friend));
@ -432,7 +460,7 @@ impl LayerEvent {
) -> Result<HandleResult> { ) -> Result<HandleResult> {
let event: LayerEvent = let event: LayerEvent =
postcard::from_bytes(&bytes).map_err(|_| new_io_error("serialize event error."))?; postcard::from_bytes(&bytes).map_err(|_| new_io_error("serialize event error."))?;
let fid = layer.get_remote_id(&mgid, &fgid)?; let fid = layer.get_running_remote_id(&mgid, &fgid)?;
let mut results = HandleResult::new(); let mut results = HandleResult::new();
@ -453,7 +481,14 @@ impl LayerEvent {
} }
LayerEvent::Info(remote) => { LayerEvent::Info(remote) => {
let avatar = remote.avatar.clone(); let avatar = remote.avatar.clone();
let f = layer.update_friend(&mgid, fid, remote)?; let db = session_db(&layer.base, &mgid)?;
let mut f = Friend::get_id(&db, fid)?.ok_or(new_io_error(""))?;
f.name = remote.name;
f.addr = remote.addr;
f.remote_update(&db)?;
drop(db);
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
layer.group.write().await.broadcast( layer.group.write().await.broadcast(
&mgid, &mgid,
InnerEvent::SessionFriendInfo(f.gid, f.addr, f.name.clone(), avatar), InnerEvent::SessionFriendInfo(f.gid, f.addr, f.name.clone(), avatar),
@ -471,7 +506,7 @@ impl LayerEvent {
)?; )?;
layer layer
.running_mut(&mgid)? .running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?; .check_add_online(fgid, Online::Direct(addr), fid)?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr)); results.rpcs.push(rpc::friend_online(mgid, fid, addr));
let data = postcard::to_allocvec(&LayerEvent::OnlinePong).unwrap_or(vec![]); let data = postcard::to_allocvec(&LayerEvent::OnlinePong).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data); let msg = SendType::Event(0, addr, data);
@ -485,7 +520,7 @@ impl LayerEvent {
)?; )?;
layer layer
.running_mut(&mgid)? .running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?; .check_add_online(fgid, Online::Direct(addr), fid)?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr)); results.rpcs.push(rpc::friend_online(mgid, fid, addr));
} }
LayerEvent::Offline => { LayerEvent::Offline => {
@ -505,7 +540,7 @@ impl LayerEvent {
fid, fid,
&mut results, &mut results,
)?; )?;
layer.remove_friend(&mgid, &fgid); layer.remove_online(&mgid, &fgid);
let db = session_db(&layer.base, &mgid)?; let db = session_db(&layer.base, &mgid)?;
Friend::id_close(&db, fid)?; Friend::id_close(&db, fid)?;
drop(db); drop(db);
@ -593,6 +628,12 @@ impl LayerEvent {
} }
} }
#[inline]
fn load_friend(base: &PathBuf, mgid: &GroupId, fgid: &GroupId) -> Result<Option<Friend>> {
let db = session_db(base, mgid)?;
Friend::get(&db, fgid)
}
pub(super) fn req_message(layer: &mut Layer, me: User, request: Request) -> SendType { pub(super) fn req_message(layer: &mut Layer, me: User, request: Request) -> SendType {
// update delivery. // update delivery.
let uid = layer.delivery.len() as u64 + 1; let uid = layer.delivery.len() as u64 + 1;
@ -627,14 +668,9 @@ pub(super) fn event_message(
SendType::Event(uid, addr, data) SendType::Event(uid, addr, data)
} }
pub(crate) async fn conn_req_message( pub(crate) fn chat_conn(proof: Proof, addr: PeerAddr) -> SendType {
layer: &Layer,
mgid: &GroupId,
addr: PeerAddr,
) -> Result<SendType> {
let proof = layer.group.read().await.prove_addr(mgid, &addr)?;
let data = postcard::to_allocvec(&LayerRequest::Connect(proof)).unwrap_or(vec![]); let data = postcard::to_allocvec(&LayerRequest::Connect(proof)).unwrap_or(vec![]);
Ok(SendType::Connect(0, addr, None, None, data)) SendType::Connect(0, addr, None, None, data)
} }
async fn conn_res_message(layer: &Layer, mgid: &GroupId, addr: PeerAddr) -> Result<SendType> { async fn conn_res_message(layer: &Layer, mgid: &GroupId, addr: PeerAddr) -> Result<SendType> {
@ -674,6 +710,6 @@ pub(super) fn rpc_agree_message(
} }
// maybe need if gid or addr in blocklist. // maybe need if gid or addr in blocklist.
fn _res_reject() -> Vec<u8> { fn res_reject() -> Vec<u8> {
postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]) postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![])
} }

2
src/apps/chat/mod.rs

@ -2,7 +2,7 @@ mod layer;
mod models; mod models;
pub(crate) mod rpc; pub(crate) mod rpc;
pub(crate) use layer::conn_req_message; pub(crate) use layer::chat_conn;
pub(crate) use layer::handle as layer_handle; pub(crate) use layer::handle as layer_handle;
pub(crate) use layer::LayerEvent; pub(crate) use layer::LayerEvent;
pub(crate) use models::{Friend, Message, MessageType, NetworkMessage, Request}; pub(crate) use models::{Friend, Message, MessageType, NetworkMessage, Request};

17
src/apps/chat/rpc.rs

@ -122,7 +122,17 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"chat-friend-list", "chat-friend-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move { |gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let friends = state.layer.read().await.all_friends_with_online(&gid)?; let layer_lock = state.layer.read().await;
let db = session_db(&layer_lock.base, &gid)?;
let mut friends = Friend::all(&db)?;
drop(db);
let gids: Vec<&GroupId> = friends.iter().map(|f| &f.gid).collect();
let onlines = layer_lock.merge_online(&gid, gids)?;
for (index, online) in onlines.iter().enumerate() {
friends[index].online = *online;
}
Ok(HandleResult::rpc(friend_list(friends))) Ok(HandleResult::rpc(friend_list(friends)))
}, },
); );
@ -182,7 +192,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
friend.close(&db)?; friend.close(&db)?;
drop(db); drop(db);
let online = layer_lock.remove_friend(&gid, &friend.gid); let online = layer_lock.remove_online(&gid, &friend.gid);
drop(layer_lock); drop(layer_lock);
if let Some(faddr) = online { if let Some(faddr) = online {
@ -222,7 +232,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
friend.delete(&db)?; friend.delete(&db)?;
drop(db); drop(db);
let online = layer_lock.remove_friend(&gid, &friend.gid); let online = layer_lock.remove_online(&gid, &friend.gid);
delete_avatar(layer_lock.base(), &gid, &friend.gid).await?; delete_avatar(layer_lock.base(), &gid, &friend.gid).await?;
drop(layer_lock); drop(layer_lock);
@ -346,7 +356,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
request.update(&db)?; request.update(&db)?;
let f = Friend::from_request(&db, request)?; let f = Friend::from_request(&db, request)?;
layer_lock.running_mut(&gid)?.add_permissioned(f.gid, f.id);
results.rpcs.push(json!([id, f.to_rpc()])); results.rpcs.push(json!([id, f.to_rpc()]));
let proof = group_lock.prove_addr(&gid, &f.addr)?; let proof = group_lock.prove_addr(&gid, &f.addr)?;

13
src/apps/group_chat/layer.rs

@ -3,12 +3,13 @@ use tdn::{
smol::lock::RwLock, smol::lock::RwLock,
types::{ types::{
group::GroupId, group::GroupId,
message::RecvType, message::{RecvType, SendType},
primitive::{new_io_error, HandleResult, Result}, primitive::{new_io_error, HandleResult, PeerAddr, Result},
}, },
}; };
use group_chat_types::GroupResult; use group_chat_types::{GroupConnect, GroupResult, JoinProof};
use tdn_did::Proof;
//use group_chat_types::{Event, GroupConnect, GroupEvent, GroupInfo, GroupResult, GroupType}; //use group_chat_types::{Event, GroupConnect, GroupEvent, GroupInfo, GroupResult, GroupType};
use crate::layer::Layer; use crate::layer::Layer;
@ -72,3 +73,9 @@ pub(crate) async fn handle(
Ok(results) Ok(results)
} }
pub(crate) fn group_chat_conn(proof: Proof, addr: PeerAddr, gid: GroupId, height: u64) -> SendType {
let data = postcard::to_allocvec(&GroupConnect::Join(gid, JoinProof::Had(proof), height))
.unwrap_or(vec![]);
SendType::Connect(0, addr, None, None, data)
}

3
src/apps/group_chat/mod.rs

@ -6,11 +6,12 @@ use tdn::types::{group::GroupId, message::SendType, primitive::HandleResult};
/// Group chat server to ESSE. /// Group chat server to ESSE.
#[inline] #[inline]
pub(super) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) { pub(crate) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) {
results.layers.push((gid, GROUP_ID, msg)); results.layers.push((gid, GROUP_ID, msg));
} }
pub(crate) use models::GroupChat; pub(crate) use models::GroupChat;
pub(crate) mod rpc; pub(crate) mod rpc;
pub(crate) use layer::group_chat_conn;
pub(crate) use layer::handle as layer_handle; pub(crate) use layer::handle as layer_handle;
pub(crate) use rpc::new_rpc_handler; pub(crate) use rpc::new_rpc_handler;

17
src/apps/group_chat/models.rs

@ -59,7 +59,7 @@ pub(crate) struct GroupChat {
/// group chat type. /// group chat type.
g_type: GroupType, g_type: GroupType,
/// group chat server addresse. /// group chat server addresse.
g_addr: PeerAddr, pub g_addr: PeerAddr,
/// group chat name. /// group chat name.
g_name: String, g_name: String,
/// group chat simple intro. /// group chat simple intro.
@ -213,6 +213,16 @@ impl GroupChat {
Ok(groups) Ok(groups)
} }
/// use in rpc when load account groups.
pub fn all_ok(db: &DStorage) -> Result<Vec<GroupChat>> {
let matrix = db.query("SELECT id, owner, gcd, gtype, addr, name, bio, is_top, is_ok, is_need_agree, is_closed, key, last_datetime, last_content, last_readed, datetime FROM groups WHERE is_closed = false ORDER BY last_datetime DESC")?;
let mut groups = vec![];
for values in matrix {
groups.push(GroupChat::from_values(values, false));
}
Ok(groups)
}
pub fn get(db: &DStorage, gid: &GroupId) -> Result<Option<GroupChat>> { pub fn get(db: &DStorage, gid: &GroupId) -> Result<Option<GroupChat>> {
let sql = format!("SELECT id, owner, gcd, gtype, addr, name, bio, is_top, is_ok, is_need_agree, is_closed, key, last_datetime, last_content, last_readed, datetime FROM groups WHERE gcd = '{}' AND is_deleted = false", gid.to_hex()); let sql = format!("SELECT id, owner, gcd, gtype, addr, name, bio, is_top, is_ok, is_need_agree, is_closed, key, last_datetime, last_content, last_readed, datetime FROM groups WHERE gcd = '{}' AND is_deleted = false", gid.to_hex());
let mut matrix = db.query(&sql)?; let mut matrix = db.query(&sql)?;
@ -223,6 +233,11 @@ impl GroupChat {
Ok(None) Ok(None)
} }
pub fn get_height(&self, db: &DStorage) -> Result<i64> {
// TODO
Ok(0)
}
pub fn insert(&mut self, db: &DStorage) -> Result<()> { pub fn insert(&mut self, db: &DStorage) -> Result<()> {
let sql = format!("INSERT INTO groups (owner, gcd, gtype, addr, name, bio, is_top, is_ok, is_need_agree, is_closed, key, last_datetime, last_content, last_readed, datetime, is_deleted) VALUES ('{}', '{}', {}, '{}', '{}', '{}', {}, {}, {}, {}, '{}', {}, '{}', {}, {}, false)", let sql = format!("INSERT INTO groups (owner, gcd, gtype, addr, name, bio, is_top, is_ok, is_need_agree, is_closed, key, last_datetime, last_content, last_readed, datetime, is_deleted) VALUES ('{}', '{}', {}, '{}', '{}', '{}', {}, {}, {}, {}, '{}', {}, '{}', {}, {}, false)",
self.owner.to_hex(), self.owner.to_hex(),

12
src/apps/group_chat/rpc.rs

@ -59,7 +59,17 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"group-chat-list", "group-chat-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move { |gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let groups = state.layer.read().await.all_groups_with_online(&gid)?; let layer_lock = state.layer.read().await;
let db = group_chat_db(&layer_lock.base, &gid)?;
let mut groups = GroupChat::all(&db)?;
drop(db);
let gids: Vec<&GroupId> = groups.iter().map(|g| &g.g_id).collect();
let onlines = layer_lock.merge_online(&gid, gids)?;
for (index, online) in onlines.iter().enumerate() {
groups[index].online = *online;
}
Ok(HandleResult::rpc(group_list(groups))) Ok(HandleResult::rpc(group_list(groups)))
}, },
); );

27
src/event.rs

@ -285,15 +285,6 @@ impl InnerEvent {
write_avatar_sync(group.base(), &gid, &request.gid, avatar)?; write_avatar_sync(group.base(), &gid, &request.gid, avatar)?;
} }
let friend = Friend::from_request(&db, request)?; let friend = Friend::from_request(&db, request)?;
let layer_lock = layer.clone();
let rfid = friend.id;
let ggid = gid.clone();
tdn::smol::spawn(async move {
if let Ok(running) = layer_lock.write().await.running_mut(&ggid) {
running.add_permissioned(rgid, rfid);
}
})
.detach();
results results
.rpcs .rpcs
.push(chat_rpc::request_agree(gid, rid, &friend)); .push(chat_rpc::request_agree(gid, rid, &friend));
@ -400,7 +391,7 @@ impl InnerEvent {
let ggid = gid.clone(); let ggid = gid.clone();
let sender = group.sender(); let sender = group.sender();
tdn::smol::spawn(async move { tdn::smol::spawn(async move {
let online = layer_lock.write().await.remove_friend(&ggid, &f.gid); let online = layer_lock.write().await.remove_online(&ggid, &f.gid);
if let Some(faddr) = online { if let Some(faddr) = online {
let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new(); let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new();
addrs.insert(faddr, f.gid); addrs.insert(faddr, f.gid);
@ -430,7 +421,7 @@ impl InnerEvent {
let ggid = gid.clone(); let ggid = gid.clone();
let sender = group.sender(); let sender = group.sender();
tdn::smol::spawn(async move { tdn::smol::spawn(async move {
let online = layer_lock.write().await.remove_friend(&ggid, &f.gid); let online = layer_lock.write().await.remove_online(&ggid, &f.gid);
if let Some(faddr) = online { if let Some(faddr) = online {
let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new(); let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new();
addrs.insert(faddr, f.gid); addrs.insert(faddr, f.gid);
@ -503,10 +494,11 @@ impl StatusEvent {
.push(chat_rpc::friend_online(gid, f.id, f.addr)); .push(chat_rpc::friend_online(gid, f.id, f.addr));
let layer_lock = layer.clone(); let layer_lock = layer.clone();
let rgid = f.gid; let rgid = f.gid;
let fid = f.id;
let ggid = gid.clone(); let ggid = gid.clone();
tdn::smol::spawn(async move { tdn::smol::spawn(async move {
if let Ok(running) = layer_lock.write().await.running_mut(&ggid) { if let Ok(running) = layer_lock.write().await.running_mut(&ggid) {
let _ = running.check_add_online(rgid, Online::Relay(addr)); let _ = running.check_add_online(rgid, Online::Relay(addr), fid);
} }
}) })
.detach(); .detach();
@ -795,15 +787,6 @@ impl SyncEvent {
write_avatar_sync(&base, &gid, &request.gid, avatar)?; write_avatar_sync(&base, &gid, &request.gid, avatar)?;
} }
let friend = Friend::from_request(&session_db, request)?; let friend = Friend::from_request(&session_db, request)?;
let layer_lock = layer.clone();
let rfid = friend.id;
let ggid = gid.clone();
tdn::smol::spawn(async move {
if let Ok(running) = layer_lock.write().await.running_mut(&ggid) {
running.add_permissioned(rgid, rfid);
}
})
.detach();
results results
.rpcs .rpcs
.push(chat_rpc::request_agree(gid, rid, &friend)); .push(chat_rpc::request_agree(gid, rid, &friend));
@ -854,7 +837,7 @@ impl SyncEvent {
let fgid = friend.gid; let fgid = friend.gid;
let sender = group.sender(); let sender = group.sender();
tdn::smol::spawn(async move { tdn::smol::spawn(async move {
let online = layer_lock.write().await.remove_friend(&ggid, &fgid); let online = layer_lock.write().await.remove_online(&ggid, &fgid);
if let Some(faddr) = online { if let Some(faddr) = online {
let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new(); let mut addrs: HashMap<PeerAddr, GroupId> = HashMap::new();
addrs.insert(faddr, fgid); addrs.insert(faddr, fgid);

216
src/layer.rs

@ -1,3 +1,4 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
@ -5,18 +6,29 @@ use tdn::{
smol::lock::RwLock, smol::lock::RwLock,
types::{ types::{
group::GroupId, group::GroupId,
message::{RecvType, SendType}, message::SendType,
primitive::{new_io_error, HandleResult, PeerAddr, Result}, primitive::{new_io_error, PeerAddr, Result},
}, },
}; };
use tdn_did::user::User; use tdn_did::user::User;
use crate::apps::chat::conn_req_message; use crate::apps::chat::{chat_conn, Friend};
use crate::apps::chat::Friend; use crate::apps::group_chat::{group_chat_conn, GroupChat, GROUP_ID};
use crate::apps::group_chat::GroupChat;
use crate::group::Group; use crate::group::Group;
use crate::storage::{group_chat_db, session_db, write_avatar_sync}; use crate::storage::{group_chat_db, session_db, write_avatar_sync};
/// ESSE app's BaseLayerEvent.
/// EVERY LAYER APP MUST EQUAL THE FIRST THREE FIELDS.
#[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent {
/// receiver gid, sender gid.
OnlinePing,
/// receiver gid, sender gid.
OnlinePong,
/// receiver gid, sender gid.
Offline,
}
/// ESSE layers. /// ESSE layers.
pub(crate) struct Layer { pub(crate) struct Layer {
/// account_gid => running_account. /// account_gid => running_account.
@ -56,8 +68,7 @@ impl Layer {
pub fn add_running(&mut self, gid: &GroupId) -> Result<()> { pub fn add_running(&mut self, gid: &GroupId) -> Result<()> {
if !self.runnings.contains_key(gid) { if !self.runnings.contains_key(gid) {
self.runnings self.runnings.insert(*gid, RunningAccount::init());
.insert(*gid, RunningAccount::init(&self.base, gid)?);
} }
Ok(()) Ok(())
@ -97,90 +108,47 @@ impl Layer {
addrs addrs
} }
pub fn get_remote_id(&self, mgid: &GroupId, fgid: &GroupId) -> Result<i64> { pub fn get_running_remote_id(&self, mgid: &GroupId, fgid: &GroupId) -> Result<i64> {
self.running(mgid)?.get_permissioned(fgid) self.running(mgid)?.get_online_id(fgid)
} }
pub fn all_friends(&self, gid: &GroupId) -> Result<Vec<Friend>> { pub fn merge_online(&self, mgid: &GroupId, gids: Vec<&GroupId>) -> Result<Vec<bool>> {
let db = session_db(&self.base, &gid)?; let runnings = self.running(mgid)?;
let friends = Friend::all_ok(&db)?; Ok(gids.iter().map(|g| runnings.is_online(g)).collect())
drop(db);
Ok(friends)
} }
pub fn all_friends_with_online(&self, gid: &GroupId) -> Result<Vec<Friend>> { pub fn remove_online(&mut self, gid: &GroupId, fgid: &GroupId) -> Option<PeerAddr> {
let db = session_db(&self.base, &gid)?; self.running_mut(gid).ok()?.remove_online(fgid)
let mut friends = Friend::all(&db)?;
drop(db);
let keys: HashMap<GroupId, usize> = friends
.iter()
.enumerate()
.map(|(i, f)| (f.gid, i))
.collect();
for fgid in self.running(gid)?.online_groups() {
if keys.contains_key(fgid) {
friends[keys[fgid]].online = true; // safe vec index.
}
}
Ok(friends)
} }
pub fn all_groups_with_online(&self, gid: &GroupId) -> Result<Vec<GroupChat>> { pub async fn all_layer_conns(&self) -> Result<HashMap<GroupId, Vec<(GroupId, SendType)>>> {
let db = group_chat_db(&self.base, &gid)?; let mut conns = HashMap::new();
let mut groups = GroupChat::all(&db)?; let group_lock = self.group.read().await;
drop(db); for mgid in self.runnings.keys() {
let mut vecs = vec![];
let keys: HashMap<GroupId, usize> = groups
.iter()
.enumerate()
.map(|(i, g)| (g.g_id, i))
.collect();
for fgid in self.running(gid)?.online_groups() { // load friend chat.
if keys.contains_key(fgid) { let db = session_db(&self.base, mgid)?;
groups[keys[fgid]].online = true; for friend in Friend::all_ok(&db)? {
let proof = group_lock.prove_addr(mgid, &friend.addr).unwrap();
vecs.push((friend.gid, chat_conn(proof, friend.addr)));
} }
}
Ok(groups)
}
pub fn update_friend(&self, gid: &GroupId, fid: i64, remote: User) -> Result<Friend> {
let db = session_db(&self.base, &gid)?;
if let Some(mut friend) = Friend::get_id(&db, fid)? {
friend.name = remote.name;
friend.addr = remote.addr;
friend.remote_update(&db)?;
drop(db); drop(db);
write_avatar_sync(&self.base, gid, &remote.id, remote.avatar)?;
Ok(friend)
} else {
drop(db);
Err(new_io_error("missing friend id"))
}
}
pub fn remove_friend(&mut self, gid: &GroupId, fgid: &GroupId) -> Option<PeerAddr> {
self.running_mut(gid).ok()?.remove_permissioned(fgid)
}
pub async fn all_friend_conns(&self) -> HashMap<GroupId, Vec<(GroupId, SendType)>> { // load group chat.
let mut conns = HashMap::new(); let db = group_chat_db(&self.base, mgid)?;
for mgid in self.runnings.keys() { let groups = GroupChat::all_ok(&db)?;
if let Ok(friends) = self.all_friends(mgid) { for g in groups {
let mut vecs = vec![]; let height = g.get_height(&db)? as u64;
for friend in friends { let proof = group_lock.prove_addr(mgid, &g.g_addr)?;
if let Ok(msg) = conn_req_message(self, &friend.gid, friend.addr).await { vecs.push((GROUP_ID, group_chat_conn(proof, g.g_addr, g.g_id, height)));
vecs.push((friend.gid, msg));
}
}
conns.insert(*mgid, vecs);
} }
drop(db);
conns.insert(*mgid, vecs);
} }
conns
Ok(conns)
} }
pub fn is_online(&self, faddr: &PeerAddr) -> bool { pub fn is_online(&self, faddr: &PeerAddr) -> bool {
@ -208,46 +176,39 @@ impl Online {
} }
pub(crate) struct RunningAccount { pub(crate) struct RunningAccount {
permissioned: HashMap<GroupId, i64>, /// online group (friends/services) => (group's address, group's db id)
/// online group (friends/services) => group's address. onlines: HashMap<GroupId, (Online, i64)>,
onlines: HashMap<GroupId, Online>,
} }
impl RunningAccount { impl RunningAccount {
pub fn init(base: &PathBuf, gid: &GroupId) -> Result<Self> { pub fn init() -> Self {
let mut permissioned = HashMap::new(); RunningAccount {
onlines: HashMap::new(),
// load friends to cache.
let db = session_db(base, gid)?;
let friends = Friend::all_id(&db)?;
for (fgid, db_id) in friends {
permissioned.insert(fgid, db_id);
} }
}
// TODO load services to cache. pub fn get_online_id(&self, gid: &GroupId) -> Result<i64> {
self.onlines
// TODO load permissioned .get(gid)
Ok(RunningAccount { .map(|(_, id)| *id)
permissioned, .ok_or(new_io_error("remote not online"))
onlines: HashMap::new(),
})
} }
/// get all onlines's groupid /// get all onlines's groupid
pub fn online_groups(&self) -> Vec<&GroupId> { pub fn is_online(&self, gid: &GroupId) -> bool {
self.onlines.keys().map(|k| k).collect() self.onlines.contains_key(gid)
} }
/// get online peer's addr. /// get online peer's addr.
pub fn online(&self, gid: &GroupId) -> Result<PeerAddr> { pub fn online(&self, gid: &GroupId) -> Result<PeerAddr> {
self.onlines self.onlines
.get(gid) .get(gid)
.map(|online| *online.addr()) .map(|(online, _)| *online.addr())
.ok_or(new_io_error("remote not online")) .ok_or(new_io_error("remote not online"))
} }
pub fn online_direct(&self, gid: &GroupId) -> Result<PeerAddr> { pub fn online_direct(&self, gid: &GroupId) -> Result<PeerAddr> {
if let Some(online) = self.onlines.get(gid) { if let Some((online, _)) = self.onlines.get(gid) {
match online { match online {
Online::Direct(addr) => return Ok(*addr), Online::Direct(addr) => return Ok(*addr),
_ => {} _ => {}
@ -260,29 +221,29 @@ impl RunningAccount {
pub fn onlines(&self) -> Vec<(&GroupId, &PeerAddr)> { pub fn onlines(&self) -> Vec<(&GroupId, &PeerAddr)> {
self.onlines self.onlines
.iter() .iter()
.map(|(fgid, online)| (fgid, online.addr())) .map(|(fgid, (online, _))| (fgid, online.addr()))
.collect() .collect()
} }
/// check add online. /// check add online.
pub fn check_add_online(&mut self, gid: GroupId, online: Online) -> Result<()> { pub fn check_add_online(&mut self, gid: GroupId, online: Online, id: i64) -> Result<()> {
if let Some(o) = self.onlines.get(&gid) { if let Some((o, _)) = self.onlines.get(&gid) {
match (o, &online) { match (o, &online) {
(Online::Relay(..), Online::Direct(..)) => { (Online::Relay(..), Online::Direct(..)) => {
self.onlines.insert(gid, online); self.onlines.insert(gid, (online, id));
Ok(()) Ok(())
} }
_ => Err(new_io_error("remote had online")), _ => Err(new_io_error("remote had online")),
} }
} else { } else {
self.onlines.insert(gid, online); self.onlines.insert(gid, (online, id));
Ok(()) Ok(())
} }
} }
/// check offline, and return is direct. /// check offline, and return is direct.
pub fn check_offline(&mut self, gid: &GroupId, addr: &PeerAddr) -> bool { pub fn check_offline(&mut self, gid: &GroupId, addr: &PeerAddr) -> bool {
if let Some(online) = self.onlines.remove(gid) { if let Some((online, _)) = self.onlines.remove(gid) {
if online.addr() != addr { if online.addr() != addr {
return false; return false;
} }
@ -297,10 +258,14 @@ impl RunningAccount {
false false
} }
pub fn remove_online(&mut self, gid: &GroupId) -> Option<PeerAddr> {
self.onlines.remove(gid).map(|(online, _)| *online.addr())
}
/// remove all onlines peer. /// remove all onlines peer.
pub fn remove_onlines(self) -> Vec<(PeerAddr, GroupId)> { pub fn remove_onlines(self) -> Vec<(PeerAddr, GroupId)> {
let mut peers = vec![]; let mut peers = vec![];
for (fgid, online) in self.onlines { for (fgid, (online, _)) in self.onlines {
match online { match online {
Online::Direct(addr) => peers.push((addr, fgid)), Online::Direct(addr) => peers.push((addr, fgid)),
_ => {} _ => {}
@ -311,7 +276,7 @@ impl RunningAccount {
/// check if addr is online. /// check if addr is online.
pub fn check_addr_online(&self, addr: &PeerAddr) -> bool { pub fn check_addr_online(&self, addr: &PeerAddr) -> bool {
for (_, online) in &self.onlines { for (_, (online, _)) in &self.onlines {
if online.addr() == addr { if online.addr() == addr {
return true; return true;
} }
@ -322,11 +287,9 @@ impl RunningAccount {
/// peer leave, remove online peer. /// peer leave, remove online peer.
pub fn peer_leave(&mut self, addr: &PeerAddr) -> Vec<(GroupId, i64)> { pub fn peer_leave(&mut self, addr: &PeerAddr) -> Vec<(GroupId, i64)> {
let mut peers = vec![]; let mut peers = vec![];
for (fgid, online) in &self.onlines { for (fgid, (online, id)) in &self.onlines {
if online.addr() == addr { if online.addr() == addr {
if let Some(i) = self.permissioned.get(fgid) { peers.push((*fgid, *id))
peers.push((*fgid, *i))
}
} }
} }
@ -336,30 +299,11 @@ impl RunningAccount {
peers peers
} }
/// add the permissioned group.
pub fn add_permissioned(&mut self, gid: GroupId, id: i64) {
self.permissioned.insert(gid, id);
}
/// remove the permissioned group.
pub fn remove_permissioned(&mut self, gid: &GroupId) -> Option<PeerAddr> {
self.permissioned.remove(gid);
self.onlines.remove(gid).and_then(|o| match o {
Online::Direct(addr) => Some(addr),
_ => None,
})
}
/// check the group is permissioned.
pub fn get_permissioned(&self, gid: &GroupId) -> Result<i64> {
self.permissioned
.get(gid)
.cloned()
.ok_or(new_io_error("remote missing"))
}
/// list all onlines groups. /// list all onlines groups.
pub fn _list_onlines(&self) -> Vec<(&GroupId, &PeerAddr)> { pub fn _list_onlines(&self) -> Vec<(&GroupId, &PeerAddr)> {
self.onlines.iter().map(|(k, v)| (k, v.addr())).collect() self.onlines
.iter()
.map(|(k, (v, _))| (k, v.addr()))
.collect()
} }
} }

33
src/rpc.rs

@ -13,10 +13,12 @@ use tdn::{
}; };
use crate::apps::app_rpc_inject; use crate::apps::app_rpc_inject;
use crate::apps::chat::{conn_req_message, LayerEvent}; use crate::apps::chat::{chat_conn, Friend};
use crate::apps::group_chat::{add_layer, group_chat_conn, GroupChat};
use crate::event::InnerEvent; use crate::event::InnerEvent;
use crate::group::Group; use crate::group::Group;
use crate::layer::Layer; use crate::layer::{Layer, LayerEvent};
use crate::storage::{group_chat_db, session_db};
pub(crate) fn init_rpc( pub(crate) fn init_rpc(
addr: PeerAddr, addr: PeerAddr,
@ -360,13 +362,28 @@ fn new_rpc_handler(
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let layer_lock = state.layer.read().await; let group_lock = state.group.read().await;
let friends = layer_lock.all_friends(&gid)?; let db = session_db(group_lock.base(), &gid)?;
for friend in friends { let friends = Friend::all_ok(&db)?;
let msg = conn_req_message(&layer_lock, &gid, friend.addr).await?; drop(db);
results.layers.push((gid, friend.gid, msg)); for f in friends {
let proof = group_lock.prove_addr(&gid, &f.addr)?;
results.layers.push((gid, f.gid, chat_conn(proof, f.addr)));
} }
drop(layer_lock);
let db = group_chat_db(group_lock.base(), &gid)?;
let groups = GroupChat::all_ok(&db)?;
for g in groups {
let height = g.get_height(&db)? as u64;
let proof = group_lock.prove_addr(&gid, &g.g_addr)?;
add_layer(
&mut results,
gid,
group_chat_conn(proof, g.g_addr, g.g_id, height),
);
}
drop(db);
drop(group_lock);
let devices = state.group.read().await.distribute_conns(&gid); let devices = state.group.read().await.distribute_conns(&gid);
for device in devices { for device in devices {

7
src/server.rs

@ -116,7 +116,12 @@ pub async fn start(db_path: String) -> Result<()> {
.expect("TDN channel closed"); .expect("TDN channel closed");
let t_sender = sender.clone(); let t_sender = sender.clone();
let g_conns = group.read().await.all_distribute_conns(); let g_conns = group.read().await.all_distribute_conns();
let l_conns = layer.read().await.all_friend_conns().await; let l_conns = layer
.read()
.await
.all_layer_conns()
.await
.unwrap_or(HashMap::new());
tdn::smol::spawn(sleep_waiting_reboot(t_sender, g_conns, l_conns)).detach(); tdn::smol::spawn(sleep_waiting_reboot(t_sender, g_conns, l_conns)).detach();
} }
} }

Loading…
Cancel
Save