Browse Source

upgrade group chat

pull/18/head
Sun 3 years ago
parent
commit
3803d313e3
  1. 3
      lib/apps/group/detail.dart
  2. 6
      lib/apps/group/models.dart
  3. 712
      src/apps/group/layer.rs
  4. 8
      src/apps/group/models/group.rs
  5. 8
      src/apps/group/models/message.rs
  6. 7
      src/apps/group/rpc.rs
  7. 41
      src/layer.rs
  8. 90
      src/rpc.rs
  9. 26
      types/group/src/lib.rs

3
lib/apps/group/detail.dart

@ -85,10 +85,9 @@ class _GroupChatDetailState extends State<GroupChatDetail> { @@ -85,10 +85,9 @@ class _GroupChatDetailState extends State<GroupChatDetail> {
}
}
// [group_id, member_id, member_addr]
// [group_id, member_id]
_memberOnline(List params) {
if (_group.id == params[0] && this._members.containsKey(params[1])) {
this._members[params[1]]!.addr = params[2];
this._members[params[1]]!.online = true;
setState(() {});
}

6
lib/apps/group/models.dart

@ -35,7 +35,6 @@ class Member { @@ -35,7 +35,6 @@ class Member {
int id = 0;
int fid = 0;
String mid = '';
String addr = '';
String name = '';
bool leave = false;
bool online = false;
@ -44,9 +43,8 @@ class Member { @@ -44,9 +43,8 @@ class Member {
this.id = params[0];
this.fid = params[1];
this.mid = params[2];
this.addr = params[3];
this.name = params[4];
this.leave = params[5];
this.name = params[3];
this.leave = params[4];
if (this.mid == Global.pid) {
this.online = true;
}

712
src/apps/group/layer.rs

@ -9,13 +9,12 @@ use tdn_storage::local::DStorage; @@ -9,13 +9,12 @@ use tdn_storage::local::DStorage;
use crate::apps::chat::Friend;
use crate::global::Global;
use crate::layer::Layer;
use crate::rpc::{
session_close, session_connect, session_last, session_lost, session_suspend,
session_update_name,
};
use crate::session::{connect_session, Session, SessionType};
use crate::storage::{delete_avatar, group_db, session_db, write_avatar_sync};
use crate::storage::{chat_db, delete_avatar, group_db, session_db, write_avatar_sync};
use super::models::{handle_network_message, GroupChat, Member, Message};
use super::rpc;
@ -56,7 +55,7 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle @@ -56,7 +55,7 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle
let db = group_db(&global.base, &pid, &db_key)?;
let s_db = session_db(&global.base, &pid, &db_key)?;
let group = GroupChat::close(&db, &gid, &peer.id)?;
let group = GroupChat::close_id(&db, &gid, &peer.id)?;
let sid = Session::close(&s_db, &group.id, &SessionType::Group)?;
results.rpcs.push(session_close(&sid));
}
@ -71,22 +70,16 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle @@ -71,22 +70,16 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle
results.layers.push((GROUP_CHAT_ID, msg));
}
}
RecvType::Leave(..) => {}
RecvType::Event(addr, bytes) => {
// PEER & SERVER
let event: LayerEvent = bincode::deserialize(&bytes)?;
debug!("----------- DEBUG GROUP CHAT: SERVER GOT LAYER EVENT");
//handle_server_event(fgid, addr, event, layer, &mut results).await?;
debug!("----------- DEBUG GROUP CHAT: SERVER OVER LAYER EVENT");
debug!("----------- DEBUG GROUP CHAT: PEER GOT LAYER EVENT");
//handle_peer_event(ogid, addr, event, layer, &mut results).await?;
debug!("----------- DEBUG GROUP CHAT: PEER OVER LAYER EVENT");
handle_event(addr, event, global, &mut results).await?;
}
RecvType::Delivery(..) => {}
RecvType::Stream(_uid, _stream, _bytes) => {
// TODO stream
}
RecvType::Delivery(..) => {}
RecvType::Leave(..) => {} // nerver here.
}
Ok(results)
@ -98,7 +91,7 @@ async fn handle_connect( @@ -98,7 +91,7 @@ async fn handle_connect(
gid: GroupChatId,
results: &mut HandleResult,
) -> Result<()> {
let (height, sid, id) = global.layer.read().await.group(&gid)?.info();
let (height, _, id, _) = global.layer.read().await.group(&gid)?.info();
let pid = global.pid().await;
let db_key = global.group.read().await.db_key(&pid)?;
@ -117,7 +110,7 @@ async fn handle_connect( @@ -117,7 +110,7 @@ async fn handle_connect(
results.rpcs.push(rpc::member_online(id, mid));
let data = LayerEvent::MemberOnline(gid, peer.id);
broadcast(&gid, global, &data, results).await;
broadcast(&gid, global, &data, results).await?;
Ok(())
}
@ -182,384 +175,310 @@ async fn handle_result( @@ -182,384 +175,310 @@ async fn handle_result(
Ok(())
}
// async fn handle_server_event(
// fgid: GroupId,
// addr: PeerId,
// event: LayerEvent,
// layer: &Arc<RwLock<Layer>>,
// results: &mut HandleResult,
// ) -> Result<()> {
// let gcd = event.gcd();
// let base = layer.read().await.base().clone();
// let (ogid, height, id) = layer.read().await.running(gcd)?.owner_height_id();
// let db = layer.read().await.group.read().await.group_db(&ogid)?;
// match event {
// LayerEvent::Offline(gcd) => {
// // 1. check member online.
// if layer.write().await.remove_online(&gcd, &fgid).is_none() {
// return Ok(());
// }
// // 2. UI: offline the member.
// if let Ok(mid) = Member::get_id(&db, &id, &fgid) {
// results.rpcs.push(rpc::member_offline(ogid, id, mid));
// }
// // 3. broadcast offline event.
// broadcast(&LayerEvent::MemberOffline(gcd, fgid), layer, &gcd, results).await?;
// }
// LayerEvent::GroupName(gcd, name) => {
// // 1. update group name
// let _ = GroupChat::update_name(&db, &id, &name)?;
// // 2. UI: update
// results.rpcs.push(rpc::group_name(ogid, &id, &name));
// if let Ok(sid) = Session::update_name_by_id(
// &layer.read().await.group.read().await.session_db(&ogid)?,
// &id,
// &SessionType::Group,
// &name,
// ) {
// results.rpcs.push(session_update_name(ogid, &sid, &name));
// }
// // 3. broadcast
// broadcast(&LayerEvent::GroupName(gcd, name), layer, &gcd, results).await?;
// }
// LayerEvent::Sync(gcd, _, event) => {
// match event {
// Event::MemberJoin(mgid, maddr, mname, mavatar) => {
// let mdid_res = Member::get_id(&db, &id, &mgid);
// let h = layer.write().await.running_mut(&gcd)?.increased();
// let new_e = Event::MemberJoin(mgid, maddr, mname.clone(), mavatar.clone());
// if let Ok(mdid) = mdid_res {
// Member::update(&db, &h, &mdid, &maddr, &mname)?;
// if mavatar.len() > 0 {
// write_avatar_sync(&base, &ogid, &mgid, mavatar)?;
// }
// let mem = Member::info(mdid, id, mgid, maddr, mname);
// results.rpcs.push(rpc::member_join(ogid, &mem));
// } else {
// let mut member = Member::new(h, id, mgid, maddr, mname);
// member.insert(&db)?;
// if mavatar.len() > 0 {
// write_avatar_sync(&base, &ogid, &mgid, mavatar)?;
// }
// results.rpcs.push(rpc::member_join(ogid, &member));
// }
// // broadcast
// GroupChat::add_height(&db, id, h)?;
// broadcast(&LayerEvent::Sync(gcd, h, new_e), layer, &gcd, results).await?;
// }
// Event::MemberLeave(mgid) => {
// let mdid = Member::get_id(&db, &id, &mgid)?;
// let h = layer.write().await.running_mut(&gcd)?.increased();
// Member::leave(&db, &mdid, &h)?;
// // check mid is my chat friend. if not, delete avatar.
// let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?;
// if Friend::get_id(&s_db, &mgid).is_err() {
// let _ = delete_avatar(&base, &ogid, &mgid).await;
// }
// results.rpcs.push(rpc::member_leave(ogid, id, mdid));
// // broadcast
// GroupChat::add_height(&db, id, h)?;
// broadcast(&LayerEvent::Sync(gcd, h, event), layer, &gcd, results).await?;
// }
// Event::MessageCreate(mgid, nmsg, mtime) => {
// debug!("Sync: create message start");
// let _mdid = Member::get_id(&db, &id, &mgid)?;
// let new_e = Event::MessageCreate(mgid, nmsg.clone(), mtime);
// let new_h = layer.write().await.running_mut(&gcd)?.increased();
// broadcast(&LayerEvent::Sync(gcd, new_h, new_e), layer, &gcd, results).await?;
// GroupChat::add_height(&db, id, new_h)?;
// let msg = handle_network_message(
// &layer.read().await.group,
// new_h,
// id,
// mgid,
// &ogid,
// nmsg,
// mtime,
// &base,
// results,
// )
// .await?;
// results.rpcs.push(rpc::message_create(ogid, &msg));
// debug!("Sync: create message ok");
// // UPDATE SESSION.
// if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) {
// update_session(&s_db, &ogid, &id, &msg, results);
// }
// }
// }
// }
// LayerEvent::MemberOnlineSync(gcd) => {
// let onlines = layer
// .read()
// .await
// .running(&gcd)?
// .onlines()
// .iter()
// .map(|(g, a)| (**g, **a))
// .collect();
// let event = LayerEvent::MemberOnlineSyncResult(gcd, onlines);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// let s = SendType::Event(0, addr, data);
// add_server_layer(results, fgid, s);
// }
// LayerEvent::SyncReq(gcd, from) => {
// debug!("Got sync request. height: {} from: {}", height, from);
// if height >= from {
// let to = if height - from > 20 {
// from + 20
// } else {
// height
// };
// let (members, leaves) = Member::sync(&base, &ogid, &db, &id, &from, &to).await?;
// let messages = Message::sync(&base, &ogid, &db, &id, &from, &to).await?;
// let event = LayerEvent::SyncRes(gcd, height, from, to, members, leaves, messages);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// let s = SendType::Event(0, addr, data);
// add_server_layer(results, fgid, s);
// debug!("Sended sync request results. from: {}, to: {}", from, to);
// }
// }
// LayerEvent::Suspend(..) => {}
// LayerEvent::Actived(..) => {}
// _ => error!("group server handle event nerver here"),
// }
// Ok(())
// }
// async fn handle_peer_event(
// ogid: GroupId,
// addr: PeerId,
// event: LayerEvent,
// layer: &Arc<RwLock<Layer>>,
// results: &mut HandleResult,
// ) -> Result<()> {
// let base = layer.read().await.base().clone();
// let gcd = event.gcd();
// let (sid, id) = layer.read().await.get_running_remote_id(&ogid, gcd)?;
// let db = layer.read().await.group.read().await.group_db(&ogid)?;
// match event {
// LayerEvent::Offline(gcd) => {
// // 1. offline group chat.
// layer
// .write()
// .await
// .running_mut(&ogid)?
// .check_offline(&gcd, &addr);
// // 2. UI: offline the session.
// results.rpcs.push(session_lost(ogid, &sid));
// }
// LayerEvent::Suspend(gcd) => {
// if layer
// .write()
// .await
// .running_mut(&ogid)?
// .suspend(&gcd, false, true)?
// {
// results.rpcs.push(session_suspend(ogid, &sid));
// }
// }
// LayerEvent::Actived(gcd) => {
// let _ = layer.write().await.running_mut(&ogid)?.active(&gcd, false);
// results.rpcs.push(session_connect(ogid, &sid, &addr));
// }
// LayerEvent::MemberOnline(_gcd, mgid, maddr) => {
// if let Ok(mid) = Member::addr_update(&db, &id, &mgid, &maddr) {
// results.rpcs.push(rpc::member_online(ogid, id, mid, &maddr));
// }
// }
// LayerEvent::MemberOffline(_gcd, mgid) => {
// if let Ok(mid) = Member::get_id(&db, &id, &mgid) {
// results.rpcs.push(rpc::member_offline(ogid, id, mid));
// }
// }
// LayerEvent::MemberOnlineSyncResult(_gcd, onlines) => {
// for (mgid, maddr) in onlines {
// if let Ok(mid) = Member::addr_update(&db, &id, &mgid, &maddr) {
// results.rpcs.push(rpc::member_online(ogid, id, mid, &maddr));
// }
// }
// }
// LayerEvent::GroupName(_gcd, name) => {
// let _ = GroupChat::update_name(&db, &id, &name)?;
// results.rpcs.push(rpc::group_name(ogid, &id, &name));
// let _ = Session::update_name(
// &layer.read().await.group.read().await.session_db(&ogid)?,
// &sid,
// &name,
// );
// results.rpcs.push(session_update_name(ogid, &sid, &name));
// }
// LayerEvent::GroupClose(_gcd) => {
// let group = GroupChat::close(&db, &gcd)?;
// let sid = Session::close(
// &layer.read().await.group.read().await.session_db(&ogid)?,
// &group.id,
// &SessionType::Group,
// )?;
// results.rpcs.push(session_close(ogid, &sid));
// }
// LayerEvent::Sync(_gcd, height, event) => {
// debug!("Sync: handle height: {}", height);
// match event {
// Event::MemberJoin(mgid, maddr, mname, mavatar) => {
// let mdid_res = Member::get_id(&db, &id, &mgid);
// if let Ok(mdid) = mdid_res {
// Member::update(&db, &height, &mdid, &maddr, &mname)?;
// if mavatar.len() > 0 {
// write_avatar_sync(&base, &ogid, &mgid, mavatar)?;
// }
// let mem = Member::info(mdid, id, mgid, maddr, mname);
// results.rpcs.push(rpc::member_join(ogid, &mem));
// } else {
// let mut member = Member::new(height, id, mgid, maddr, mname);
// member.insert(&db)?;
// if mavatar.len() > 0 {
// write_avatar_sync(&base, &ogid, &mgid, mavatar)?;
// }
// results.rpcs.push(rpc::member_join(ogid, &member));
// }
// // save consensus.
// GroupChat::add_height(&db, id, height)?;
// }
// Event::MemberLeave(mgid) => {
// let mdid = Member::get_id(&db, &id, &mgid)?;
// Member::leave(&db, &height, &mdid)?;
// // check mid is my chat friend. if not, delete avatar.
// let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?;
// if Friend::get_id(&s_db, &mgid).is_err() {
// let _ = delete_avatar(&base, &ogid, &mgid).await;
// }
// results.rpcs.push(rpc::member_leave(ogid, id, mdid));
// // save consensus.
// GroupChat::add_height(&db, id, height)?;
// }
// Event::MessageCreate(mgid, nmsg, mtime) => {
// debug!("Sync: create message start");
// let _mdid = Member::get_id(&db, &id, &mgid)?;
// let msg = handle_network_message(
// &layer.read().await.group,
// height,
// id,
// mgid,
// &ogid,
// nmsg,
// mtime,
// &base,
// results,
// )
// .await?;
// results.rpcs.push(rpc::message_create(ogid, &msg));
// GroupChat::add_height(&db, id, height)?;
// debug!("Sync: create message ok");
// // UPDATE SESSION.
// if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) {
// update_session(&s_db, &ogid, &id, &msg, results);
// }
// }
// }
// }
// LayerEvent::SyncRes(gcd, height, from, to, adds, leaves, messages) => {
// if to >= height {
// // when last packed sync, start sync online members.
// add_layer(results, ogid, sync_online(gcd, addr));
// }
// debug!("Start handle sync packed... {}, {}, {}", height, from, to);
// let mut last_message = None;
// for (height, mgid, maddr, mname, mavatar) in adds {
// let mdid_res = Member::get_id(&db, &id, &mgid);
// if let Ok(mdid) = mdid_res {
// Member::update(&db, &height, &mdid, &maddr, &mname)?;
// if mavatar.len() > 0 {
// write_avatar_sync(&base, &ogid, &mgid, mavatar)?;
// }
// let mem = Member::info(mdid, id, mgid, maddr, mname);
// results.rpcs.push(rpc::member_join(ogid, &mem));
// } else {
// let mut member = Member::new(height, id, mgid, maddr, mname);
// member.insert(&db)?;
// if mavatar.len() > 0 {
// write_avatar_sync(&base, &ogid, &mgid, mavatar)?;
// }
// results.rpcs.push(rpc::member_join(ogid, &member));
// }
// }
// for (height, mgid) in leaves {
// if let Ok(mdid) = Member::get_id(&db, &id, &mgid) {
// Member::leave(&db, &height, &mdid)?;
// // check mid is my chat friend. if not, delete avatar.
// let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?;
// if Friend::get_id(&s_db, &mgid).is_err() {
// let _ = delete_avatar(&base, &ogid, &mgid).await;
// }
// results.rpcs.push(rpc::member_leave(ogid, id, mdid));
// }
// }
// for (height, mgid, nm, time) in messages {
// if let Ok(msg) = handle_network_message(
// &layer.read().await.group,
// height,
// id,
// mgid,
// &ogid,
// nm,
// time,
// &base,
// results,
// )
// .await
// {
// results.rpcs.push(rpc::message_create(ogid, &msg));
// last_message = Some(msg);
// }
// }
// if to < height {
// add_layer(results, ogid, sync(gcd, addr, to + 1));
// }
// // update group chat height.
// GroupChat::add_height(&db, id, to)?;
// // UPDATE SESSION.
// if let Some(msg) = last_message {
// if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) {
// update_session(&s_db, &ogid, &id, &msg, results);
// }
// }
// debug!("Over handle sync packed... {}, {}, {}", height, from, to);
// }
// _ => error!("group peer handle event nerver here"),
// }
// Ok(())
// }
async fn handle_event(
addr: PeerId,
event: LayerEvent,
global: &Arc<Global>,
results: &mut HandleResult,
) -> Result<()> {
let gid = event.gid();
let (height, sid, id, gaddr) = global.layer.read().await.group(&gid)?.info();
let pid = global.pid().await;
let db_key = global.group.read().await.db_key(&pid)?;
let db = group_db(&global.base, &pid, &db_key)?;
let is_server = gaddr == pid;
if !is_server && gaddr != addr {
warn!("INVALID EVENT NOT FROM THE SERVER.");
return Err(anyhow!("NOT THE SERVER EVENT"));
}
match event {
LayerEvent::Offline(gid) => {
// SERVER & PEER
if is_server {
// 1. check member online.
if !global.layer.write().await.group_del_online(&gid, &addr) {
return Ok(());
}
// 2. UI: offline the member.
if let Ok(mid) = Member::get_id(&db, &id, &addr) {
results.rpcs.push(rpc::member_offline(id, mid));
}
// 3. broadcast offline event.
broadcast(&gid, global, &LayerEvent::MemberOffline(gid, addr), results).await?;
} else {
// 1. offline group chat.
global.layer.write().await.group_del(&gid);
// 2. UI: offline the session.
results.rpcs.push(session_lost(&sid));
}
}
LayerEvent::Suspend(gid) => {
// PEER
if global
.layer
.write()
.await
.group_mut(&gid)?
.suspend(false, true)
.is_some()
{
results.rpcs.push(session_suspend(&sid));
}
}
LayerEvent::Actived(gid) => {
// PEER
let _ = global.layer.write().await.group_mut(&gid)?.active(false);
results.rpcs.push(session_connect(&sid, &addr));
}
LayerEvent::MemberOnline(_gid, mpid) => {
// PEER
if let Ok(mid) = Member::get_id(&db, &id, &mpid) {
results.rpcs.push(rpc::member_online(id, mid));
}
}
LayerEvent::MemberOffline(_gid, mpid) => {
// PEER
if let Ok(mid) = Member::get_id(&db, &id, &mpid) {
results.rpcs.push(rpc::member_offline(id, mid));
}
}
LayerEvent::MemberOnlineSync(gid) => {
// SERVER
let onlines = global.layer.read().await.group(&gid)?.addrs.clone();
let event = LayerEvent::MemberOnlineSyncResult(gid, onlines);
let data = bincode::serialize(&event).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
results.layers.push((GROUP_CHAT_ID, msg));
}
LayerEvent::MemberOnlineSyncResult(_gid, onlines) => {
// PEER
for mpid in onlines {
if let Ok(mid) = Member::get_id(&db, &id, &mpid) {
results.rpcs.push(rpc::member_online(id, mid));
}
}
}
LayerEvent::GroupName(gid, name) => {
// SERVER & PEER
// 1. update group name
let _ = GroupChat::update_name(&db, &id, &name)?;
// 2. UI: update
results.rpcs.push(rpc::group_name(&id, &name));
let s_db = session_db(&global.base, &pid, &db_key)?;
let _ = Session::update_name(&s_db, &sid, &name);
results.rpcs.push(session_update_name(&sid, &name));
if is_server {
// 3. broadcast
broadcast(&gid, global, &LayerEvent::GroupName(gid, name), results).await?;
}
}
LayerEvent::GroupClose(gid) => {
// PEER
let group = GroupChat::close(&db, &id)?;
let s_db = session_db(&global.base, &pid, &db_key)?;
let sid = Session::close(&s_db, &group.id, &SessionType::Group)?;
results.rpcs.push(session_close(&sid));
}
LayerEvent::Sync(gid, height, event) => {
// SERVER & PEER
debug!("Sync: handle is_server: {} height: {} ", is_server, height);
match event {
Event::MemberJoin(mpid, mname, mavatar) => {
let mid_res = Member::get_id(&db, &id, &mpid);
let h = if is_server {
global.layer.write().await.group_mut(&gid)?.increased()
} else {
height
};
if let Ok(mid) = mid_res {
Member::update(&db, &h, &mid, &mname)?;
if mavatar.len() > 0 {
write_avatar_sync(&global.base, &pid, &mpid, mavatar.clone())?;
}
let mem = Member::info(mid, id, mpid, mname.clone());
results.rpcs.push(rpc::member_join(&mem));
} else {
let mut member = Member::new(h, id, mpid, mname.clone());
member.insert(&db)?;
if mavatar.len() > 0 {
write_avatar_sync(&global.base, &pid, &mpid, mavatar.clone())?;
}
results.rpcs.push(rpc::member_join(&member));
}
GroupChat::add_height(&db, id, h)?;
if is_server {
// broadcast
let new_e = Event::MemberJoin(mpid, mname, mavatar);
broadcast(&gid, global, &LayerEvent::Sync(gid, h, new_e), results).await?;
}
}
Event::MemberLeave(mpid) => {
let mid = Member::get_id(&db, &id, &mpid)?;
let h = if is_server {
global.layer.write().await.group_mut(&gid)?.increased()
} else {
height
};
Member::leave(&db, &mid, &h)?;
// check mid is my chat friend. if not, delete avatar.
let c_db = chat_db(&global.base, &pid, &db_key)?;
if Friend::get_id(&c_db, &mpid).is_err() {
let _ = delete_avatar(&global.base, &pid, &mpid).await;
}
results.rpcs.push(rpc::member_leave(id, mid));
// broadcast
GroupChat::add_height(&db, id, h)?;
if is_server {
broadcast(&gid, global, &LayerEvent::Sync(gid, h, event), results).await?;
}
}
Event::MessageCreate(mpid, nmsg, mtime) => {
debug!("Sync: create message start");
let _mid = Member::get_id(&db, &id, &mpid)?;
let h = if is_server {
global.layer.write().await.group_mut(&gid)?.increased()
} else {
height
};
let msg = handle_network_message(
&pid,
&global.base,
&db_key,
h,
id,
mpid,
nmsg.clone(),
mtime,
results,
)
.await?;
results.rpcs.push(rpc::message_create(&msg));
debug!("Sync: create message ok");
// UPDATE SESSION.
let s_db = session_db(&global.base, &pid, &db_key)?;
update_session(&s_db, &id, &msg, results);
GroupChat::add_height(&db, id, h)?;
if is_server {
let new_e = Event::MessageCreate(mpid, nmsg, mtime);
broadcast(&gid, global, &LayerEvent::Sync(gid, h, new_e), results).await?;
}
}
}
}
LayerEvent::SyncReq(gid, from) => {
// SERVER
debug!("Got sync request. height: {} from: {}", height, from);
if height >= from {
let to = if height - from > 20 {
from + 20
} else {
height
};
let (members, leaves) =
Member::sync(&global.base, &pid, &db, &id, &from, &to).await?;
let messages = Message::sync(&global.base, &pid, &db, &id, &from, &to).await?;
let event = LayerEvent::SyncRes(gid, height, from, to, members, leaves, messages);
let data = bincode::serialize(&event).unwrap_or(vec![]);
let s = SendType::Event(0, addr, data);
results.layers.push((GROUP_CHAT_ID, s));
debug!("Sended sync request results. from: {}, to: {}", from, to);
}
}
LayerEvent::SyncRes(gid, height, from, to, adds, leaves, messages) => {
// PEER
if to >= height {
results.layers.push((GROUP_CHAT_ID, sync_online(gid, addr)));
// when last packed sync, start sync online members.
}
debug!("Start handle sync packed... {}, {}, {}", height, from, to);
let mut last_message = None;
for (height, mpid, mname, mavatar) in adds {
let mid_res = Member::get_id(&db, &id, &mpid);
if let Ok(mid) = mid_res {
Member::update(&db, &height, &mid, &mname)?;
if mavatar.len() > 0 {
write_avatar_sync(&global.base, &pid, &mpid, mavatar)?;
}
let mem = Member::info(mid, id, mpid, mname);
results.rpcs.push(rpc::member_join(&mem));
} else {
let mut member = Member::new(height, id, mpid, mname);
member.insert(&db)?;
if mavatar.len() > 0 {
write_avatar_sync(&global.base, &pid, &mpid, mavatar)?;
}
results.rpcs.push(rpc::member_join(&member));
}
}
let c_db = chat_db(&global.base, &pid, &db_key)?;
for (height, mpid) in leaves {
if let Ok(mid) = Member::get_id(&db, &id, &mpid) {
Member::leave(&db, &height, &mid)?;
// check mid is my chat friend. if not, delete avatar.
if Friend::get_id(&c_db, &mpid).is_err() {
let _ = delete_avatar(&global.base, &pid, &mpid).await;
}
results.rpcs.push(rpc::member_leave(id, mid));
}
}
for (height, mpid, nm, time) in messages {
if let Ok(msg) = handle_network_message(
&pid,
&global.base,
&db_key,
height,
id,
mpid,
nm,
time,
results,
)
.await
{
results.rpcs.push(rpc::message_create(&msg));
last_message = Some(msg);
}
}
if to < height {
results
.layers
.push((GROUP_CHAT_ID, sync(gid, addr, to + 1)));
}
// update group chat height.
GroupChat::add_height(&db, id, to)?;
// UPDATE SESSION.
if let Some(msg) = last_message {
let s_db = session_db(&global.base, &pid, &db_key)?;
update_session(&s_db, &id, &msg, results);
}
debug!("Over handle sync packed... {}, {}, {}", height, from, to);
}
}
Ok(())
}
pub(crate) async fn broadcast(
gid: &GroupChatId,
@ -569,7 +488,7 @@ pub(crate) async fn broadcast( @@ -569,7 +488,7 @@ pub(crate) async fn broadcast(
) -> Result<()> {
let new_data = bincode::serialize(event)?;
for mpid in global.layer.read().await.group(gid)?.addrs.iter() {
for mpid in global.layer.read().await.group(gid)?.addrs.iter().skip(1) {
let s = SendType::Event(0, *mpid, new_data.clone());
results.layers.push((GROUP_CHAT_ID, s));
debug!("--- DEBUG broadcast to: {:?}", mpid);
@ -601,9 +520,10 @@ pub(crate) fn update_session(s_db: &DStorage, id: &i64, msg: &Message, results: @@ -601,9 +520,10 @@ pub(crate) fn update_session(s_db: &DStorage, id: &i64, msg: &Message, results:
}
}
pub(crate) fn group_conn(addr: Peer, gid: GroupChatId) -> SendType {
pub(crate) fn group_conn(addr: PeerId, gid: GroupChatId, results: &mut HandleResult) {
let data = bincode::serialize(&LayerConnect(gid)).unwrap_or(vec![]);
SendType::Connect(0, addr, data)
let msg = SendType::Connect(0, Peer::peer(addr), data);
results.layers.push((GROUP_CHAT_ID, msg));
}
fn sync(gid: GroupChatId, addr: PeerId, height: i64) -> SendType {

8
src/apps/group/models/group.rs

@ -177,7 +177,13 @@ impl GroupChat { @@ -177,7 +177,13 @@ impl GroupChat {
db.update(&sql)
}
pub fn close(db: &DStorage, gid: &GroupChatId, addr: &PeerId) -> Result<GroupChat> {
pub fn close(db: &DStorage, id: &i64) -> Result<GroupChat> {
let sql = format!("UPDATE groups SET is_close = true WHERE id = {}", id);
db.update(&sql)?;
Self::get(db, id)
}
pub fn close_id(db: &DStorage, gid: &GroupChatId, addr: &PeerId) -> Result<GroupChat> {
let group = Self::get_id(db, gid, addr)?;
let sql = format!("UPDATE groups SET is_close = true WHERE id = {}", group.id);
db.update(&sql)?;

8
src/apps/group/models/message.rs

@ -138,8 +138,8 @@ impl Message { @@ -138,8 +138,8 @@ impl Message {
}
pub async fn sync(
own: &PeerId,
base: &PathBuf,
own: &PeerId,
db: &DStorage,
fid: &i64,
from: &i64,
@ -191,17 +191,17 @@ pub(crate) async fn handle_network_message( @@ -191,17 +191,17 @@ pub(crate) async fn handle_network_message(
base: &PathBuf,
db_key: &str,
height: i64,
gdid: i64,
id: i64,
mid: PeerId,
msg: NetworkMessage,
datetime: i64,
results: &mut HandleResult,
) -> Result<Message> {
let db = group_db(base, own, db_key)?;
let mdid = Member::get_id(&db, &gdid, &mid)?;
let mdid = Member::get_id(&db, &id, &mid)?;
let is_me = &mid == own;
let (m_type, raw) = from_network_message(own, base, db_key, msg, results).await?;
let mut msg = Message::new_with_time(height, gdid, mdid, is_me, m_type, raw, datetime);
let mut msg = Message::new_with_time(height, id, mdid, is_me, m_type, raw, datetime);
msg.insert(&db)?;
Ok(msg)
}

7
src/apps/group/rpc.rs

@ -124,7 +124,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) { @@ -124,7 +124,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let mut m = Member::new(gh, id, pid, me.name);
m.insert(&db)?;
let mid = m.id;
let _ = write_avatar(&state.base, &pid, &pid, &me.avatar).await;
// Add new session.
@ -192,7 +191,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) { @@ -192,7 +191,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
if g.local {
// local save.
let new_h = state.layer.write().await.group_increased(&gid)?;
let new_h = state.layer.write().await.group_mut(&gid)?.increased();
let mut mem = Member::new(new_h, g.id, f.pid, f.name);
mem.insert(&group_db)?;
@ -236,7 +235,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) { @@ -236,7 +235,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
if group.local {
// local save.
let new_h = state.layer.write().await.group_increased(&gid)?;
let new_h = state.layer.write().await.group_mut(&gid)?.increased();
let mut msg = Message::new_with_time(new_h, id, mid, true, m_type, raw, datetime);
msg.insert(&db)?;
@ -311,7 +310,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) { @@ -311,7 +310,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
if g.local {
// dissolve group.
let data = bincode::serialize(&LayerEvent::GroupClose(g.gid))?;
if let Some(addrs) = state.layer.write().await.group_rm_online(&g.gid) {
if let Some(addrs) = state.layer.write().await.group_del(&g.gid) {
for addr in addrs {
let s = SendType::Event(0, addr, data.clone());
results.layers.push((GROUP_CHAT_ID, s));

41
src/layer.rs

@ -119,12 +119,6 @@ impl Layer { @@ -119,12 +119,6 @@ impl Layer {
}
}
pub fn group_add(&mut self, gid: GroupChatId, pid: PeerId, sid: i64, fid: i64, h: i64) {
if !self.groups.contains_key(&gid) {
self.groups.insert(gid, LayerSession::new(pid, sid, fid, h));
}
}
pub fn group(&self, gid: &GroupChatId) -> Result<&LayerSession> {
if let Some(session) = self.groups.get(gid) {
Ok(session)
@ -133,18 +127,24 @@ impl Layer { @@ -133,18 +127,24 @@ impl Layer {
}
}
pub fn group_rm_online(&mut self, gid: &GroupChatId) -> Option<Vec<PeerId>> {
self.groups.remove(gid).map(|session| session.addrs)
}
pub fn group_increased(&mut self, gid: &GroupChatId) -> Result<i64> {
pub fn group_mut(&mut self, gid: &GroupChatId) -> Result<&mut LayerSession> {
if let Some(session) = self.groups.get_mut(gid) {
Ok(session.increased())
Ok(session)
} else {
Err(anyhow!("session missing!"))
}
}
pub fn group_add(&mut self, gid: GroupChatId, pid: PeerId, sid: i64, fid: i64, h: i64) {
if !self.groups.contains_key(&gid) {
self.groups.insert(gid, LayerSession::new(pid, sid, fid, h));
}
}
pub fn group_del(&mut self, gid: &GroupChatId) -> Option<Vec<PeerId>> {
self.groups.remove(gid).map(|session| session.addrs)
}
pub fn group_add_member(&mut self, gid: &GroupChatId, addr: PeerId) {
if let Some(session) = self.groups.get_mut(gid) {
session.addrs.push(addr);
@ -157,6 +157,16 @@ impl Layer { @@ -157,6 +157,16 @@ impl Layer {
}
}
pub fn group_del_online(&mut self, gid: &GroupChatId, addr: &PeerId) -> bool {
if let Some(session) = self.groups.get_mut(gid) {
if let Some(pos) = session.addrs.iter().position(|x| x == addr) {
session.addrs.remove(pos);
return true;
}
}
false
}
// pub fn remove_running(&mut self, gid: &GroupId) -> HashMap<PeerId, GroupId> {
// // check close the stable connection.
// let mut addrs: HashMap<PeerId, GroupId> = HashMap::new();
@ -248,7 +258,6 @@ impl Layer { @@ -248,7 +258,6 @@ impl Layer {
// }
pub fn broadcast(&self, user: User, results: &mut HandleResult) {
let gid = user.id;
let info = ChatLayerEvent::InfoRes(user);
let data = bincode::serialize(&info).unwrap_or(vec![]);
@ -256,6 +265,8 @@ impl Layer { @@ -256,6 +265,8 @@ impl Layer {
let msg = SendType::Event(0, *fpid, data.clone());
results.layers.push((CHAT_ID, msg));
}
// TODO GROUPS
}
}
@ -326,8 +337,8 @@ impl LayerSession { @@ -326,8 +337,8 @@ impl LayerSession {
}
}
pub fn info(&self) -> (i64, i64, i64) {
(self.height, self.s_id, self.db_id)
pub fn info(&self) -> (i64, i64, i64, PeerId) {
(self.height, self.s_id, self.db_id, self.addrs[0])
}
pub fn increased(&mut self) -> i64 {

90
src/rpc.rs

@ -1,14 +1,11 @@ @@ -1,14 +1,11 @@
use chat_types::CHAT_ID;
use esse_primitives::{id_from_str, id_to_str};
use group_types::GroupChatId;
use group_types::GROUP_CHAT_ID;
use std::collections::HashMap;
use group_types::{GroupChatId, LayerEvent as GroupLayerEvent, GROUP_CHAT_ID};
use std::net::SocketAddr;
use std::sync::Arc;
use tdn::{
prelude::{new_send_channel, start_main},
types::{
group::GroupId,
message::{
NetworkType, RpcSendMessage, SendMessage, SendType, StateRequest, StateResponse,
},
@ -17,21 +14,15 @@ use tdn::{ @@ -17,21 +14,15 @@ use tdn::{
},
};
use tdn_did::{generate_mnemonic, Count};
use tokio::sync::{
mpsc::{self, error::SendError, Sender},
RwLock,
};
use crate::account::lang_from_i64;
use crate::apps::app_rpc_inject;
use crate::apps::chat::{chat_conn, LayerEvent as ChatLayerEvent};
use crate::apps::group::{group_conn, GroupChat};
use crate::global::Global;
//use crate::apps::group::{add_layer, group_conn, GroupChat};
//use crate::event::InnerEvent;
use crate::group::Group;
use crate::layer::Layer;
use crate::session::{connect_session, Session, SessionType};
use crate::storage::session_db;
use crate::storage::{group_db, session_db};
pub(crate) fn init_rpc(global: Arc<Global>) -> RpcHandler<Global> {
let mut handler = new_rpc_handler(global);
@ -129,28 +120,6 @@ fn session_list(sessions: Vec<Session>) -> RpcParam { @@ -129,28 +120,6 @@ fn session_list(sessions: Vec<Session>) -> RpcParam {
json!(results)
}
#[inline]
pub(crate) async fn sleep_waiting_close_stable(
sender: Sender<SendMessage>,
groups: HashMap<PeerId, ()>,
layers: HashMap<PeerId, GroupId>,
) -> std::result::Result<(), SendError<SendMessage>> {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
for (addr, _) in groups {
sender
.send(SendMessage::Group(SendType::Disconnect(addr)))
.await?;
}
for (faddr, fgid) in layers {
sender
.send(SendMessage::Layer(fgid, SendType::Disconnect(faddr)))
.await?;
}
Ok(())
}
#[inline]
pub(crate) async fn inner_rpc(uid: u64, method: &str, global: &Arc<Global>) -> Result<()> {
// Inner network default rpc method. only use in http-rpc.
@ -161,7 +130,7 @@ pub(crate) async fn inner_rpc(uid: u64, method: &str, global: &Arc<Global>) -> R @@ -161,7 +130,7 @@ pub(crate) async fn inner_rpc(uid: u64, method: &str, global: &Arc<Global>) -> R
_ => return Ok(()),
};
let (s, mut r) = mpsc::channel::<StateResponse>(128);
let (s, mut r) = tokio::sync::mpsc::channel::<StateResponse>(128);
let _ = global
.send(SendMessage::Network(NetworkType::NetworkState(req, s)))
.await?;
@ -371,7 +340,7 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> { @@ -371,7 +340,7 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
let pid = id_from_str(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let me_lock = params[1].as_str().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::rpc(json!([id_to_str(&pid)]));
let results = HandleResult::rpc(json!([id_to_str(&pid)]));
let (tdn_send, tdn_recv) = new_send_channel();
let running = state.reset(&pid, me_lock, tdn_send).await?;
@ -379,31 +348,20 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> { @@ -379,31 +348,20 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
return Ok(results);
}
// TODO load all local services created by this account.
// load all local services created by this account.
let db_key = state.group.read().await.db_key(&pid)?;
let group_db = group_db(&state.base, &pid, &db_key)?;
let s_db = session_db(&state.base, &pid, &db_key)?;
// 1. group chat.
// let self_addr = layer_lock.addr.clone();
// let group_lock = state.group.read().await;
// let group_db = group_lock.group_db(&ogid)?;
// let s_db = group_lock.session_db(&ogid)?;
// drop(group_lock);
// let group_chats = GroupChat::local(&group_db)?;
// for g in group_chats {
// layer_lock.add_running(&g.g_id, ogid, g.id, g.height)?;
// results.networks.push(NetworkType::AddGroup(g.g_id));
// // 2. online group to self group onlines.
// if let Some(session) =
// connect_session(&s_db, &SessionType::Group, &g.id, &self_addr)?
// {
// layer_lock.running_mut(&ogid)?.check_add_online(
// g.g_id,
// Online::Direct(self_addr),
// session.id,
// g.id,
// )?;
// }
// }
// drop(layer_lock);
let group_chats = GroupChat::local(&group_db)?;
let mut layer = state.layer.write().await;
for g in group_chats {
// 2. online group to self group onlines.
if let Some(s) = connect_session(&s_db, &SessionType::Group, &g.id, &pid)? {
layer.group_add(g.gid, g.addr, s.id, g.id, g.height);
}
}
drop(layer);
let key = state.group.read().await.keypair();
let peer_id = start_main(
@ -475,11 +433,7 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> { @@ -475,11 +433,7 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
if let Some(addr) = online {
return Ok(HandleResult::rpc(json!([id, id_to_str(&addr)])));
}
// add_layer(
// &mut results,
// gid,
// group_conn(proof, Peer::peer(s.addr), s.gid),
// );
group_conn(s.addr, remote_gid, &mut results);
}
_ => {}
}
@ -519,9 +473,9 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> { @@ -519,9 +473,9 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
if layer_lock.group_suspend(&remote_gid, true, must)?.is_some() {
results.rpcs.push(json!([id]));
}
//let data = bincode::serialize(&GroupLayerEvent::Suspend(remote_gid))?;
//let msg = SendType::Event(0, s.addr, data);
//results.layers.push((GROUP_CHAT_ID, msg));
let data = bincode::serialize(&GroupLayerEvent::Suspend(remote_gid))?;
let msg = SendType::Event(0, s.addr, data);
results.layers.push((GROUP_CHAT_ID, msg));
}
_ => {
return Ok(HandleResult::new()); // others has no online.

26
types/group/src/lib.rs

@ -62,20 +62,20 @@ pub enum LayerEvent { @@ -62,20 +62,20 @@ pub enum LayerEvent {
impl LayerEvent {
/// get event's group id.
pub fn gcd(&self) -> &GroupChatId {
pub fn gid(&self) -> &GroupChatId {
match self {
Self::Offline(gcd) => gcd,
Self::Suspend(gcd) => gcd,
Self::Actived(gcd) => gcd,
Self::MemberOnline(gcd, ..) => gcd,
Self::MemberOffline(gcd, ..) => gcd,
Self::MemberOnlineSync(gcd) => gcd,
Self::MemberOnlineSyncResult(gcd, ..) => gcd,
Self::GroupName(gcd, ..) => gcd,
Self::GroupClose(gcd) => gcd,
Self::Sync(gcd, ..) => gcd,
Self::SyncReq(gcd, ..) => gcd,
Self::SyncRes(gcd, ..) => gcd,
Self::Offline(gid) => gid,
Self::Suspend(gid) => gid,
Self::Actived(gid) => gid,
Self::MemberOnline(gid, ..) => gid,
Self::MemberOffline(gid, ..) => gid,
Self::MemberOnlineSync(gid) => gid,
Self::MemberOnlineSyncResult(gid, ..) => gid,
Self::GroupName(gid, ..) => gid,
Self::GroupClose(gid) => gid,
Self::Sync(gid, ..) => gid,
Self::SyncReq(gid, ..) => gid,
Self::SyncRes(gid, ..) => gid,
}
}
}

Loading…
Cancel
Save