Browse Source

refactor chat connect/result

pull/18/head
Sun 4 years ago
parent
commit
bd9372a205
  1. 642
      src/apps/chat/layer.rs
  2. 14
      src/layer.rs
  3. 2
      src/rpc.rs
  4. 33
      src/session.rs

642
src/apps/chat/layer.rs

@ -11,11 +11,11 @@ use tdn::{
}; };
use tdn_did::{user::User, Proof}; use tdn_did::{user::User, Proof};
use crate::event::{InnerEvent, StatusEvent}; use crate::event::InnerEvent;
use crate::layer::{Layer, Online}; use crate::layer::{Layer, Online};
use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH}; use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH};
use crate::rpc::{session_connect, session_create, session_last, session_lost, session_suspend}; use crate::rpc::{session_connect, session_create, session_last, session_lost, session_suspend};
use crate::session::{Session, SessionType}; use crate::session::{connect_session, Session, SessionType};
use crate::storage::{ use crate::storage::{
chat_db, read_avatar, read_file, read_record, session_db, write_avatar_sync, write_file, chat_db, read_avatar, read_file, read_record, session_db, write_avatar_sync, write_file,
write_image, write_image,
@ -24,35 +24,6 @@ use crate::storage::{
use super::models::{Friend, Message, MessageType, NetworkMessage, Request}; use super::models::{Friend, Message, MessageType, NetworkMessage, Request};
use super::rpc; use super::rpc;
/// Layer Request for stable connected.
/// Params: User, remote_id, remark.
/// this user if already friend, only has gid.
#[derive(Serialize, Deserialize)]
enum LayerRequest {
/// Requst for connect, had friendship.
/// params: signature with PeerAddr.
Connect(Proof),
/// Requst for make friendship.
/// Params: remote_user, me_id, remark.
Friend(User, String),
}
/// Layer Response for stable connected.
#[derive(Serialize, Deserialize)]
pub(crate) enum LayerResponse {
/// Connected with stable, had friendship.
Connect(Proof),
/// Agree a friend request.
/// Params: User, remote_id.
/// this user if already, only has gid.
Agree(User, Proof),
/// Reject a friend request.
/// Params: me_id, remote_id.
Reject,
// TODO service connected info.
//Service,
}
/// ESSE chat layer Event. /// ESSE chat layer Event.
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent { pub(crate) enum LayerEvent {
@ -66,6 +37,12 @@ pub(crate) enum LayerEvent {
OnlinePing, OnlinePing,
/// receiver gid, sender gid. as BaseLayerEvent. /// receiver gid, sender gid. as BaseLayerEvent.
OnlinePong, OnlinePong,
/// make friendship request.
Request(User, String),
/// agree friendship request.
Agree(User, Proof),
/// reject friendship request.
Reject,
/// receiver gid, sender gid, message. /// receiver gid, sender gid, message.
Message(EventId, NetworkMessage), Message(EventId, NetworkMessage),
/// receiver gid, sender user. /// receiver gid, sender user.
@ -84,145 +61,31 @@ pub(crate) async fn handle(
let mut layer = arc_layer.write().await; let mut layer = arc_layer.write().await;
match msg { match msg {
RecvType::Connect(addr, data) => {
let request: LayerRequest = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize request friend failure"))?;
match request {
LayerRequest::Connect(proof) => {
let friend = load_friend(&layer.base, &mgid, &fgid)?;
if friend.is_none() {
let data = postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]);
let msg = SendType::Result(0, addr, false, false, data);
results.layers.push((mgid, fgid, msg));
return Ok(results);
}
let f = friend.unwrap(); // safe.
// 0. get session. TODO
let sid = 0;
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
// 2. online this group.
layer.running_mut(&mgid)?.check_add_online(
fgid,
Online::Direct(addr),
sid,
f.id,
)?;
// 3. update remote addr. TODO
if f.addr != addr {
let db = chat_db(&layer.base, &mgid)?;
let _ = Friend::addr_update(&db, f.id, &addr);
drop(db);
}
// 4. online to UI. TODO
// 5. connected.
let msg = conn_res_message(&layer, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
LayerRequest::Friend(remote, remark) => {
let some_friend = load_friend(&layer.base, &mgid, &fgid)?;
if some_friend.is_none() {
// check if exist request.
let db = chat_db(&layer.base, &mgid)?;
if let Some(req) = Request::get(&db, &remote.id)? {
req.delete(&db)?; // delete the old request.
results.rpcs.push(rpc::request_delete(mgid, req.id));
}
let mut request = Request::new(
remote.id,
remote.addr,
remote.name.clone(),
remark.clone(),
false,
true,
);
// save to db.
request.insert(&db)?;
drop(db);
// save the avatar.
write_avatar_sync(&layer.base, &mgid, &request.gid, remote.avatar.clone())?;
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestCreate(false, remote, remark),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
results.rpcs.push(rpc::request_create(mgid, &request));
return Ok(results);
}
let mut friend = some_friend.unwrap(); // safe checked.
// already friendship & update.
// 0. get session. TODO
let sid = 0;
// 1. online this group.
layer.running_mut(&mgid)?.check_add_online(
fgid,
Online::Direct(addr),
sid,
friend.id,
)?;
// 2. update remote user.
friend.name = remote.name;
friend.addr = remote.addr;
let db = chat_db(&layer.base, &mgid)?;
friend.remote_update(&db)?;
drop(db);
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
// 3. online to UI.
// TODO UPDATE SESSION
results.rpcs.push(rpc::friend_info(mgid, &friend));
// 4. connected.
let msg = conn_agree_message(&mut layer, 0, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
}
}
RecvType::Leave(addr) => { RecvType::Leave(addr) => {
let group_pin = layer.group.clone();
let mut group_lock = group_pin.write().await;
for (mgid, running) in &mut layer.runnings { for (mgid, running) in &mut layer.runnings {
let peers = running.peer_leave(&addr); for sid in running.peer_leave(&addr) {
for (fgid, fid) in peers { results.rpcs.push(session_lost(*mgid, &sid));
results
.rpcs
.push(crate::apps::group_chat::rpc::group_offline(
*mgid, fid, &fgid,
));
group_lock.status(
&mgid,
StatusEvent::SessionFriendOffline(fgid),
&mut results,
)?;
} }
} }
} }
RecvType::Connect(addr, data) | RecvType::ResultConnect(addr, data) => {
// ESSE chat layer connect date structure.
if handle_connect(&mgid, &fgid, &addr, data, &mut layer, &mut results)? {
let msg = conn_res_message(&layer, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
} else {
let msg = SendType::Result(0, addr, false, false, vec![]);
results.layers.push((mgid, fgid, msg));
}
}
RecvType::Result(addr, is_ok, data) => { RecvType::Result(addr, is_ok, data) => {
// check to close. // ESSE chat layer result date structure.
if !is_ok { if is_ok {
if !handle_connect(&mgid, &fgid, &addr, data, &mut layer, &mut results)? {
let msg = SendType::Result(0, addr, false, false, vec![]);
results.layers.push((mgid, fgid, msg));
}
} else {
let db = chat_db(&layer.base, &mgid)?; let db = chat_db(&layer.base, &mgid)?;
if let Some(friend) = Friend::get_it(&db, &fgid)? { if let Some(friend) = Friend::get_it(&db, &fgid)? {
if friend.contains_addr(&addr) { if friend.contains_addr(&addr) {
@ -230,260 +93,6 @@ pub(crate) async fn handle(
friend.close(&db)?; friend.close(&db)?;
} }
} }
drop(db);
let response: LayerResponse = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match response {
LayerResponse::Reject => {
let db = chat_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &fgid)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(request.gid, false, vec![]),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = false;
request.update(&db)?;
results.rpcs.push(rpc::request_reject(mgid, request.id));
}
drop(db);
}
_ => {}
}
return Ok(results);
}
let response: LayerResponse = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match response {
LayerResponse::Connect(proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
// 2. check has this remove.
let some_friend = load_friend(&layer.base, &mgid, &fgid)?;
if some_friend.is_none() {
return Ok(results);
}
let fid = some_friend.unwrap().id; // safe.
// 0. get session. TODO
let sid = 0;
// 3. online this group.
layer.running_mut(&mgid)?.check_add_online(
fgid,
Online::Direct(addr),
sid,
fid,
)?;
// 4. update remote addr.
let db = chat_db(&layer.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?;
drop(db);
// 5. online to UI.
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
LayerResponse::Agree(remote, proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
if let Some(friend) = load_friend(&layer.base, &mgid, &fgid)? {
// 0. get session. TODO
let sid = 0;
// already friendship.
layer.running_mut(&mgid)?.check_add_online(
fgid,
Online::Direct(addr),
sid,
friend.id,
)?;
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
} else {
// agree request for friend.
let db = chat_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &remote.id)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(
request.gid,
true,
remote.avatar.clone(),
),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = true;
request.update(&db)?;
let request_id = request.id;
let friend = Friend::from_request(&db, request)?;
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
results
.rpcs
.push(rpc::request_agree(mgid, request_id, &friend));
// ADD NEW SESSION.
let s_db = session_db(&layer.base, &mgid)?;
let mut session = Session::new(
friend.id,
friend.gid,
friend.addr,
SessionType::Chat,
friend.name,
friend.datetime,
);
session.insert(&s_db)?;
results.rpcs.push(session_create(mgid, &session));
}
drop(db);
}
let data = postcard::to_allocvec(&LayerEvent::OnlinePing).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
results.layers.push((mgid, fgid, msg));
}
LayerResponse::Reject => {}
}
}
RecvType::ResultConnect(addr, data) => {
let response: LayerResponse = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match response {
LayerResponse::Connect(proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
// 2. check has this remove.
let some_friend = load_friend(&layer.base, &mgid, &fgid)?;
if some_friend.is_none() {
return Ok(results);
}
let fid = some_friend.unwrap().id; // safe.
// 0. get session. TODO
let sid = 0;
// 3. online this group.
layer.running_mut(&mgid)?.check_add_online(
fgid,
Online::Direct(addr),
sid,
fid,
)?;
// 4. update remote addr.
let db = chat_db(&layer.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?;
drop(db);
// 5. online to UI. TODO
// 6. connected.
let msg = conn_res_message(&layer, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
LayerResponse::Agree(remote, proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
if let Some(friend) = load_friend(&layer.base, &mgid, &fgid)? {
// 0. get session. TODO
let sid = 0;
// already friendship.
layer.running_mut(&mgid)?.check_add_online(
fgid,
Online::Direct(addr),
sid,
friend.id,
)?;
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
} else {
// agree request for friend.
let db = chat_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &remote.id)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(
request.gid,
true,
remote.avatar.clone(),
),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = true;
request.update(&db)?;
let request_id = request.id;
let friend = Friend::from_request(&db, request)?;
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
results
.rpcs
.push(rpc::request_agree(mgid, request_id, &friend));
// ADD NEW SESSION.
let s_db = session_db(&layer.base, &mgid)?;
let mut session = Session::new(
friend.id,
friend.gid,
friend.addr,
SessionType::Chat,
friend.name,
friend.datetime,
);
session.insert(&s_db)?;
results.rpcs.push(session_create(mgid, &session));
}
drop(db);
}
let msg = conn_res_message(&layer, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
}
LayerResponse::Reject => {
let db = chat_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &fgid)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(request.gid, false, vec![]),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = false;
request.update(&db)?;
results.rpcs.push(rpc::request_reject(mgid, request.id));
}
drop(db);
}
} }
} }
RecvType::Event(addr, bytes) => { RecvType::Event(addr, bytes) => {
@ -524,6 +133,45 @@ pub(crate) async fn handle(
Ok(results) Ok(results)
} }
fn handle_connect(
mgid: &GroupId,
fgid: &GroupId,
addr: &PeerAddr,
data: Vec<u8>,
layer: &mut Layer,
results: &mut HandleResult,
) -> Result<bool> {
// 0. deserialize connect data.
let proof: Proof = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize chat layer connect failure"))?;
// 1. check verify.
proof.verify(fgid, addr, &layer.addr)?;
// 2. check friendship.
let friend = update_friend(&layer.base, mgid, fgid, addr)?;
if friend.is_none() {
return Ok(false);
}
let fid = friend.unwrap().id; // safe.
// 3. get session.
let session_some = connect_session(&layer.base, mgid, &SessionType::Chat, &fid, addr)?;
if session_some.is_none() {
return Ok(false);
}
let sid = session_some.unwrap().id;
// 4. active this session.
layer
.running_mut(mgid)?
.check_add_online(*fgid, Online::Direct(*addr), sid, fid)?;
// 5. session online to UI.
results.rpcs.push(session_connect(*mgid, &sid, addr));
Ok(true)
}
impl LayerEvent { impl LayerEvent {
pub async fn handle( pub async fn handle(
fgid: GroupId, fgid: GroupId,
@ -534,25 +182,127 @@ impl LayerEvent {
) -> Result<HandleResult> { ) -> Result<HandleResult> {
let event: LayerEvent = let event: LayerEvent =
postcard::from_bytes(&bytes).map_err(|_| new_io_error("serialize event error."))?; postcard::from_bytes(&bytes).map_err(|_| new_io_error("serialize event error."))?;
let (sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
let mut results = HandleResult::new(); let mut results = HandleResult::new();
match event { match event {
LayerEvent::Offline(_) => { LayerEvent::Offline(_) => {
let (sid, _fid) = layer.get_running_remote_id(&mgid, &fgid)?;
layer.running_mut(&mgid)?.check_offline(&fgid, &addr); layer.running_mut(&mgid)?.check_offline(&fgid, &addr);
results.rpcs.push(session_lost(mgid, &sid)); results.rpcs.push(session_lost(mgid, &sid));
} }
LayerEvent::Suspend(_) => { LayerEvent::Suspend(_) => {
let (sid, _fid) = layer.get_running_remote_id(&mgid, &fgid)?;
if layer.running_mut(&mgid)?.suspend(&fgid, false)? { if layer.running_mut(&mgid)?.suspend(&fgid, false)? {
results.rpcs.push(session_suspend(mgid, &sid)); results.rpcs.push(session_suspend(mgid, &sid));
} }
} }
LayerEvent::Actived(_) => { LayerEvent::Actived(_) => {
let (sid, _fid) = layer.get_running_remote_id(&mgid, &fgid)?;
let _ = layer.running_mut(&mgid)?.active(&fgid, false); let _ = layer.running_mut(&mgid)?.active(&fgid, false);
results.rpcs.push(session_connect(mgid, &sid, &addr)); results.rpcs.push(session_connect(mgid, &sid, &addr));
} }
LayerEvent::Request(remote, remark) => {
if load_friend(&layer.base, &mgid, &fgid)?.is_none() {
// check if exist request.
let db = chat_db(&layer.base, &mgid)?;
if let Some(req) = Request::get(&db, &remote.id)? {
req.delete(&db)?; // delete the old request.
results.rpcs.push(rpc::request_delete(mgid, req.id));
}
let mut request = Request::new(
remote.id,
remote.addr,
remote.name.clone(),
remark.clone(),
false,
true,
);
// save to db.
request.insert(&db)?;
drop(db);
// save the avatar.
write_avatar_sync(&layer.base, &mgid, &request.gid, remote.avatar.clone())?;
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestCreate(false, remote, remark),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
results.rpcs.push(rpc::request_create(mgid, &request));
return Ok(results);
} else {
let msg = conn_agree_message(layer, 0, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
}
}
LayerEvent::Agree(remote, proof) => {
// 0. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
// 1. check friendship.
if load_friend(&layer.base, &mgid, &fgid)?.is_none() {
// agree request for friend.
let db = chat_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &remote.id)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(
request.gid,
true,
remote.avatar.clone(),
),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = true;
request.update(&db)?;
let request_id = request.id;
let friend = Friend::from_request(&db, request)?;
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
results
.rpcs
.push(rpc::request_agree(mgid, request_id, &friend));
// ADD NEW SESSION.
let s_db = session_db(&layer.base, &mgid)?;
let mut session = Session::new(
friend.id,
friend.gid,
friend.addr,
SessionType::Chat,
friend.name,
friend.datetime,
);
session.insert(&s_db)?;
results.rpcs.push(session_create(mgid, &session));
}
drop(db);
}
}
LayerEvent::Reject => {
let db = chat_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &fgid)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(request.gid, false, vec![]),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = false;
request.update(&db)?;
results.rpcs.push(rpc::request_reject(mgid, request.id));
}
drop(db);
}
LayerEvent::Message(hash, m) => { LayerEvent::Message(hash, m) => {
let (_sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
let db = chat_db(&layer.base, &mgid)?; let db = chat_db(&layer.base, &mgid)?;
if !Message::exist(&db, &hash)? { if !Message::exist(&db, &hash)? {
let msg = m.clone().handle(false, mgid, &layer.base, &db, fid, hash)?; let msg = m.clone().handle(false, mgid, &layer.base, &db, fid, hash)?;
@ -601,6 +351,7 @@ impl LayerEvent {
} }
} }
LayerEvent::Info(remote) => { LayerEvent::Info(remote) => {
let (_sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
let avatar = remote.avatar.clone(); let avatar = remote.avatar.clone();
let db = chat_db(&layer.base, &mgid)?; let db = chat_db(&layer.base, &mgid)?;
let mut f = Friend::get_id(&db, fid)?.ok_or(new_io_error(""))?; let mut f = Friend::get_id(&db, fid)?.ok_or(new_io_error(""))?;
@ -620,11 +371,7 @@ impl LayerEvent {
results.rpcs.push(rpc::friend_info(mgid, &f)); results.rpcs.push(rpc::friend_info(mgid, &f));
} }
LayerEvent::OnlinePing => { LayerEvent::OnlinePing => {
layer.group.write().await.status( let (sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
layer layer
.running_mut(&mgid)? .running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr), sid, fid)?; .check_add_online(fgid, Online::Direct(addr), sid, fid)?;
@ -634,16 +381,13 @@ impl LayerEvent {
results.layers.push((mgid, fgid, msg)); results.layers.push((mgid, fgid, msg));
} }
LayerEvent::OnlinePong => { LayerEvent::OnlinePong => {
layer.group.write().await.status( let (sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
layer layer
.running_mut(&mgid)? .running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr), sid, fid)?; .check_add_online(fgid, Online::Direct(addr), sid, fid)?;
} }
LayerEvent::Close => { LayerEvent::Close => {
let (_sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
layer.group.write().await.broadcast( layer.group.write().await.broadcast(
&mgid, &mgid,
InnerEvent::SessionFriendClose(fgid), InnerEvent::SessionFriendClose(fgid),
@ -747,13 +491,31 @@ fn load_friend(base: &PathBuf, mgid: &GroupId, fgid: &GroupId) -> Result<Option<
Friend::get(&db, fgid) Friend::get(&db, fgid)
} }
#[inline]
fn update_friend(
base: &PathBuf,
mgid: &GroupId,
fgid: &GroupId,
addr: &PeerAddr,
) -> Result<Option<Friend>> {
let db = chat_db(base, mgid)?;
if let Some(friend) = Friend::get(&db, fgid)? {
if &friend.addr != addr {
let _ = Friend::addr_update(&db, friend.id, addr);
}
Ok(Some(friend))
} else {
Ok(None)
}
}
pub(super) fn req_message(layer: &mut Layer, me: User, request: Request) -> SendType { pub(super) fn req_message(layer: &mut Layer, me: User, request: Request) -> SendType {
// update delivery. // update delivery.
let uid = layer.delivery.len() as u64 + 1; let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (me.id, request.id)); layer.delivery.insert(uid, (me.id, request.id));
let req = LayerRequest::Friend(me, request.remark); let req = LayerEvent::Request(me, request.remark);
let data = postcard::to_allocvec(&req).unwrap_or(vec![]); let data = postcard::to_allocvec(&req).unwrap_or(vec![]);
SendType::Connect(uid, request.addr, None, None, data) SendType::Event(uid, request.addr, data)
} }
pub(super) fn reject_message( pub(super) fn reject_message(
@ -762,10 +524,10 @@ pub(super) fn reject_message(
addr: PeerAddr, addr: PeerAddr,
me_id: GroupId, me_id: GroupId,
) -> SendType { ) -> SendType {
let data = postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]); let data = postcard::to_allocvec(&LayerEvent::Reject).unwrap_or(vec![]);
let uid = layer.delivery.len() as u64 + 1; let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (me_id, tid)); layer.delivery.insert(uid, (me_id, tid));
SendType::Result(uid, addr, false, false, data) SendType::Event(uid, addr, data)
} }
pub(super) fn event_message( pub(super) fn event_message(
@ -782,13 +544,13 @@ pub(super) fn event_message(
} }
pub(crate) fn chat_conn(proof: Proof, addr: PeerAddr) -> SendType { pub(crate) fn chat_conn(proof: Proof, addr: PeerAddr) -> SendType {
let data = postcard::to_allocvec(&LayerRequest::Connect(proof)).unwrap_or(vec![]); let data = postcard::to_allocvec(&proof).unwrap_or(vec![]);
SendType::Connect(0, addr, None, None, data) SendType::Connect(0, addr, None, None, data)
} }
async fn conn_res_message(layer: &Layer, mgid: &GroupId, addr: PeerAddr) -> Result<SendType> { async fn conn_res_message(layer: &Layer, mgid: &GroupId, addr: PeerAddr) -> Result<SendType> {
let proof = layer.group.read().await.prove_addr(mgid, &addr)?; let proof = layer.group.read().await.prove_addr(mgid, &addr)?;
let data = postcard::to_allocvec(&LayerResponse::Connect(proof)).unwrap_or(vec![]); let data = postcard::to_allocvec(&proof).unwrap_or(vec![]);
Ok(SendType::Result(0, addr, true, false, data)) Ok(SendType::Result(0, addr, true, false, data))
} }
@ -804,8 +566,8 @@ async fn conn_agree_message(
let proof = group_lock.prove_addr(mgid, &addr)?; let proof = group_lock.prove_addr(mgid, &addr)?;
let me = group_lock.clone_user(mgid)?; let me = group_lock.clone_user(mgid)?;
drop(group_lock); drop(group_lock);
let data = postcard::to_allocvec(&LayerResponse::Agree(me, proof)).unwrap_or(vec![]); let data = postcard::to_allocvec(&LayerEvent::Agree(me, proof)).unwrap_or(vec![]);
Ok(SendType::Result(uid, addr, true, false, data)) Ok(SendType::Event(uid, addr, data))
} }
pub(super) fn rpc_agree_message( pub(super) fn rpc_agree_message(
@ -818,11 +580,11 @@ pub(super) fn rpc_agree_message(
) -> Result<SendType> { ) -> Result<SendType> {
let uid = layer.delivery.len() as u64 + 1; let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (*mgid, tid)); layer.delivery.insert(uid, (*mgid, tid));
let data = postcard::to_allocvec(&LayerResponse::Agree(me, proof)).unwrap_or(vec![]); let data = postcard::to_allocvec(&LayerEvent::Agree(me, proof)).unwrap_or(vec![]);
Ok(SendType::Result(uid, addr, true, false, data)) Ok(SendType::Event(uid, addr, data))
} }
// maybe need if gid or addr in blocklist. // maybe need if gid or addr in blocklist.
fn res_reject() -> Vec<u8> { fn res_reject() -> Vec<u8> {
postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]) postcard::to_allocvec(&LayerEvent::Reject).unwrap_or(vec![])
} }

14
src/layer.rs

@ -7,7 +7,7 @@ use tdn::{
types::{ types::{
group::GroupId, group::GroupId,
message::SendType, message::SendType,
primitive::{new_io_error, HandleResult, PeerAddr, Result}, primitive::{new_io_error, PeerAddr, Result},
}, },
}; };
@ -351,17 +351,19 @@ impl RunningAccount {
} }
/// peer leave, remove online peer. /// peer leave, remove online peer.
pub fn peer_leave(&mut self, addr: &PeerAddr) -> Vec<(GroupId, i64)> { pub fn peer_leave(&mut self, addr: &PeerAddr) -> Vec<i64> {
let mut peers = vec![]; let mut peers = vec![];
let mut deletes = vec![];
for (fgid, online) in &self.sessions { for (fgid, online) in &self.sessions {
if online.online.addr() == addr { if online.online.addr() == addr {
peers.push((*fgid, online.db_id)) peers.push(online.db_id);
deletes.push(*fgid);
} }
} }
for i in &deletes {
for i in &peers { self.sessions.remove(&i);
self.sessions.remove(&i.0);
} }
peers peers
} }

2
src/rpc.rs

@ -91,7 +91,7 @@ pub(crate) fn session_last(
} }
#[inline] #[inline]
pub(crate) fn session_update( pub(crate) fn _session_update(
mgid: GroupId, mgid: GroupId,
id: &i64, id: &i64,
addr: &PeerAddr, addr: &PeerAddr,

33
src/session.rs

@ -1,3 +1,4 @@
use std::path::PathBuf;
use tdn::types::{ use tdn::types::{
group::GroupId, group::GroupId,
primitive::{new_io_error, PeerAddr, Result}, primitive::{new_io_error, PeerAddr, Result},
@ -5,6 +6,8 @@ use tdn::types::{
}; };
use tdn_storage::local::{DStorage, DsValue}; use tdn_storage::local::{DStorage, DsValue};
use crate::storage::session_db;
pub(crate) enum SessionType { pub(crate) enum SessionType {
Chat, Chat,
Group, Group,
@ -43,7 +46,7 @@ impl SessionType {
} }
pub(crate) struct Session { pub(crate) struct Session {
id: i64, pub id: i64,
fid: i64, fid: i64,
pub gid: GroupId, pub gid: GroupId,
pub addr: PeerAddr, pub addr: PeerAddr,
@ -184,3 +187,31 @@ impl Session {
)) ))
} }
} }
#[inline]
pub(crate) fn connect_session(
base: &PathBuf,
mgid: &GroupId,
s_type: &SessionType,
fid: &i64,
addr: &PeerAddr,
) -> Result<Option<Session>> {
let db = session_db(base, mgid)?;
let sql = format!("SELECT id, fid, gid, addr, s_type, name, is_top, is_close, last_datetime, last_content, last_readed FROM sessions WHERE s_type = {} AND fid = {}", s_type.to_int(), fid);
let mut matrix = db.query(&sql)?;
if matrix.len() > 0 {
let session = Session::from_values(matrix.pop().unwrap()); // safe unwrap()
let _ = db.update(&format!(
"UPDATE sessions SET addr = '{}' WHERE id = {}",
addr.to_hex(),
session.id,
));
Ok(Some(session))
} else {
Ok(None)
}
}

Loading…
Cancel
Save