Browse Source

update group chat layer event

pull/18/head
Sun 4 years ago
parent
commit
9ac4507f4f
  1. 6
      src/apps/chat/layer.rs
  2. 77
      src/apps/group_chat/layer.rs
  3. 20
      src/apps/group_chat/rpc.rs
  4. 8
      src/layer.rs
  5. 4
      src/rpc.rs
  6. 5
      src/server.rs

6
src/apps/chat/layer.rs

@ -53,12 +53,12 @@ pub(crate) enum LayerResponse {
/// ESSE chat layer Event. /// ESSE chat layer Event.
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent { pub(crate) enum LayerEvent {
/// receiver gid, sender gid. as BaseLayerEvent.
Offline(GroupId),
/// receiver gid, sender gid. as BaseLayerEvent. /// receiver gid, sender gid. as BaseLayerEvent.
OnlinePing, OnlinePing,
/// receiver gid, sender gid. as BaseLayerEvent. /// receiver gid, sender gid. as BaseLayerEvent.
OnlinePong, OnlinePong,
/// receiver gid, sender gid. as BaseLayerEvent.
Offline,
/// receiver gid, sender gid, message. /// receiver gid, sender gid, message.
Message(EventId, NetworkMessage), Message(EventId, NetworkMessage),
/// receiver gid, sender user. /// receiver gid, sender user.
@ -525,7 +525,7 @@ impl LayerEvent {
.check_add_online(fgid, Online::Direct(addr), fid)?; .check_add_online(fgid, Online::Direct(addr), fid)?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr)); results.rpcs.push(rpc::friend_online(mgid, fid, addr));
} }
LayerEvent::Offline => { LayerEvent::Offline(_) => {
layer.group.write().await.status( layer.group.write().await.status(
&mgid, &mgid,
StatusEvent::SessionFriendOffline(fgid), StatusEvent::SessionFriendOffline(fgid),

77
src/apps/group_chat/layer.rs

@ -9,9 +9,8 @@ use tdn::{
}, },
}; };
use group_chat_types::{GroupConnect, GroupResult, JoinProof}; use group_chat_types::{Event, GroupConnect, GroupResult, JoinProof, LayerEvent};
use tdn_did::Proof; use tdn_did::Proof;
//use group_chat_types::{Event, GroupConnect, GroupEvent, GroupInfo, GroupResult, GroupType};
use crate::layer::{Layer, Online}; use crate::layer::{Layer, Online};
use crate::storage::group_chat_db; use crate::storage::group_chat_db;
@ -27,9 +26,7 @@ pub(crate) async fn handle(
let mut results = HandleResult::new(); let mut results = HandleResult::new();
match msg { match msg {
RecvType::Connect(_addr, _data) => { RecvType::Connect(..) => {} // Never to here.
// Never to here.
}
RecvType::Leave(_addr) => { RecvType::Leave(_addr) => {
// //
} }
@ -101,8 +98,10 @@ pub(crate) async fn handle(
let _res: GroupResult = postcard::from_bytes(&data) let _res: GroupResult = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?; .map_err(|_e| new_io_error("Deseralize result failure"))?;
} }
RecvType::Event(_addr, _bytes) => { RecvType::Event(addr, bytes) => {
// let event: LayerEvent =
postcard::from_bytes(&bytes).map_err(|_| new_io_error("serialize event error."))?;
handle_event(mgid, addr, event, layer, &mut results).await?;
} }
RecvType::Stream(_uid, _stream, _bytes) => { RecvType::Stream(_uid, _stream, _bytes) => {
// TODO stream // TODO stream
@ -115,6 +114,70 @@ pub(crate) async fn handle(
Ok(results) Ok(results)
} }
async fn handle_event(
mgid: GroupId,
addr: PeerAddr,
event: LayerEvent,
layer: &Arc<RwLock<Layer>>,
results: &mut HandleResult,
) -> Result<()> {
let gid = match event {
LayerEvent::Offline(gcd)
| LayerEvent::OnlinePing(gcd)
| LayerEvent::OnlinePong(gcd)
| LayerEvent::MemberOnline(gcd, ..)
| LayerEvent::MemberOffline(gcd, ..)
| LayerEvent::Sync(gcd, ..) => layer.read().await.get_running_remote_id(&mgid, &gcd)?,
};
match event {
LayerEvent::Offline(_) => {
results.rpcs.push(rpc::group_offline(mgid, gid));
}
LayerEvent::OnlinePing(gcd) => {
results.rpcs.push(rpc::group_online(mgid, gid));
let data = postcard::to_allocvec(&LayerEvent::OnlinePong(gcd)).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
add_layer(results, mgid, msg);
}
LayerEvent::OnlinePong(_) => {
results.rpcs.push(rpc::group_online(mgid, gid));
}
LayerEvent::Sync(_gcd, _, event) => {
match event {
Event::Message => {
//
}
Event::GroupUpdate => {
//
}
Event::GroupTransfer => {
//
}
Event::UserInfo => {
//
}
Event::Close => {
//
}
}
// save event.
// update to UI.
}
LayerEvent::MemberOnline(_, mid, maddr) => {
results.rpcs.push(rpc::member_online(mgid, gid, mid, maddr));
}
LayerEvent::MemberOffline(_, mid, ma) => {
results.rpcs.push(rpc::member_offline(mgid, gid, mid, ma));
}
}
Ok(())
}
#[inline] #[inline]
fn load_group(base: &PathBuf, mgid: &GroupId, gcd: &GroupId) -> Result<Option<GroupChat>> { fn load_group(base: &PathBuf, mgid: &GroupId, gcd: &GroupId) -> Result<Option<GroupChat>> {
let db = group_chat_db(base, mgid)?; let db = group_chat_db(base, mgid)?;

20
src/apps/group_chat/rpc.rs

@ -36,6 +36,26 @@ pub(crate) fn group_offline(mgid: GroupId, gid: i64) -> RpcParam {
rpc_response(0, "group-chat-offline", json!([gid]), mgid) rpc_response(0, "group-chat-offline", json!([gid]), mgid)
} }
#[inline]
pub(crate) fn member_online(mgid: GroupId, gid: i64, mid: GroupId, maddr: PeerAddr) -> RpcParam {
rpc_response(
0,
"group-chat-member-online",
json!([gid, mid.to_hex(), maddr.to_hex()]),
mgid,
)
}
#[inline]
pub(crate) fn member_offline(mgid: GroupId, gid: i64, mid: GroupId, maddr: PeerAddr) -> RpcParam {
rpc_response(
0,
"group-chat-member-offline",
json!([gid, mid.to_hex(), maddr.to_hex()]),
mgid,
)
}
#[inline] #[inline]
fn group_list(groups: Vec<GroupChat>) -> RpcParam { fn group_list(groups: Vec<GroupChat>) -> RpcParam {
let mut results = vec![]; let mut results = vec![];

8
src/layer.rs

@ -21,12 +21,8 @@ use crate::storage::{group_chat_db, session_db, write_avatar_sync};
/// EVERY LAYER APP MUST EQUAL THE FIRST THREE FIELDS. /// EVERY LAYER APP MUST EQUAL THE FIRST THREE FIELDS.
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent { pub(crate) enum LayerEvent {
/// receiver gid, sender gid. /// offline, remote_gid.
OnlinePing, Offline(GroupId),
/// receiver gid, sender gid.
OnlinePong,
/// receiver gid, sender gid.
Offline,
} }
/// ESSE layers. /// ESSE layers.

4
src/rpc.rs

@ -329,7 +329,7 @@ fn new_rpc_handler(
for gid in keys { for gid in keys {
for (fgid, addr) in layer_lock.running(&gid)?.onlines() { for (fgid, addr) in layer_lock.running(&gid)?.onlines() {
// send a event that is offline. // send a event that is offline.
let data = postcard::to_allocvec(&LayerEvent::Offline).unwrap_or(vec![]); let data = postcard::to_allocvec(&LayerEvent::Offline(*fgid)).unwrap_or(vec![]);
let msg = SendType::Event(0, *addr, data); let msg = SendType::Event(0, *addr, data);
results.layers.push((gid, *fgid, msg)); results.layers.push((gid, *fgid, msg));
} }
@ -400,7 +400,7 @@ fn new_rpc_handler(
let layer_lock = state.layer.read().await; let layer_lock = state.layer.read().await;
for (fgid, addr) in layer_lock.running(&gid)?.onlines() { for (fgid, addr) in layer_lock.running(&gid)?.onlines() {
// send a event that is offline. // send a event that is offline.
let data = postcard::to_allocvec(&LayerEvent::Offline).unwrap_or(vec![]); let data = postcard::to_allocvec(&LayerEvent::Offline(*fgid)).unwrap_or(vec![]);
let msg = SendType::Event(0, *addr, data); let msg = SendType::Event(0, *addr, data);
results.layers.push((gid, *fgid, msg)); results.layers.push((gid, *fgid, msg));
} }

5
src/server.rs

@ -82,11 +82,6 @@ pub async fn start(db_path: String) -> Result<()> {
} }
} }
ReceiveMessage::Layer(fgid, tgid, l_msg) => { ReceiveMessage::Layer(fgid, tgid, l_msg) => {
// 1. check to account is online. if not online, nothing.
if !layer.read().await.runnings.contains_key(&tgid) {
continue;
}
if let Ok(handle_result) = app_layer_handle(&layer, fgid, tgid, l_msg).await { if let Ok(handle_result) = app_layer_handle(&layer, fgid, tgid, l_msg).await {
handle(handle_result, now_rpc_uid, true, &sender).await; handle(handle_result, now_rpc_uid, true, &sender).await;
} }

Loading…
Cancel
Save