Browse Source

move friend chat to apps

pull/18/head
Sun 4 years ago
parent
commit
ce29814657
  1. 4
      Cargo.toml
  2. BIN
      assets/logo/logo_assistant.png
  3. BIN
      assets/logo/logo_files.png
  4. BIN
      assets/logo/logo_group_chat.png
  5. 2
      lib/apps/service/models.dart
  6. 1
      pubspec.yaml
  7. 11
      src/apps.rs
  8. 672
      src/apps/chat/layer.rs
  9. 4
      src/apps/chat/mod.rs
  10. 17
      src/apps/chat/rpc.rs
  11. 21
      src/apps/group_chat/layer.rs
  12. 5
      src/apps/group_chat/mod.rs
  13. 6
      src/apps/group_chat/rpc.rs
  14. 3
      src/event.rs
  15. 675
      src/layer.rs
  16. 5
      src/rpc.rs

4
Cargo.toml

@ -37,8 +37,8 @@ sysinfo = "0.16" @@ -37,8 +37,8 @@ sysinfo = "0.16"
tdn = { git = "https://github.com/cypherlink/TDN", branch="main", default-features = false, features = ["full"] }
tdn-did = { git = "https://github.com/cypherlink/tdn-did", branch="main" }
tdn-storage = { git = "https://github.com/cypherlink/tdn-storage", branch="main" }
group-chat-types = { git = "https://github.com/cympletech/group-chat", branch="main" }
# group-chat-types = { path = "../group-chat/types" }
# group-chat-types = { git = "https://github.com/cympletech/group-chat", branch="main" }
group-chat-types = { path = "../group-chat/types" }
[target.'cfg(target_os="android")'.dependencies]
jni = { version = "0.19", default-features = false }

BIN
assets/logo/logo_assistant.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.7 KiB

After

Width:  |  Height:  |  Size: 2.8 KiB

BIN
assets/logo/logo_files.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.7 KiB

After

Width:  |  Height:  |  Size: 2.4 KiB

BIN
assets/logo/logo_group_chat.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 KiB

2
lib/apps/service/models.dart

@ -23,7 +23,7 @@ extension InnerServiceExtension on InnerService { @@ -23,7 +23,7 @@ extension InnerServiceExtension on InnerService {
case InnerService.Assistant:
return [lang.assistant, lang.assistantBio, 'assets/logo/logo_assistant.png'];
case InnerService.GroupChat:
return [lang.groupChat, lang.groupChatBio, 'assets/logo/logo_assistant.png'];
return [lang.groupChat, lang.groupChatBio, 'assets/logo/logo_group_chat.png'];
}
}

1
pubspec.yaml

@ -72,6 +72,7 @@ flutter: @@ -72,6 +72,7 @@ flutter:
- assets/logo/logo_40.jpg
- assets/logo/logo_assistant.png
- assets/logo/logo_files.png
- assets/logo/logo_group_chat.png
- assets/images/background_light.jpg
- assets/images/background_dark.jpg
- assets/images/image_missing.png

11
src/apps.rs

@ -1,10 +1,11 @@ @@ -1,10 +1,11 @@
use tdn::types::{
group::GroupId,
message::RecvType,
primitive::{HandleResult, PeerAddr, Result},
primitive::{HandleResult, Result},
rpc::RpcHandler,
};
use crate::layer::Layer;
use crate::rpc::RpcState;
pub(crate) mod assistant;
@ -23,17 +24,15 @@ pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<RpcState>) { @@ -23,17 +24,15 @@ pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<RpcState>) {
group_chat::new_rpc_handler(handler);
}
pub(crate) fn app_layer_handle(
pub(crate) async fn app_layer_handle(
layer: &mut Layer,
fgid: GroupId,
mgid: GroupId,
msg: RecvType,
) -> Result<HandleResult> {
match fgid {
group_chat::GROUP_ID => group_chat::layer_handle(mgid, msg),
_ => {
// todo!()
Ok(HandleResult::new())
}
_ => chat::layer_handle(layer, fgid, mgid, msg).await,
}
}

672
src/apps/chat/layer.rs

@ -0,0 +1,672 @@ @@ -0,0 +1,672 @@
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use tdn::types::{
group::{EventId, GroupId},
message::{RecvType, SendType},
primitive::{new_io_error, DeliveryType, HandleResult, PeerAddr, Result},
};
use tdn_did::{user::User, Proof};
use crate::event::{InnerEvent, StatusEvent};
use crate::layer::running::Online;
use crate::layer::Layer;
use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH};
use crate::storage::{
read_avatar, read_file, read_record, session_db, write_avatar_sync, write_file, write_image,
};
use super::models::{Friend, Message, MessageType, NetworkMessage, Request};
use super::rpc;
/// Layer Request for stable connected.
/// Params: User, remote_id, remark.
/// this user if already friend, only has gid.
#[derive(Serialize, Deserialize)]
enum LayerRequest {
/// Requst for connect, had friendship.
/// params: signature with PeerAddr.
Connect(Proof),
/// Requst for make friendship.
/// Params: remote_user, me_id, remark.
Friend(User, String),
}
/// Layer Response for stable connected.
#[derive(Serialize, Deserialize)]
pub(crate) enum LayerResponse {
/// Connected with stable, had friendship.
Connect(Proof),
/// Agree a friend request.
/// Params: User, remote_id.
/// this user if already, only has gid.
Agree(User, Proof),
/// Reject a friend request.
/// Params: me_id, remote_id.
Reject,
// TODO service connected info.
//Service,
}
/// Esse app's Event.
#[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent {
/// receiver gid, sender gid, message.
Message(EventId, NetworkMessage),
/// receiver gid, sender user.
Info(User),
/// receiver gid, sender gid.
OnlinePing,
/// receiver gid, sender gid.
OnlinePong,
/// receiver gid, sender gid.
Offline,
/// close friendship.
Close,
}
pub(crate) async fn handle(
layer: &mut Layer,
fgid: GroupId,
mgid: GroupId,
msg: RecvType,
) -> Result<HandleResult> {
let mut results = HandleResult::new();
match msg {
RecvType::Connect(addr, data) => {
let request: LayerRequest = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize request friend failure"))?;
match request {
LayerRequest::Connect(proof) => {
let fid = layer.get_remote_id(&mgid, &fgid)?;
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
// 2. online this group.
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
// 3. update remote addr. TODO
let db = session_db(&layer.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?;
drop(db);
// 4. online to UI.
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
// 5. connected.
let msg = conn_res_message(layer, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
LayerRequest::Friend(remote, remark) => {
let some_fid = layer.get_remote_id(&mgid, &fgid);
if some_fid.is_err() {
// check if exist request.
let db = session_db(&layer.base, &mgid)?;
if let Some(req) = Request::get(&db, &remote.id)? {
req.delete(&db)?; // delete the old request.
results.rpcs.push(rpc::request_delete(mgid, req.id));
}
let mut request = Request::new(
remote.id,
remote.addr,
remote.name.clone(),
remark.clone(),
false,
true,
);
// save to db.
request.insert(&db)?;
drop(db);
// save the avatar.
write_avatar_sync(&layer.base, &mgid, &request.gid, remote.avatar.clone())?;
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestCreate(false, remote, remark),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
results.rpcs.push(rpc::request_create(mgid, &request));
return Ok(results);
}
let fid = some_fid.unwrap(); // safe checked.
// already friendship & update.
// 1. online this group.
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
// 2. update remote user.
let mut friend = layer.update_friend(&mgid, fid, remote)?;
// 3. online to UI.
friend.online = true;
results.rpcs.push(rpc::friend_info(mgid, &friend));
// 4. connected.
let msg = conn_agree_message(layer, 0, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
}
}
RecvType::Leave(addr) => {
for (mgid, running) in &mut layer.runnings {
let peers = running.peer_leave(&addr);
for (fgid, fid) in peers {
results.rpcs.push(rpc::friend_offline(*mgid, fid));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOffline(fgid),
&mut results,
)?;
}
}
}
RecvType::Result(addr, is_ok, data) => {
// check to close.
if !is_ok {
let db = session_db(&layer.base, &mgid)?;
if let Some(friend) = Friend::get_it(&db, &fgid)? {
if friend.contains_addr(&addr) {
results.rpcs.push(rpc::friend_close(mgid, friend.id));
friend.close(&db)?;
}
}
drop(db);
let response: LayerResponse = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match response {
LayerResponse::Reject => {
let db = session_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &fgid)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(request.gid, false, vec![]),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = false;
request.update(&db)?;
results.rpcs.push(rpc::request_reject(mgid, request.id));
}
drop(db);
}
_ => {}
}
return Ok(results);
}
let response: LayerResponse = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match response {
LayerResponse::Connect(proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
// 2. check has this remove.
let fid = layer.get_remote_id(&mgid, &fgid)?;
// 3. online this group.
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
// 4. update remote addr.
let db = session_db(&layer.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?;
drop(db);
// 5. online to UI.
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
LayerResponse::Agree(remote, proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
if let Ok(fid) = layer.get_remote_id(&mgid, &fgid) {
// already friendship.
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
} else {
// agree request for friend.
let db = session_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &remote.id)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(
request.gid,
true,
remote.avatar.clone(),
),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = true;
request.update(&db)?;
let request_id = request.id;
let friend = Friend::from_request(&db, request)?;
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
layer.running_mut(&mgid)?.add_permissioned(fgid, friend.id);
results
.rpcs
.push(rpc::request_agree(mgid, request_id, &friend));
}
drop(db);
}
let data = postcard::to_allocvec(&LayerEvent::OnlinePing).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
results.layers.push((mgid, fgid, msg));
}
LayerResponse::Reject => {}
}
}
RecvType::ResultConnect(addr, data) => {
let response: LayerResponse = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match response {
LayerResponse::Connect(proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
// 2. check has this remove.
let fid = layer.get_remote_id(&mgid, &fgid)?;
// 3. online this group.
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
// 4. update remote addr.
let db = session_db(&layer.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?;
drop(db);
// 5. online to UI.
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
// 6. connected.
let msg = conn_res_message(layer, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
LayerResponse::Agree(remote, proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
if let Ok(fid) = layer.get_remote_id(&mgid, &fgid) {
// already friendship.
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
} else {
// agree request for friend.
let db = session_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &remote.id)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(
request.gid,
true,
remote.avatar.clone(),
),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = true;
request.update(&db)?;
let request_id = request.id;
let friend = Friend::from_request(&db, request)?;
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
layer.running_mut(&mgid)?.add_permissioned(fgid, friend.id);
results
.rpcs
.push(rpc::request_agree(mgid, request_id, &friend));
}
drop(db);
}
let msg = conn_res_message(layer, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
}
LayerResponse::Reject => {
let db = session_db(&layer.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &fgid)? {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(request.gid, false, vec![]),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = false;
request.update(&db)?;
results.rpcs.push(rpc::request_reject(mgid, request.id));
}
drop(db);
}
}
}
RecvType::Event(addr, bytes) => {
return LayerEvent::handle(fgid, mgid, layer, addr, bytes).await;
}
RecvType::Stream(_uid, _stream, _bytes) => {
// TODO stream
}
RecvType::Delivery(t, tid, is_ok) => {
println!("delivery: tid: {}, is_ok: {}", tid, is_ok);
// TODO maybe send failure need handle.
if is_ok {
if let Some((gid, db_id)) = layer.delivery.remove(&tid) {
let db = session_db(&layer.base, &mgid)?;
let resp = match t {
DeliveryType::Event => {
Message::delivery(&db, db_id, true)?;
rpc::message_delivery(gid, db_id, true)
}
DeliveryType::Connect => {
// request.
Request::delivery(&db, db_id, true)?;
rpc::request_delivery(gid, db_id, true)
}
DeliveryType::Result => {
// response. TODO better for it.
Request::delivery(&db, db_id, true)?;
rpc::request_delivery(gid, db_id, true)
}
};
drop(db);
results.rpcs.push(resp);
}
}
}
}
Ok(results)
}
impl LayerEvent {
pub async fn handle(
fgid: GroupId,
mgid: GroupId,
layer: &mut Layer,
addr: PeerAddr,
bytes: Vec<u8>,
) -> Result<HandleResult> {
let event: LayerEvent =
postcard::from_bytes(&bytes).map_err(|_| new_io_error("serialize event error."))?;
let fid = layer.get_remote_id(&mgid, &fgid)?;
let mut results = HandleResult::new();
match event {
LayerEvent::Message(hash, m) => {
let db = session_db(&layer.base, &mgid)?;
if !Message::exist(&db, &hash)? {
let msg = m.clone().handle(false, mgid, &layer.base, &db, fid, hash)?;
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionMessageCreate(fgid, false, hash, m),
MESSAGE_TABLE_PATH,
msg.id,
&mut results,
)?;
results.rpcs.push(rpc::message_create(mgid, &msg));
}
}
LayerEvent::Info(remote) => {
let avatar = remote.avatar.clone();
let f = layer.update_friend(&mgid, fid, remote)?;
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionFriendInfo(f.gid, f.addr, f.name.clone(), avatar),
FRIEND_TABLE_PATH,
f.id,
&mut results,
)?;
results.rpcs.push(rpc::friend_info(mgid, &f));
}
LayerEvent::OnlinePing => {
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
let data = postcard::to_allocvec(&LayerEvent::OnlinePong).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
results.layers.push((mgid, fgid, msg));
}
LayerEvent::OnlinePong => {
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
}
LayerEvent::Offline => {
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOffline(fgid),
&mut results,
)?;
layer.running_mut(&mgid)?.check_offline(&fgid, &addr);
results.rpcs.push(rpc::friend_offline(mgid, fid));
}
LayerEvent::Close => {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionFriendClose(fgid),
FRIEND_TABLE_PATH,
fid,
&mut results,
)?;
layer.remove_friend(&mgid, &fgid);
let db = session_db(&layer.base, &mgid)?;
Friend::id_close(&db, fid)?;
drop(db);
results.rpcs.push(rpc::friend_close(mgid, fid));
if !layer.is_online(&addr) {
results
.layers
.push((mgid, fgid, SendType::Disconnect(addr)))
}
}
}
Ok(results)
}
pub async fn from_message(
base: &PathBuf,
mgid: GroupId,
fid: i64,
m_type: MessageType,
content: String,
) -> std::result::Result<(Message, NetworkMessage), tdn::types::rpc::RpcError> {
let db = session_db(&base, &mgid)?;
// handle message's type.
let (nm_type, raw) = match m_type {
MessageType::String => (NetworkMessage::String(content.clone()), content),
MessageType::Image => {
let bytes = read_file(&PathBuf::from(content)).await?;
let image_name = write_image(base, &mgid, &bytes).await?;
(NetworkMessage::Image(bytes), image_name)
}
MessageType::File => {
let file_path = PathBuf::from(content);
let bytes = read_file(&file_path).await?;
let old_name = file_path.file_name()?.to_str()?;
let filename = write_file(base, &mgid, old_name, &bytes).await?;
(NetworkMessage::File(filename.clone(), bytes), filename)
}
MessageType::Contact => {
let cid: i64 = content.parse().map_err(|_e| new_io_error("id error"))?;
let contact = Friend::get_id(&db, cid)??;
let avatar_bytes = read_avatar(base, &mgid, &contact.gid).await?;
let tmp_name = contact.name.replace(";", "-;");
let contact_values = format!(
"{};;{};;{}",
tmp_name,
contact.gid.to_hex(),
contact.addr.to_hex()
);
(
NetworkMessage::Contact(contact.name, contact.gid, contact.addr, avatar_bytes),
contact_values,
)
}
MessageType::Record => {
let (bytes, time) = if let Some(i) = content.find('-') {
let time = content[0..i].parse().unwrap_or(0);
let bytes = read_record(base, &mgid, &content[i + 1..]).await?;
(bytes, time)
} else {
(vec![], 0)
};
(NetworkMessage::Record(bytes, time), content)
}
MessageType::Emoji => {
// TODO
(NetworkMessage::Emoji, content)
}
MessageType::Phone => {
// TODO
(NetworkMessage::Phone, content)
}
MessageType::Video => {
// TODO
(NetworkMessage::Video, content)
}
};
let mut msg = Message::new(&mgid, fid, true, m_type, raw, false);
msg.insert(&db)?;
Friend::update_last_message(&db, fid, &msg, true)?;
drop(db);
Ok((msg, nm_type))
}
}
pub(super) fn req_message(layer: &mut Layer, me: User, request: Request) -> SendType {
// update delivery.
let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (me.id, request.id));
let req = LayerRequest::Friend(me, request.remark);
let data = postcard::to_allocvec(&req).unwrap_or(vec![]);
SendType::Connect(uid, request.addr, None, None, data)
}
pub(super) fn reject_message(
layer: &mut Layer,
tid: i64,
addr: PeerAddr,
me_id: GroupId,
) -> SendType {
let data = postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]);
let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (me_id, tid));
SendType::Result(uid, addr, false, false, data)
}
pub(super) fn event_message(
layer: &mut Layer,
tid: i64,
me_id: GroupId,
addr: PeerAddr,
event: &LayerEvent,
) -> SendType {
let data = postcard::to_allocvec(event).unwrap_or(vec![]);
let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (me_id, tid));
SendType::Event(uid, addr, data)
}
pub(crate) async fn conn_req_message(
layer: &Layer,
mgid: &GroupId,
addr: PeerAddr,
) -> Result<SendType> {
let proof = layer.group.read().await.prove_addr(mgid, &addr)?;
let data = postcard::to_allocvec(&LayerRequest::Connect(proof)).unwrap_or(vec![]);
Ok(SendType::Connect(0, addr, None, None, data))
}
async fn conn_res_message(layer: &Layer, mgid: &GroupId, addr: PeerAddr) -> Result<SendType> {
let proof = layer.group.read().await.prove_addr(mgid, &addr)?;
let data = postcard::to_allocvec(&LayerResponse::Connect(proof)).unwrap_or(vec![]);
Ok(SendType::Result(0, addr, true, false, data))
}
async fn conn_agree_message(
layer: &mut Layer,
tid: i64,
mgid: &GroupId,
addr: PeerAddr,
) -> Result<SendType> {
let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (*mgid, tid));
let group_lock = layer.group.read().await;
let proof = group_lock.prove_addr(mgid, &addr)?;
let me = group_lock.clone_user(mgid)?;
drop(group_lock);
let data = postcard::to_allocvec(&LayerResponse::Agree(me, proof)).unwrap_or(vec![]);
Ok(SendType::Result(uid, addr, true, false, data))
}
pub(super) fn rpc_agree_message(
layer: &mut Layer,
tid: i64,
proof: Proof,
me: User,
mgid: &GroupId,
addr: PeerAddr,
) -> Result<SendType> {
let uid = layer.delivery.len() as u64 + 1;
layer.delivery.insert(uid, (*mgid, tid));
let data = postcard::to_allocvec(&LayerResponse::Agree(me, proof)).unwrap_or(vec![]);
Ok(SendType::Result(uid, addr, true, false, data))
}
// maybe need if gid or addr in blocklist.
fn _res_reject() -> Vec<u8> {
postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![])
}

4
src/apps/chat/mod.rs

@ -1,5 +1,9 @@ @@ -1,5 +1,9 @@
mod layer;
mod models;
pub(crate) mod rpc;
pub(crate) use layer::conn_req_message;
pub(crate) use layer::handle as layer_handle;
pub(crate) use layer::LayerEvent;
pub(crate) use models::{Friend, Message, MessageType, NetworkMessage, Request};
pub(crate) use rpc::new_rpc_handler;

17
src/apps/chat/rpc.rs

@ -9,11 +9,11 @@ use tdn::types::{ @@ -9,11 +9,11 @@ use tdn::types::{
use tdn_did::user::User;
use crate::event::InnerEvent;
use crate::layer::LayerEvent;
use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH};
use crate::rpc::{sleep_waiting_close_stable, RpcState};
use crate::storage::{delete_avatar, session_db};
use super::layer::LayerEvent;
use super::{Friend, Message, MessageType, Request};
#[inline]
@ -310,9 +310,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -310,9 +310,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
&mut results,
)?;
results
.layers
.push((gid, remote_gid, layer_lock.req_message(me, request)));
results.layers.push((
gid,
remote_gid,
super::layer::req_message(&mut layer_lock, me, request),
));
drop(layer_lock);
@ -348,7 +350,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -348,7 +350,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
results.rpcs.push(json!([id, f.to_rpc()]));
let proof = group_lock.prove_addr(&gid, &f.addr)?;
let msg = layer_lock.rpc_agree_message(id, proof, me, &gid, f.addr)?;
let msg =
super::layer::rpc_agree_message(&mut layer_lock, id, proof, me, &gid, f.addr)?;
results.layers.push((gid, f.gid, msg));
}
db.close()?;
@ -370,7 +373,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -370,7 +373,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
req.is_over = true;
req.update(&db)?;
drop(db);
let msg = layer_lock.reject_message(id, req.addr, gid);
let msg = super::layer::reject_message(&mut layer_lock, id, req.addr, gid);
drop(layer_lock);
let mut results = HandleResult::layer(gid, req.gid, msg);
@ -445,7 +448,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -445,7 +448,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let (msg, nw) = LayerEvent::from_message(base, gid, fid, m_type, content).await?;
let event = LayerEvent::Message(msg.hash, nw);
let s = layer_lock.event_message(msg.id, gid, faddr, &event);
let s = super::layer::event_message(&mut layer_lock, msg.id, gid, faddr, &event);
drop(layer_lock);
let mut results = HandleResult::rpc(json!(msg.to_rpc()));

21
src/apps/group_chat/layer.rs

@ -1,22 +1,23 @@ @@ -1,22 +1,23 @@
use tdn::types::{
group::GroupId,
message::{RecvType, SendType},
message::RecvType,
primitive::{new_io_error, HandleResult, Result},
};
use group_chat_types::{Event, GroupConnect, GroupEvent, GroupInfo, GroupResult, GroupType};
use group_chat_types::GroupResult;
//use group_chat_types::{Event, GroupConnect, GroupEvent, GroupInfo, GroupResult, GroupType};
pub(crate) fn handle(mgid: GroupId, msg: RecvType) -> Result<HandleResult> {
let mut results = HandleResult::new();
pub(crate) fn handle(_mgid: GroupId, msg: RecvType) -> Result<HandleResult> {
let results = HandleResult::new();
match msg {
RecvType::Connect(addr, data) => {
RecvType::Connect(_addr, _data) => {
// None.
}
RecvType::Leave(addr) => {
RecvType::Leave(_addr) => {
//
}
RecvType::Result(addr, is_ok, data) => {
RecvType::Result(_addr, _is_ok, data) => {
let res: GroupResult = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match res {
@ -28,17 +29,17 @@ pub(crate) fn handle(mgid: GroupId, msg: RecvType) -> Result<HandleResult> { @@ -28,17 +29,17 @@ pub(crate) fn handle(mgid: GroupId, msg: RecvType) -> Result<HandleResult> {
}
}
}
RecvType::ResultConnect(addr, data) => {
RecvType::ResultConnect(_addr, data) => {
let _res: GroupResult = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
}
RecvType::Event(addr, bytes) => {
RecvType::Event(_addr, _bytes) => {
//
}
RecvType::Stream(_uid, _stream, _bytes) => {
// TODO stream
}
RecvType::Delivery(t, tid, is_ok) => {
RecvType::Delivery(_t, _tid, _is_ok) => {
//
}
}

5
src/apps/group_chat/mod.rs

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
mod layer;
mod models;
pub use group_chat_types::GROUP_CHAT_ID as GROUP_ID;
@ -10,7 +11,5 @@ pub(super) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) @@ -10,7 +11,5 @@ pub(super) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType)
}
pub(crate) mod rpc;
pub(crate) use rpc::new_rpc_handler;
mod layer;
pub(crate) use layer::handle as layer_handle;
pub(crate) use rpc::new_rpc_handler;

6
src/apps/group_chat/rpc.rs

@ -1,12 +1,14 @@ @@ -1,12 +1,14 @@
use group_chat_types::{Event, GroupConnect, GroupEvent, GroupInfo, GroupResult, GroupType};
use std::sync::Arc;
use tdn::types::{
group::GroupId,
message::SendType,
primitive::{new_io_error, HandleResult, PeerAddr},
rpc::{json, rpc_response, RpcHandler, RpcParam},
rpc::{json, RpcHandler, RpcParam},
};
use group_chat_types::GroupConnect;
//use group_chat_types::{Event, GroupConnect, GroupEvent, GroupInfo, GroupResult, GroupType};
//use crate::group::GroupEvent;
use super::add_layer;
use crate::rpc::RpcState;

3
src/event.rs

@ -13,10 +13,11 @@ use tdn_did::user::User; @@ -13,10 +13,11 @@ use tdn_did::user::User;
use tdn_storage::local::DStorage;
use crate::account::Account;
use crate::apps::chat::LayerEvent;
use crate::consensus::Event;
use crate::group::{Group, GroupEvent};
use crate::layer::running::Online;
use crate::layer::{Layer, LayerEvent};
use crate::layer::Layer;
use crate::migrate::consensus::{
ACCOUNT_TABLE_PATH, FILE_TABLE_PATH, FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH,
};

675
src/layer.rs

@ -1,72 +1,37 @@ @@ -1,72 +1,37 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tdn::{
smol::lock::RwLock,
types::{
group::{EventId, GroupId},
group::GroupId,
message::{RecvType, SendType},
primitive::{new_io_error, DeliveryType, HandleResult, PeerAddr, Result},
primitive::{new_io_error, HandleResult, PeerAddr, Result},
},
};
use tdn_did::{user::User, Proof};
use crate::apps::chat::{Friend, Message, MessageType, NetworkMessage, Request};
use crate::event::{InnerEvent, StatusEvent};
use crate::group::Group;
use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH};
use tdn_did::user::User;
use crate::apps::app_layer_handle;
use crate::apps::chat::rpc;
use crate::storage::{
read_avatar, read_file, read_record, session_db, write_avatar_sync, write_file, write_image,
};
use crate::apps::chat::conn_req_message;
use crate::apps::chat::Friend;
use crate::group::Group;
use crate::storage::{session_db, write_avatar_sync};
pub mod running;
use running::{Online, RunningAccount};
use running::RunningAccount;
/// Esse layers.
/// ESSE layers.
pub(crate) struct Layer {
/// account_gid => running_account.
runnings: HashMap<GroupId, RunningAccount>,
pub runnings: HashMap<GroupId, RunningAccount>,
/// message delivery tracking. uuid, me_gid, db_id.
delivery: HashMap<u64, (GroupId, i64)>,
pub delivery: HashMap<u64, (GroupId, i64)>,
/// storage base path.
base: PathBuf,
pub base: PathBuf,
/// self peer addr.
addr: PeerAddr,
pub addr: PeerAddr,
/// group info.
group: Arc<RwLock<Group>>,
}
/// Layer Request for stable connected.
/// Params: User, remote_id, remark.
/// this user if already friend, only has gid.
#[derive(Serialize, Deserialize)]
enum LayerRequest {
/// Requst for connect, had friendship.
/// params: signature with PeerAddr.
Connect(Proof),
/// Requst for make friendship.
/// Params: remote_user, me_id, remark.
Friend(User, String),
}
/// Layer Response for stable connected.
#[derive(Serialize, Deserialize)]
pub(crate) enum LayerResponse {
/// Connected with stable, had friendship.
Connect(Proof),
/// Agree a friend request.
/// Params: User, remote_id.
/// this user if already, only has gid.
Agree(User, Proof),
/// Reject a friend request.
/// Params: me_id, remote_id.
Reject,
// TODO service connected info.
//Service,
pub group: Arc<RwLock<Group>>,
}
impl Layer {
@ -81,351 +46,7 @@ impl Layer { @@ -81,351 +46,7 @@ impl Layer {
return Err(new_io_error("running account not found."));
}
// TODO app_layer_handle(fgid, mgid, msg)
// 2. handle receive message by type.
let mut results = HandleResult::new();
match msg {
RecvType::Connect(addr, data) => {
let request: LayerRequest = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize request friend failure"))?;
match request {
LayerRequest::Connect(proof) => {
let fid = self.get_remote_id(&mgid, &fgid)?;
// 1. check verify.
proof.verify(&fgid, &addr, &self.addr)?;
// 2. online this group.
self.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
// 3. update remote addr. TODO
let db = session_db(&self.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?;
drop(db);
// 4. online to UI.
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
// 5. connected.
let msg = self.conn_res_message(&mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
self.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
LayerRequest::Friend(remote, remark) => {
let some_fid = self.get_remote_id(&mgid, &fgid);
if some_fid.is_err() {
// check if exist request.
let db = session_db(&self.base, &mgid)?;
if let Some(req) = Request::get(&db, &remote.id)? {
req.delete(&db)?; // delete the old request.
results.rpcs.push(rpc::request_delete(mgid, req.id));
}
let mut request = Request::new(
remote.id,
remote.addr,
remote.name.clone(),
remark.clone(),
false,
true,
);
// save to db.
request.insert(&db)?;
drop(db);
// save the avatar.
write_avatar_sync(
&self.base,
&mgid,
&request.gid,
remote.avatar.clone(),
)?;
self.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestCreate(false, remote, remark),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
results.rpcs.push(rpc::request_create(mgid, &request));
return Ok(results);
}
let fid = some_fid.unwrap(); // safe checked.
// already friendship & update.
// 1. online this group.
self.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
// 2. update remote user.
let mut friend = self.update_friend(&mgid, fid, remote)?;
// 3. online to UI.
friend.online = true;
results.rpcs.push(rpc::friend_info(mgid, &friend));
// 4. connected.
let msg = self.conn_agree_message(0, &mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
self.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
}
}
RecvType::Leave(addr) => {
for (mgid, running) in &mut self.runnings {
let peers = running.peer_leave(&addr);
for (fgid, fid) in peers {
results.rpcs.push(rpc::friend_offline(*mgid, fid));
self.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOffline(fgid),
&mut results,
)?;
}
}
}
RecvType::Result(addr, is_ok, data) => {
// check to close.
if !is_ok {
let db = session_db(&self.base, &mgid)?;
if let Some(friend) = Friend::get_it(&db, &fgid)? {
if friend.contains_addr(&addr) {
results.rpcs.push(rpc::friend_close(mgid, friend.id));
friend.close(&db)?;
}
}
drop(db);
let response: LayerResponse = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match response {
LayerResponse::Reject => {
let db = session_db(&self.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &fgid)? {
self.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(request.gid, false, vec![]),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = false;
request.update(&db)?;
results.rpcs.push(rpc::request_reject(mgid, request.id));
}
drop(db);
}
_ => {}
}
return Ok(results);
}
let response: LayerResponse = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match response {
LayerResponse::Connect(proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &self.addr)?;
// 2. check has this remove.
let fid = self.get_remote_id(&mgid, &fgid)?;
// 3. online this group.
self.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
// 4. update remote addr.
let db = session_db(&self.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?;
drop(db);
// 5. online to UI.
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
self.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
LayerResponse::Agree(remote, proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &self.addr)?;
if let Ok(fid) = self.get_remote_id(&mgid, &fgid) {
// already friendship.
self.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
self.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
} else {
// agree request for friend.
let db = session_db(&self.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &remote.id)? {
self.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(
request.gid,
true,
remote.avatar.clone(),
),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = true;
request.update(&db)?;
let request_id = request.id;
let friend = Friend::from_request(&db, request)?;
write_avatar_sync(&self.base, &mgid, &remote.id, remote.avatar)?;
self.running_mut(&mgid)?.add_permissioned(fgid, friend.id);
results
.rpcs
.push(rpc::request_agree(mgid, request_id, &friend));
}
drop(db);
}
let data = postcard::to_allocvec(&LayerEvent::OnlinePing).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
results.layers.push((mgid, fgid, msg));
}
LayerResponse::Reject => {}
}
}
RecvType::ResultConnect(addr, data) => {
let response: LayerResponse = postcard::from_bytes(&data)
.map_err(|_e| new_io_error("Deseralize result failure"))?;
match response {
LayerResponse::Connect(proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &self.addr)?;
// 2. check has this remove.
let fid = self.get_remote_id(&mgid, &fgid)?;
// 3. online this group.
self.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
// 4. update remote addr.
let db = session_db(&self.base, &mgid)?;
Friend::addr_update(&db, fid, &addr)?;
drop(db);
// 5. online to UI.
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
// 6. connected.
let msg = self.conn_res_message(&mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
self.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
}
LayerResponse::Agree(remote, proof) => {
// 1. check verify.
proof.verify(&fgid, &addr, &self.addr)?;
if let Ok(fid) = self.get_remote_id(&mgid, &fgid) {
// already friendship.
self.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
self.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
} else {
// agree request for friend.
let db = session_db(&self.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &remote.id)? {
self.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(
request.gid,
true,
remote.avatar.clone(),
),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = true;
request.update(&db)?;
let request_id = request.id;
let friend = Friend::from_request(&db, request)?;
write_avatar_sync(&self.base, &mgid, &remote.id, remote.avatar)?;
self.running_mut(&mgid)?.add_permissioned(fgid, friend.id);
results
.rpcs
.push(rpc::request_agree(mgid, request_id, &friend));
}
drop(db);
}
let msg = self.conn_res_message(&mgid, addr).await?;
results.layers.push((mgid, fgid, msg));
}
LayerResponse::Reject => {
let db = session_db(&self.base, &mgid)?;
if let Some(mut request) = Request::get(&db, &fgid)? {
self.group.write().await.broadcast(
&mgid,
InnerEvent::SessionRequestHandle(request.gid, false, vec![]),
REQUEST_TABLE_PATH,
request.id,
&mut results,
)?;
request.is_over = true;
request.is_ok = false;
request.update(&db)?;
results.rpcs.push(rpc::request_reject(mgid, request.id));
}
drop(db);
}
}
}
RecvType::Event(addr, bytes) => {
return LayerEvent::handle(fgid, mgid, self, addr, bytes).await;
}
RecvType::Stream(_uid, _stream, _bytes) => {
// TODO stream
}
RecvType::Delivery(t, tid, is_ok) => {
println!("delivery: tid: {}, is_ok: {}", tid, is_ok);
// TODO maybe send failure need handle.
if is_ok {
if let Some((gid, db_id)) = self.delivery.remove(&tid) {
let db = session_db(&self.base, &mgid)?;
let resp = match t {
DeliveryType::Event => {
Message::delivery(&db, db_id, true)?;
rpc::message_delivery(gid, db_id, true)
}
DeliveryType::Connect => {
// request.
Request::delivery(&db, db_id, true)?;
rpc::request_delivery(gid, db_id, true)
}
DeliveryType::Result => {
// response. TODO better for it.
Request::delivery(&db, db_id, true)?;
rpc::request_delivery(gid, db_id, true)
}
};
drop(db);
results.rpcs.push(resp);
}
}
}
}
Ok(results)
app_layer_handle(self, fgid, mgid, msg).await
}
}
@ -549,7 +170,7 @@ impl Layer { @@ -549,7 +170,7 @@ impl Layer {
if let Ok(friends) = self.all_friends(mgid) {
let mut vecs = vec![];
for friend in friends {
if let Ok(msg) = self.conn_req_message(&friend.gid, friend.addr).await {
if let Ok(msg) = conn_req_message(self, &friend.gid, friend.addr).await {
vecs.push((friend.gid, msg));
}
}
@ -565,268 +186,4 @@ impl Layer { @@ -565,268 +186,4 @@ impl Layer {
}
return false;
}
pub fn req_message(&mut self, me: User, request: Request) -> SendType {
// update delivery.
let uid = self.delivery.len() as u64 + 1;
self.delivery.insert(uid, (me.id, request.id));
let req = LayerRequest::Friend(me, request.remark);
let data = postcard::to_allocvec(&req).unwrap_or(vec![]);
SendType::Connect(uid, request.addr, None, None, data)
}
pub fn reject_message(&mut self, tid: i64, addr: PeerAddr, me_id: GroupId) -> SendType {
let data = postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![]);
let uid = self.delivery.len() as u64 + 1;
self.delivery.insert(uid, (me_id, tid));
SendType::Result(uid, addr, false, false, data)
}
pub fn event_message(
&mut self,
tid: i64,
me_id: GroupId,
addr: PeerAddr,
event: &LayerEvent,
) -> SendType {
let data = postcard::to_allocvec(event).unwrap_or(vec![]);
let uid = self.delivery.len() as u64 + 1;
self.delivery.insert(uid, (me_id, tid));
SendType::Event(uid, addr, data)
}
pub async fn conn_req_message(&self, mgid: &GroupId, addr: PeerAddr) -> Result<SendType> {
let proof = self.group.read().await.prove_addr(mgid, &addr)?;
let data = postcard::to_allocvec(&LayerRequest::Connect(proof)).unwrap_or(vec![]);
Ok(SendType::Connect(0, addr, None, None, data))
}
pub async fn conn_res_message(&self, mgid: &GroupId, addr: PeerAddr) -> Result<SendType> {
let proof = self.group.read().await.prove_addr(mgid, &addr)?;
let data = postcard::to_allocvec(&LayerResponse::Connect(proof)).unwrap_or(vec![]);
Ok(SendType::Result(0, addr, true, false, data))
}
pub async fn conn_agree_message(
&mut self,
tid: i64,
mgid: &GroupId,
addr: PeerAddr,
) -> Result<SendType> {
let uid = self.delivery.len() as u64 + 1;
self.delivery.insert(uid, (*mgid, tid));
let group_lock = self.group.read().await;
let proof = group_lock.prove_addr(mgid, &addr)?;
let me = group_lock.clone_user(mgid)?;
drop(group_lock);
let data = postcard::to_allocvec(&LayerResponse::Agree(me, proof)).unwrap_or(vec![]);
Ok(SendType::Result(uid, addr, true, false, data))
}
pub fn rpc_agree_message(
&mut self,
tid: i64,
proof: Proof,
me: User,
mgid: &GroupId,
addr: PeerAddr,
) -> Result<SendType> {
let uid = self.delivery.len() as u64 + 1;
self.delivery.insert(uid, (*mgid, tid));
let data = postcard::to_allocvec(&LayerResponse::Agree(me, proof)).unwrap_or(vec![]);
Ok(SendType::Result(uid, addr, true, false, data))
}
// maybe need if gid or addr in blocklist.
pub fn _res_reject() -> Vec<u8> {
postcard::to_allocvec(&LayerResponse::Reject).unwrap_or(vec![])
}
}
/// Esse app's Event.
#[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent {
/// receiver gid, sender gid, message.
Message(EventId, NetworkMessage),
/// receiver gid, sender user.
Info(User),
/// receiver gid, sender gid.
OnlinePing,
/// receiver gid, sender gid.
OnlinePong,
/// receiver gid, sender gid.
Offline,
/// close friendship.
Close,
}
impl LayerEvent {
pub async fn handle(
fgid: GroupId,
mgid: GroupId,
layer: &mut Layer,
addr: PeerAddr,
bytes: Vec<u8>,
) -> Result<HandleResult> {
let event: LayerEvent =
postcard::from_bytes(&bytes).map_err(|_| new_io_error("serialize event error."))?;
let fid = layer.get_remote_id(&mgid, &fgid)?;
let mut results = HandleResult::new();
match event {
LayerEvent::Message(hash, m) => {
let db = session_db(&layer.base, &mgid)?;
if !Message::exist(&db, &hash)? {
let msg = m.clone().handle(false, mgid, &layer.base, &db, fid, hash)?;
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionMessageCreate(fgid, false, hash, m),
MESSAGE_TABLE_PATH,
msg.id,
&mut results,
)?;
results.rpcs.push(rpc::message_create(mgid, &msg));
}
}
LayerEvent::Info(remote) => {
let avatar = remote.avatar.clone();
let f = layer.update_friend(&mgid, fid, remote)?;
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionFriendInfo(f.gid, f.addr, f.name.clone(), avatar),
FRIEND_TABLE_PATH,
f.id,
&mut results,
)?;
results.rpcs.push(rpc::friend_info(mgid, &f));
}
LayerEvent::OnlinePing => {
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
let data = postcard::to_allocvec(&LayerEvent::OnlinePong).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
results.layers.push((mgid, fgid, msg));
}
LayerEvent::OnlinePong => {
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOnline(fgid),
&mut results,
)?;
layer
.running_mut(&mgid)?
.check_add_online(fgid, Online::Direct(addr))?;
results.rpcs.push(rpc::friend_online(mgid, fid, addr));
}
LayerEvent::Offline => {
layer.group.write().await.status(
&mgid,
StatusEvent::SessionFriendOffline(fgid),
&mut results,
)?;
layer.running_mut(&mgid)?.check_offline(&fgid, &addr);
results.rpcs.push(rpc::friend_offline(mgid, fid));
}
LayerEvent::Close => {
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionFriendClose(fgid),
FRIEND_TABLE_PATH,
fid,
&mut results,
)?;
layer.remove_friend(&mgid, &fgid);
let db = session_db(&layer.base, &mgid)?;
Friend::id_close(&db, fid)?;
drop(db);
results.rpcs.push(rpc::friend_close(mgid, fid));
if !layer.is_online(&addr) {
results
.layers
.push((mgid, fgid, SendType::Disconnect(addr)))
}
}
}
Ok(results)
}
pub async fn from_message(
base: &PathBuf,
mgid: GroupId,
fid: i64,
m_type: MessageType,
content: String,
) -> std::result::Result<(Message, NetworkMessage), tdn::types::rpc::RpcError> {
let db = session_db(&base, &mgid)?;
// handle message's type.
let (nm_type, raw) = match m_type {
MessageType::String => (NetworkMessage::String(content.clone()), content),
MessageType::Image => {
let bytes = read_file(&PathBuf::from(content)).await?;
let image_name = write_image(base, &mgid, &bytes).await?;
(NetworkMessage::Image(bytes), image_name)
}
MessageType::File => {
let file_path = PathBuf::from(content);
let bytes = read_file(&file_path).await?;
let old_name = file_path.file_name()?.to_str()?;
let filename = write_file(base, &mgid, old_name, &bytes).await?;
(NetworkMessage::File(filename.clone(), bytes), filename)
}
MessageType::Contact => {
let cid: i64 = content.parse().map_err(|_e| new_io_error("id error"))?;
let contact = Friend::get_id(&db, cid)??;
let avatar_bytes = read_avatar(base, &mgid, &contact.gid).await?;
let tmp_name = contact.name.replace(";", "-;");
let contact_values = format!(
"{};;{};;{}",
tmp_name,
contact.gid.to_hex(),
contact.addr.to_hex()
);
(
NetworkMessage::Contact(contact.name, contact.gid, contact.addr, avatar_bytes),
contact_values,
)
}
MessageType::Record => {
let (bytes, time) = if let Some(i) = content.find('-') {
let time = content[0..i].parse().unwrap_or(0);
let bytes = read_record(base, &mgid, &content[i + 1..]).await?;
(bytes, time)
} else {
(vec![], 0)
};
(NetworkMessage::Record(bytes, time), content)
}
MessageType::Emoji => {
// TODO
(NetworkMessage::Emoji, content)
}
MessageType::Phone => {
// TODO
(NetworkMessage::Phone, content)
}
MessageType::Video => {
// TODO
(NetworkMessage::Video, content)
}
};
let mut msg = Message::new(&mgid, fid, true, m_type, raw, false);
msg.insert(&db)?;
Friend::update_last_message(&db, fid, &msg, true)?;
drop(db);
Ok((msg, nm_type))
}
}

5
src/rpc.rs

@ -13,9 +13,10 @@ use tdn::{ @@ -13,9 +13,10 @@ use tdn::{
};
use crate::apps::app_rpc_inject;
use crate::apps::chat::{conn_req_message, LayerEvent};
use crate::event::InnerEvent;
use crate::group::Group;
use crate::layer::{Layer, LayerEvent};
use crate::layer::Layer;
pub(crate) fn init_rpc(
addr: PeerAddr,
@ -362,7 +363,7 @@ fn new_rpc_handler( @@ -362,7 +363,7 @@ fn new_rpc_handler(
let layer_lock = state.layer.read().await;
let friends = layer_lock.all_friends(&gid)?;
for friend in friends {
let msg = layer_lock.conn_req_message(&gid, friend.addr).await?;
let msg = conn_req_message(&layer_lock, &gid, friend.addr).await?;
results.layers.push((gid, friend.gid, msg));
}
drop(layer_lock);

Loading…
Cancel
Save