Browse Source

upgrade wallet & group chat

pull/18/head
Sun 3 years ago
parent
commit
845b722b1b
  1. 7
      lib/widgets/user_info.dart
  2. 50
      src/apps.rs
  3. 2
      src/apps/chat/layer.rs
  4. 1045
      src/apps/group/layer.rs
  5. 19
      src/apps/group/mod.rs
  6. 88
      src/apps/group/models/group.rs
  7. 106
      src/apps/group/models/member.rs
  8. 41
      src/apps/group/models/message.rs
  9. 278
      src/apps/group/rpc.rs
  10. 2
      src/apps/wallet/models.rs
  11. 160
      src/apps/wallet/rpc.rs
  12. 36
      src/group.rs
  13. 95
      src/layer.rs
  14. 5
      src/migrate/group.rs

7
lib/widgets/user_info.dart

@ -106,9 +106,6 @@ class _UserInfoState extends State<UserInfo> {
), ),
if (widget.title != null) if (widget.title != null)
Text(widget.title!, style: TextStyle(fontSize: 16.0, fontStyle: FontStyle.italic)), Text(widget.title!, style: TextStyle(fontSize: 16.0, fontStyle: FontStyle.italic)),
const SizedBox(height: 10),
const Divider(height: 1.0, color: Color(0x40ADB0BB)),
const SizedBox(height: 20),
InkWell( InkWell(
onTap: () { onTap: () {
Clipboard.setData(ClipboardData(text: pidText(widget.id))); Clipboard.setData(ClipboardData(text: pidText(widget.id)));
@ -117,11 +114,12 @@ class _UserInfoState extends State<UserInfo> {
}); });
}, },
child: Container( child: Container(
width: 250.0, padding: const EdgeInsets.symmetric(vertical: 10.0),
child: Row( child: Row(
children: [ children: [
Expanded( Expanded(
child: Text(pidText(widget.id, widget.pre), child: Text(pidText(widget.id, widget.pre),
textAlign: TextAlign.center,
style: TextStyle(fontSize: 14, color: idColor))), style: TextStyle(fontSize: 14, color: idColor))),
Padding( Padding(
padding: const EdgeInsets.symmetric(horizontal: 10.0), padding: const EdgeInsets.symmetric(horizontal: 10.0),
@ -132,6 +130,7 @@ class _UserInfoState extends State<UserInfo> {
), ),
) )
), ),
const Divider(height: 1.0, color: Color(0x40ADB0BB)),
const SizedBox(height: 16), const SizedBox(height: 16),
if (widget.remark != null) if (widget.remark != null)
Container( Container(

50
src/apps.rs

@ -2,7 +2,8 @@ use chat_types::CHAT_ID;
use cloud_types::CLOUD_ID; use cloud_types::CLOUD_ID;
use dao_types::DAO_ID; use dao_types::DAO_ID;
use domain_types::DOMAIN_ID; use domain_types::DOMAIN_ID;
use group_types::GROUP_CHAT_ID; use group_types::{GroupChatId, GROUP_CHAT_ID};
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tdn::types::{ use tdn::types::{
group::GroupId, group::GroupId,
@ -13,16 +14,17 @@ use tdn::types::{
use crate::global::Global; use crate::global::Global;
use crate::rpc::session_lost; use crate::rpc::session_lost;
use crate::storage::group_db;
pub(crate) mod chat; pub(crate) mod chat;
//pub(crate) mod cloud; //pub(crate) mod cloud;
pub(crate) mod device; pub(crate) mod device;
//pub(crate) mod domain; //pub(crate) mod domain;
//pub(crate) mod file; //pub(crate) mod file;
//pub(crate) mod group; pub(crate) mod group;
pub(crate) mod jarvis; pub(crate) mod jarvis;
//pub(crate) mod dao; //pub(crate) mod dao;
//pub(crate) mod wallet; pub(crate) mod wallet;
pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<Global>) { pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<Global>) {
//device::new_rpc_handler(handler); //device::new_rpc_handler(handler);
@ -30,8 +32,8 @@ pub(crate) fn app_rpc_inject(handler: &mut RpcHandler<Global>) {
jarvis::new_rpc_handler(handler); jarvis::new_rpc_handler(handler);
//domain::new_rpc_handler(handler); //domain::new_rpc_handler(handler);
//file::new_rpc_handler(handler); //file::new_rpc_handler(handler);
//group::new_rpc_handler(handler); group::new_rpc_handler(handler);
//wallet::new_rpc_handler(handler); wallet::new_rpc_handler(handler);
//dao::new_rpc_handler(handler); //dao::new_rpc_handler(handler);
//cloud::new_rpc_handler(handler); //cloud::new_rpc_handler(handler);
} }
@ -45,7 +47,7 @@ pub(crate) async fn app_layer_handle(
debug!("TODO GOT LAYER MESSAGE: ====== {} -> {} ===== ", fgid, tgid); debug!("TODO GOT LAYER MESSAGE: ====== {} -> {} ===== ", fgid, tgid);
match (fgid, tgid) { match (fgid, tgid) {
(CHAT_ID, 0) | (0, CHAT_ID) => chat::handle(msg, global).await, (CHAT_ID, 0) | (0, CHAT_ID) => chat::handle(msg, global).await,
(GROUP_CHAT_ID, 0) => chat::handle(msg, global).await, (GROUP_CHAT_ID, 0) | (0, GROUP_CHAT_ID) => group::handle(msg, global).await,
(DAO_ID, 0) => chat::handle(msg, global).await, (DAO_ID, 0) => chat::handle(msg, global).await,
(DOMAIN_ID, 0) => chat::handle(msg, global).await, (DOMAIN_ID, 0) => chat::handle(msg, global).await,
(CLOUD_ID, 0) => chat::handle(msg, global).await, (CLOUD_ID, 0) => chat::handle(msg, global).await,
@ -59,16 +61,40 @@ pub(crate) async fn app_layer_handle(
results.rpcs.push(session_lost(&session.s_id)); results.rpcs.push(session_lost(&session.s_id));
} }
let mut delete = vec![]; let mut delete: HashMap<GroupChatId, Vec<usize>> = HashMap::new();
let pid = global.pid().await;
let db_key = global.group.read().await.db_key(&pid)?;
let db = group_db(&global.base, &pid, &db_key)?;
for (gid, session) in &layer.groups { for (gid, session) in &layer.groups {
if session.addr == peer.id { for (index, addr) in session.addrs.iter().enumerate() {
delete.push(*gid); if addr == &peer.id {
results.rpcs.push(session_lost(&session.s_id)); delete
.entry(*gid)
.and_modify(|f| f.push(index))
.or_insert(vec![index]);
if index == 0 {
results.rpcs.push(session_lost(&session.s_id));
} else {
if let Ok(mid) = group::Member::get_id(&db, &session.db_id, addr) {
results
.rpcs
.push(group::rpc::member_offline(session.db_id, mid));
}
}
}
} }
} }
for gid in delete { for (gid, mut indexs) in delete {
let _ = layer.groups.remove(&gid); if indexs[0] == 0 {
let _ = layer.groups.remove(&gid);
} else {
indexs.reverse();
for i in indexs {
let _ = layer.group_del_member(&gid, i);
}
}
} }
Ok(results) Ok(results)

2
src/apps/chat/layer.rs

@ -159,7 +159,7 @@ async fn handle_connect(
results.rpcs.push(session_connect(&sid, &peer.id)); results.rpcs.push(session_connect(&sid, &peer.id));
// 4. active this session. // 4. active this session.
global.layer.write().await.chat_add(peer.id, sid, f.id); global.layer.write().await.chat_add(peer.id, sid, f.id, 0);
Ok(f.height as u64) Ok(f.height as u64)
} }

1045
src/apps/group/layer.rs

File diff suppressed because it is too large Load Diff

19
src/apps/group/mod.rs

@ -1,22 +1,7 @@
mod layer; mod layer;
mod models; mod models;
pub use group_types::GROUP_CHAT_ID as GROUP_ID;
use tdn::types::{group::GroupId, message::SendType, primitive::HandleResult};
/// Send to group chat service.
#[inline]
pub(crate) fn add_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) {
results.layers.push((gid, GROUP_ID, msg));
}
/// Send to group chat member.
#[inline]
pub fn add_server_layer(results: &mut HandleResult, gid: GroupId, msg: SendType) {
results.layers.push((GROUP_ID, gid, msg));
}
pub(crate) mod rpc; pub(crate) mod rpc;
pub(crate) use layer::{group_conn, handle_peer, handle_server}; pub(crate) use layer::{group_conn, handle};
pub(crate) use models::GroupChat; pub(crate) use models::{GroupChat, Member};
pub(crate) use rpc::new_rpc_handler; pub(crate) use rpc::new_rpc_handler;

88
src/apps/group/models/group.rs

@ -1,8 +1,8 @@
use group_types::GroupChatId;
use rand::Rng; use rand::Rng;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tdn::types::{ use tdn::types::{
group::GroupId, primitives::{PeerId, Result},
primitive::{PeerId, Result},
rpc::{json, RpcParam}, rpc::{json, RpcParam},
}; };
use tdn_storage::local::{DStorage, DsValue}; use tdn_storage::local::{DStorage, DsValue};
@ -18,11 +18,11 @@ pub(crate) struct GroupChat {
/// consensus height. /// consensus height.
pub height: i64, pub height: i64,
/// group chat id. /// group chat id.
pub g_id: GroupId, pub gid: GroupChatId,
/// group chat server addresse. /// group chat server addresse.
pub g_addr: PeerId, pub addr: PeerId,
/// group chat name. /// group chat name.
pub g_name: String, pub name: String,
/// group is delete by owner. /// group is delete by owner.
pub close: bool, pub close: bool,
/// group is in my device. /// group is in my device.
@ -30,13 +30,13 @@ pub(crate) struct GroupChat {
} }
impl GroupChat { impl GroupChat {
pub fn new(g_addr: PeerId, g_name: String) -> Self { pub fn new(addr: PeerId, name: String) -> Self {
let g_id = GroupId(rand::thread_rng().gen::<[u8; 32]>()); let gid = rand::thread_rng().gen::<GroupChatId>();
Self { Self {
g_id, gid,
g_addr, addr,
g_name, name,
id: 0, id: 0,
height: 0, height: 0,
close: false, close: false,
@ -44,11 +44,11 @@ impl GroupChat {
} }
} }
pub fn from(g_id: GroupId, height: i64, g_addr: PeerId, g_name: String) -> Self { pub fn from(gid: GroupChatId, height: i64, addr: PeerId, name: String) -> Self {
Self { Self {
g_id, gid,
g_addr, addr,
g_name, name,
height, height,
close: false, close: false,
local: false, local: false,
@ -65,10 +65,10 @@ impl GroupChat {
Session::new( Session::new(
self.id, self.id,
self.g_id, self.gid.to_string(),
self.g_addr, self.addr,
SessionType::Group, SessionType::Group,
self.g_name.clone(), self.name.clone(),
datetime, datetime,
) )
} }
@ -76,9 +76,9 @@ impl GroupChat {
pub fn to_rpc(&self) -> RpcParam { pub fn to_rpc(&self) -> RpcParam {
json!([ json!([
self.id, self.id,
self.g_id.to_hex(), self.gid,
self.g_addr.to_hex(), self.addr.to_hex(),
self.g_name, self.name,
self.close, self.close,
self.local, self.local,
]) ])
@ -88,9 +88,9 @@ impl GroupChat {
Self { Self {
local: v.pop().unwrap().as_bool(), local: v.pop().unwrap().as_bool(),
close: v.pop().unwrap().as_bool(), close: v.pop().unwrap().as_bool(),
g_name: v.pop().unwrap().as_string(), name: v.pop().unwrap().as_string(),
g_addr: PeerId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()), addr: PeerId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()),
g_id: GroupId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()), gid: v.pop().unwrap().as_i64() as GroupChatId,
height: v.pop().unwrap().as_i64(), height: v.pop().unwrap().as_i64(),
id: v.pop().unwrap().as_i64(), id: v.pop().unwrap().as_i64(),
} }
@ -98,7 +98,7 @@ impl GroupChat {
pub fn local(db: &DStorage) -> Result<Vec<GroupChat>> { pub fn local(db: &DStorage) -> Result<Vec<GroupChat>> {
let matrix = db.query( let matrix = db.query(
"SELECT id, height, gcd, addr, name, is_close, is_local FROM groups WHERE is_local = true", "SELECT id, height, gid, addr, name, is_close, is_local FROM groups WHERE is_local = true",
)?; )?;
let mut groups = vec![]; let mut groups = vec![];
for values in matrix { for values in matrix {
@ -109,7 +109,7 @@ impl GroupChat {
pub fn all(db: &DStorage) -> Result<Vec<GroupChat>> { pub fn all(db: &DStorage) -> Result<Vec<GroupChat>> {
let matrix = let matrix =
db.query("SELECT id, height, gcd, addr, name, is_close, is_local FROM groups")?; db.query("SELECT id, height, gid, addr, name, is_close, is_local FROM groups")?;
let mut groups = vec![]; let mut groups = vec![];
for values in matrix { for values in matrix {
groups.push(Self::from_values(values)); groups.push(Self::from_values(values));
@ -119,7 +119,7 @@ impl GroupChat {
pub fn get(db: &DStorage, id: &i64) -> Result<GroupChat> { pub fn get(db: &DStorage, id: &i64) -> Result<GroupChat> {
let sql = format!( let sql = format!(
"SELECT id, height, gcd, addr, name, is_close, is_local FROM groups WHERE id = {}", "SELECT id, height, gid, addr, name, is_close, is_local FROM groups WHERE id = {}",
id id
); );
let mut matrix = db.query(&sql)?; let mut matrix = db.query(&sql)?;
@ -131,11 +131,8 @@ impl GroupChat {
} }
} }
pub fn get_id(db: &DStorage, gid: &GroupId) -> Result<GroupChat> { pub fn get_id(db: &DStorage, gid: &GroupChatId, addr: &PeerId) -> Result<GroupChat> {
let sql = format!( let sql = format!("SELECT id, height, gid, addr, name, is_close, is_local FROM groups WHERE gid = {} AND addr = '{}'", gid, addr.to_hex());
"SELECT id, height, gcd, addr, name, is_close, is_local FROM groups WHERE gcd = '{}'",
gid.to_hex()
);
let mut matrix = db.query(&sql)?; let mut matrix = db.query(&sql)?;
if matrix.len() > 0 { if matrix.len() > 0 {
let values = matrix.pop().unwrap(); // safe unwrap() let values = matrix.pop().unwrap(); // safe unwrap()
@ -147,27 +144,20 @@ impl GroupChat {
pub fn insert(&mut self, db: &DStorage) -> Result<()> { pub fn insert(&mut self, db: &DStorage) -> Result<()> {
let mut unique_check = db.query(&format!( let mut unique_check = db.query(&format!(
"SELECT id from groups WHERE gcd = '{}'", "SELECT id from groups WHERE gid = {} AND addr = '{}'",
self.g_id.to_hex() self.gid,
self.addr.to_hex()
))?; ))?;
if unique_check.len() > 0 { if unique_check.len() > 0 {
let id = unique_check.pop().unwrap().pop().unwrap().as_i64(); self.gid += 1;
self.id = id; return self.insert(db);
let sql = format!(
"UPDATE groups SET height = {}, addr='{}', name = '{}' WHERE id = {}",
self.height,
self.g_addr.to_hex(),
self.g_name,
self.id
);
db.update(&sql)?;
} else { } else {
let sql = format!( let sql = format!(
"INSERT INTO groups (height, gcd, addr, name, is_close, is_local) VALUES ({}, '{}', '{}', '{}', {}, {})", "INSERT INTO groups (height, gid, addr, name, is_close, is_local) VALUES ({}, {}, '{}', '{}', {}, {})",
self.height, self.height,
self.g_id.to_hex(), self.gid,
self.g_addr.to_hex(), self.addr.to_hex(),
self.g_name, self.name,
self.close, self.close,
self.local, self.local,
); );
@ -187,8 +177,8 @@ impl GroupChat {
db.update(&sql) db.update(&sql)
} }
pub fn close(db: &DStorage, gcd: &GroupId) -> Result<GroupChat> { pub fn close(db: &DStorage, gid: &GroupChatId, addr: &PeerId) -> Result<GroupChat> {
let group = Self::get_id(db, gcd)?; let group = Self::get_id(db, gid, addr)?;
let sql = format!("UPDATE groups SET is_close = true WHERE id = {}", group.id); let sql = format!("UPDATE groups SET is_close = true WHERE id = {}", group.id);
db.update(&sql)?; db.update(&sql)?;
Ok(group) Ok(group)

106
src/apps/group/models/member.rs

@ -1,7 +1,7 @@
use esse_primitives::{id_from_str, id_to_str};
use std::path::PathBuf; use std::path::PathBuf;
use tdn::types::{ use tdn::types::{
group::GroupId, primitives::{PeerId, Result},
primitive::{PeerId, Result},
rpc::{json, RpcParam}, rpc::{json, RpcParam},
}; };
use tdn_storage::local::{DStorage, DsValue}; use tdn_storage::local::{DStorage, DsValue};
@ -16,36 +16,32 @@ pub(crate) struct Member {
pub height: i64, pub height: i64,
/// group's db id. /// group's db id.
pub fid: i64, pub fid: i64,
/// member's Did(GroupId) /// member's Did(PeerId)
pub m_id: GroupId, pub pid: PeerId,
/// member's addresse.
pub m_addr: PeerId,
/// member's name. /// member's name.
pub m_name: String, pub name: String,
/// if leave from group. /// if leave from group.
pub leave: bool, pub leave: bool,
} }
impl Member { impl Member {
pub fn new(height: i64, fid: i64, m_id: GroupId, m_addr: PeerId, m_name: String) -> Self { pub fn new(height: i64, fid: i64, pid: PeerId, name: String) -> Self {
Self { Self {
height, height,
fid, fid,
m_id, pid,
m_addr, name,
m_name,
leave: false, leave: false,
id: 0, id: 0,
} }
} }
pub fn info(id: i64, fid: i64, m_id: GroupId, m_addr: PeerId, m_name: String) -> Self { pub fn info(id: i64, fid: i64, pid: PeerId, name: String) -> Self {
Self { Self {
id, id,
fid, fid,
m_id, pid,
m_addr, name,
m_name,
leave: false, leave: false,
height: 0, height: 0,
} }
@ -55,9 +51,8 @@ impl Member {
json!([ json!([
self.id, self.id,
self.fid, self.fid,
self.m_id.to_hex(), id_to_str(&self.pid),
self.m_addr.to_hex(), self.name,
self.m_name,
self.leave, self.leave,
]) ])
} }
@ -65,9 +60,8 @@ impl Member {
fn from_values(mut v: Vec<DsValue>) -> Self { fn from_values(mut v: Vec<DsValue>) -> Self {
Self { Self {
leave: v.pop().unwrap().as_bool(), leave: v.pop().unwrap().as_bool(),
m_name: v.pop().unwrap().as_string(), name: v.pop().unwrap().as_string(),
m_addr: PeerId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()), pid: id_from_str(v.pop().unwrap().as_str()).unwrap_or(Default::default()),
m_id: GroupId::from_hex(v.pop().unwrap().as_string()).unwrap_or(Default::default()),
fid: v.pop().unwrap().as_i64(), fid: v.pop().unwrap().as_i64(),
height: v.pop().unwrap().as_i64(), height: v.pop().unwrap().as_i64(),
id: v.pop().unwrap().as_i64(), id: v.pop().unwrap().as_i64(),
@ -76,7 +70,7 @@ impl Member {
pub fn list(db: &DStorage, fid: &i64) -> Result<Vec<Member>> { pub fn list(db: &DStorage, fid: &i64) -> Result<Vec<Member>> {
let matrix = db.query(&format!( let matrix = db.query(&format!(
"SELECT id, height, fid, mid, addr, name, leave FROM members WHERE fid = {}", "SELECT id, height, fid, pid, name, leave FROM members WHERE fid = {}",
fid fid
))?; ))?;
let mut groups = vec![]; let mut groups = vec![];
@ -88,28 +82,24 @@ impl Member {
pub fn insert(&mut self, db: &DStorage) -> Result<()> { pub fn insert(&mut self, db: &DStorage) -> Result<()> {
let mut unique_check = db.query(&format!( let mut unique_check = db.query(&format!(
"SELECT id from members WHERE fid = {} AND mid = '{}'", "SELECT id from members WHERE fid = {} AND pid = '{}'",
self.fid, self.fid,
self.m_id.to_hex() id_to_str(&self.pid)
))?; ))?;
if unique_check.len() > 0 { if unique_check.len() > 0 {
let id = unique_check.pop().unwrap().pop().unwrap().as_i64(); let id = unique_check.pop().unwrap().pop().unwrap().as_i64();
self.id = id; self.id = id;
let sql = format!( let sql = format!(
"UPDATE members SET height = {}, addr='{}', name = '{}', leave = false WHERE id = {}", "UPDATE members SET height = {}, name = '{}', leave = false WHERE id = {}",
self.height, self.height, self.name, self.id,
self.m_addr.to_hex(),
self.m_name,
self.id,
); );
db.update(&sql)?; db.update(&sql)?;
} else { } else {
let sql = format!("INSERT INTO members (height, fid, mid, addr, name, leave) VALUES ({}, {}, '{}', '{}', '{}', false)", let sql = format!("INSERT INTO members (height, fid, pid, name, leave) VALUES ({}, {}, '{}', '{}', false)",
self.height, self.height,
self.fid, self.fid,
self.m_id.to_hex(), id_to_str(&self.pid),
self.m_addr.to_hex(), self.name,
self.m_name,
); );
let id = db.insert(&sql)?; let id = db.insert(&sql)?;
self.id = id; self.id = id;
@ -119,7 +109,7 @@ impl Member {
pub fn _get(db: &DStorage, id: &i64) -> Result<Member> { pub fn _get(db: &DStorage, id: &i64) -> Result<Member> {
let mut matrix = db.query(&format!( let mut matrix = db.query(&format!(
"SELECT id, height, fid, mid, addr, name, leave FROM members WHERE id = {}", "SELECT id, height, fid, pid, name, leave FROM members WHERE id = {}",
id, id,
))?; ))?;
if matrix.len() > 0 { if matrix.len() > 0 {
@ -129,11 +119,11 @@ impl Member {
} }
} }
pub fn get_id(db: &DStorage, fid: &i64, gid: &GroupId) -> Result<i64> { pub fn get_id(db: &DStorage, fid: &i64, pid: &PeerId) -> Result<i64> {
let mut matrix = db.query(&format!( let mut matrix = db.query(&format!(
"SELECT id FROM members WHERE fid = {} AND mid = '{}'", "SELECT id FROM members WHERE fid = {} AND pid = '{}'",
fid, fid,
gid.to_hex() id_to_str(pid)
))?; ))?;
if matrix.len() > 0 { if matrix.len() > 0 {
Ok(matrix.pop().unwrap().pop().unwrap().as_i64()) // safe unwrap. Ok(matrix.pop().unwrap().pop().unwrap().as_i64()) // safe unwrap.
@ -142,31 +132,10 @@ impl Member {
} }
} }
pub fn addr_update(db: &DStorage, fid: &i64, mid: &GroupId, addr: &PeerId) -> Result<i64> { pub fn update(db: &DStorage, id: &i64, height: &i64, name: &str) -> Result<usize> {
let mdid = Self::get_id(db, fid, mid)?;
let sql = format!( let sql = format!(
"UPDATE members SET addr='{}' WHERE fid = {} AND mid = '{}'", "UPDATE members SET height = {}, name='{}' WHERE id = {}",
addr.to_hex(), height, name, id,
fid,
mid.to_hex(),
);
db.update(&sql)?;
Ok(mdid)
}
pub fn update(
db: &DStorage,
id: &i64,
height: &i64,
addr: &PeerId,
name: &str,
) -> Result<usize> {
let sql = format!(
"UPDATE members SET height = {}, addr='{}', name='{}' WHERE id = {}",
height,
addr.to_hex(),
name,
id,
); );
db.update(&sql) db.update(&sql)
} }
@ -186,26 +155,23 @@ impl Member {
pub async fn sync( pub async fn sync(
base: &PathBuf, base: &PathBuf,
gid: &GroupId, gid: &PeerId,
db: &DStorage, db: &DStorage,
fid: &i64, fid: &i64,
from: &i64, from: &i64,
to: &i64, to: &i64,
) -> Result<( ) -> Result<(Vec<(i64, PeerId, String, Vec<u8>)>, Vec<(i64, PeerId)>)> {
Vec<(i64, GroupId, PeerId, String, Vec<u8>)>, let sql = format!("SELECT id, height, fid, pid, name, leave FROM members WHERE fid = {} AND height BETWEEN {} AND {}", fid, from, to);
Vec<(i64, GroupId)>,
)> {
let sql = format!("SELECT id, height, fid, mid, addr, name, leave FROM members WHERE fid = {} AND height BETWEEN {} AND {}", fid, from, to);
let matrix = db.query(&sql)?; let matrix = db.query(&sql)?;
let mut adds = vec![]; let mut adds = vec![];
let mut leaves = vec![]; let mut leaves = vec![];
for values in matrix { for values in matrix {
let m = Self::from_values(values); let m = Self::from_values(values);
if m.leave { if m.leave {
leaves.push((m.height, m.m_id)); leaves.push((m.height, m.pid));
} else { } else {
let mavatar = read_avatar(base, gid, &m.m_id).await.unwrap_or(vec![]); let mavatar = read_avatar(base, gid, &m.pid).await.unwrap_or(vec![]);
adds.push((m.height, m.m_id, m.m_addr, m.m_name, mavatar)) adds.push((m.height, m.pid, m.name, mavatar))
} }
} }
Ok((adds, leaves)) Ok((adds, leaves))

41
src/apps/group/models/message.rs

@ -1,19 +1,17 @@
use esse_primitives::id_from_str;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tdn::types::{ use tdn::types::{
group::GroupId, primitives::{HandleResult, PeerId, Result},
primitive::{HandleResult, Result},
rpc::{json, RpcParam}, rpc::{json, RpcParam},
}; };
use tdn_storage::local::{DStorage, DsValue}; use tdn_storage::local::{DStorage, DsValue};
use tokio::sync::RwLock;
use chat_types::{MessageType, NetworkMessage}; use chat_types::{MessageType, NetworkMessage};
use crate::apps::chat::{from_network_message, raw_to_network_message, to_network_message as tnm}; use crate::apps::chat::{from_network_message, raw_to_network_message, to_network_message as tnm};
use crate::group::Group; use crate::storage::group_db;
use super::Member; use super::Member;
@ -140,19 +138,18 @@ impl Message {
} }
pub async fn sync( pub async fn sync(
own: &PeerId,
base: &PathBuf, base: &PathBuf,
gid: &GroupId,
db: &DStorage, db: &DStorage,
fid: &i64, fid: &i64,
from: &i64, from: &i64,
to: &i64, to: &i64,
) -> Result<Vec<(i64, GroupId, NetworkMessage, i64)>> { ) -> Result<Vec<(i64, PeerId, NetworkMessage, i64)>> {
let sql = format!("SELECT id, mid FROM members WHERE fid = {}", fid); let sql = format!("SELECT id, pid FROM members WHERE fid = {}", fid);
let m = db.query(&sql)?; let m = db.query(&sql)?;
let mut members = HashMap::new(); let mut members = HashMap::new();
for mut v in m { for mut v in m {
let m_s = v.pop().unwrap().as_string(); let mid = id_from_str(v.pop().unwrap().as_str()).unwrap_or(Default::default());
let mid = GroupId::from_hex(m_s).unwrap_or(Default::default());
let id = v.pop().unwrap().as_i64(); let id = v.pop().unwrap().as_i64();
members.insert(id, mid); members.insert(id, mid);
} }
@ -162,8 +159,8 @@ impl Message {
let mut messages = vec![]; let mut messages = vec![];
for values in matrix { for values in matrix {
let msg = Message::from_values(values); let msg = Message::from_values(values);
if let Ok(nmsg) = tnm(base, gid, msg.m_type, msg.content).await { if let Ok(nmsg) = tnm(own, base, msg.m_type, msg.content).await {
let mid = members.get(&msg.mid).cloned().unwrap_or(GroupId::default()); let mid = members.get(&msg.mid).cloned().unwrap_or(PeerId::default());
messages.push((msg.height, mid, nmsg, msg.datetime)) messages.push((msg.height, mid, nmsg, msg.datetime))
} }
} }
@ -173,9 +170,9 @@ impl Message {
} }
pub(crate) async fn to_network_message( pub(crate) async fn to_network_message(
group: &Arc<RwLock<Group>>, own: &PeerId,
base: &PathBuf, base: &PathBuf,
gid: &GroupId, db_key: &str,
mtype: MessageType, mtype: MessageType,
content: &str, content: &str,
) -> Result<(NetworkMessage, i64, String)> { ) -> Result<(NetworkMessage, i64, String)> {
@ -185,25 +182,25 @@ pub(crate) async fn to_network_message(
.map(|s| s.as_secs()) .map(|s| s.as_secs())
.unwrap_or(0) as i64; // safe for all life. .unwrap_or(0) as i64; // safe for all life.
let (nmsg, raw) = raw_to_network_message(group, base, gid, &mtype, content).await?; let (nmsg, raw) = raw_to_network_message(own, base, db_key, &mtype, content).await?;
Ok((nmsg, datetime, raw)) Ok((nmsg, datetime, raw))
} }
pub(crate) async fn handle_network_message( pub(crate) async fn handle_network_message(
group: &Arc<RwLock<Group>>, own: &PeerId,
base: &PathBuf,
db_key: &str,
height: i64, height: i64,
gdid: i64, gdid: i64,
mid: GroupId, mid: PeerId,
mgid: &GroupId,
msg: NetworkMessage, msg: NetworkMessage,
datetime: i64, datetime: i64,
base: &PathBuf,
results: &mut HandleResult, results: &mut HandleResult,
) -> Result<Message> { ) -> Result<Message> {
let db = group.read().await.group_db(mgid)?; let db = group_db(base, own, db_key)?;
let mdid = Member::get_id(&db, &gdid, &mid)?; let mdid = Member::get_id(&db, &gdid, &mid)?;
let is_me = &mid == mgid; let is_me = &mid == own;
let (m_type, raw) = from_network_message(group, msg, base, mgid, results).await?; let (m_type, raw) = from_network_message(own, base, db_key, msg, results).await?;
let mut msg = Message::new_with_time(height, gdid, mdid, is_me, m_type, raw, datetime); let mut msg = Message::new_with_time(height, gdid, mdid, is_me, m_type, raw, datetime);
msg.insert(&db)?; msg.insert(&db)?;
Ok(msg) Ok(msg)

278
src/apps/group/rpc.rs

@ -1,57 +1,49 @@
use chat_types::{MessageType, CHAT_ID};
use group_types::{Event, LayerEvent, GROUP_CHAT_ID};
use std::sync::Arc; use std::sync::Arc;
use tdn::types::{ use tdn::types::{
group::GroupId, message::{RpcSendMessage, SendType},
message::{NetworkType, SendMessage, SendType}, primitives::{HandleResult, PeerId},
primitive::{HandleResult, PeerId},
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam},
}; };
use chat_types::MessageType; use crate::apps::chat::{raw_to_network_message, Friend, InviteType};
use group_types::{Event, LayerEvent}; use crate::global::Global;
use crate::rpc::{session_create, session_delete, session_update_name};
use crate::apps::chat::{Friend, InviteType};
use crate::layer::Online;
use crate::rpc::{session_create, session_delete, session_update_name, RpcState};
use crate::session::{Session, SessionType}; use crate::session::{Session, SessionType};
use crate::storage::{read_avatar, write_avatar}; use crate::storage::{chat_db, group_db, read_avatar, session_db, write_avatar};
use super::layer::{broadcast, update_session}; use super::layer::{broadcast, update_session};
use super::models::{to_network_message, GroupChat, Member, Message}; use super::models::{to_network_message, GroupChat, Member, Message};
use super::{add_layer, add_server_layer};
#[inline] #[inline]
pub(crate) fn member_join(mgid: GroupId, member: &Member) -> RpcParam { pub(crate) fn member_join(member: &Member) -> RpcParam {
rpc_response(0, "group-member-join", json!(member.to_rpc()), mgid) rpc_response(0, "group-member-join", json!(member.to_rpc()))
} }
#[inline] #[inline]
pub(crate) fn member_leave(mgid: GroupId, id: i64, mid: i64) -> RpcParam { pub(crate) fn member_leave(id: i64, mid: i64) -> RpcParam {
rpc_response(0, "group-member-leave", json!([id, mid]), mgid) rpc_response(0, "group-member-leave", json!([id, mid]))
} }
#[inline] #[inline]
pub(crate) fn member_online(mgid: GroupId, id: i64, mid: i64, maddr: &PeerId) -> RpcParam { pub(crate) fn member_online(id: i64, mid: i64) -> RpcParam {
rpc_response( rpc_response(0, "group-member-online", json!([id, mid]))
0,
"group-member-online",
json!([id, mid, maddr.to_hex()]),
mgid,
)
} }
#[inline] #[inline]
pub(crate) fn member_offline(mgid: GroupId, gid: i64, mid: i64) -> RpcParam { pub(crate) fn member_offline(id: i64, mid: i64) -> RpcParam {
rpc_response(0, "group-member-offline", json!([gid, mid]), mgid) rpc_response(0, "group-member-offline", json!([id, mid]))
} }
#[inline] #[inline]
pub(crate) fn group_name(mgid: GroupId, gid: &i64, name: &str) -> RpcParam { pub(crate) fn group_name(id: &i64, name: &str) -> RpcParam {
rpc_response(0, "group-name", json!([gid, name]), mgid) rpc_response(0, "group-name", json!([id, name]))
} }
#[inline] #[inline]
pub(crate) fn message_create(mgid: GroupId, msg: &Message) -> RpcParam { pub(crate) fn message_create(msg: &Message) -> RpcParam {
rpc_response(0, "group-message-create", json!(msg.to_rpc()), mgid) rpc_response(0, "group-message-create", json!(msg.to_rpc()))
} }
#[inline] #[inline]
@ -79,20 +71,26 @@ fn detail_list(group: GroupChat, members: Vec<Member>, messages: Vec<Message>) -
json!([group.to_rpc(), member_results, message_results]) json!([group.to_rpc(), member_results, message_results])
} }
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
handler.add_method( handler.add_method(
"group-list", "group-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move { |_params: Vec<RpcParam>, state: Arc<Global>| async move {
let db = state.group.read().await.group_db(&gid)?; let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = group_db(&state.base, &pid, &db_key)?;
Ok(HandleResult::rpc(group_list(GroupChat::all(&db)?))) Ok(HandleResult::rpc(group_list(GroupChat::all(&db)?)))
}, },
); );
handler.add_method( handler.add_method(
"group-detail", "group-detail",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let db = state.group.read().await.group_db(&gid)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = group_db(&state.base, &pid, &db_key)?;
let group = GroupChat::get(&db, &id)?; let group = GroupChat::get(&db, &id)?;
let members = Member::list(&db, &id)?; let members = Member::list(&db, &id)?;
let messages = Message::list(&db, &id)?; let messages = Message::list(&db, &id)?;
@ -102,65 +100,52 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"group-create", "group-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned(); let name = params[0].as_str().ok_or(RpcError::ParseError)?.to_owned();
let pid = state.pid().await;
let group_lock = state.group.read().await; let group_lock = state.group.read().await;
let base = group_lock.base().clone(); let db_key = group_lock.db_key(&pid)?;
let addr = group_lock.addr().clone(); let me = group_lock.clone_user(&pid)?;
let sender = group_lock.sender();
let me = group_lock.clone_user(&gid)?;
let db = group_lock.group_db(&gid)?;
let s_db = group_lock.session_db(&gid)?;
drop(group_lock); drop(group_lock);
let mut gc = GroupChat::new(addr, name); let db = group_db(&state.base, &pid, &db_key)?;
let gcd = gc.g_id; let s_db = session_db(&state.base, &pid, &db_key)?;
let gheight = gc.height + 1; // add first member.
let mut gc = GroupChat::new(pid, name);
let gh = gc.height + 1; // add first member.
// save db // save db
gc.insert(&db)?; gc.insert(&db)?;
let gdid = gc.id; let id = gc.id;
let gid = gc.gid;
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let mut m = Member::new(gheight, gc.id, gid, me.addr, me.name); let mut m = Member::new(gh, id, pid, me.name);
m.insert(&db)?; m.insert(&db)?;
let mid = m.id; let mid = m.id;
let _ = write_avatar(&base, &gid, &gid, &me.avatar).await; let _ = write_avatar(&state.base, &pid, &pid, &me.avatar).await;
// Add new session. // Add new session.
let mut session = gc.to_session(); let mut session = gc.to_session();
session.insert(&s_db)?; session.insert(&s_db)?;
let sid = session.id; let sid = session.id;
let sender = state.rpc_send.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _ = sender let _ = sender
.send(SendMessage::Rpc(0, session_create(gid, &session), true)) .send(RpcSendMessage(0, session_create(&session), true))
.await; .await;
}); });
// add to rpcs. // add to rpcs.
results.rpcs.push(json!([sid, gdid])); results.rpcs.push(json!([sid, id]));
// Add frist member join. // Add frist member join.
let mut layer_lock = state.layer.write().await; state.layer.write().await.group_add(gid, pid, sid, id, gh);
layer_lock.add_running(&gcd, gid, gdid, gheight)?;
// Add online to layers.
layer_lock
.running_mut(&gcd)?
.check_add_online(gid, Online::Direct(addr), gdid, mid)?;
layer_lock
.running_mut(&gid)?
.check_add_online(gcd, Online::Direct(addr), sid, gdid)?;
drop(layer_lock);
// Update consensus. // Update consensus.
GroupChat::add_height(&db, gdid, gheight)?; GroupChat::add_height(&db, id, gh)?;
// Online local group.
results.networks.push(NetworkType::AddGroup(gcd));
Ok(results) Ok(results)
}, },
@ -168,65 +153,60 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"group-member-join", "group-member-join",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let fid = params[1].as_i64().ok_or(RpcError::ParseError)?; let fid = params[1].as_i64().ok_or(RpcError::ParseError)?;
let group_lock = state.group.read().await; let pid = state.pid().await;
let base = group_lock.base().clone(); let db_key = state.group.read().await.db_key(&pid)?;
let chat_db = group_lock.chat_db(&gid)?; let group_db = group_db(&state.base, &pid, &db_key)?;
let group_db = group_lock.group_db(&gid)?; let chat_db = chat_db(&state.base, &pid, &db_key)?;
let s_db = group_lock.session_db(&gid)?; let s_db = session_db(&state.base, &pid, &db_key)?;
drop(group_lock);
let f = Friend::get(&chat_db, &fid)?; let f = Friend::get(&chat_db, &fid)?;
let g = GroupChat::get(&group_db, &id)?; let g = GroupChat::get(&group_db, &id)?;
let gcd = g.g_id; let gid = g.gid;
let mut results = HandleResult::new(); let mut results = HandleResult::new();
// handle invite message // handle invite message
let contact_values = InviteType::Group(gcd, g.g_addr, g.g_name).serialize(); let contact = InviteType::Group(gid, g.addr, g.name).serialize();
let (msg, nw) = crate::apps::chat::LayerEvent::from_message( let m_type = MessageType::Invite;
&state.group, let (nm, raw) =
&base, raw_to_network_message(&pid, &state.base, &db_key, &m_type, &contact).await?;
gid, let mut msg = crate::apps::chat::Message::new(&pid, f.id, true, m_type, raw, false);
fid, msg.insert(&chat_db)?;
MessageType::Invite, let event = crate::apps::chat::LayerEvent::Message(msg.hash, nm);
&contact_values, let tid = state.layer.write().await.delivery(msg.id);
) let data = bincode::serialize(&event).unwrap_or(vec![]);
.await?; let lmsg = SendType::Event(tid, f.pid, data);
let event = crate::apps::chat::LayerEvent::Message(msg.hash, nw); results.layers.push((CHAT_ID, lmsg));
let mut layer_lock = state.layer.write().await;
let s = crate::apps::chat::event_message(&mut layer_lock, msg.id, gid, f.addr, &event); // update session.
drop(layer_lock); crate::apps::chat::update_session(&s_db, &id, &msg, &mut results);
results.layers.push((gid, f.gid, s));
crate::apps::chat::update_session(&s_db, &gid, &id, &msg, &mut results);
// handle group member // handle group member
let avatar = read_avatar(&base, &gid, &f.gid).await.unwrap_or(vec![]); let avatar = read_avatar(&state.base, &pid, &f.pid)
let event = Event::MemberJoin(f.gid, f.addr, f.name.clone(), avatar); .await
.unwrap_or(vec![]);
let event = Event::MemberJoin(f.pid, f.name.clone(), avatar);
if g.local { if g.local {
// local save. // local save.
let new_h = state.layer.write().await.running_mut(&gcd)?.increased(); let new_h = state.layer.write().await.group_increased(&gid)?;
let mut mem = Member::new(new_h, g.id, f.gid, f.addr, f.name); let mut mem = Member::new(new_h, g.id, f.pid, f.name);
mem.insert(&group_db)?; mem.insert(&group_db)?;
results.rpcs.push(mem.to_rpc()); results.rpcs.push(mem.to_rpc());
GroupChat::add_height(&group_db, id, new_h)?; GroupChat::add_height(&group_db, id, new_h)?;
// broadcast. // broadcast.
broadcast( let data = LayerEvent::Sync(gid, new_h, event);
&LayerEvent::Sync(gcd, new_h, event), broadcast(&gid, &state, &data, &mut results).await?;
&state.layer,
&gcd,
&mut results,
)
.await?;
} else { } else {
// send to server. // send to server.
let data = bincode::serialize(&LayerEvent::Sync(gcd, 0, event))?; let data = bincode::serialize(&LayerEvent::Sync(gid, 0, event))?;
let msg = SendType::Event(0, g.g_addr, data); let msg = SendType::Event(0, g.addr, data);
add_layer(&mut results, gid, msg); results.layers.push((GROUP_CHAT_ID, msg));
} }
Ok(results) Ok(results)
@ -235,29 +215,28 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"group-message-create", "group-message-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let m_type = MessageType::from_int(params[1].as_i64().ok_or(RpcError::ParseError)?); let m_type = MessageType::from_int(params[1].as_i64().ok_or(RpcError::ParseError)?);
let m_content = params[2].as_str().ok_or(RpcError::ParseError)?; let m_content = params[2].as_str().ok_or(RpcError::ParseError)?;
let group_lock = state.group.read().await; let pid = state.pid().await;
let base = group_lock.base().clone(); let db_key = state.group.read().await.db_key(&pid)?;
let db = group_lock.group_db(&gid)?; let db = group_db(&state.base, &pid, &db_key)?;
let s_db = group_lock.session_db(&gid)?; let s_db = session_db(&state.base, &pid, &db_key)?;
drop(group_lock);
let group = GroupChat::get(&db, &id)?; let group = GroupChat::get(&db, &id)?;
let gcd = group.g_id; let gid = group.gid;
let mid = Member::get_id(&db, &id, &gid)?; let mid = Member::get_id(&db, &id, &pid)?;
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let (nmsg, datetime, raw) = let (nmsg, datetime, raw) =
to_network_message(&state.group, &base, &gid, m_type, m_content).await?; to_network_message(&pid, &state.base, &db_key, m_type, m_content).await?;
let event = Event::MessageCreate(gid, nmsg, datetime); let event = Event::MessageCreate(pid, nmsg, datetime);
if group.local { if group.local {
// local save. // local save.
let new_h = state.layer.write().await.running_mut(&gcd)?.increased(); let new_h = state.layer.write().await.group_increased(&gid)?;
let mut msg = Message::new_with_time(new_h, id, mid, true, m_type, raw, datetime); let mut msg = Message::new_with_time(new_h, id, mid, true, m_type, raw, datetime);
msg.insert(&db)?; msg.insert(&db)?;
@ -265,21 +244,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
GroupChat::add_height(&db, id, new_h)?; GroupChat::add_height(&db, id, new_h)?;
// UPDATE SESSION. // UPDATE SESSION.
update_session(&s_db, &gid, &id, &msg, &mut results); update_session(&s_db, &id, &msg, &mut results);
// broadcast. // broadcast.
broadcast( let data = LayerEvent::Sync(gid, new_h, event);
&LayerEvent::Sync(gcd, new_h, event), broadcast(&gid, &state, &data, &mut results).await?;
&state.layer,
&gcd,
&mut results,
)
.await?;
} else { } else {
// send to server. // send to server.
let data = bincode::serialize(&LayerEvent::Sync(gcd, 0, event))?; let data = bincode::serialize(&LayerEvent::Sync(gid, 0, event))?;
let msg = SendType::Event(0, group.g_addr, data); let msg = SendType::Event(0, group.addr, data);
add_layer(&mut results, gid, msg); results.layers.push((GROUP_CHAT_ID, msg));
} }
Ok(results) Ok(results)
@ -288,35 +262,31 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"group-name", "group-name",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let name = params[1].as_str().ok_or(RpcError::ParseError)?; let name = params[1].as_str().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let group_lock = state.group.read().await; let pid = state.pid().await;
let db = group_lock.group_db(&gid)?; let db_key = state.group.read().await.db_key(&pid)?;
let s_db = group_lock.session_db(&gid)?; let db = group_db(&state.base, &pid, &db_key)?;
drop(group_lock); let s_db = session_db(&state.base, &pid, &db_key)?;
let g = GroupChat::get(&db, &id)?; let g = GroupChat::get(&db, &id)?;
let d = bincode::serialize(&LayerEvent::GroupName(g.g_id, name.to_owned()))?; let data = LayerEvent::GroupName(g.gid, name.to_owned());
if g.local { if g.local {
if let Ok(sid) = Session::update_name_by_id(&s_db, &id, &SessionType::Group, &name) if let Ok(sid) = Session::update_name_by_id(&s_db, &id, &SessionType::Group, &name)
{ {
results.rpcs.push(session_update_name(gid, &sid, &name)); results.rpcs.push(session_update_name(&sid, &name));
} }
results.rpcs.push(json!([id, name])); results.rpcs.push(json!([id, name]));
// dissolve group. broadcast(&g.gid, &state, &data, &mut results).await?;
for (mgid, maddr) in state.layer.read().await.running(&g.g_id)?.onlines() {
let s = SendType::Event(0, *maddr, d.clone());
add_server_layer(&mut results, *mgid, s);
}
} else { } else {
// leave group. let d = bincode::serialize(&data)?;
let msg = SendType::Event(0, g.g_addr, d); let msg = SendType::Event(0, g.addr, d);
add_layer(&mut results, gid, msg); results.layers.push((GROUP_CHAT_ID, msg));
} }
Ok(results) Ok(results)
@ -325,33 +295,33 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"group-delete", "group-delete",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let pid = state.pid().await;
let group_lock = state.group.read().await; let db_key = state.group.read().await.db_key(&pid)?;
let db = group_lock.group_db(&gid)?; let db = group_db(&state.base, &pid, &db_key)?;
let s_db = group_lock.session_db(&gid)?; let s_db = session_db(&state.base, &pid, &db_key)?;
drop(group_lock);
let g = GroupChat::delete(&db, &id)?; let g = GroupChat::delete(&db, &id)?;
let sid = Session::delete(&s_db, &id, &SessionType::Group)?; let sid = Session::delete(&s_db, &id, &SessionType::Group)?;
results.rpcs.push(session_delete(gid, &sid)); results.rpcs.push(session_delete(&sid));
if g.local { if g.local {
// dissolve group. // dissolve group.
let d = bincode::serialize(&LayerEvent::GroupClose(g.g_id))?; let data = bincode::serialize(&LayerEvent::GroupClose(g.gid))?;
for (mgid, maddr) in state.layer.read().await.running(&g.g_id)?.onlines() { if let Some(addrs) = state.layer.write().await.group_rm_online(&g.gid) {
let s = SendType::Event(0, *maddr, d.clone()); for addr in addrs {
add_server_layer(&mut results, *mgid, s); let s = SendType::Event(0, addr, data.clone());
results.layers.push((GROUP_CHAT_ID, s));
}
} }
} else { } else {
// leave group. // leave group.
let d = bincode::serialize(&LayerEvent::Sync(g.g_id, 0, Event::MemberLeave(gid)))?; let d = bincode::serialize(&LayerEvent::Sync(g.gid, 0, Event::MemberLeave(pid)))?;
let msg = SendType::Event(0, g.g_addr, d); let msg = SendType::Event(0, g.addr, d);
add_layer(&mut results, gid, msg); results.layers.push((GROUP_CHAT_ID, msg));
} }
Ok(results) Ok(results)

2
src/apps/wallet/models.rs

@ -1,6 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use tdn::types::{ use tdn::types::{
primitive::Result, primitives::Result,
rpc::{json, RpcParam}, rpc::{json, RpcParam},
}; };

160
src/apps/wallet/rpc.rs

@ -1,8 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use tdn::types::{ use tdn::types::{
group::GroupId, message::RpcSendMessage,
message::SendMessage, primitives::{HandleResult, Result},
primitive::{HandleResult, Result},
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam},
}; };
use tdn_did::{generate_btc_account, generate_eth_account, secp256k1::SecretKey}; use tdn_did::{generate_btc_account, generate_eth_account, secp256k1::SecretKey};
@ -16,7 +15,9 @@ use web3::{
Web3, Web3,
}; };
use crate::rpc::RpcState; use crate::global::Global;
use crate::storage::{account_db, wallet_db};
use crate::utils::crypto::{decrypt, encrypt};
use super::{ use super::{
models::{Address, Balance, ChainToken, Network, Token}, models::{Address, Balance, ChainToken, Network, Token},
@ -42,34 +43,25 @@ fn token_list(network: Network, tokens: Vec<Token>) -> RpcParam {
} }
#[inline] #[inline]
fn res_balance( fn res_balance(address: &str, network: &Network, balance: &str, token: Option<&Token>) -> RpcParam {
gid: GroupId,
address: &str,
network: &Network,
balance: &str,
token: Option<&Token>,
) -> RpcParam {
if let Some(t) = token { if let Some(t) = token {
rpc_response( rpc_response(
0, 0,
"wallet-balance", "wallet-balance",
json!([address, network.to_i64(), balance, t.to_rpc()]), json!([address, network.to_i64(), balance, t.to_rpc()]),
gid,
) )
} else { } else {
rpc_response( rpc_response(
0, 0,
"wallet-balance", "wallet-balance",
json!([address, network.to_i64(), balance]), json!([address, network.to_i64(), balance]),
gid,
) )
} }
} }
async fn loop_token( async fn loop_token(
sender: Sender<SendMessage>, sender: Sender<RpcSendMessage>,
db: DStorage, db: DStorage,
gid: GroupId,
network: Network, network: Network,
address: String, address: String,
c_token: Option<Token>, c_token: Option<Token>,
@ -83,8 +75,8 @@ async fn loop_token(
let transport = Http::new(node)?; let transport = Http::new(node)?;
let web3 = Web3::new(transport); let web3 = Web3::new(transport);
let balance = token_balance(&web3, &token.contract, &address, &token.chain).await?; let balance = token_balance(&web3, &token.contract, &address, &token.chain).await?;
let res = res_balance(gid, &address, &network, &balance, Some(&token)); let res = res_balance(&address, &network, &balance, Some(&token));
sender.send(SendMessage::Rpc(0, res, true)).await?; sender.send(RpcSendMessage(0, res, true)).await?;
} else { } else {
match chain { match chain {
ChainToken::ETH => { ChainToken::ETH => {
@ -93,19 +85,19 @@ async fn loop_token(
let balance = web3.eth().balance(address.parse()?, None).await?; let balance = web3.eth().balance(address.parse()?, None).await?;
let balance = balance.to_string(); let balance = balance.to_string();
let _ = Address::update_balance(&db, &address, &network, &balance); let _ = Address::update_balance(&db, &address, &network, &balance);
let res = res_balance(gid, &address, &network, &balance, None); let res = res_balance(&address, &network, &balance, None);
sender.send(SendMessage::Rpc(0, res, true)).await?; sender.send(RpcSendMessage(0, res, true)).await?;
for token in tokens { for token in tokens {
//tokio::time::sleep(std::time::Duration::from_secs(1)).await; //tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let balance = let balance =
token_balance(&web3, &token.contract, &address, &token.chain).await?; token_balance(&web3, &token.contract, &address, &token.chain).await?;
let res = res_balance(gid, &address, &network, &balance, Some(&token)); let res = res_balance(&address, &network, &balance, Some(&token));
// update & clean balances. // update & clean balances.
// TODO // TODO
sender.send(SendMessage::Rpc(0, res, true)).await?; sender.send(RpcSendMessage(0, res, true)).await?;
} }
} }
ChainToken::BTC => { ChainToken::BTC => {
@ -119,9 +111,8 @@ async fn loop_token(
} }
async fn token_check( async fn token_check(
sender: Sender<SendMessage>, sender: Sender<RpcSendMessage>,
db: DStorage, db: DStorage,
gid: GroupId,
chain: ChainToken, chain: ChainToken,
network: Network, network: Network,
address: String, address: String,
@ -160,8 +151,8 @@ async fn token_check(
.query("balanceOf", (account,), None, Default::default(), None) .query("balanceOf", (account,), None, Default::default(), None)
.await?; .await?;
let balance = balance.to_string(); let balance = balance.to_string();
let res = res_balance(gid, &address, &network, &balance, Some(&token)); let res = res_balance(&address, &network, &balance, Some(&token));
sender.send(SendMessage::Rpc(0, res, true)).await?; sender.send(RpcSendMessage(0, res, true)).await?;
Ok(()) Ok(())
} }
@ -320,15 +311,18 @@ async fn nft_check(node: &str, c_str: &str, hash: &str) -> Result<String> {
Ok(format!("{:?}", owner)) Ok(format!("{:?}", owner))
} }
pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
handler.add_method("wallet-echo", |_, params, _| async move { handler.add_method("wallet-echo", |params, _| async move {
Ok(HandleResult::rpc(json!(params))) Ok(HandleResult::rpc(json!(params)))
}); });
handler.add_method( handler.add_method(
"wallet-list", "wallet-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move { |_params: Vec<RpcParam>, state: Arc<Global>| async move {
let db = state.group.read().await.wallet_db(&gid)?; let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = wallet_db(&state.base, &pid, &db_key)?;
let addresses = Address::list(&db)?; let addresses = Address::list(&db)?;
Ok(HandleResult::rpc(wallet_list(addresses))) Ok(HandleResult::rpc(wallet_list(addresses)))
}, },
@ -336,17 +330,20 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"wallet-generate", "wallet-generate",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let lock = params[1].as_str().ok_or(RpcError::ParseError)?; let lock = params[1].as_str().ok_or(RpcError::ParseError)?;
let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = wallet_db(&state.base, &pid, &db_key)?;
let group_lock = state.group.read().await; let group_lock = state.group.read().await;
let mnemonic = group_lock.mnemonic(&gid, lock)?; let mnemonic = group_lock.mnemonic(&pid, lock, &state.secret)?;
let account = group_lock.account(&gid)?; let account = group_lock.account(&pid)?;
let lang = account.lang(); let lang = account.lang();
let pass = account.pass.to_string(); let pass = account.pass.to_string();
let account_index = account.index as u32; let account_index = account.index as u32;
let db = group_lock.wallet_db(&gid)?;
drop(group_lock); drop(group_lock);
let mut results = HandleResult::new(); let mut results = HandleResult::new();
@ -373,16 +370,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
address.insert(&db)?; address.insert(&db)?;
results.rpcs.push(address.to_rpc()); results.rpcs.push(address.to_rpc());
if address.main { if address.main {
let a_db = account_db(&state.base, &state.secret)?;
let mut group_lock = state.group.write().await; let mut group_lock = state.group.write().await;
let a_db = group_lock.account_db()?; let account = group_lock.account_mut(&pid)?;
let account = group_lock.account_mut(&gid)?;
account.wallet = address.chain.update_main(&address.address, &account.wallet); account.wallet = address.chain.update_main(&address.address, &account.wallet);
account.pub_height = account.pub_height + 1; account.pub_height = account.pub_height + 1;
account.update_info(&a_db)?; account.update_info(&a_db)?;
let user = group_lock.clone_user(&gid)?; let user = group_lock.clone_user(&pid)?;
drop(group_lock); drop(group_lock);
// broadcast all friends. // broadcast to all friends.
state.layer.read().await.broadcast(user, &mut results); state.layer.read().await.broadcast(user, &mut results);
} }
Ok(results) Ok(results)
@ -391,7 +388,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"wallet-import", "wallet-import",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let secret = params[1].as_str().ok_or(RpcError::ParseError)?; let secret = params[1].as_str().ok_or(RpcError::ParseError)?;
let lock = params[2].as_str().ok_or(RpcError::ParseError)?; let lock = params[2].as_str().ok_or(RpcError::ParseError)?;
@ -399,11 +396,16 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let sk: SecretKey = secret.parse().or(Err(RpcError::ParseError))?; let sk: SecretKey = secret.parse().or(Err(RpcError::ParseError))?;
let addr = format!("{:?}", (&sk).address()); let addr = format!("{:?}", (&sk).address());
let pid = state.pid().await;
let group_lock = state.group.read().await; let group_lock = state.group.read().await;
let cbytes = group_lock.encrypt(&gid, lock, sk.as_ref())?; let ckey = &group_lock.account(&pid)?.encrypt;
let db = group_lock.wallet_db(&gid)?; let db_key = group_lock.db_key(&pid)?;
let cbytes = encrypt(&state.secret, lock, ckey, sk.as_ref())?;
drop(group_lock); drop(group_lock);
let db = wallet_db(&state.base, &pid, &db_key)?;
let mut address = Address::import(chain, addr, cbytes); let mut address = Address::import(chain, addr, cbytes);
address.insert(&db)?; address.insert(&db)?;
Ok(HandleResult::rpc(address.to_rpc())) Ok(HandleResult::rpc(address.to_rpc()))
@ -412,14 +414,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"wallet-token", "wallet-token",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let network = Network::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let net = Network::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let address = params[1].as_str().ok_or(RpcError::ParseError)?.to_owned(); let address = params[1].as_str().ok_or(RpcError::ParseError)?.to_owned();
let group_lock = state.group.read().await; let pid = state.pid().await;
let db = group_lock.wallet_db(&gid)?; let db_key = state.group.read().await.db_key(&pid)?;
let sender = group_lock.sender(); let db = wallet_db(&state.base, &pid, &db_key)?;
drop(group_lock);
let c_str = if params.len() == 4 { let c_str = if params.len() == 4 {
let cid = params[2].as_i64().ok_or(RpcError::ParseError)?; let cid = params[2].as_i64().ok_or(RpcError::ParseError)?;
@ -429,34 +430,32 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
None None
}; };
let tokens = Token::list(&db, &network)?; let tokens = Token::list(&db, &net)?;
tokio::spawn(loop_token(sender, db, gid, network, address, c_str)); tokio::spawn(loop_token(state.rpc_send.clone(), db, net, address, c_str));
Ok(HandleResult::rpc(token_list(network, tokens))) Ok(HandleResult::rpc(token_list(net, tokens)))
}, },
); );
handler.add_method( handler.add_method(
"wallet-token-import", "wallet-token-import",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let network = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?); let net = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?);
let address = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned(); let addr = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned();
let c_str = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned(); let c = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned();
let group_lock = state.group.read().await; let pid = state.pid().await;
let db = group_lock.wallet_db(&gid)?; let db_key = state.group.read().await.db_key(&pid)?;
let sender = group_lock.sender(); let db = wallet_db(&state.base, &pid, &db_key)?;
drop(group_lock);
tokio::spawn(token_check(sender, db, gid, chain, network, address, c_str));
tokio::spawn(token_check(state.rpc_send.clone(), db, chain, net, addr, c));
Ok(HandleResult::new()) Ok(HandleResult::new())
}, },
); );
handler.add_method( handler.add_method(
"wallet-gas-price", "wallet-gas-price",
|_gid: GroupId, params: Vec<RpcParam>, _state: Arc<RpcState>| async move { |params: Vec<RpcParam>, _state: Arc<Global>| async move {
let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let network = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?); let network = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?);
let from = params[2].as_str().ok_or(RpcError::ParseError)?; let from = params[2].as_str().ok_or(RpcError::ParseError)?;
@ -471,7 +470,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"wallet-transfer", "wallet-transfer",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?); let chain = ChainToken::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let network = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?); let network = Network::from_i64(params[1].as_i64().ok_or(RpcError::ParseError)?);
let from = params[2].as_i64().ok_or(RpcError::ParseError)?; let from = params[2].as_i64().ok_or(RpcError::ParseError)?;
@ -480,20 +479,23 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let c_str = params[5].as_str().ok_or(RpcError::ParseError)?; let c_str = params[5].as_str().ok_or(RpcError::ParseError)?;
let lock = params[6].as_str().ok_or(RpcError::ParseError)?; let lock = params[6].as_str().ok_or(RpcError::ParseError)?;
let pid = state.pid().await;
let group_lock = state.group.read().await; let group_lock = state.group.read().await;
if !group_lock.check_lock(&gid, &lock) { if !group_lock.check_lock(&pid, &lock) {
return Err(RpcError::Custom("Lock is invalid!".to_owned())); return Err(RpcError::Custom("Lock is invalid!".to_owned()));
} }
let db = group_lock.wallet_db(&gid)?; let db_key = group_lock.db_key(&pid)?;
let db = wallet_db(&state.base, &pid, &db_key)?;
let address = Address::get(&db, &from)?; let address = Address::get(&db, &from)?;
let (mnemonic, pbytes) = if address.is_gen() { let (mnemonic, pbytes) = if address.is_gen() {
(group_lock.mnemonic(&gid, lock)?, vec![]) (group_lock.mnemonic(&pid, lock, &state.secret)?, vec![])
} else { } else {
let pbytes = group_lock.decrypt(&gid, lock, address.secret.as_ref())?; let ckey = &group_lock.account(&pid)?.encrypt;
let pbytes = decrypt(&state.secret, lock, ckey, address.secret.as_ref())?;
(String::new(), pbytes) (String::new(), pbytes)
}; };
let account = group_lock.account(&gid)?; let account = group_lock.account(&pid)?;
let lang = account.lang(); let lang = account.lang();
let pass = account.pass.to_string(); let pass = account.pass.to_string();
let account_index = account.index as u32; let account_index = account.index as u32;
@ -553,11 +555,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"wallet-nft", "wallet-nft",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let address = params[0].as_i64().ok_or(RpcError::ParseError)?; let address = params[0].as_i64().ok_or(RpcError::ParseError)?;
let token = params[1].as_i64().ok_or(RpcError::ParseError)?; let token = params[1].as_i64().ok_or(RpcError::ParseError)?;
let db = state.group.read().await.wallet_db(&gid)?; let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = wallet_db(&state.base, &pid, &db_key)?;
let nfts = Balance::list(&db, &address, &token)?; let nfts = Balance::list(&db, &address, &token)?;
let mut results = vec![]; let mut results = vec![];
@ -570,12 +574,14 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"wallet-nft-add", "wallet-nft-add",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let address = params[0].as_i64().ok_or(RpcError::ParseError)?; let address = params[0].as_i64().ok_or(RpcError::ParseError)?;
let token = params[1].as_i64().ok_or(RpcError::ParseError)?; let token = params[1].as_i64().ok_or(RpcError::ParseError)?;
let hash = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned(); let hash = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned();
let db = state.group.read().await.wallet_db(&gid)?; let pid = state.pid().await;
let db_key = state.group.read().await.db_key(&pid)?;
let db = wallet_db(&state.base, &pid, &db_key)?;
let t = Token::get(&db, &token)?; let t = Token::get(&db, &token)?;
let a = Address::get(&db, &address)?; let a = Address::get(&db, &address)?;
@ -598,13 +604,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method( handler.add_method(
"wallet-main", "wallet-main",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move { |params: Vec<RpcParam>, state: Arc<Global>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?; let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let group_lock = state.group.read().await; let pid = state.pid().await;
let db = group_lock.wallet_db(&gid)?; let db_key = state.group.read().await.db_key(&pid)?;
let a_db = group_lock.account_db()?; let db = wallet_db(&state.base, &pid, &db_key)?;
drop(group_lock); let a_db = account_db(&state.base, &state.secret)?;
let address = Address::get(&db, &id)?; let address = Address::get(&db, &id)?;
Address::main(&db, &id)?; Address::main(&db, &id)?;
@ -612,11 +618,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let mut results = HandleResult::new(); let mut results = HandleResult::new();
let mut group_lock = state.group.write().await; let mut group_lock = state.group.write().await;
let account = group_lock.account_mut(&gid)?; let account = group_lock.account_mut(&pid)?;
account.wallet = address.chain.update_main(&address.address, &account.wallet); account.wallet = address.chain.update_main(&address.address, &account.wallet);
account.pub_height = account.pub_height + 1; account.pub_height = account.pub_height + 1;
account.update_info(&a_db)?; account.update_info(&a_db)?;
let user = group_lock.clone_user(&gid)?; let user = group_lock.clone_user(&pid)?;
drop(group_lock); drop(group_lock);
// broadcast all friends. // broadcast all friends.

36
src/group.rs

@ -443,20 +443,19 @@ impl Group {
Ok((pheight, oheight)) Ok((pheight, oheight))
} }
// pub fn clone_user(&self, pid: &PeerId) -> Result<User> { pub fn clone_user(&self, pid: &PeerId) -> Result<User> {
// if let Some(u) = self.accounts.get(pid) { if let Some(u) = self.accounts.get(pid) {
// Ok(User::new( Ok(User::new(
// u.pid, u.pid,
// self.addr, u.name.clone(),
// u.name.clone(), u.avatar.clone(),
// u.avatar.clone(), u.wallet.clone(),
// u.wallet.clone(), u.pub_height,
// u.pub_height, ))
// )) } else {
// } else { Err(anyhow!("user missing."))
// Err(anyhow!("user missing.")) }
// } }
// }
pub fn list_accounts(&self) -> &HashMap<PeerId, Account> { pub fn list_accounts(&self) -> &HashMap<PeerId, Account> {
&self.accounts &self.accounts
@ -548,15 +547,6 @@ impl Group {
account_db.close() account_db.close()
} }
// pub fn encrypt(&self, pid: &PeerId, lock: &str, bytes: &[u8]) -> Result<Vec<u8>> {
// let ckey = &self.account(pid)?.encrypt;
// encrypt(&self.secret, lock, ckey, bytes)
// }
// pub fn decrypt(&self, pid: &PeerId, lock: &str, bytes: &[u8]) -> Result<Vec<u8>> {
// let ckey = &self.account(pid)?.encrypt;
// decrypt(&self.secret, lock, ckey, bytes)
// }
// pub fn create_message(&self, pid: &PeerId, addr: Peer) -> Result<SendType> { // pub fn create_message(&self, pid: &PeerId, addr: Peer) -> Result<SendType> {
// let user = self.clone_user(pid)?; // let user = self.clone_user(pid)?;
// let account = self.account(pid)?; // let account = self.account(pid)?;

95
src/layer.rs

@ -1,3 +1,4 @@
use chat_types::CHAT_ID;
use esse_primitives::id_to_str; use esse_primitives::id_to_str;
use group_types::GroupChatId; use group_types::GroupChatId;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -11,7 +12,7 @@ use tdn::types::{
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::account::User; use crate::account::User;
//use crate::apps::chat::{chat_conn, LayerEvent as ChatLayerEvent}; use crate::apps::chat::LayerEvent as ChatLayerEvent;
//use crate::apps::group::{group_conn, GROUP_ID}; //use crate::apps::group::{group_conn, GROUP_ID};
use crate::group::Group; use crate::group::Group;
use crate::session::{Session, SessionType}; use crate::session::{Session, SessionType};
@ -56,7 +57,7 @@ impl Layer {
return true; return true;
} else { } else {
for (_, session) in &self.groups { for (_, session) in &self.groups {
if session.addr == *addr { if session.addrs.contains(addr) {
return true; return true;
} }
} }
@ -101,13 +102,12 @@ impl Layer {
} }
pub fn chat_rm_online(&mut self, pid: &PeerId) -> Option<PeerId> { pub fn chat_rm_online(&mut self, pid: &PeerId) -> Option<PeerId> {
self.chats.remove(pid).map(|session| session.addr) self.chats.remove(pid).map(|session| session.addrs[0])
} }
pub fn chat_add(&mut self, pid: PeerId, sid: i64, fid: i64) { pub fn chat_add(&mut self, pid: PeerId, sid: i64, fid: i64, h: i64) {
if !self.chats.contains_key(&pid) { if !self.chats.contains_key(&pid) {
self.chats self.chats.insert(pid, LayerSession::new(pid, sid, fid, h));
.insert(pid, LayerSession::new(id_to_str(&pid), pid, sid, fid));
} }
} }
@ -119,6 +119,44 @@ impl Layer {
} }
} }
pub fn group_add(&mut self, gid: GroupChatId, pid: PeerId, sid: i64, fid: i64, h: i64) {
if !self.groups.contains_key(&gid) {
self.groups.insert(gid, LayerSession::new(pid, sid, fid, h));
}
}
pub fn group(&self, gid: &GroupChatId) -> Result<&LayerSession> {
if let Some(session) = self.groups.get(gid) {
Ok(session)
} else {
Err(anyhow!("session missing!"))
}
}
pub fn group_rm_online(&mut self, gid: &GroupChatId) -> Option<Vec<PeerId>> {
self.groups.remove(gid).map(|session| session.addrs)
}
pub fn group_increased(&mut self, gid: &GroupChatId) -> Result<i64> {
if let Some(session) = self.groups.get_mut(gid) {
Ok(session.increased())
} else {
Err(anyhow!("session missing!"))
}
}
pub fn group_add_member(&mut self, gid: &GroupChatId, addr: PeerId) {
if let Some(session) = self.groups.get_mut(gid) {
session.addrs.push(addr);
}
}
pub fn group_del_member(&mut self, gid: &GroupChatId, index: usize) {
if let Some(session) = self.groups.get_mut(gid) {
session.addrs.remove(index);
}
}
// pub fn remove_running(&mut self, gid: &GroupId) -> HashMap<PeerId, GroupId> { // pub fn remove_running(&mut self, gid: &GroupId) -> HashMap<PeerId, GroupId> {
// // check close the stable connection. // // check close the stable connection.
// let mut addrs: HashMap<PeerId, GroupId> = HashMap::new(); // let mut addrs: HashMap<PeerId, GroupId> = HashMap::new();
@ -209,17 +247,16 @@ impl Layer {
// } // }
// } // }
// pub fn broadcast(&self, user: User, results: &mut HandleResult) { pub fn broadcast(&self, user: User, results: &mut HandleResult) {
// let gid = user.id; let gid = user.id;
// let info = ChatLayerEvent::InfoRes(user); let info = ChatLayerEvent::InfoRes(user);
// let data = bincode::serialize(&info).unwrap_or(vec![]); let data = bincode::serialize(&info).unwrap_or(vec![]);
// if let Some(running) = self.runnings.get(&gid) {
// for (fgid, online) in &running.sessions { for fpid in self.chats.keys() {
// let msg = SendType::Event(0, *online.online.addr(), data.clone()); let msg = SendType::Event(0, *fpid, data.clone());
// results.layers.push((gid, *fgid, msg)); results.layers.push((CHAT_ID, msg));
// } }
// } }
// }
} }
// pub(crate) struct OnlineSession { // pub(crate) struct OnlineSession {
@ -261,10 +298,9 @@ impl Layer {
/// online connected layer session. /// online connected layer session.
pub(crate) struct LayerSession { pub(crate) struct LayerSession {
/// session refs symbol id (Chat is friend's pid, Group is GroupChatId) pub height: i64,
pub pid: String,
/// session network addr. /// session network addr.
pub addr: PeerId, pub addrs: Vec<PeerId>,
/// session database id. /// session database id.
pub s_id: i64, pub s_id: i64,
/// layer service database id. /// layer service database id.
@ -278,18 +314,27 @@ pub(crate) struct LayerSession {
} }
impl LayerSession { impl LayerSession {
fn new(pid: String, addr: PeerId, s_id: i64, db_id: i64) -> Self { fn new(addr: PeerId, s_id: i64, db_id: i64, height: i64) -> Self {
Self { Self {
pid,
addr,
s_id, s_id,
db_id, db_id,
height,
addrs: vec![addr],
suspend_me: false, suspend_me: false,
suspend_remote: false, suspend_remote: false,
remain: 0, remain: 0,
} }
} }
pub fn info(&self) -> (i64, i64, i64) {
(self.height, self.s_id, self.db_id)
}
pub fn increased(&mut self) -> i64 {
self.height += 1;
self.height
}
pub fn active(&mut self, is_me: bool) -> PeerId { pub fn active(&mut self, is_me: bool) -> PeerId {
if is_me { if is_me {
self.suspend_me = false; self.suspend_me = false;
@ -297,7 +342,7 @@ impl LayerSession {
self.suspend_remote = false; self.suspend_remote = false;
} }
self.remain = 0; self.remain = 0;
self.addr self.addrs[0]
} }
pub fn suspend(&mut self, is_me: bool, must: bool) -> Option<PeerId> { pub fn suspend(&mut self, is_me: bool, must: bool) -> Option<PeerId> {
@ -314,7 +359,7 @@ impl LayerSession {
if self.suspend_remote && self.suspend_me { if self.suspend_remote && self.suspend_me {
self.remain = 6; // keep-alive 10~11 minutes 120s/time self.remain = 6; // keep-alive 10~11 minutes 120s/time
Some(self.addr) Some(self.addrs[0])
} else { } else {
None None
} }

5
src/migrate/group.rs

@ -3,7 +3,7 @@ pub(super) const GROUP_VERSIONS: [&str; 3] = [
"CREATE TABLE IF NOT EXISTS groups( "CREATE TABLE IF NOT EXISTS groups(
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
height INTEGER NOT NULL, height INTEGER NOT NULL,
gcd TEXT NOT NULL, gid INTEGER NOT NULL,
addr TEXT NOT NULL, addr TEXT NOT NULL,
name TEXT NOT NULL, name TEXT NOT NULL,
is_close INTEGER NOT NULL, is_close INTEGER NOT NULL,
@ -12,8 +12,7 @@ pub(super) const GROUP_VERSIONS: [&str; 3] = [
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
height INTEGER NOT NULL, height INTEGER NOT NULL,
fid INTEGER NOT NULL, fid INTEGER NOT NULL,
mid TEXT NOT NULL, pid TEXT NOT NULL,
addr TEXT NOT NULL,
name TEXT NOT NULL, name TEXT NOT NULL,
leave INTEGER NOT NULL);", leave INTEGER NOT NULL);",
"CREATE TABLE IF NOT EXISTS messages( "CREATE TABLE IF NOT EXISTS messages(

Loading…
Cancel
Save