Browse Source

add send message when group chat is local

pull/18/head
Sun 4 years ago
parent
commit
0d26c90003
  1. 2
      lib/apps/group_chat/provider.dart
  2. 90
      src/apps/group_chat/common.rs
  3. 2
      src/apps/group_chat/layer.rs
  4. 2
      src/apps/group_chat/mod.rs
  5. 39
      src/apps/group_chat/models.rs
  6. 47
      src/apps/group_chat/rpc.rs
  7. 30
      src/layer.rs
  8. 1
      src/migrate/group_chat.rs
  9. 28
      src/rpc.rs

2
lib/apps/group_chat/provider.dart

@ -142,7 +142,7 @@ class GroupChatProvider extends ChangeNotifier { @@ -142,7 +142,7 @@ class GroupChatProvider extends ChangeNotifier {
messageCreate(MessageType mtype, String content) {
final gcd = this.activedGroup!.gid;
rpc.send('group-chat-message-create', [gcd, mtype.toInt(), content]);
rpc.send('group-chat-message-create', [gcd, this.actived!, mtype.toInt(), content]);
}
close(int id) {

90
src/apps/group_chat/common.rs

@ -0,0 +1,90 @@ @@ -0,0 +1,90 @@
use group_chat_types::{Event, LayerEvent};
use std::path::PathBuf;
use tdn::types::{
group::GroupId,
primitive::{HandleResult, Result},
};
use tdn_storage::local::DStorage;
use crate::apps::chat::Friend;
use crate::rpc::session_last;
use crate::session::{Session, SessionType};
use crate::storage::{chat_db, delete_avatar, session_db, write_avatar_sync};
use super::models::{from_network_message, Member};
use super::rpc;
pub async fn handle_event(
db: DStorage,
base: PathBuf,
gid: i64, // group chat database id.
mgid: GroupId, // me account(group_id)
event: LayerEvent,
results: &mut HandleResult,
) -> Result<()> {
match event {
LayerEvent::Sync(_gcd, height, event) => {
match event {
Event::GroupInfo => {}
Event::GroupTransfer => {}
Event::GroupManagerAdd => {}
Event::GroupManagerDel => {}
Event::GroupClose => {}
Event::MemberInfo(mid, maddr, mname, mavatar) => {
let id = Member::get_id(&db, &gid, &mid)?;
Member::update(&db, &id, &maddr, &mname)?;
if mavatar.len() > 0 {
write_avatar_sync(&base, &mgid, &mid, mavatar)?;
}
results.rpcs.push(rpc::member_info(mgid, id, maddr, mname));
}
Event::MemberJoin(mid, maddr, mname, mavatar, mtime) => {
if Member::get_id(&db, &gid, &mid).is_err() {
let mut member = Member::new(gid, mid, maddr, mname, false, mtime);
member.insert(&db)?;
if mavatar.len() > 0 {
write_avatar_sync(&base, &mgid, &mid, mavatar)?;
}
results.rpcs.push(rpc::member_join(mgid, member));
}
}
Event::MemberLeave(mid) => {
let id = Member::get_id(&db, &gid, &mid)?;
Member::leave(&db, &id)?;
// check mid is my chat friend. if not, delete avatar.
let s_db = chat_db(&base, &mgid)?;
if Friend::get(&s_db, &mid)?.is_none() {
let _ = delete_avatar(&base, &mgid, &mid).await;
}
results.rpcs.push(rpc::member_leave(mgid, id));
}
Event::MessageCreate(mid, nmsg, mtime) => {
println!("Sync: create message start");
let (msg, scontent) =
from_network_message(height, gid, mid, &mgid, nmsg, mtime, &base)?;
results.rpcs.push(rpc::message_create(mgid, &msg));
println!("Sync: create message ok");
// UPDATE SESSION.
let s_db = session_db(&base, &mgid)?;
if let Ok(id) = Session::last(
&s_db,
&gid,
&SessionType::Group,
&msg.datetime,
&scontent,
true,
) {
results
.rpcs
.push(session_last(mgid, &id, &msg.datetime, &scontent, false));
}
}
}
}
_ => {} // TODO
}
Ok(())
}

2
src/apps/group_chat/layer.rs

@ -171,7 +171,7 @@ async fn handle_event( @@ -171,7 +171,7 @@ async fn handle_event(
let (rid, key) = Request::over(&db, &gcd, true)?;
// 1. add group chat.
let mut group = GroupChat::from_info(key, info, 0, addr, &base, &mgid)?;
let mut group = GroupChat::from_info(key, info, 0, addr, &base, &mgid, true)?;
group.insert(&db)?;
// 2. ADD NEW SESSION.

2
src/apps/group_chat/mod.rs

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
mod common;
mod layer;
mod models;
@ -13,4 +14,5 @@ pub(crate) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) @@ -13,4 +14,5 @@ pub(crate) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType)
pub(crate) mod rpc;
pub(crate) use layer::group_chat_conn;
pub(crate) use layer::handle as layer_handle;
pub(crate) use models::GroupChat;
pub(crate) use rpc::new_rpc_handler;

39
src/apps/group_chat/models.rs

@ -85,6 +85,8 @@ pub(crate) struct GroupChat { @@ -85,6 +85,8 @@ pub(crate) struct GroupChat {
pub datetime: i64,
/// is deleted.
is_deleted: bool,
/// is remote.
pub is_remote: bool,
}
impl GroupChat {
@ -96,6 +98,7 @@ impl GroupChat { @@ -96,6 +98,7 @@ impl GroupChat {
g_bio: String,
is_need_agree: bool,
is_ok: bool,
is_remote: bool,
) -> Self {
let g_id = GroupId(rand::thread_rng().gen::<[u8; 32]>());
@ -118,6 +121,7 @@ impl GroupChat { @@ -118,6 +121,7 @@ impl GroupChat {
key,
datetime,
is_ok,
is_remote,
id: 0,
height: 0,
is_closed: false,
@ -135,6 +139,7 @@ impl GroupChat { @@ -135,6 +139,7 @@ impl GroupChat {
g_bio: String,
is_need_agree: bool,
key: GroupChatKey,
is_remote: bool,
) -> Self {
let start = SystemTime::now();
let datetime = start
@ -154,6 +159,7 @@ impl GroupChat { @@ -154,6 +159,7 @@ impl GroupChat {
datetime,
id: 0,
height,
is_remote,
is_ok: true,
is_closed: false,
is_deleted: false,
@ -167,12 +173,13 @@ impl GroupChat { @@ -167,12 +173,13 @@ impl GroupChat {
addr: PeerAddr,
base: &PathBuf,
mgid: &GroupId,
is_remote: bool,
) -> Result<Self> {
match info {
GroupInfo::Common(owner, _, _, g_id, g_type, agree, name, g_bio, avatar) => {
write_avatar_sync(base, &mgid, &g_id, avatar)?;
Ok(Self::new_from(
g_id, height, owner, g_type, addr, name, g_bio, agree, key,
g_id, height, owner, g_type, addr, name, g_bio, agree, key, is_remote,
))
}
GroupInfo::Encrypted(owner, _, _, g_id, agree, _hash, _name, _bio, avatar) => {
@ -185,7 +192,7 @@ impl GroupChat { @@ -185,7 +192,7 @@ impl GroupChat {
write_avatar_sync(base, &mgid, &g_id, avatar)?;
Ok(Self::new_from(
g_id, height, owner, g_type, addr, name, bio, agree, key,
g_id, height, owner, g_type, addr, name, bio, agree, key, is_remote,
))
}
}
@ -254,6 +261,7 @@ impl GroupChat { @@ -254,6 +261,7 @@ impl GroupChat {
Self {
is_deleted,
is_remote: v.pop().unwrap().as_bool(),
datetime: v.pop().unwrap().as_i64(),
key: GroupChatKey::from_hex(v.pop().unwrap().as_string())
.unwrap_or(GroupChatKey::new(vec![])),
@ -273,7 +281,7 @@ impl GroupChat { @@ -273,7 +281,7 @@ impl GroupChat {
/// use in rpc when load account friends.
pub fn all(db: &DStorage) -> Result<Vec<GroupChat>> {
let matrix = db.query("SELECT id, height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime FROM groups WHERE is_deleted = false")?;
let matrix = db.query("SELECT id, height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime, is_remote FROM groups WHERE is_deleted = false")?;
let mut groups = vec![];
for values in matrix {
groups.push(GroupChat::from_values(values, false));
@ -283,7 +291,7 @@ impl GroupChat { @@ -283,7 +291,7 @@ impl GroupChat {
/// use in rpc when load account groups.
pub fn all_ok(db: &DStorage) -> Result<Vec<GroupChat>> {
let matrix = db.query("SELECT id, height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime FROM groups WHERE is_closed = false")?;
let matrix = db.query("SELECT id, height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime, is_remote FROM groups WHERE is_closed = false")?;
let mut groups = vec![];
for values in matrix {
groups.push(GroupChat::from_values(values, false));
@ -291,8 +299,21 @@ impl GroupChat { @@ -291,8 +299,21 @@ impl GroupChat {
Ok(groups)
}
/// list all local group chat as running layer.
pub fn all_local(db: &DStorage, owner: &GroupId) -> Result<Vec<(GroupId, i64)>> {
let matrix = db.query(&format!("SELECT gcd, height FROM groups WHERE owner = '{}' and is_remote = false and is_closed = false", owner.to_hex()))?;
let mut groups = vec![];
for mut values in matrix {
let height = values.pop().unwrap().as_i64();
let gcd =
GroupId::from_hex(values.pop().unwrap().as_string()).unwrap_or(Default::default());
groups.push((gcd, height));
}
Ok(groups)
}
pub fn get(db: &DStorage, gid: &GroupId) -> Result<Option<GroupChat>> {
let sql = format!("SELECT id, height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime FROM groups WHERE gcd = '{}' AND is_deleted = false", gid.to_hex());
let sql = format!("SELECT id, height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime, is_remote FROM groups WHERE gcd = '{}' AND is_deleted = false", gid.to_hex());
let mut matrix = db.query(&sql)?;
if matrix.len() > 0 {
let values = matrix.pop().unwrap(); // safe unwrap()
@ -302,7 +323,7 @@ impl GroupChat { @@ -302,7 +323,7 @@ impl GroupChat {
}
pub fn get_id(db: &DStorage, id: &i64) -> Result<Option<GroupChat>> {
let sql = format!("SELECT id, height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime FROM groups WHERE id = {} AND is_deleted = false", id);
let sql = format!("SELECT id, height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime, is_remote FROM groups WHERE id = {} AND is_deleted = false", id);
let mut matrix = db.query(&sql)?;
if matrix.len() > 0 {
let values = matrix.pop().unwrap(); // safe unwrap()
@ -319,7 +340,7 @@ impl GroupChat { @@ -319,7 +340,7 @@ impl GroupChat {
if unique_check.len() > 0 {
let id = unique_check.pop().unwrap().pop().unwrap().as_i64();
self.id = id;
let sql = format!("UPDATE groups SET height = {}, owner = '{}', gtype = {}, addr='{}', name = '{}', bio = '{}', is_ok = {}, is_need_agree = {}, is_closed = {}, key = '{}', datetime = {}, is_deleted = false WHERE id = {}",
let sql = format!("UPDATE groups SET height = {}, owner = '{}', gtype = {}, addr='{}', name = '{}', bio = '{}', is_ok = {}, is_need_agree = {}, is_closed = {}, key = '{}', datetime = {},is_remote = {}, is_deleted = false WHERE id = {}",
self.height,
self.owner.to_hex(),
self.g_type.to_u32(),
@ -331,11 +352,12 @@ impl GroupChat { @@ -331,11 +352,12 @@ impl GroupChat {
self.is_closed,
self.key.to_hex(),
self.datetime,
self.is_remote,
self.id
);
db.update(&sql)?;
} else {
let sql = format!("INSERT INTO groups (height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime, is_deleted) VALUES ({}, '{}', '{}', {}, '{}', '{}', '{}', {}, {}, {}, '{}', {}, false)",
let sql = format!("INSERT INTO groups (height, owner, gcd, gtype, addr, name, bio, is_ok, is_need_agree, is_closed, key, datetime, is_deleted) VALUES ({}, '{}', '{}', {}, '{}', '{}', '{}', {}, {}, {}, '{}', {}, {}, false)",
self.height,
self.owner.to_hex(),
self.g_id.to_hex(),
@ -348,6 +370,7 @@ impl GroupChat { @@ -348,6 +370,7 @@ impl GroupChat {
self.is_closed,
self.key.to_hex(),
self.datetime,
self.is_remote,
);
let id = db.insert(&sql)?;
self.id = id;

47
src/apps/group_chat/rpc.rs

@ -2,7 +2,7 @@ use std::sync::Arc; @@ -2,7 +2,7 @@ use std::sync::Arc;
use tdn::types::{
group::GroupId,
message::SendType,
primitive::{HandleResult, PeerAddr},
primitive::{new_io_error, HandleResult, PeerAddr},
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam},
};
use tdn_did::Proof;
@ -15,6 +15,7 @@ use crate::session::{Session, SessionType}; @@ -15,6 +15,7 @@ use crate::session::{Session, SessionType};
use crate::storage::{chat_db, group_chat_db, read_avatar, session_db, write_avatar};
use super::add_layer;
use super::common::handle_event;
use super::models::{to_network_message, GroupChat, GroupChatKey, Member, Message, Request};
#[inline]
@ -200,6 +201,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -200,6 +201,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
bio,
need_agree,
glocation == GroupLocation::Local,
glocation == GroupLocation::Remote,
);
let gcd = gc.g_id;
@ -433,18 +435,45 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -433,18 +435,45 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
"group-chat-message-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let gcd = GroupId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let m_type = MessageType::from_int(params[1].as_i64().ok_or(RpcError::ParseError)?);
let m_content = params[2].as_str().ok_or(RpcError::ParseError)?;
let addr = state.layer.read().await.running(&gid)?.online(&gcd)?;
let id = params[1].as_i64().ok_or(RpcError::ParseError)?;
let m_type = MessageType::from_int(params[2].as_i64().ok_or(RpcError::ParseError)?);
let m_content = params[3].as_str().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new();
let db = group_chat_db(state.layer.read().await.base(), &gid)?;
let gc = GroupChat::get_id(&db, &id)?.ok_or(RpcError::ParseError)?;
let base = state.group.read().await.base().clone();
let (nmsg, datetime) = to_network_message(&base, &gid, m_type, m_content).await?;
let event = Event::MessageCreate(gid, nmsg, datetime);
let data = postcard::to_allocvec(&LayerEvent::Sync(gcd, 0, event)).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
add_layer(&mut results, gid, msg);
let mut results = HandleResult::new();
if gc.is_remote {
let addr = state.layer.read().await.running(&gid)?.online(&gcd)?;
let data =
postcard::to_allocvec(&LayerEvent::Sync(gcd, 0, event)).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
add_layer(&mut results, gid, msg);
} else {
// 1. increase the consensus height in running layer.
let height = state.layer.write().await.running_mut(&gcd)?.increased();
// 2. update group chat height.
GroupChat::add_height(&db, id, height)?;
// 3. broadcast event bytes.
let event = LayerEvent::Sync(gcd, height, event);
let new_data = postcard::to_allocvec(&event)
.map_err(|_| new_io_error("serialize event error."))?;
// 4. handle event.
handle_event(db, base, id, gid, event, &mut results).await?;
// 5. broadcast event.
for (mid, maddr) in state.layer.read().await.running(&gcd)?.onlines() {
let s = SendType::Event(0, *maddr, new_data.clone());
add_layer(&mut results, *mid, s);
}
}
Ok(results)
},
);

30
src/layer.rs

@ -15,7 +15,7 @@ use crate::group::Group; @@ -15,7 +15,7 @@ use crate::group::Group;
use crate::session::{Session, SessionType};
use crate::storage::session_db;
/// ESSE app's BaseLayerEvent.
/// ESSE app's `BaseLayerEvent`.
/// EVERY LAYER APP MUST EQUAL THE FIRST THREE FIELDS.
#[derive(Serialize, Deserialize)]
pub(crate) enum LayerEvent {
@ -29,8 +29,8 @@ pub(crate) enum LayerEvent { @@ -29,8 +29,8 @@ pub(crate) enum LayerEvent {
/// ESSE layers.
pub(crate) struct Layer {
/// account_gid => running_account.
pub runnings: HashMap<GroupId, RunningAccount>,
/// layer_gid (include account id, group chat id) => running_layer.
pub runnings: HashMap<GroupId, RunningLayer>,
/// message delivery tracking. uuid, me_gid, db_id.
pub delivery: HashMap<u64, (GroupId, i64)>,
/// storage base path.
@ -56,17 +56,17 @@ impl Layer { @@ -56,17 +56,17 @@ impl Layer {
&self.base
}
pub fn running(&self, gid: &GroupId) -> Result<&RunningAccount> {
pub fn running(&self, gid: &GroupId) -> Result<&RunningLayer> {
self.runnings.get(gid).ok_or(new_io_error("not online"))
}
pub fn running_mut(&mut self, gid: &GroupId) -> Result<&mut RunningAccount> {
pub fn running_mut(&mut self, gid: &GroupId) -> Result<&mut RunningLayer> {
self.runnings.get_mut(gid).ok_or(new_io_error("not online"))
}
pub fn add_running(&mut self, gid: &GroupId) -> Result<()> {
pub fn add_running(&mut self, gid: &GroupId, consensus: i64) -> Result<()> {
if !self.runnings.contains_key(gid) {
self.runnings.insert(*gid, RunningAccount::init());
self.runnings.insert(*gid, RunningLayer::init(consensus));
}
Ok(())
@ -203,18 +203,26 @@ impl OnlineSession { @@ -203,18 +203,26 @@ impl OnlineSession {
}
}
pub(crate) struct RunningAccount {
pub(crate) struct RunningLayer {
/// online group (friends/services) => (group's address, group's db id)
sessions: HashMap<GroupId, OnlineSession>,
/// layer current consensus height.
consensus: i64,
}
impl RunningAccount {
pub fn init() -> Self {
RunningAccount {
impl RunningLayer {
pub fn init(consensus: i64) -> Self {
RunningLayer {
consensus,
sessions: HashMap::new(),
}
}
pub fn increased(&mut self) -> i64 {
self.consensus += 1;
self.consensus
}
pub fn active(&mut self, gid: &GroupId, is_me: bool) -> Option<PeerAddr> {
if let Some(online) = self.sessions.get_mut(gid) {
if is_me {

1
src/migrate/group_chat.rs

@ -14,6 +14,7 @@ pub(super) const GROUP_CHAT_VERSIONS: [&str; 4] = [ @@ -14,6 +14,7 @@ pub(super) const GROUP_CHAT_VERSIONS: [&str; 4] = [
is_closed INTEGER NOT NULL,
key TEXT NOT NULL,
datetime INTEGER NOT NULL,
is_remote INTEGER NOT NULL,
is_deleted INTEGER NOT NULL);",
"CREATE TABLE IF NOT EXISTS requests(
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,

28
src/rpc.rs

@ -14,12 +14,12 @@ use tokio::sync::{ @@ -14,12 +14,12 @@ use tokio::sync::{
use crate::apps::app_rpc_inject;
use crate::apps::chat::chat_conn;
use crate::apps::group_chat::{add_layer, group_chat_conn};
use crate::apps::group_chat::{add_layer, group_chat_conn, GroupChat};
use crate::event::InnerEvent;
use crate::group::Group;
use crate::layer::{Layer, LayerEvent};
use crate::session::{Session, SessionType};
use crate::storage::session_db;
use crate::storage::{group_chat_db, session_db};
pub(crate) fn init_rpc(
addr: PeerAddr,
@ -261,7 +261,7 @@ fn new_rpc_handler( @@ -261,7 +261,7 @@ fn new_rpc_handler(
.await
.add_account(name, seed, lock, avatar_bytes, device_name, device_info)
.await?;
state.layer.write().await.add_running(&gid)?;
state.layer.write().await.add_running(&gid, 0)?;
let mut results = HandleResult::rpc(json!(vec![gid.to_hex()]));
results.networks.push(NetworkType::AddGroup(gid)); // add AddGroup to TDN.
@ -287,7 +287,7 @@ fn new_rpc_handler( @@ -287,7 +287,7 @@ fn new_rpc_handler(
.await
.add_account(name, seed, lock, vec![], device_name, device_info)
.await?;
state.layer.write().await.add_running(&gid)?;
state.layer.write().await.add_running(&gid, 0)?;
let mut results = HandleResult::rpc(json!(vec![gid.to_hex()]));
results.networks.push(NetworkType::AddGroup(gid)); // add AddGroup to TDN.
@ -359,15 +359,27 @@ fn new_rpc_handler( @@ -359,15 +359,27 @@ fn new_rpc_handler(
let gid = GroupId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let me_lock = params[1].as_str().ok_or(RpcError::ParseError)?;
state.group.write().await.add_running(&gid, me_lock)?;
state.layer.write().await.add_running(&gid)?;
let mut results = HandleResult::rpc(json!([gid.to_hex()]));
debug!("Account Logined: {}.", gid.to_hex());
state.group.write().await.add_running(&gid, me_lock)?;
// add AddGroup to TDN.
results.networks.push(NetworkType::AddGroup(gid));
let mut layer_lock = state.layer.write().await;
layer_lock.add_running(&gid, 0)?; // TODO account current state height.
// load all services layer created by this account.
// 1. group chat.
let group_db = group_chat_db(&layer_lock.base, &gid)?;
let group_chats = GroupChat::all_local(&group_db, &gid)?;
for (gcd, gheight) in group_chats {
layer_lock.add_running(&gcd, gheight)?;
results.networks.push(NetworkType::AddGroup(gcd));
}
drop(layer_lock);
debug!("Account Logined: {}.", gid.to_hex());
Ok(results)
},
);

Loading…
Cancel
Save