diff --git a/src/apps/group_chat/layer.rs b/src/apps/group_chat/layer.rs index c14579a..0f3919c 100644 --- a/src/apps/group_chat/layer.rs +++ b/src/apps/group_chat/layer.rs @@ -11,6 +11,7 @@ use tdn::{ use group_chat_types::{Event, GroupConnect, GroupResult, JoinProof, LayerEvent, PackedEvent}; use tdn_did::Proof; +use tdn_storage::local::DStorage; use crate::layer::{Layer, Online}; use crate::storage::{group_chat_db, write_avatar_sync}; @@ -76,6 +77,8 @@ pub(crate) async fn handle( // 4. online ping. add_layer(&mut results, mgid, ping(gcd, addr)); + println!("will sync remote: {}, my: {}", height, group.height); + // 5. sync group height. if group.height < height { add_layer(&mut results, mgid, sync(gcd, addr, group.height)); @@ -144,6 +147,7 @@ async fn handle_event( layer: &Arc>, results: &mut HandleResult, ) -> Result<()> { + println!("Got event......."); let gid = match event { LayerEvent::Offline(gcd) | LayerEvent::OnlinePing(gcd) @@ -152,9 +156,7 @@ async fn handle_event( | LayerEvent::MemberOffline(gcd, ..) | LayerEvent::Sync(gcd, ..) | LayerEvent::SyncReq(gcd, ..) - | LayerEvent::PackedSync(gcd, ..) => { - layer.read().await.get_running_remote_id(&mgid, &gcd)? - } + | LayerEvent::Packed(gcd, ..) => layer.read().await.get_running_remote_id(&mgid, &gcd)?, }; match event { @@ -178,6 +180,7 @@ async fn handle_event( results.rpcs.push(rpc::member_offline(mgid, gid, mid, ma)); } LayerEvent::Sync(_, height, event) => { + println!("Sync: height: {}", height); let base = layer.read().await.base().clone(); let db = group_chat_db(&base, &mgid)?; @@ -205,18 +208,23 @@ async fn handle_event( } Event::MemberLeave(_mid) => {} Event::MessageCreate(mid, nmsg, mtime) => { + println!("Sync: create message start"); let base = layer.read().await.base.clone(); - let msg = - from_network_message(height as i64, gid, mid, mgid, nmsg, mtime, base)?; + let msg = from_network_message(height, gid, mid, &mgid, nmsg, mtime, &base)?; results.rpcs.push(rpc::message_create(mgid, msg)); + println!("Sync: create message ok"); } } // save event. GroupChat::add_height(&db, gid, height)?; } - LayerEvent::PackedSync(gcd, height, from, to, events) => { - handle_sync(mgid, gcd, addr, height, from, to, events, results); + LayerEvent::Packed(gcd, height, from, to, events) => { + println!("Start handle sync packed... {}, {}, {}", height, from, to); + let base = layer.read().await.base().clone(); + handle_sync( + mgid, gid, gcd, addr, height, from, to, events, base, results, + )?; } LayerEvent::SyncReq(..) => {} // Never here. } @@ -242,22 +250,27 @@ fn ping(gcd: GroupId, addr: PeerAddr) -> SendType { } fn sync(gcd: GroupId, addr: PeerAddr, height: i64) -> SendType { + println!("Send sync request..."); let data = postcard::to_allocvec(&LayerEvent::SyncReq(gcd, height + 1)).unwrap_or(vec![]); SendType::Event(0, addr, data) } fn handle_sync( mgid: GroupId, + fid: i64, gcd: GroupId, addr: PeerAddr, height: i64, mut from: i64, to: i64, events: Vec, + base: PathBuf, results: &mut HandleResult, -) { +) -> Result<()> { + let db = group_chat_db(&base, &mgid)?; + for event in events { - handle_sync_event(from, event); + let _ = handle_sync_event(&mgid, &fid, from, event, &base, &db, results); from += 1; } @@ -266,8 +279,56 @@ fn handle_sync( } // update group chat height. + GroupChat::add_height(&db, fid, to)?; + + Ok(()) } -fn handle_sync_event(height: i64, event: PackedEvent) { - // +fn handle_sync_event( + mgid: &GroupId, + fid: &i64, + height: i64, + event: PackedEvent, + base: &PathBuf, + db: &DStorage, + results: &mut HandleResult, +) -> Result<()> { + match event { + PackedEvent::GroupInfo => { + // TODO + } + PackedEvent::GroupTransfer => { + // TODO + } + PackedEvent::GroupManagerAdd => { + // TODO + } + PackedEvent::GroupManagerDel => { + // TODO + } + PackedEvent::GroupClose => { + // TOOD + } + PackedEvent::MemberInfo(mid, maddr, mname, mavatar) => { + // TODO + } + PackedEvent::MemberJoin(mid, maddr, mname, mavatar, mtime) => { + if mavatar.len() > 0 { + write_avatar_sync(&base, &mgid, &mid, mavatar)?; + } + let mut member = Member::new(*fid, mid, maddr, mname, false, mtime); + member.insert(&db)?; + results.rpcs.push(rpc::member_join(*mgid, member)); + } + PackedEvent::MemberLeave(mid) => { + // TODO + } + PackedEvent::MessageCreate(mid, nmsg, time) => { + let msg = from_network_message(height, *fid, mid, mgid, nmsg, time, base)?; + results.rpcs.push(rpc::message_create(*mgid, msg)); + } + PackedEvent::None => {} + } + + Ok(()) } diff --git a/src/apps/group_chat/models.rs b/src/apps/group_chat/models.rs index 5935caf..e938043 100644 --- a/src/apps/group_chat/models.rs +++ b/src/apps/group_chat/models.rs @@ -763,28 +763,28 @@ pub(super) fn from_network_message( height: i64, gdid: i64, mid: GroupId, - mgid: GroupId, + mgid: &GroupId, msg: NetworkMessage, datetime: i64, - base: PathBuf, + base: &PathBuf, ) -> Result { - let db = group_chat_db(&base, &mgid)?; + let db = group_chat_db(base, mgid)?; let mdid = Member::get_id(&db, &gdid, &mid)?; - let is_me = mid == mgid; + let is_me = &mid == mgid; // handle event. let (m_type, raw) = match msg { NetworkMessage::String(content) => (MessageType::String, content), NetworkMessage::Image(bytes) => { - let image_name = write_image_sync(&base, &mgid, bytes)?; + let image_name = write_image_sync(base, mgid, bytes)?; (MessageType::Image, image_name) } NetworkMessage::File(old_name, bytes) => { - let filename = write_file_sync(&base, &mgid, &old_name, bytes)?; + let filename = write_file_sync(base, mgid, &old_name, bytes)?; (MessageType::File, filename) } NetworkMessage::Contact(name, rgid, addr, avatar_bytes) => { - write_avatar_sync(&base, &mgid, &rgid, avatar_bytes)?; + write_avatar_sync(base, mgid, &rgid, avatar_bytes)?; let tmp_name = name.replace(";", "-;"); let contact_values = format!("{};;{};;{}", tmp_name, rgid.to_hex(), addr.to_hex()); (MessageType::Contact, contact_values) @@ -794,7 +794,7 @@ pub(super) fn from_network_message( (MessageType::Emoji, "".to_owned()) } NetworkMessage::Record(bytes, time) => { - let record_name = write_record_sync(&base, &mgid, gdid, time, bytes)?; + let record_name = write_record_sync(base, mgid, gdid, time, bytes)?; (MessageType::Record, record_name) } NetworkMessage::Phone => {