Browse Source

apply suspend timeout

pull/18/head
Sun 5 years ago
parent
commit
cd253abb3b
  1. 1
      Cargo.toml
  2. 88
      src/apps/chat/layer.rs
  3. 20
      src/apps/chat/models.rs
  4. 23
      src/apps/chat/rpc.rs
  5. 42
      src/layer.rs
  6. 63
      src/rpc.rs
  7. 53
      src/server.rs
  8. 11
      src/session.rs

1
Cargo.toml

@ -22,6 +22,7 @@ panic = 'abort' @@ -22,6 +22,7 @@ panic = 'abort'
[dependencies]
log = "0.4"
rand = "0.8"
once_cell = "1.7"
simplelog = "0.8"
image = "0.23"
base64 = "0.13"

88
src/apps/chat/layer.rs

@ -33,10 +33,6 @@ pub(crate) enum LayerEvent { @@ -33,10 +33,6 @@ pub(crate) enum LayerEvent {
Suspend(GroupId),
/// actived. extend BaseLayerEvent.
Actived(GroupId),
/// receiver gid, sender gid. as BaseLayerEvent.
OnlinePing,
/// receiver gid, sender gid. as BaseLayerEvent.
OnlinePong,
/// make friendship request.
Request(User, String),
/// agree friendship request.
@ -71,7 +67,9 @@ pub(crate) async fn handle( @@ -71,7 +67,9 @@ pub(crate) async fn handle(
RecvType::Connect(addr, data) | RecvType::ResultConnect(addr, data) => {
// ESSE chat layer connect date structure.
if handle_connect(&mgid, &fgid, &addr, data, &mut layer, &mut results)? {
let msg = conn_res_message(&layer, &mgid, addr).await?;
let proof = layer.group.read().await.prove_addr(&mgid, &addr)?;
let data = postcard::to_allocvec(&proof).unwrap_or(vec![]);
let msg = SendType::Result(0, addr, true, false, data);
results.layers.push((mgid, fgid, msg));
} else {
let msg = SendType::Result(0, addr, false, false, vec![]);
@ -235,7 +233,11 @@ impl LayerEvent { @@ -235,7 +233,11 @@ impl LayerEvent {
results.rpcs.push(rpc::request_create(mgid, &request));
return Ok(results);
} else {
let msg = conn_agree_message(layer, 0, &mgid, addr).await?;
let group_lock = layer.group.read().await;
let me = group_lock.clone_user(&mgid)?;
let proof = group_lock.prove_addr(&mgid, &addr)?;
drop(group_lock);
let msg = agree_message(proof, me, addr)?;
results.layers.push((mgid, fgid, msg));
}
}
@ -270,14 +272,7 @@ impl LayerEvent { @@ -270,14 +272,7 @@ impl LayerEvent {
// ADD NEW SESSION.
let s_db = session_db(&layer.base, &mgid)?;
let mut session = Session::new(
friend.id,
friend.gid,
friend.addr,
SessionType::Chat,
friend.name,
friend.datetime,
);
let mut session = friend.to_session();
session.insert(&s_db)?;
results.rpcs.push(session_create(mgid, &session));
}
@ -335,14 +330,7 @@ impl LayerEvent { @@ -335,14 +330,7 @@ impl LayerEvent {
} else {
let c_db = chat_db(&layer.base, &mgid)?;
if let Some(f) = Friend::get_id(&c_db, fid)? {
let mut session = Session::new(
f.id,
f.gid,
f.addr,
SessionType::Chat,
f.name,
f.datetime,
);
let mut session = f.to_session();
session.last_content = msg.content;
session.insert(&s_db)?;
results.rpcs.push(session_create(mgid, &session));
@ -370,22 +358,6 @@ impl LayerEvent { @@ -370,22 +358,6 @@ impl LayerEvent {
)?;
results.rpcs.push(rpc::friend_info(mgid, &f));
}
LayerEvent::OnlinePing => {
let (sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr), sid, fid)?;
let data = postcard::to_allocvec(&LayerEvent::OnlinePong).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
results.layers.push((mgid, fgid, msg));
}
LayerEvent::OnlinePong => {
let (sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr), sid, fid)?;
}
LayerEvent::Close => {
let (_sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
layer.group.write().await.broadcast(
@ -477,9 +449,6 @@ impl LayerEvent { @@ -477,9 +449,6 @@ impl LayerEvent {
let mut msg = Message::new(&mgid, fid, true, m_type, raw, false);
msg.insert(&db)?;
// TODO UPDATE SESSION
drop(db);
Ok((msg, nm_type))
}
@ -548,43 +517,12 @@ pub(crate) fn chat_conn(proof: Proof, addr: PeerAddr) -> SendType { @@ -548,43 +517,12 @@ pub(crate) fn chat_conn(proof: Proof, addr: PeerAddr) -> SendType {
SendType::Connect(0, addr, None, None, data)
}
async fn conn_res_message(layer: &Layer, mgid: &GroupId, addr: PeerAddr) -> Result<SendType> {
let proof = layer.group.read().await.prove_addr(mgid, &addr)?;
let data = postcard::to_allocvec(&proof).unwrap_or(vec![]);
Ok(SendType::Result(0, addr, true, false, data))
}
async fn conn_agree_message(
layer: &mut Layer,
tid: i64,
mgid: &GroupId,
addr: PeerAddr,
) -> Result<SendType> {
let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (*mgid, tid));
let group_lock = layer.group.read().await;
let proof = group_lock.prove_addr(mgid, &addr)?;
let me = group_lock.clone_user(mgid)?;
drop(group_lock);
let data = postcard::to_allocvec(&LayerEvent::Agree(me, proof)).unwrap_or(vec![]);
Ok(SendType::Event(uid, addr, data))
}
pub(super) fn rpc_agree_message(
layer: &mut Layer,
tid: i64,
proof: Proof,
me: User,
mgid: &GroupId,
addr: PeerAddr,
) -> Result<SendType> {
let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (*mgid, tid));
pub(super) fn agree_message(proof: Proof, me: User, addr: PeerAddr) -> Result<SendType> {
let data = postcard::to_allocvec(&LayerEvent::Agree(me, proof)).unwrap_or(vec![]);
Ok(SendType::Event(uid, addr, data))
Ok(SendType::Event(0, addr, data))
}
// maybe need if gid or addr in blocklist.
fn res_reject() -> Vec<u8> {
fn _res_reject() -> Vec<u8> {
postcard::to_allocvec(&LayerEvent::Reject).unwrap_or(vec![])
}

20
src/apps/chat/models.rs

@ -8,6 +8,7 @@ use tdn::types::{ @@ -8,6 +8,7 @@ use tdn::types::{
};
use tdn_storage::local::{DStorage, DsValue};
use crate::session::{Session, SessionType};
use crate::storage::{
read_avatar_sync, read_file_sync, read_image_sync, read_record_sync, write_avatar_sync,
write_file_sync, write_image_sync, write_record_sync,
@ -111,8 +112,6 @@ impl NetworkMessage { @@ -111,8 +112,6 @@ impl NetworkMessage {
let mut msg = Message::new_with_id(hash, fid, is_me, m_type, raw, true);
msg.insert(db)?;
// TODO UPDATE SESSION
Ok(msg)
}
@ -267,6 +266,17 @@ impl Friend { @@ -267,6 +266,17 @@ impl Friend {
}
}
pub fn to_session(&self) -> Session {
Session::new(
self.id,
self.gid,
self.addr,
SessionType::Chat,
self.name.clone(),
self.datetime,
)
}
pub fn to_rpc(&self) -> RpcParam {
json!([
self.id,
@ -455,11 +465,7 @@ impl Request { @@ -455,11 +465,7 @@ impl Request {
}
pub fn to_friend(self) -> Friend {
let mut friend = Friend::new(self.gid, self.addr, self.name, self.remark);
// TODO add new session.
friend
Friend::new(self.gid, self.addr, self.name, self.remark)
}
/// here is zero-copy and unwrap is safe. checked.

23
src/apps/chat/rpc.rs

@ -306,8 +306,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -306,8 +306,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let mut group_lock = state.group.write().await;
let me = group_lock.clone_user(&gid)?;
let mut layer_lock = state.layer.write().await;
let db = chat_db(layer_lock.base(), &gid)?;
let db = chat_db(group_lock.base(), &gid)?;
let mut results = HandleResult::new();
if let Some(mut request) = Request::get_id(&db, id)? {
@ -322,24 +321,20 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -322,24 +321,20 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
request.is_over = true;
request.update(&db)?;
let f = Friend::from_request(&db, request)?;
results.rpcs.push(json!([id, f.to_rpc()]));
let friend = Friend::from_request(&db, request)?;
results.rpcs.push(json!([id, friend.to_rpc()]));
// ADD NEW SESSION.
let s_db = session_db(layer_lock.base(), &gid)?;
let mut session =
Session::new(f.id, f.gid, f.addr, SessionType::Chat, f.name, f.datetime);
let s_db = session_db(group_lock.base(), &gid)?;
let mut session = friend.to_session();
session.insert(&s_db)?;
results.rpcs.push(session_create(gid, &session));
let proof = group_lock.prove_addr(&gid, &f.addr)?;
let msg =
super::layer::rpc_agree_message(&mut layer_lock, id, proof, me, &gid, f.addr)?;
results.layers.push((gid, f.gid, msg));
let proof = group_lock.prove_addr(&gid, &friend.addr)?;
let msg = super::layer::agree_message(proof, me, friend.addr)?;
results.layers.push((gid, friend.gid, msg));
}
db.close()?;
drop(group_lock);
drop(layer_lock);
Ok(results)
},
);

42
src/layer.rs

@ -112,11 +112,6 @@ impl Layer { @@ -112,11 +112,6 @@ impl Layer {
self.running(mgid)?.get_online_id(fgid)
}
pub fn merge_online(&self, mgid: &GroupId, gids: Vec<&GroupId>) -> Result<Vec<bool>> {
let runnings = self.running(mgid)?;
Ok(gids.iter().map(|g| runnings.is_online(g)).collect())
}
pub fn remove_online(&mut self, gid: &GroupId, fgid: &GroupId) -> Option<PeerAddr> {
self.running_mut(gid).ok()?.remove_online(fgid)
}
@ -195,6 +190,19 @@ impl OnlineSession { @@ -195,6 +190,19 @@ impl OnlineSession {
remain: 0,
}
}
fn close_suspend(&mut self) -> bool {
if self.suspend_me && self.suspend_remote {
if self.remain == 0 {
true
} else {
self.remain -= 1;
false
}
} else {
false
}
}
}
pub(crate) struct RunningAccount {
@ -233,7 +241,7 @@ impl RunningAccount { @@ -233,7 +241,7 @@ impl RunningAccount {
}
if online.suspend_remote && online.suspend_me {
online.remain = 120; // keep-alive 2~3 minutes
online.remain = 6; // keep-alive 10~11 minutes 120s/time
Ok(true)
} else {
Ok(false)
@ -250,11 +258,6 @@ impl RunningAccount { @@ -250,11 +258,6 @@ impl RunningAccount {
.ok_or(new_io_error("remote not online"))
}
/// get all sessions's groupid
pub fn is_online(&self, gid: &GroupId) -> bool {
self.sessions.contains_key(gid)
}
/// get online peer's addr.
pub fn online(&self, gid: &GroupId) -> Result<PeerAddr> {
self.sessions
@ -368,10 +371,17 @@ impl RunningAccount { @@ -368,10 +371,17 @@ impl RunningAccount {
}
/// list all onlines groups.
pub fn _list_onlines(&self) -> Vec<(&GroupId, &PeerAddr)> {
self.sessions
.iter()
.map(|(k, online)| (k, online.online.addr()))
.collect()
pub fn close_suspend(&mut self) -> Vec<(GroupId, PeerAddr, i64)> {
let mut needed = vec![];
for (fgid, online) in &mut self.sessions {
if online.close_suspend() {
needed.push((*fgid, *online.online.addr(), online.db_id));
}
}
for (gid, _, _) in needed.iter() {
self.sessions.remove(gid);
}
needed
}
}

63
src/rpc.rs

@ -510,6 +510,21 @@ fn new_rpc_handler( @@ -510,6 +510,21 @@ fn new_rpc_handler(
let id = params[0].as_i64()?;
let remote = GroupId::from_hex(params[1].as_str()?)?;
let db = session_db(state.group.read().await.base(), &gid)?;
let s = Session::get(&db, &id)?;
drop(db);
let msg = match s.s_type {
SessionType::Chat | SessionType::Group => {
let event = LayerEvent::Suspend(s.gid);
let data = postcard::to_allocvec(&event).unwrap_or(vec![]);
SendType::Event(0, s.addr, data)
}
_ => {
return Ok(HandleResult::new()); // others has no online.
}
};
let mut layer_lock = state.layer.write().await;
let suspend = layer_lock.running_mut(&gid)?.suspend(&remote, true)?;
drop(layer_lock);
@ -519,26 +534,42 @@ fn new_rpc_handler( @@ -519,26 +534,42 @@ fn new_rpc_handler(
results.rpcs.push(json!([id]))
}
// let group_lock = state.group.read().await;
// let db = session_db(group_lock.base(), &gid)?;
// let s = Session::get(&db, &id)?;
// drop(db);
// match s.s_type {
// SessionType::Chat => {
// let proof = group_lock.prove_addr(&gid, &s.addr)?;
// results.layers.push((gid, s.gid, chat_conn(proof, s.addr)));
// }
// SessionType::Group => {
// let proof = group_lock.prove_addr(&gid, &s.addr)?;
// add_layer(&mut results, gid, group_chat_conn(proof, s.addr, s.gid));
// }
// _ => {}
// }
match s.s_type {
SessionType::Chat => {
results.layers.push((gid, s.gid, msg));
}
SessionType::Group => {
add_layer(&mut results, gid, msg);
}
_ => {}
}
Ok(results)
},
);
handler.add_method(
"session-readed",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let db = session_db(state.group.read().await.base(), &gid)?;
Session::readed(&db, &id)?;
Ok(HandleResult::new())
},
);
handler.add_method(
"session-update",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64()?;
let is_top = params[1].as_bool()?;
let is_close = params[2].as_bool()?;
let db = session_db(state.group.read().await.base(), &gid)?;
Session::update(&db, &id, is_top, is_close)?;
Ok(HandleResult::new())
},
);
handler
}

53
src/server.rs

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
use once_cell::sync::OnceCell;
use simplelog::{CombinedLogger, Config as LogConfig, LevelFilter};
use std::collections::HashMap;
use std::path::PathBuf;
@ -24,6 +25,8 @@ use crate::storage::account_db; @@ -24,6 +25,8 @@ use crate::storage::account_db;
pub const DEFAULT_WS_ADDR: &'static str = "127.0.0.1:8080";
pub const DEFAULT_LOG_FILE: &'static str = "esse.log.txt";
pub static RPC_WS_UID: OnceCell<u64> = OnceCell::new();
pub async fn start(db_path: String) -> Result<()> {
let db_path = PathBuf::from(db_path);
if !db_path.exists() {
@ -72,6 +75,9 @@ pub async fn start(db_path: String) -> Result<()> { @@ -72,6 +75,9 @@ pub async fn start(db_path: String) -> Result<()> {
//let mut group_rpcs: HashMap<u64, GroupId> = HashMap::new();
let mut now_rpc_uid = 0;
// running session remain task.
tdn::smol::spawn(session_remain(layer.clone(), sender.clone())).detach();
while let Ok(message) = recver.recv().await {
match message {
ReceiveMessage::Group(fgid, g_msg) => {
@ -97,6 +103,7 @@ pub async fn start(db_path: String) -> Result<()> { @@ -97,6 +103,7 @@ pub async fn start(db_path: String) -> Result<()> {
}
if now_rpc_uid != uid && is_ws {
let _ = RPC_WS_UID.set(uid);
now_rpc_uid = uid
}
@ -147,6 +154,52 @@ async fn sleep_waiting_reboot( @@ -147,6 +154,52 @@ async fn sleep_waiting_reboot(
Ok(())
}
async fn session_remain(layer: Arc<RwLock<Layer>>, sender: Sender<SendMessage>) -> Result<()> {
loop {
tdn::smol::Timer::after(std::time::Duration::from_secs(120)).await;
if let Some(uid) = RPC_WS_UID.get() {
let mut layer_lock = layer.write().await;
let mut rpcs = vec![];
let mut addrs = HashMap::new();
for (_, running) in layer_lock.runnings.iter_mut() {
let closed = running.close_suspend();
for (gid, addr, sid) in closed {
addrs.insert(addr, false);
rpcs.push(crate::rpc::session_lost(gid, &sid));
}
}
drop(layer_lock);
let layer_lock = layer.read().await;
for (_, running) in layer_lock.runnings.iter() {
for (addr, keep) in addrs.iter_mut() {
if running.check_addr_online(addr) {
*keep = true;
}
}
}
drop(layer_lock);
for rpc in rpcs {
let _ = sender.send(SendMessage::Rpc(*uid, rpc, true)).await;
}
for (addr, keep) in addrs {
if !keep {
let _ = sender
.send(SendMessage::Layer(
GroupId::default(),
GroupId::default(),
SendType::Disconnect(addr),
))
.await;
}
}
}
}
}
#[inline]
async fn handle(handle_result: HandleResult, uid: u64, is_ws: bool, sender: &Sender<SendMessage>) {
let HandleResult {

11
src/session.rs

@ -152,8 +152,13 @@ impl Session { @@ -152,8 +152,13 @@ impl Session {
Ok(sessions)
}
pub fn top(db: &DStorage, id: &i64, is_top: bool) -> Result<usize> {
db.update(&format!("UPDATE sessions SET is_top = 1 WHERE id = {}", id))
pub fn update(db: &DStorage, id: &i64, is_top: bool, is_close: bool) -> Result<usize> {
db.update(&format!(
"UPDATE sessions SET is_top = {}, is_close = {} WHERE id = {}",
if is_top { 1 } else { 0 },
if is_close { 1 } else { 0 },
id
))
}
pub fn last(
@ -180,7 +185,7 @@ impl Session { @@ -180,7 +185,7 @@ impl Session {
}
}
pub fn read(db: &DStorage, id: &i64) -> Result<usize> {
pub fn readed(db: &DStorage, id: &i64) -> Result<usize> {
db.update(&format!(
"UPDATE sessions SET last_readed = 1 WHERE id = {}",
id

Loading…
Cancel
Save