Browse Source

packed sync events

pull/18/head
Sun 4 years ago
parent
commit
de10e0d9c0
  1. 83
      src/apps/group_chat/layer.rs
  2. 16
      src/apps/group_chat/models.rs

83
src/apps/group_chat/layer.rs

@ -11,6 +11,7 @@ use tdn::{
use group_chat_types::{Event, GroupConnect, GroupResult, JoinProof, LayerEvent, PackedEvent}; use group_chat_types::{Event, GroupConnect, GroupResult, JoinProof, LayerEvent, PackedEvent};
use tdn_did::Proof; use tdn_did::Proof;
use tdn_storage::local::DStorage;
use crate::layer::{Layer, Online}; use crate::layer::{Layer, Online};
use crate::storage::{group_chat_db, write_avatar_sync}; use crate::storage::{group_chat_db, write_avatar_sync};
@ -76,6 +77,8 @@ pub(crate) async fn handle(
// 4. online ping. // 4. online ping.
add_layer(&mut results, mgid, ping(gcd, addr)); add_layer(&mut results, mgid, ping(gcd, addr));
println!("will sync remote: {}, my: {}", height, group.height);
// 5. sync group height. // 5. sync group height.
if group.height < height { if group.height < height {
add_layer(&mut results, mgid, sync(gcd, addr, group.height)); add_layer(&mut results, mgid, sync(gcd, addr, group.height));
@ -144,6 +147,7 @@ async fn handle_event(
layer: &Arc<RwLock<Layer>>, layer: &Arc<RwLock<Layer>>,
results: &mut HandleResult, results: &mut HandleResult,
) -> Result<()> { ) -> Result<()> {
println!("Got event.......");
let gid = match event { let gid = match event {
LayerEvent::Offline(gcd) LayerEvent::Offline(gcd)
| LayerEvent::OnlinePing(gcd) | LayerEvent::OnlinePing(gcd)
@ -152,9 +156,7 @@ async fn handle_event(
| LayerEvent::MemberOffline(gcd, ..) | LayerEvent::MemberOffline(gcd, ..)
| LayerEvent::Sync(gcd, ..) | LayerEvent::Sync(gcd, ..)
| LayerEvent::SyncReq(gcd, ..) | LayerEvent::SyncReq(gcd, ..)
| LayerEvent::PackedSync(gcd, ..) => { | LayerEvent::Packed(gcd, ..) => layer.read().await.get_running_remote_id(&mgid, &gcd)?,
layer.read().await.get_running_remote_id(&mgid, &gcd)?
}
}; };
match event { match event {
@ -178,6 +180,7 @@ async fn handle_event(
results.rpcs.push(rpc::member_offline(mgid, gid, mid, ma)); results.rpcs.push(rpc::member_offline(mgid, gid, mid, ma));
} }
LayerEvent::Sync(_, height, event) => { LayerEvent::Sync(_, height, event) => {
println!("Sync: height: {}", height);
let base = layer.read().await.base().clone(); let base = layer.read().await.base().clone();
let db = group_chat_db(&base, &mgid)?; let db = group_chat_db(&base, &mgid)?;
@ -205,18 +208,23 @@ async fn handle_event(
} }
Event::MemberLeave(_mid) => {} Event::MemberLeave(_mid) => {}
Event::MessageCreate(mid, nmsg, mtime) => { Event::MessageCreate(mid, nmsg, mtime) => {
println!("Sync: create message start");
let base = layer.read().await.base.clone(); let base = layer.read().await.base.clone();
let msg = let msg = from_network_message(height, gid, mid, &mgid, nmsg, mtime, &base)?;
from_network_message(height as i64, gid, mid, mgid, nmsg, mtime, base)?;
results.rpcs.push(rpc::message_create(mgid, msg)); results.rpcs.push(rpc::message_create(mgid, msg));
println!("Sync: create message ok");
} }
} }
// save event. // save event.
GroupChat::add_height(&db, gid, height)?; GroupChat::add_height(&db, gid, height)?;
} }
LayerEvent::PackedSync(gcd, height, from, to, events) => { LayerEvent::Packed(gcd, height, from, to, events) => {
handle_sync(mgid, gcd, addr, height, from, to, events, results); println!("Start handle sync packed... {}, {}, {}", height, from, to);
let base = layer.read().await.base().clone();
handle_sync(
mgid, gid, gcd, addr, height, from, to, events, base, results,
)?;
} }
LayerEvent::SyncReq(..) => {} // Never here. LayerEvent::SyncReq(..) => {} // Never here.
} }
@ -242,22 +250,27 @@ fn ping(gcd: GroupId, addr: PeerAddr) -> SendType {
} }
fn sync(gcd: GroupId, addr: PeerAddr, height: i64) -> SendType { fn sync(gcd: GroupId, addr: PeerAddr, height: i64) -> SendType {
println!("Send sync request...");
let data = postcard::to_allocvec(&LayerEvent::SyncReq(gcd, height + 1)).unwrap_or(vec![]); let data = postcard::to_allocvec(&LayerEvent::SyncReq(gcd, height + 1)).unwrap_or(vec![]);
SendType::Event(0, addr, data) SendType::Event(0, addr, data)
} }
fn handle_sync( fn handle_sync(
mgid: GroupId, mgid: GroupId,
fid: i64,
gcd: GroupId, gcd: GroupId,
addr: PeerAddr, addr: PeerAddr,
height: i64, height: i64,
mut from: i64, mut from: i64,
to: i64, to: i64,
events: Vec<PackedEvent>, events: Vec<PackedEvent>,
base: PathBuf,
results: &mut HandleResult, results: &mut HandleResult,
) { ) -> Result<()> {
let db = group_chat_db(&base, &mgid)?;
for event in events { for event in events {
handle_sync_event(from, event); let _ = handle_sync_event(&mgid, &fid, from, event, &base, &db, results);
from += 1; from += 1;
} }
@ -266,8 +279,56 @@ fn handle_sync(
} }
// update group chat height. // update group chat height.
GroupChat::add_height(&db, fid, to)?;
Ok(())
} }
fn handle_sync_event(height: i64, event: PackedEvent) { fn handle_sync_event(
// mgid: &GroupId,
fid: &i64,
height: i64,
event: PackedEvent,
base: &PathBuf,
db: &DStorage,
results: &mut HandleResult,
) -> Result<()> {
match event {
PackedEvent::GroupInfo => {
// TODO
}
PackedEvent::GroupTransfer => {
// TODO
}
PackedEvent::GroupManagerAdd => {
// TODO
}
PackedEvent::GroupManagerDel => {
// TODO
}
PackedEvent::GroupClose => {
// TOOD
}
PackedEvent::MemberInfo(mid, maddr, mname, mavatar) => {
// TODO
}
PackedEvent::MemberJoin(mid, maddr, mname, mavatar, mtime) => {
if mavatar.len() > 0 {
write_avatar_sync(&base, &mgid, &mid, mavatar)?;
}
let mut member = Member::new(*fid, mid, maddr, mname, false, mtime);
member.insert(&db)?;
results.rpcs.push(rpc::member_join(*mgid, member));
}
PackedEvent::MemberLeave(mid) => {
// TODO
}
PackedEvent::MessageCreate(mid, nmsg, time) => {
let msg = from_network_message(height, *fid, mid, mgid, nmsg, time, base)?;
results.rpcs.push(rpc::message_create(*mgid, msg));
}
PackedEvent::None => {}
}
Ok(())
} }

16
src/apps/group_chat/models.rs

@ -763,28 +763,28 @@ pub(super) fn from_network_message(
height: i64, height: i64,
gdid: i64, gdid: i64,
mid: GroupId, mid: GroupId,
mgid: GroupId, mgid: &GroupId,
msg: NetworkMessage, msg: NetworkMessage,
datetime: i64, datetime: i64,
base: PathBuf, base: &PathBuf,
) -> Result<Message> { ) -> Result<Message> {
let db = group_chat_db(&base, &mgid)?; let db = group_chat_db(base, mgid)?;
let mdid = Member::get_id(&db, &gdid, &mid)?; let mdid = Member::get_id(&db, &gdid, &mid)?;
let is_me = mid == mgid; let is_me = &mid == mgid;
// handle event. // handle event.
let (m_type, raw) = match msg { let (m_type, raw) = match msg {
NetworkMessage::String(content) => (MessageType::String, content), NetworkMessage::String(content) => (MessageType::String, content),
NetworkMessage::Image(bytes) => { NetworkMessage::Image(bytes) => {
let image_name = write_image_sync(&base, &mgid, bytes)?; let image_name = write_image_sync(base, mgid, bytes)?;
(MessageType::Image, image_name) (MessageType::Image, image_name)
} }
NetworkMessage::File(old_name, bytes) => { NetworkMessage::File(old_name, bytes) => {
let filename = write_file_sync(&base, &mgid, &old_name, bytes)?; let filename = write_file_sync(base, mgid, &old_name, bytes)?;
(MessageType::File, filename) (MessageType::File, filename)
} }
NetworkMessage::Contact(name, rgid, addr, avatar_bytes) => { NetworkMessage::Contact(name, rgid, addr, avatar_bytes) => {
write_avatar_sync(&base, &mgid, &rgid, avatar_bytes)?; write_avatar_sync(base, mgid, &rgid, avatar_bytes)?;
let tmp_name = name.replace(";", "-;"); let tmp_name = name.replace(";", "-;");
let contact_values = format!("{};;{};;{}", tmp_name, rgid.to_hex(), addr.to_hex()); let contact_values = format!("{};;{};;{}", tmp_name, rgid.to_hex(), addr.to_hex());
(MessageType::Contact, contact_values) (MessageType::Contact, contact_values)
@ -794,7 +794,7 @@ pub(super) fn from_network_message(
(MessageType::Emoji, "".to_owned()) (MessageType::Emoji, "".to_owned())
} }
NetworkMessage::Record(bytes, time) => { NetworkMessage::Record(bytes, time) => {
let record_name = write_record_sync(&base, &mgid, gdid, time, bytes)?; let record_name = write_record_sync(base, mgid, gdid, time, bytes)?;
(MessageType::Record, record_name) (MessageType::Record, record_name)
} }
NetworkMessage::Phone => { NetworkMessage::Phone => {

Loading…
Cancel
Save