diff --git a/src/apps/chat/mod.rs b/src/apps/chat/mod.rs index ce7f990..d81c28c 100644 --- a/src/apps/chat/mod.rs +++ b/src/apps/chat/mod.rs @@ -6,7 +6,7 @@ pub(crate) use layer::handle; pub(crate) use layer::LayerEvent; pub(crate) use layer::{chat_conn, event_message, update_session}; pub(crate) use models::{ - from_model, from_network_message, handle_nmsg, raw_to_network_message, Friend, InviteType, - Message, Request, + from_model, from_network_message, handle_nmsg, raw_to_network_message, to_network_message, + Friend, InviteType, Message, Request, }; pub(crate) use rpc::new_rpc_handler; diff --git a/src/apps/chat/models.rs b/src/apps/chat/models.rs index 95f5b97..d09b6b7 100644 --- a/src/apps/chat/models.rs +++ b/src/apps/chat/models.rs @@ -16,8 +16,9 @@ use tdn::types::{ use crate::apps::group::GroupChat; use crate::rpc::session_create; use crate::storage::{ - chat_db, group_db, read_avatar, read_file, read_record, session_db, write_avatar_sync, - write_file, write_file_sync, write_image, write_image_sync, write_record_sync, + chat_db, group_db, read_avatar, read_db_file, read_file, read_image, read_record, session_db, + write_avatar_sync, write_file, write_file_sync, write_image, write_image_sync, + write_record_sync, }; pub(crate) fn from_network_message( @@ -155,6 +156,51 @@ pub(crate) async fn raw_to_network_message( } } +pub(crate) async fn to_network_message( + base: &PathBuf, + gid: &GroupId, + mtype: MessageType, + content: String, +) -> Result { + // handle message's type. + match mtype { + MessageType::String => Ok(NetworkMessage::String(content)), + MessageType::Image => { + let bytes = read_image(base, gid, &content).await?; + Ok(NetworkMessage::Image(bytes)) + } + MessageType::File => { + let bytes = read_db_file(base, gid, &content).await?; + Ok(NetworkMessage::File(content, bytes)) + } + MessageType::Contact => { + let v: Vec<&str> = content.split(";;").collect(); + if v.len() != 3 { + return Err(anyhow!("message is invalid")); + } + let cname = v[0].to_owned(); + let cgid = GroupId::from_hex(v[1])?; + let caddr = PeerId::from_hex(v[2])?; + let avatar_bytes = read_avatar(base, gid, &cgid).await?; + Ok(NetworkMessage::Contact(cname, cgid, caddr, avatar_bytes)) + } + MessageType::Record => { + let (bytes, time) = if let Some(i) = content.find('-') { + let time = content[0..i].parse().unwrap_or(0); + let bytes = read_record(base, gid, &content[i + 1..]).await?; + (bytes, time) + } else { + (vec![], 0) + }; + Ok(NetworkMessage::Record(bytes, time)) + } + MessageType::Invite => Ok(NetworkMessage::Invite(content)), + MessageType::Emoji => Ok(NetworkMessage::Emoji), + MessageType::Phone => Ok(NetworkMessage::Phone), + MessageType::Video => Ok(NetworkMessage::Video), + } +} + pub(crate) async fn _clear_message( _base: &PathBuf, _ogid: &GroupId, diff --git a/src/apps/chat/models/message.rs b/src/apps/chat/models/message.rs index b77053f..fcc4c41 100644 --- a/src/apps/chat/models/message.rs +++ b/src/apps/chat/models/message.rs @@ -2,16 +2,14 @@ use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; use tdn::types::{ group::{EventId, GroupId}, - primitive::{HandleResult, PeerId, Result}, + primitive::{HandleResult, Result}, rpc::{json, RpcParam}, }; use tdn_storage::local::{DStorage, DsValue}; use chat_types::{MessageType, NetworkMessage}; -use crate::storage::{read_avatar_sync, read_file_sync, read_image_sync, read_record_sync}; - -use super::from_network_message; +use super::{from_network_message, to_network_message}; pub(crate) fn handle_nmsg( nmsg: NetworkMessage, @@ -30,44 +28,12 @@ pub(crate) fn handle_nmsg( Ok(msg) } -pub(crate) fn from_model(base: &PathBuf, gid: &GroupId, model: Message) -> Result { - // handle message's type. - match model.m_type { - MessageType::String => Ok(NetworkMessage::String(model.content)), - MessageType::Image => { - let bytes = read_image_sync(base, gid, &model.content)?; - Ok(NetworkMessage::Image(bytes)) - } - MessageType::File => { - let bytes = read_file_sync(base, gid, &model.content)?; - Ok(NetworkMessage::File(model.content, bytes)) - } - MessageType::Contact => { - let v: Vec<&str> = model.content.split(";;").collect(); - if v.len() != 3 { - return Err(anyhow!("message is invalid")); - } - let cname = v[0].to_owned(); - let cgid = GroupId::from_hex(v[1])?; - let caddr = PeerId::from_hex(v[2])?; - let avatar_bytes = read_avatar_sync(base, gid, &cgid)?; - Ok(NetworkMessage::Contact(cname, cgid, caddr, avatar_bytes)) - } - MessageType::Record => { - let (bytes, time) = if let Some(i) = model.content.find('-') { - let time = model.content[0..i].parse().unwrap_or(0); - let bytes = read_record_sync(base, gid, &model.content[i + 1..])?; - (bytes, time) - } else { - (vec![], 0) - }; - Ok(NetworkMessage::Record(bytes, time)) - } - MessageType::Invite => Ok(NetworkMessage::Invite(model.content)), - MessageType::Emoji => Ok(NetworkMessage::Emoji), - MessageType::Phone => Ok(NetworkMessage::Phone), - MessageType::Video => Ok(NetworkMessage::Video), - } +pub(crate) async fn from_model( + base: &PathBuf, + gid: &GroupId, + model: Message, +) -> Result { + to_network_message(base, gid, model.m_type, model.content).await } pub(crate) struct Message { diff --git a/src/apps/chat/rpc.rs b/src/apps/chat/rpc.rs index fcc7f51..2401e71 100644 --- a/src/apps/chat/rpc.rs +++ b/src/apps/chat/rpc.rs @@ -12,8 +12,7 @@ use chat_types::MessageType; use crate::account::User; use crate::event::InnerEvent; use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH}; -use crate::rpc::{session_create, session_last, sleep_waiting_close_stable, RpcState}; -use crate::session::{Session, SessionType}; +use crate::rpc::{session_create, sleep_waiting_close_stable, RpcState}; use crate::storage::{chat_db, delete_avatar, session_db}; use super::layer::{update_session, LayerEvent}; @@ -139,6 +138,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let mut results = HandleResult::new(); let db = chat_db(state.layer.read().await.base(), &gid)?; let mut f = Friend::get(&db, &id)?; + f.remark = remark.to_owned(); + f.me_update(&db)?; drop(db); state.group.write().await.broadcast( &gid, diff --git a/src/apps/group/layer.rs b/src/apps/group/layer.rs index 9f84856..fd9260d 100644 --- a/src/apps/group/layer.rs +++ b/src/apps/group/layer.rs @@ -196,7 +196,7 @@ fn handle_connect( // 1.3 online to UI. results.rpcs.push(session_connect(ogid, &sid, &addr.id)); - println!("will sync remote: {}, my: {}", height, group.height); + debug!("will sync remote: {}, my: {}", height, group.height); // 1.4 sync group height. if group.height < height { add_layer(results, ogid, sync(gcd, addr.id, group.height)); @@ -325,24 +325,19 @@ async fn handle_server_event( println!("Got sync request. height: {} from: {}", height, from); if height >= from { - let to = if height - from > 100 { - from + 100 + let to = if height - from > 20 { + from + 20 } else { height }; - // TODO - - // let packed = Consensus::pack(&db, &base, &gcd, &id, &from, &to).await?; - // let event = LayerEvent::Packed(gcd, height, from, to, packed); - - //let packed_members = vec![]; - //let packed_messages = vec![]; - - //let data = bincode::serialize(&event).unwrap_or(vec![]); - //let s = SendType::Event(0, addr, data); - //add_server_layer(results, fgid, s); - //println!("Sended sync request results. from: {}, to: {}", from, to); + let (members, leaves) = Member::sync(&base, &ogid, &db, &id, &to).await?; + let messages = Message::sync(&base, &ogid, &db, &id, &to).await?; + let event = LayerEvent::SyncRes(gcd, height, from, to, members, leaves, messages); + let data = bincode::serialize(&event).unwrap_or(vec![]); + let s = SendType::Event(0, addr, data); + add_server_layer(results, fgid, s); + println!("Sended sync request results. from: {}, to: {}", from, to); } } LayerEvent::Suspend(..) => {} @@ -481,22 +476,47 @@ async fn handle_peer_event( } } } - LayerEvent::SyncMember(gcd, height, from, to, adds, leaves) => { - println!("Start handle sync packed... {}, {}, {}", height, from, to); - // TODO - // handle_sync(&db, ogid, id, gcd, addr, height, from, to, events, base, results)?; - // update or leave. - } - LayerEvent::SyncMessage(gcd, height, mut from, to, adds) => { + LayerEvent::SyncRes(gcd, height, mut from, to, adds, leaves, messages) => { if to >= height { // when last packed sync, start sync online members. add_layer(results, ogid, sync_online(gcd, addr)); } - println!("Start handle sync packed... {}, {}, {}", height, from, to); + debug!("Start handle sync packed... {}, {}, {}", height, from, to); let mut last_message = None; - for (height, mgid, nm, time) in adds { + for (height, mgid, maddr, mname, mavatar) in adds { + let mdid_res = Member::get_id(&db, &id, &mgid); + if let Ok(mdid) = mdid_res { + Member::update(&db, &height, &mdid, &maddr, &mname)?; + if mavatar.len() > 0 { + write_avatar_sync(&base, &ogid, &mgid, mavatar)?; + } + let mem = Member::info(mdid, id, mgid, maddr, mname); + results.rpcs.push(rpc::member_join(ogid, &mem)); + } else { + let mut member = Member::new(height, id, mgid, maddr, mname); + member.insert(&db)?; + if mavatar.len() > 0 { + write_avatar_sync(&base, &ogid, &mgid, mavatar)?; + } + results.rpcs.push(rpc::member_join(ogid, &member)); + } + } + + for (height, mgid) in leaves { + if let Ok(mdid) = Member::get_id(&db, &id, &mgid) { + Member::leave(&db, &height, &mdid)?; + // check mid is my chat friend. if not, delete avatar. + let s_db = chat_db(&base, &mgid)?; + if Friend::get_id(&s_db, &mgid).is_err() { + let _ = delete_avatar(&base, &ogid, &mgid).await; + } + results.rpcs.push(rpc::member_leave(ogid, id, mdid)); + } + } + + for (height, mgid, nm, time) in messages { let msg = handle_network_message(height, id, mgid, &ogid, nm, time, &base, results)?; results.rpcs.push(rpc::message_create(ogid, &msg)); diff --git a/src/apps/group/models/member.rs b/src/apps/group/models/member.rs index 83a939b..42f4a0a 100644 --- a/src/apps/group/models/member.rs +++ b/src/apps/group/models/member.rs @@ -1,3 +1,4 @@ +use std::path::PathBuf; use tdn::types::{ group::GroupId, primitive::{PeerId, Result}, @@ -5,6 +6,8 @@ use tdn::types::{ }; use tdn_storage::local::{DStorage, DsValue}; +use crate::storage::read_avatar; + /// Group Member Model. pub(crate) struct Member { /// db auto-increment id. @@ -71,7 +74,7 @@ impl Member { } } - pub fn all(db: &DStorage, fid: &i64) -> Result> { + pub fn list(db: &DStorage, fid: &i64) -> Result> { let matrix = db.query(&format!( "SELECT id, height, fid, mid, addr, name, leave FROM members WHERE fid = {}", fid @@ -114,7 +117,7 @@ impl Member { Ok(()) } - pub fn get(db: &DStorage, id: &i64) -> Result { + pub fn _get(db: &DStorage, id: &i64) -> Result { let mut matrix = db.query(&format!( "SELECT id, height, fid, mid, addr, name, leave FROM members WHERE id = {}", id, @@ -180,4 +183,30 @@ impl Member { let sql = format!("DELETE FROM members WHERE fid = {}", fid); db.delete(&sql) } + + pub async fn sync( + base: &PathBuf, + gid: &GroupId, + db: &DStorage, + fid: &i64, + height: &i64, + ) -> Result<( + Vec<(i64, GroupId, PeerId, String, Vec)>, + Vec<(i64, GroupId)>, + )> { + let sql = format!("SELECT id, height, fid, mid, addr, name, leave FROM members WHERE fid = {} AND height >= {}", fid, height); + let matrix = db.query(&sql)?; + let mut adds = vec![]; + let mut leaves = vec![]; + for values in matrix { + let m = Self::from_values(values); + if m.leave { + leaves.push((m.height, m.m_id)); + } else { + let mavatar = read_avatar(base, gid, &m.m_id).await.unwrap_or(vec![]); + adds.push((m.height, m.m_id, m.m_addr, m.m_name, mavatar)) + } + } + Ok((adds, leaves)) + } } diff --git a/src/apps/group/models/message.rs b/src/apps/group/models/message.rs index b9f1865..214ebef 100644 --- a/src/apps/group/models/message.rs +++ b/src/apps/group/models/message.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; use tdn::types::{ @@ -9,7 +10,7 @@ use tdn_storage::local::{DStorage, DsValue}; use chat_types::{MessageType, NetworkMessage}; -use crate::apps::chat::{from_network_message, raw_to_network_message}; +use crate::apps::chat::{from_network_message, raw_to_network_message, to_network_message as tnm}; use crate::storage::group_db; use super::Member; @@ -58,22 +59,6 @@ impl Message { id: 0, } } - pub(crate) fn new( - height: i64, - fid: i64, - mid: i64, - is_me: bool, - m_type: MessageType, - content: String, - ) -> Message { - let start = SystemTime::now(); - let datetime = start - .duration_since(UNIX_EPOCH) - .map(|s| s.as_secs()) - .unwrap_or(0) as i64; // safe for all life. - - Self::new_with_time(height, fid, mid, is_me, m_type, content, datetime) - } /// here is zero-copy and unwrap is safe. checked. fn from_values(mut v: Vec) -> Message { @@ -104,7 +89,7 @@ impl Message { ]) } - pub fn get(db: &DStorage, id: &i64) -> Result { + pub fn _get(db: &DStorage, id: &i64) -> Result { let mut matrix = db.query(&format!("SELECT id, height, fid, mid, is_me, m_type, content, is_delivery, datetime FROM messages WHERE id = {}", id))?; if matrix.len() > 0 { Ok(Message::from_values(matrix.pop().unwrap())) // safe unwrap. @@ -113,7 +98,7 @@ impl Message { } } - pub fn all(db: &DStorage, fid: &i64) -> Result> { + pub fn list(db: &DStorage, fid: &i64) -> Result> { let matrix = db.query(&format!("SELECT id, height, fid, mid, is_me, m_type, content, is_delivery, datetime FROM messages WHERE fid = {}", fid))?; let mut groups = vec![]; for values in matrix { @@ -151,6 +136,37 @@ impl Message { let sql = format!("DELETE FROM messages WHERE fid = {}", fid); db.delete(&sql) } + + pub async fn sync( + base: &PathBuf, + gid: &GroupId, + db: &DStorage, + fid: &i64, + height: &i64, + ) -> Result> { + let sql = format!("SELECT id, mid FROM members WHERE fid = {}", fid); + let m = db.query(&sql)?; + let mut members = HashMap::new(); + for mut v in m { + let m_s = v.pop().unwrap().as_string(); + let mid = GroupId::from_hex(m_s).unwrap_or(Default::default()); + let id = v.pop().unwrap().as_i64(); + members.insert(id, mid); + } + + let sql = format!("SELECT id, height, fid, mid, is_me, m_type, content, is_delivery, datetime FROM messages WHERE fid = {} AND height >= {}", fid, height); + let matrix = db.query(&sql)?; + let mut messages = vec![]; + for values in matrix { + let msg = Message::from_values(values); + if let Ok(nmsg) = tnm(base, gid, msg.m_type, msg.content).await { + let mid = members.get(&msg.mid).cloned().unwrap_or(GroupId::default()); + messages.push((msg.height, mid, nmsg, msg.datetime)) + } + } + + Ok(messages) + } } pub(crate) async fn to_network_message( diff --git a/src/apps/group/rpc.rs b/src/apps/group/rpc.rs index 8965131..e578eb3 100644 --- a/src/apps/group/rpc.rs +++ b/src/apps/group/rpc.rs @@ -11,7 +11,7 @@ use group_types::{Event, LayerEvent}; use crate::apps::chat::{Friend, InviteType}; use crate::layer::Online; -use crate::rpc::{session_create, session_delete, session_last, RpcState}; +use crate::rpc::{session_create, session_delete, RpcState}; use crate::session::{Session, SessionType}; use crate::storage::{chat_db, group_db, read_avatar, session_db, write_avatar}; @@ -94,8 +94,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let db = group_db(state.layer.read().await.base(), &gid)?; let group = GroupChat::get(&db, &id)?; - let members = Member::all(&db, &id)?; - let messages = Message::all(&db, &id)?; + let members = Member::list(&db, &id)?; + let messages = Message::list(&db, &id)?; Ok(HandleResult::rpc(detail_list(group, members, messages))) }, ); diff --git a/src/event.rs b/src/event.rs index 5d6bcb0..85b0967 100644 --- a/src/event.rs +++ b/src/event.rs @@ -499,7 +499,7 @@ impl StatusEvent { } impl SyncEvent { - pub fn sync( + pub async fn sync( base: &PathBuf, gid: &GroupId, account: &Account, @@ -616,7 +616,7 @@ impl SyncEvent { // create let mid = msg.hash; let is_me = msg.is_me; - let nm = from_model(base, gid, msg)?; + let nm = from_model(base, gid, msg).await?; SyncEvent::Message(hash, fgid, mid, is_me, nm) } else { SyncEvent::None diff --git a/src/group.rs b/src/group.rs index 42269bd..4a831ee 100644 --- a/src/group.rs +++ b/src/group.rs @@ -77,7 +77,7 @@ pub(crate) enum GroupEvent { } impl Group { - pub fn handle( + pub async fn handle( &mut self, gid: GroupId, msg: RecvType, @@ -113,7 +113,7 @@ impl Group { } RecvType::Event(addr, bytes) => { let event: GroupEvent = bincode::deserialize(&bytes)?; - return GroupEvent::handle(self, event, gid, addr, layer, uid); + return GroupEvent::handle(self, event, gid, addr, layer, uid).await; } RecvType::Stream(_uid, _stream, _bytes) => { todo!(); @@ -691,7 +691,7 @@ impl Group { } impl GroupEvent { - pub fn handle( + pub async fn handle( group: &mut Group, event: GroupEvent, gid: GroupId, @@ -829,7 +829,7 @@ impl GroupEvent { // every time sync MAX is 100. let last_to = if to - from > 100 { to - 100 } else { to }; let sync_events = - SyncEvent::sync(&group.base, &gid, group.account(&gid)?, from, last_to)?; + SyncEvent::sync(&group.base, &gid, group.account(&gid)?, from, last_to).await?; let event = GroupEvent::SyncResponse(from, last_to, to, sync_events); let data = bincode::serialize(&event).unwrap_or(vec![]); results.groups.push((gid, SendType::Event(0, addr, data))); diff --git a/src/server.rs b/src/server.rs index dc038d6..e26a542 100644 --- a/src/server.rs +++ b/src/server.rs @@ -84,8 +84,11 @@ pub async fn start(db_path: String) -> Result<()> { while let Some(message) = recver.recv().await { match message { ReceiveMessage::Group(fgid, g_msg) => { - if let Ok(handle_result) = - group.write().await.handle(fgid, g_msg, &layer, now_rpc_uid) + if let Ok(handle_result) = group + .write() + .await + .handle(fgid, g_msg, &layer, now_rpc_uid) + .await { handle(handle_result, now_rpc_uid, true, &sender).await; } diff --git a/src/storage.rs b/src/storage.rs index d680ebd..7ee5414 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -100,12 +100,28 @@ pub(crate) fn write_file_sync( Ok(name.to_owned()) } -pub(crate) fn read_file_sync(base: &PathBuf, gid: &GroupId, name: &str) -> Result> { +pub(crate) async fn read_db_file(base: &PathBuf, gid: &GroupId, name: &str) -> Result> { let mut path = base.clone(); path.push(gid.to_hex()); path.push(FILES_DIR); path.push(name); - Ok(std::fs::read(base)?) + if path.exists() { + Ok(fs::read(path).await?) + } else { + Ok(vec![]) + } +} + +pub(crate) async fn read_image(base: &PathBuf, gid: &GroupId, name: &str) -> Result> { + let mut path = base.clone(); + path.push(gid.to_hex()); + path.push(IMAGE_DIR); + path.push(name); + if path.exists() { + Ok(fs::read(path).await?) + } else { + Ok(vec![]) + } } #[inline] @@ -173,14 +189,6 @@ pub(crate) async fn write_image(base: &PathBuf, gid: &GroupId, bytes: &[u8]) -> Ok(name) } -pub(crate) fn read_image_sync(base: &PathBuf, gid: &GroupId, name: &str) -> Result> { - let mut path = base.clone(); - path.push(gid.to_hex()); - path.push(IMAGE_DIR); - path.push(name); - Ok(std::fs::read(base)?) -} - #[inline] fn avatar_png(gid: &GroupId) -> String { let mut gs = gid.to_hex(); @@ -284,14 +292,6 @@ pub(crate) async fn read_record(base: &PathBuf, gid: &GroupId, name: &str) -> Re } } -pub(crate) fn read_record_sync(base: &PathBuf, gid: &GroupId, name: &str) -> Result> { - let mut path = base.clone(); - path.push(gid.to_hex()); - path.push(RECORD_DIR); - path.push(name); - Ok(std::fs::read(path)?) -} - pub(crate) fn write_record_sync( base: &PathBuf, gid: &GroupId,