Browse Source

fix chat conn & delivery

pull/18/head
Sun 3 years ago
parent
commit
3ac5af3306
  1. 28
      src/apps.rs
  2. 2
      src/apps/chat/layer.rs
  3. 3
      src/apps/chat/rpc.rs
  4. 7
      src/global.rs
  5. 19
      src/layer.rs
  6. 30
      src/rpc.rs
  7. 4
      src/server.rs

28
src/apps.rs

@ -1,3 +1,8 @@ @@ -1,3 +1,8 @@
use chat_types::CHAT_ID;
use cloud_types::CLOUD_ID;
use dao_types::DAO_ID;
use domain_types::DOMAIN_ID;
use group_types::GROUP_CHAT_ID;
use std::sync::Arc;
use tdn::types::{
group::GroupId,
@ -31,22 +36,22 @@ pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<Global>) { @@ -31,22 +36,22 @@ pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<Global>) {
//cloud::new_rpc_handler(handler);
}
#[allow(non_snake_case)]
pub(crate) async fn app_layer_handle(
fgid: GroupId,
tgid: GroupId,
msg: RecvType,
global: &Arc<Global>,
) -> Result<HandleResult> {
match fgid {
CHAT_ID => chat::handle(msg, global).await,
//CHAT_ID => chat::handle_peer(layer, mgid, msg).await,
//(_, group::GROUP_ID) => group::handle_server(layer, fgid, msg).await,
//(dao::GROUP_ID, _) => dao::handle(layer, fgid, mgid, false, msg).await,
//(domain::GROUP_ID, _) => domain::handle(layer, mgid, msg).await,
//(cloud::GROUP_ID, _) => cloud::handle(layer, mgid, msg).await,
//_ => chat::handle(layer, fgid, mgid, msg).await,
debug!("TODO GOT LAYER MESSAGE: ====== {} -> {} ===== ", fgid, tgid);
match (fgid, tgid) {
(CHAT_ID, 0) | (0, CHAT_ID) => chat::handle(msg, global).await,
(GROUP_CHAT_ID, 0) => chat::handle(msg, global).await,
(DAO_ID, 0) => chat::handle(msg, global).await,
(DOMAIN_ID, 0) => chat::handle(msg, global).await,
(CLOUD_ID, 0) => chat::handle(msg, global).await,
_ => match msg {
RecvType::Leave(peer) => {
debug!("Peer leaved: {}", peer.id.to_hex());
let mut results = HandleResult::new();
let mut layer = global.layer.write().await;
@ -68,7 +73,10 @@ pub(crate) async fn app_layer_handle( @@ -68,7 +73,10 @@ pub(crate) async fn app_layer_handle(
Ok(results)
}
_ => Err(anyhow!("nothing!")),
_ => {
warn!("LAYER MISSING: {:?}", msg);
Err(anyhow!("nothing!"))
}
},
}
}

2
src/apps/chat/layer.rs

@ -114,7 +114,7 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle @@ -114,7 +114,7 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle
rpc::request_delivery(id, is_ok)
}
DeliveryType::Result => {
// response. TODO better for it.
// response. TODO better for agree send.
Request::delivery(&db, id, is_ok)?;
rpc::request_delivery(id, is_ok)
}

3
src/apps/chat/rpc.rs

@ -443,11 +443,12 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) { @@ -443,11 +443,12 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let mut results = HandleResult::rpc(json!(msg.to_rpc()));
let tid = state.layer.write().await.delivery(msg.id);
let event = LayerEvent::Message(msg.hash, nm);
let data = bincode::serialize(&event).unwrap_or(vec![]);
results
.layers
.push((CHAT_ID, SendType::Event(0, fpid, data)));
.push((CHAT_ID, SendType::Event(tid, fpid, data)));
// UPDATE SESSION.
let s_db = session_db(&state.base, &pid, &db_key)?;

7
src/global.rs

@ -1,8 +1,3 @@ @@ -1,8 +1,3 @@
use chat_types::CHAT_ID;
use cloud_types::CLOUD_ID;
use dao_types::DAO_ID;
use domain_types::DOMAIN_ID;
use group_types::GROUP_CHAT_ID;
use std::collections::HashMap;
use std::path::PathBuf;
use tdn::{
@ -54,7 +49,7 @@ impl Global { @@ -54,7 +49,7 @@ impl Global {
self_send: Sender<ReceiveMessage>,
rpc_send: Sender<RpcSendMessage>,
) -> Self {
let gids = vec![0, CHAT_ID, GROUP_CHAT_ID, DAO_ID, DOMAIN_ID, CLOUD_ID];
let gids = vec![0]; // ESSE DEFAULT IS 0
Global {
base,

19
src/layer.rs

@ -5,7 +5,6 @@ use std::collections::HashMap; @@ -5,7 +5,6 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tdn::types::{
group::GroupId,
message::SendType,
primitives::{HandleResult, Peer, PeerId, Result},
};
@ -17,18 +16,6 @@ use crate::account::User; @@ -17,18 +16,6 @@ use crate::account::User;
use crate::group::Group;
use crate::session::{Session, SessionType};
/// ESSE app's `BaseLayerEvent`.
/// EVERY LAYER APP MUST EQUAL THE FIRST THREE FIELDS.
#[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent {
/// Offline. params: remote_id.
Offline(GroupId),
/// Suspend. params: remote_id.
Suspend(GroupId),
/// Actived. params: remote_id.
Actived(GroupId),
}
/// ESSE layers.
pub(crate) struct Layer {
/// friend pid => Session
@ -51,9 +38,11 @@ impl Layer { @@ -51,9 +38,11 @@ impl Layer {
}
}
pub fn delivery(&mut self, db_id: i64) {
self.delivery.insert(self.delivery_count as u64, db_id);
pub fn delivery(&mut self, db_id: i64) -> u64 {
let next = self.delivery_count as u64;
self.delivery.insert(next, db_id);
self.delivery_count += 1;
next
}
pub fn clear(&mut self) {

30
src/rpc.rs

@ -1,5 +1,7 @@ @@ -1,5 +1,7 @@
use chat_types::CHAT_ID;
use esse_primitives::{id_from_str, id_to_str};
use group_types::GroupChatId;
use group_types::GROUP_CHAT_ID;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@ -22,12 +24,12 @@ use tokio::sync::{ @@ -22,12 +24,12 @@ use tokio::sync::{
use crate::account::lang_from_i64;
use crate::apps::app_rpc_inject;
use crate::apps::chat::chat_conn;
use crate::apps::chat::{chat_conn, LayerEvent as ChatLayerEvent};
use crate::global::Global;
//use crate::apps::group::{add_layer, group_conn, GroupChat};
//use crate::event::InnerEvent;
use crate::group::Group;
use crate::layer::{Layer, LayerEvent};
use crate::layer::Layer;
use crate::session::{connect_session, Session, SessionType};
use crate::storage::session_db;
@ -504,26 +506,22 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> { @@ -504,26 +506,22 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
match s.s_type {
SessionType::Chat => {
let remote_id = id_from_str(remote)?;
let addr = layer_lock.chat_suspend(&remote_id, true, must)?;
if addr.is_some() {
results.rpcs.push(json!([id]))
if layer_lock.chat_suspend(&remote_id, true, must)?.is_some() {
results.rpcs.push(json!([id]));
}
//let event = LayerEvent::Suspend(CHAT_GROUP_ID);
//let data = bincode::serialize(&event)?;
//let msg = SendType::Event(0, s.pid, data);
//results.layers.push((gid, s.gid, msg));
let data = bincode::serialize(&ChatLayerEvent::Suspend)?;
let msg = SendType::Event(0, remote_id, data);
results.layers.push((CHAT_ID, msg));
}
SessionType::Group => {
let remote_gid: GroupChatId =
remote.parse().map_err(|_| RpcError::ParseError)?;
let addr = layer_lock.group_suspend(&remote_gid, true, must)?;
if addr.is_some() {
results.rpcs.push(json!([id]))
if layer_lock.group_suspend(&remote_gid, true, must)?.is_some() {
results.rpcs.push(json!([id]));
}
//let event = LayerEvent::Suspend(GROUP_CHAT_ID);
//let data = bincode::serialize(&event)?;
//let msg = SendType::Event(0, s.pid, data);
//add_layer(&mut results, gid, msg);
//let data = bincode::serialize(&GroupLayerEvent::Suspend(remote_gid))?;
//let msg = SendType::Event(0, s.addr, data);
//results.layers.push((GROUP_CHAT_ID, msg));
}
_ => {
return Ok(HandleResult::new()); // others has no online.

4
src/server.rs

@ -97,8 +97,8 @@ pub async fn start(db_path: String) -> Result<()> { @@ -97,8 +97,8 @@ pub async fn start(db_path: String) -> Result<()> {
handle(handle_result, now_rpc_uid, true, &global).await;
}
}
ReceiveMessage::Layer(fgid, l_msg) => {
if let Ok(handle_result) = app_layer_handle(fgid, l_msg, &global).await {
ReceiveMessage::Layer(fgid, tgid, l_msg) => {
if let Ok(handle_result) = app_layer_handle(fgid, tgid, l_msg, &global).await {
handle(handle_result, now_rpc_uid, true, &global).await;
}
}

Loading…
Cancel
Save