Browse Source

refs #14 encrypted local database

pull/18/head
Sun 4 years ago
parent
commit
bd24715b4d
  1. 30
      pubspec.lock
  2. 18
      src/account.rs
  3. 93
      src/apps/chat/layer.rs
  4. 4
      src/apps/chat/mod.rs
  5. 18
      src/apps/chat/models.rs
  6. 9
      src/apps/chat/models/message.rs
  7. 59
      src/apps/chat/rpc.rs
  8. 3
      src/apps/device/rpc.rs
  9. 3
      src/apps/domain/layer.rs
  10. 12
      src/apps/domain/rpc.rs
  11. 26
      src/apps/file/rpc.rs
  12. 143
      src/apps/group/layer.rs
  13. 14
      src/apps/group/models/message.rs
  14. 61
      src/apps/group/rpc.rs
  15. 10
      src/apps/jarvis/rpc.rs
  16. 32
      src/apps/wallet/rpc.rs
  17. 112
      src/event.rs
  18. 160
      src/group.rs
  19. 12
      src/group/running.rs
  20. 3
      src/layer.rs
  21. 32
      src/migrate.rs
  22. 26
      src/rpc.rs
  23. 13
      src/server.rs
  24. 8
      src/session.rs
  25. 105
      src/storage.rs

30
pubspec.lock

@ -119,7 +119,7 @@ packages: @@ -119,7 +119,7 @@ packages:
name: crop
url: "https://pub.dartlang.org"
source: hosted
version: "0.5.1+1"
version: "0.5.2"
cross_file:
dependency: transitive
description:
@ -202,7 +202,7 @@ packages: @@ -202,7 +202,7 @@ packages:
description:
path: "plugins/file_selector/file_selector_linux"
ref: HEAD
resolved-ref: "03d957e8b5c99fc83cd4a781031b154ab3de8753"
resolved-ref: c339fe7dd2e17a80f7bb839bfad89d21a6e084ba
url: "git://github.com/google/flutter-desktop-embedding.git"
source: git
version: "0.0.2+1"
@ -211,7 +211,7 @@ packages: @@ -211,7 +211,7 @@ packages:
description:
path: "plugins/file_selector/file_selector_macos"
ref: HEAD
resolved-ref: "03d957e8b5c99fc83cd4a781031b154ab3de8753"
resolved-ref: c339fe7dd2e17a80f7bb839bfad89d21a6e084ba
url: "git://github.com/google/flutter-desktop-embedding.git"
source: git
version: "0.0.4+1"
@ -221,7 +221,7 @@ packages: @@ -221,7 +221,7 @@ packages:
name: file_selector_platform_interface
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.2"
version: "2.0.3"
file_selector_web:
dependency: transitive
description:
@ -234,7 +234,7 @@ packages: @@ -234,7 +234,7 @@ packages:
description:
path: "plugins/file_selector/file_selector_windows"
ref: HEAD
resolved-ref: "03d957e8b5c99fc83cd4a781031b154ab3de8753"
resolved-ref: c339fe7dd2e17a80f7bb839bfad89d21a6e084ba
url: "git://github.com/google/flutter-desktop-embedding.git"
source: git
version: "0.0.2+1"
@ -315,7 +315,7 @@ packages: @@ -315,7 +315,7 @@ packages:
name: flutter_native_splash
url: "https://pub.dartlang.org"
source: hosted
version: "1.3.2"
version: "1.3.3"
flutter_plugin_android_lifecycle:
dependency: transitive
description:
@ -329,7 +329,7 @@ packages: @@ -329,7 +329,7 @@ packages:
name: flutter_quill
url: "https://pub.dartlang.org"
source: hosted
version: "3.0.1"
version: "3.2.0"
flutter_test:
dependency: "direct dev"
description: flutter
@ -421,7 +421,7 @@ packages: @@ -421,7 +421,7 @@ packages:
name: image_picker_platform_interface
url: "https://pub.dartlang.org"
source: hosted
version: "2.4.1"
version: "2.4.2"
image_save:
dependency: "direct main"
description:
@ -503,7 +503,7 @@ packages: @@ -503,7 +503,7 @@ packages:
name: msix
url: "https://pub.dartlang.org"
source: hosted
version: "2.6.7"
version: "2.8.0"
nested:
dependency: transitive
description:
@ -573,7 +573,7 @@ packages: @@ -573,7 +573,7 @@ packages:
name: path_provider_platform_interface
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.1"
version: "2.0.2"
path_provider_windows:
dependency: transitive
description:
@ -636,7 +636,7 @@ packages: @@ -636,7 +636,7 @@ packages:
name: plugin_platform_interface
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.2"
version: "2.1.0"
process:
dependency: transitive
description:
@ -650,7 +650,7 @@ packages: @@ -650,7 +650,7 @@ packages:
name: provider
url: "https://pub.dartlang.org"
source: hosted
version: "6.0.1"
version: "6.0.2"
pub_semver:
dependency: transitive
description:
@ -907,7 +907,7 @@ packages: @@ -907,7 +907,7 @@ packages:
name: url_launcher_platform_interface
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.4"
version: "2.0.5"
url_launcher_web:
dependency: transitive
description:
@ -949,7 +949,7 @@ packages: @@ -949,7 +949,7 @@ packages:
name: video_player_platform_interface
url: "https://pub.dartlang.org"
source: hosted
version: "5.0.0"
version: "5.0.1"
video_player_web:
dependency: transitive
description:
@ -1021,5 +1021,5 @@ packages: @@ -1021,5 +1021,5 @@ packages:
source: hosted
version: "8.0.0"
sdks:
dart: ">=2.14.0 <3.0.0"
dart: ">=2.15.1 <3.0.0"
flutter: ">=2.5.3"

18
src/account.rs

@ -60,6 +60,7 @@ pub(crate) struct Account { @@ -60,6 +60,7 @@ pub(crate) struct Account {
pub own_height: u64, // own data consensus height.
pub event: EventId,
pub datetime: i64,
plainkey: Vec<u8>,
}
impl Account {
@ -74,6 +75,7 @@ impl Account { @@ -74,6 +75,7 @@ impl Account {
mnemonic: Vec<u8>,
secret: Vec<u8>,
encrypt: Vec<u8>,
plainkey: Vec<u8>,
) -> Self {
let start = SystemTime::now();
let datetime = start
@ -96,6 +98,7 @@ impl Account { @@ -96,6 +98,7 @@ impl Account {
mnemonic,
secret,
encrypt,
plainkey,
avatar,
datetime,
}
@ -143,6 +146,7 @@ impl Account { @@ -143,6 +146,7 @@ impl Account {
mnemonic,
secret,
ckey,
key.to_vec(),
),
sk,
))
@ -156,11 +160,22 @@ impl Account { @@ -156,11 +160,22 @@ impl Account {
}
}
// when success login, cache plain encrypt key for database use.
pub fn cache_plainkey(&mut self, salt: &[u8], lock: &str) -> Result<()> {
self.plainkey = decrypt_key(salt, lock, &self.encrypt)?;
Ok(())
}
pub fn plainkey(&self) -> String {
hex::encode(&self.plainkey)
}
pub fn pin(&mut self, salt: &[u8], old: &str, new: &str) -> Result<()> {
self.check_lock(old)?;
self.lock = hash_pin(new)?;
let key = decrypt_key(salt, old, &self.encrypt)?;
self.encrypt = encrypt_key(salt, new, &key)?;
self.plainkey = key;
self.encrypt = encrypt_key(salt, new, &self.plainkey)?;
Ok(())
}
@ -196,6 +211,7 @@ impl Account { @@ -196,6 +211,7 @@ impl Account {
index: v.pop().unwrap().as_i64(),
gid: GroupId::from_hex(v.pop().unwrap().as_str()).unwrap_or(GroupId::default()),
id: v.pop().unwrap().as_i64(),
plainkey: vec![],
}
}

93
src/apps/chat/layer.rs

@ -7,12 +7,14 @@ use tdn::types::{ @@ -7,12 +7,14 @@ use tdn::types::{
primitive::{DeliveryType, HandleResult, Peer, PeerId, Result},
};
use tdn_did::Proof;
use tdn_storage::local::DStorage;
use tokio::sync::RwLock;
use chat_types::{MessageType, NetworkMessage};
use crate::account::{Account, User};
use crate::event::InnerEvent;
use crate::group::Group;
use crate::layer::{Layer, Online};
use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH};
use crate::rpc::{
@ -20,7 +22,7 @@ use crate::rpc::{ @@ -20,7 +22,7 @@ use crate::rpc::{
session_update_name,
};
use crate::session::{connect_session, Session, SessionType};
use crate::storage::{account_db, chat_db, session_db, write_avatar_sync};
use crate::storage::write_avatar_sync;
use super::models::{handle_nmsg, raw_to_network_message, Friend, Message, Request};
use super::rpc;
@ -77,7 +79,8 @@ pub(crate) async fn handle( @@ -77,7 +79,8 @@ pub(crate) async fn handle(
}
RecvType::Connect(addr, data) | RecvType::ResultConnect(addr, data) => {
// ESSE chat layer connect date structure.
if let Ok(height) = handle_connect(&mgid, &fgid, &addr, data, &mut layer, &mut results)
if let Ok(height) =
handle_connect(&mgid, &fgid, &addr, data, &mut layer, &mut results).await
{
let peer_id = addr.id;
let proof = layer.group.read().await.prove_addr(&mgid, &addr.id)?;
@ -98,7 +101,7 @@ pub(crate) async fn handle( @@ -98,7 +101,7 @@ pub(crate) async fn handle(
// ESSE chat layer result date structure.
if is_ok {
if let Ok(height) =
handle_connect(&mgid, &fgid, &addr, data, &mut layer, &mut results)
handle_connect(&mgid, &fgid, &addr, data, &mut layer, &mut results).await
{
let info = LayerEvent::InfoReq(height);
let data = bincode::serialize(&info).unwrap_or(vec![]);
@ -109,7 +112,7 @@ pub(crate) async fn handle( @@ -109,7 +112,7 @@ pub(crate) async fn handle(
results.layers.push((mgid, fgid, msg));
}
} else {
let db = chat_db(&layer.base, &mgid)?;
let db = layer.group.read().await.chat_db(&mgid)?;
let friend = Friend::get_id(&db, &fgid)?;
if friend.contains_addr(&addr.id) {
results.rpcs.push(rpc::friend_close(mgid, friend.id));
@ -128,7 +131,7 @@ pub(crate) async fn handle( @@ -128,7 +131,7 @@ pub(crate) async fn handle(
// TODO maybe send failure need handle.
if is_ok {
if let Some((gid, db_id)) = layer.delivery.remove(&tid) {
let db = chat_db(&layer.base, &mgid)?;
let db = layer.group.read().await.chat_db(&mgid)?;
let resp = match t {
DeliveryType::Event => {
Message::delivery(&db, db_id, true)?;
@ -155,7 +158,7 @@ pub(crate) async fn handle( @@ -155,7 +158,7 @@ pub(crate) async fn handle(
Ok(results)
}
fn handle_connect(
async fn handle_connect(
mgid: &GroupId,
fgid: &GroupId,
addr: &Peer,
@ -170,14 +173,19 @@ fn handle_connect( @@ -170,14 +173,19 @@ fn handle_connect(
proof.verify(fgid, &addr.id, &layer.addr)?;
// 2. check friendship.
let friend = update_friend(&layer.base, mgid, fgid, &addr.id);
let friend = update_friend(&layer.group.read().await.chat_db(mgid)?, fgid, &addr.id);
if friend.is_err() {
return Err(anyhow!("not friend"));
}
let f = friend.unwrap(); // safe.
// 3. get session.
let session_some = connect_session(&layer.base, mgid, &SessionType::Chat, &f.id, &addr.id)?;
let session_some = connect_session(
&layer.group.read().await.session_db(mgid)?,
&SessionType::Chat,
&f.id,
&addr.id,
)?;
if session_some.is_none() {
return Err(anyhow!("not friend"));
}
@ -225,9 +233,9 @@ impl LayerEvent { @@ -225,9 +233,9 @@ impl LayerEvent {
// 1. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
if load_friend(&layer.base, &mgid, &fgid).is_err() {
if load_friend(&layer.group.read().await.chat_db(&mgid)?, &fgid).is_err() {
// check if exist request.
let db = chat_db(&layer.base, &mgid)?;
let db = layer.group.read().await.chat_db(&mgid)?;
if let Ok(req) = Request::get_id(&db, &fgid) {
Request::delete(&db, &req.id)?; // delete the old request.
results.rpcs.push(rpc::request_delete(mgid, req.id));
@ -250,9 +258,9 @@ impl LayerEvent { @@ -250,9 +258,9 @@ impl LayerEvent {
// 0. check verify.
proof.verify(&fgid, &addr, &layer.addr)?;
// 1. check friendship.
if load_friend(&layer.base, &mgid, &fgid).is_err() {
if load_friend(&layer.group.read().await.chat_db(&mgid)?, &fgid).is_err() {
// 2. agree request for friend.
let db = chat_db(&layer.base, &mgid)?;
let db = layer.group.read().await.chat_db(&mgid)?;
if let Ok(mut r) = Request::get_id(&db, &fgid) {
r.is_over = true;
r.is_ok = true;
@ -261,7 +269,7 @@ impl LayerEvent { @@ -261,7 +269,7 @@ impl LayerEvent {
results.rpcs.push(rpc::request_agree(mgid, r.id, &friend));
// ADD NEW SESSION.
let s_db = session_db(&layer.base, &mgid)?;
let s_db = layer.group.read().await.session_db(&mgid)?;
let mut session = friend.to_session();
session.insert(&s_db)?;
results.rpcs.push(session_create(mgid, &session));
@ -270,7 +278,7 @@ impl LayerEvent { @@ -270,7 +278,7 @@ impl LayerEvent {
}
}
LayerEvent::Reject => {
let db = chat_db(&layer.base, &mgid)?;
let db = layer.group.read().await.chat_db(&mgid)?;
if let Ok(mut request) = Request::get_id(&db, &fgid) {
layer.group.write().await.broadcast(
&mgid,
@ -288,9 +296,10 @@ impl LayerEvent { @@ -288,9 +296,10 @@ impl LayerEvent {
}
LayerEvent::Message(hash, m) => {
let (_sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
let db = chat_db(&layer.base, &mgid)?;
let db = layer.group.read().await.chat_db(&mgid)?;
if !Message::exist(&db, &hash)? {
let msg = handle_nmsg(
&layer.group,
m.clone(),
false,
mgid,
@ -299,7 +308,8 @@ impl LayerEvent { @@ -299,7 +308,8 @@ impl LayerEvent {
fid,
hash,
&mut results,
)?;
)
.await?;
layer.group.write().await.broadcast(
&mgid,
InnerEvent::SessionMessageCreate(fgid, false, hash, m),
@ -310,12 +320,14 @@ impl LayerEvent { @@ -310,12 +320,14 @@ impl LayerEvent {
results.rpcs.push(rpc::message_create(mgid, &msg));
// UPDATE SESSION.
update_session(&layer.base, &mgid, &fid, &msg, &mut results);
if let Ok(s_db) = layer.group.read().await.session_db(&mgid) {
update_session(&s_db, &mgid, &fid, &msg, &mut results);
}
}
}
LayerEvent::InfoReq(height) => {
// check sync remote height.
if let Ok(account) = Account::get(&account_db(layer.base())?, &mgid) {
if let Ok(account) = Account::get(&layer.group.read().await.account_db()?, &mgid) {
if account.pub_height > height {
let info = LayerEvent::InfoRes(User::info(
account.name,
@ -332,7 +344,7 @@ impl LayerEvent { @@ -332,7 +344,7 @@ impl LayerEvent {
LayerEvent::InfoRes(remote) => {
let (sid, fid) = layer.get_running_remote_id(&mgid, &fgid)?;
let avatar = remote.avatar.clone();
let db = chat_db(&layer.base, &mgid)?;
let db = layer.group.read().await.chat_db(&mgid)?;
let mut f = Friend::get(&db, &fid)?;
let name = remote.name.clone();
f.name = remote.name;
@ -342,7 +354,8 @@ impl LayerEvent { @@ -342,7 +354,8 @@ impl LayerEvent {
drop(db);
write_avatar_sync(&layer.base, &mgid, &remote.id, remote.avatar)?;
results.rpcs.push(rpc::friend_info(mgid, &f));
let _ = Session::update_name(&session_db(&layer.base, &mgid)?, &sid, &name);
let _ =
Session::update_name(&layer.group.read().await.session_db(&mgid)?, &sid, &name);
results.rpcs.push(session_update_name(mgid, &sid, &name));
layer.group.write().await.broadcast(
@ -363,7 +376,7 @@ impl LayerEvent { @@ -363,7 +376,7 @@ impl LayerEvent {
&mut results,
)?;
layer.remove_online(&mgid, &fgid);
let db = chat_db(&layer.base, &mgid)?;
let db = layer.group.read().await.chat_db(&mgid)?;
Friend::id_close(&db, fid)?;
drop(db);
results.rpcs.push(rpc::friend_close(mgid, fid));
@ -379,31 +392,29 @@ impl LayerEvent { @@ -379,31 +392,29 @@ impl LayerEvent {
}
pub async fn from_message(
group: &Arc<RwLock<Group>>,
base: &PathBuf,
mgid: GroupId,
fid: i64,
m_type: MessageType,
content: &str,
) -> std::result::Result<(Message, NetworkMessage), tdn::types::rpc::RpcError> {
let db = chat_db(&base, &mgid)?;
// handle message's type.
let (nm_type, raw) = raw_to_network_message(base, &mgid, &m_type, content).await?;
let db = group.read().await.chat_db(&mgid)?;
let (nm_type, raw) = raw_to_network_message(group, base, &mgid, &m_type, content).await?;
let mut msg = Message::new(&mgid, fid, true, m_type, raw, false);
msg.insert(&db)?;
drop(db);
Ok((msg, nm_type))
}
}
#[inline]
fn load_friend(base: &PathBuf, mgid: &GroupId, fgid: &GroupId) -> Result<Friend> {
let db = chat_db(base, mgid)?;
fn load_friend(db: &DStorage, fgid: &GroupId) -> Result<Friend> {
Friend::get_id(&db, fgid)
}
#[inline]
fn update_friend(base: &PathBuf, mgid: &GroupId, fgid: &GroupId, addr: &PeerId) -> Result<Friend> {
let db = chat_db(base, mgid)?;
fn update_friend(db: &DStorage, fgid: &GroupId, addr: &PeerId) -> Result<Friend> {
let friend = Friend::get_id(&db, fgid)?;
if &friend.addr != addr {
let _ = Friend::addr_update(&db, friend.id, addr);
@ -463,7 +474,7 @@ fn _res_reject() -> Vec<u8> { @@ -463,7 +474,7 @@ fn _res_reject() -> Vec<u8> {
// UPDATE SESSION.
pub(crate) fn update_session(
base: &PathBuf,
s_db: &DStorage,
gid: &GroupId,
id: &i64,
msg: &Message,
@ -476,18 +487,16 @@ pub(crate) fn update_session( @@ -476,18 +487,16 @@ pub(crate) fn update_session(
_ => format!("{}:", msg.m_type.to_int()),
};
if let Ok(s_db) = session_db(base, gid) {
if let Ok(sid) = Session::last(
&s_db,
id,
&SessionType::Chat,
&msg.datetime,
&scontent,
true,
) {
results
.rpcs
.push(session_last(*gid, &sid, &msg.datetime, &scontent, false));
}
if let Ok(sid) = Session::last(
&s_db,
id,
&SessionType::Chat,
&msg.datetime,
&scontent,
true,
) {
results
.rpcs
.push(session_last(*gid, &sid, &msg.datetime, &scontent, false));
}
}

4
src/apps/chat/mod.rs

@ -6,7 +6,7 @@ pub(crate) use layer::handle; @@ -6,7 +6,7 @@ pub(crate) use layer::handle;
pub(crate) use layer::LayerEvent;
pub(crate) use layer::{chat_conn, event_message, update_session};
pub(crate) use models::{
from_model, from_network_message, handle_nmsg, raw_to_network_message, to_network_message,
Friend, InviteType, Message, Request,
from_model, from_network_message, raw_to_network_message, to_network_message, Friend,
InviteType, Message, Request,
};
pub(crate) use rpc::new_rpc_handler;

18
src/apps/chat/models.rs

@ -8,20 +8,23 @@ pub(crate) use self::request::Request; @@ -8,20 +8,23 @@ pub(crate) use self::request::Request;
use chat_types::{MessageType, NetworkMessage};
use std::path::PathBuf;
use std::sync::Arc;
use tdn::types::{
group::GroupId,
primitive::{HandleResult, PeerId, Result},
};
use tokio::sync::RwLock;
use crate::apps::group::GroupChat;
use crate::group::Group;
use crate::rpc::session_create;
use crate::storage::{
chat_db, group_db, read_avatar, read_db_file, read_file, read_image, read_record, session_db,
write_avatar_sync, write_file, write_file_sync, write_image, write_image_sync,
write_record_sync,
read_avatar, read_db_file, read_file, read_image, read_record, write_avatar_sync, write_file,
write_file_sync, write_image, write_image_sync, write_record_sync,
};
pub(crate) fn from_network_message(
pub(crate) async fn from_network_message(
group: &Arc<RwLock<Group>>,
nmsg: NetworkMessage,
base: &PathBuf,
ogid: &GroupId,
@ -58,13 +61,13 @@ pub(crate) fn from_network_message( @@ -58,13 +61,13 @@ pub(crate) fn from_network_message(
match itype {
InviteType::Group(gcd, addr, name) => {
// 1 add group chat.
let db = group_db(&base, &ogid)?;
let db = group.read().await.group_db(&ogid)?;
let mut g = GroupChat::from(gcd, 0, addr, name);
g.insert(&db)?;
// 2 add new session.
let mut session = g.to_session();
let s_db = session_db(&base, &ogid)?;
let s_db = group.read().await.session_db(&ogid)?;
session.insert(&s_db)?;
results.rpcs.push(session_create(*ogid, &session));
}
@ -84,6 +87,7 @@ pub(crate) fn from_network_message( @@ -84,6 +87,7 @@ pub(crate) fn from_network_message(
}
pub(crate) async fn raw_to_network_message(
group: &Arc<RwLock<Group>>,
base: &PathBuf,
ogid: &GroupId,
mtype: &MessageType,
@ -112,7 +116,7 @@ pub(crate) async fn raw_to_network_message( @@ -112,7 +116,7 @@ pub(crate) async fn raw_to_network_message(
}
MessageType::Contact => {
let cid: i64 = content.parse()?;
let db = chat_db(base, ogid)?;
let db = group.read().await.chat_db(ogid)?;
let contact = Friend::get(&db, &cid)?;
drop(db);
let avatar_bytes = read_avatar(base, ogid, &contact.gid).await?;

9
src/apps/chat/models/message.rs

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tdn::types::{
group::{EventId, GroupId},
@ -6,12 +7,16 @@ use tdn::types::{ @@ -6,12 +7,16 @@ use tdn::types::{
rpc::{json, RpcParam},
};
use tdn_storage::local::{DStorage, DsValue};
use tokio::sync::RwLock;
use chat_types::{MessageType, NetworkMessage};
use crate::group::Group;
use super::{from_network_message, to_network_message};
pub(crate) fn handle_nmsg(
pub(crate) async fn handle_nmsg(
group: &Arc<RwLock<Group>>,
nmsg: NetworkMessage,
is_me: bool,
gid: GroupId,
@ -22,7 +27,7 @@ pub(crate) fn handle_nmsg( @@ -22,7 +27,7 @@ pub(crate) fn handle_nmsg(
results: &mut HandleResult,
) -> Result<Message> {
// handle event.
let (m_type, raw) = from_network_message(nmsg, base, &gid, results)?;
let (m_type, raw) = from_network_message(group, nmsg, base, &gid, results).await?;
let mut msg = Message::new_with_id(hash, fid, is_me, m_type, raw, true);
msg.insert(db)?;
Ok(msg)

59
src/apps/chat/rpc.rs

@ -12,7 +12,7 @@ use chat_types::MessageType; @@ -12,7 +12,7 @@ use chat_types::MessageType;
use crate::event::InnerEvent;
use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH};
use crate::rpc::{session_create, sleep_waiting_close_stable, RpcState};
use crate::storage::{chat_db, delete_avatar, session_db};
use crate::storage::delete_avatar;
use super::layer::{agree_message, reject_message, req_message, update_session, LayerEvent};
use super::{Friend, Message, Request};
@ -114,11 +114,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -114,11 +114,11 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let need_online = params[0].as_bool().ok_or(RpcError::ParseError)?;
let layer_lock = state.layer.read().await;
let db = chat_db(&layer_lock.base, &gid)?;
let db = state.group.read().await.chat_db(&gid)?;
let friends = Friend::list(&db)?;
let mut results = vec![];
let layer_lock = state.layer.read().await;
if need_online {
for friend in friends {
let online = layer_lock.is_online(&gid, &friend.gid);
@ -142,7 +142,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -142,7 +142,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let remark = params[1].as_str().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new();
let db = chat_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.chat_db(&gid)?;
let mut f = Friend::get(&db, &id)?;
f.remark = remark.to_owned();
f.me_update(&db)?;
@ -164,15 +164,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -164,15 +164,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new();
let mut layer_lock = state.layer.write().await;
let db = chat_db(layer_lock.base(), &gid)?;
let db = state.group.read().await.chat_db(&gid)?;
let friend = Friend::get(&db, &id)?;
friend.close(&db)?;
drop(db);
let online = layer_lock.remove_online(&gid, &friend.gid);
drop(layer_lock);
let online = state.layer.write().await.remove_online(&gid, &friend.gid);
if let Some(faddr) = online {
let mut addrs: HashMap<PeerId, GroupId> = HashMap::new();
@ -203,13 +201,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -203,13 +201,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new();
let mut layer_lock = state.layer.write().await;
let db = chat_db(layer_lock.base(), &gid)?;
let db = state.group.read().await.chat_db(&gid)?;
let friend = Friend::get(&db, &id)?;
Friend::delete(&db, &id)?;
drop(db);
let mut layer_lock = state.layer.write().await;
let online = layer_lock.remove_online(&gid, &friend.gid);
delete_avatar(layer_lock.base(), &gid, &friend.gid).await?;
drop(layer_lock);
@ -240,9 +238,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -240,9 +238,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"chat-request-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let layer_lock = state.layer.read().await;
let db = chat_db(layer_lock.base(), &gid)?;
drop(layer_lock);
let db = state.group.read().await.chat_db(&gid)?;
let requests = Request::list(&db)?;
drop(db);
Ok(HandleResult::rpc(request_list(requests)))
@ -271,8 +267,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -271,8 +267,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let proof = group_lock.prove_addr(&gid, &remote_addr)?;
drop(group_lock);
let db = state.group.read().await.chat_db(&gid)?;
let mut layer_lock = state.layer.write().await;
let db = chat_db(layer_lock.base(), &gid)?;
if Friend::is_friend(&db, &request.gid)? {
debug!("had friend.");
drop(layer_lock);
@ -306,7 +302,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -306,7 +302,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let mut group_lock = state.group.write().await;
let db = chat_db(group_lock.base(), &gid)?;
let db = group_lock.chat_db(&gid)?;
let mut request = Request::get(&db, &id)?;
let mut results = HandleResult::new();
@ -326,7 +322,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -326,7 +322,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
results.rpcs.push(json!([id, friend.to_rpc()]));
// ADD NEW SESSION.
let s_db = session_db(group_lock.base(), &gid)?;
let s_db = group_lock.session_db(&gid)?;
let mut session = friend.to_session();
session.insert(&s_db)?;
results.rpcs.push(session_create(gid, &session));
@ -344,13 +340,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -344,13 +340,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let mut layer_lock = state.layer.write().await;
let db = chat_db(layer_lock.base(), &gid)?;
let db = state.group.read().await.chat_db(&gid)?;
let mut req = Request::get(&db, &id)?;
req.is_ok = false;
req.is_over = true;
req.update(&db)?;
drop(db);
let mut layer_lock = state.layer.write().await;
let msg = reject_message(&mut layer_lock, id, req.addr, gid);
drop(layer_lock);
@ -371,10 +367,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -371,10 +367,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let layer_lock = state.layer.read().await;
let db = chat_db(layer_lock.base(), &gid)?;
let base = layer_lock.base().clone();
drop(layer_lock);
let group_lock = state.group.read().await;
let db = group_lock.chat_db(&gid)?;
let base = group_lock.base().clone();
drop(group_lock);
let req = Request::get(&db, &id)?;
Request::delete(&db, &id)?;
@ -401,7 +397,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -401,7 +397,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let db = chat_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.chat_db(&gid)?;
let friend = Friend::get(&db, &id)?;
let messages = Message::get_by_fid(&db, &id)?;
drop(db);
@ -415,10 +411,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -415,10 +411,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let fid = params[0].as_i64().ok_or(RpcError::ParseError)?;
let layer_lock = state.layer.read().await;
let db = chat_db(layer_lock.base(), &gid)?;
drop(layer_lock);
let db = state.group.read().await.chat_db(&gid)?;
let messages = Message::get_by_fid(&db, &fid)?;
drop(db);
Ok(HandleResult::rpc(message_list(messages)))
@ -437,7 +430,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -437,7 +430,8 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let base = layer_lock.base().clone();
let faddr = layer_lock.running(&gid)?.online(&fgid)?;
let (msg, nw) = LayerEvent::from_message(&base, gid, fid, m_type, content).await?;
let (msg, nw) =
LayerEvent::from_message(&state.group, &base, gid, fid, m_type, content).await?;
let event = LayerEvent::Message(msg.hash, nw);
let s = super::layer::event_message(&mut layer_lock, msg.id, gid, faddr, &event);
drop(layer_lock);
@ -446,7 +440,9 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -446,7 +440,9 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
results.layers.push((gid, fgid, s));
// UPDATE SESSION.
update_session(&base, &gid, &fid, &msg, &mut results);
if let Ok(s_db) = state.group.read().await.session_db(&gid) {
update_session(&s_db, &gid, &fid, &msg, &mut results);
}
match event {
LayerEvent::Message(hash, nw) => {
@ -470,10 +466,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -470,10 +466,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let layer_lock = state.layer.read().await;
let db = chat_db(&layer_lock.base(), &gid)?;
drop(layer_lock);
let db = state.group.read().await.chat_db(&gid)?;
let msg = Message::get(&db, &id)?;
Message::delete(&db, &id)?;
drop(db);

3
src/apps/device/rpc.rs

@ -7,7 +7,6 @@ use tdn::types::{ @@ -7,7 +7,6 @@ use tdn::types::{
use crate::group::GroupEvent;
use crate::rpc::RpcState;
use crate::storage::consensus_db;
use crate::utils::device_status::device_status as local_device_status;
use super::Device;
@ -70,7 +69,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -70,7 +69,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"device-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let db = consensus_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.consensus_db(&gid)?;
let devices = Device::list(&db)?;
drop(db);
let online_devices = state.group.read().await.online_devices(&gid, devices);

3
src/apps/domain/layer.rs

@ -9,7 +9,6 @@ use tokio::sync::RwLock; @@ -9,7 +9,6 @@ use tokio::sync::RwLock;
use domain_types::{LayerServerEvent, ServerEvent};
use crate::layer::Layer;
use crate::storage::domain_db;
use super::models::{Name, Provider};
use super::rpc;
@ -33,7 +32,7 @@ pub(crate) async fn handle( @@ -33,7 +32,7 @@ pub(crate) async fn handle(
// server & client handle it.
let LayerServerEvent(event, _proof) = bincode::deserialize(&bytes)?;
let db = domain_db(layer.read().await.base(), &ogid)?;
let db = layer.read().await.group.read().await.domain_db(&ogid)?;
match event {
ServerEvent::Status(name, support_request) => {

12
src/apps/domain/rpc.rs

@ -11,7 +11,7 @@ use super::{ @@ -11,7 +11,7 @@ use super::{
add_layer,
models::{Name, Provider},
};
use crate::{rpc::RpcState, storage::domain_db};
use crate::rpc::RpcState;
#[inline]
pub(crate) fn add_provider(mgid: GroupId, provider: &Provider) -> RpcParam {
@ -71,7 +71,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -71,7 +71,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"domain-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let db = domain_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.domain_db(&gid)?;
// list providers.
let providers: Vec<RpcParam> =
@ -90,7 +90,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -90,7 +90,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let provider = PeerId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let mut results = HandleResult::new();
let db = domain_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.domain_db(&gid)?;
let mut p = Provider::prepare(provider);
p.insert(&db)?;
@ -104,7 +104,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -104,7 +104,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let db = domain_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.domain_db(&gid)?;
let provider = Provider::get(&db, &id)?;
if let Ok(default) = Provider::get_default(&db) {
if default.id == provider.id {
@ -123,7 +123,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -123,7 +123,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let db = domain_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.domain_db(&gid)?;
let names = Name::get_by_provider(&db, &id)?;
if names.len() == 0 {
Provider::delete(&db, &id)?;
@ -145,7 +145,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -145,7 +145,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
// save to db.
let mut results = HandleResult::new();
let db = domain_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.domain_db(&gid)?;
let mut u = Name::prepare(name, bio, provider);
u.insert(&db)?;

26
src/apps/file/rpc.rs

@ -7,7 +7,7 @@ use tdn::types::{ @@ -7,7 +7,7 @@ use tdn::types::{
};
use crate::rpc::RpcState;
use crate::storage::{copy_file, file_db, write_file};
use crate::storage::{copy_file, write_file};
use super::models::{File, RootDirectory};
@ -22,7 +22,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -22,7 +22,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let root = RootDirectory::from_i64(params[0].as_i64().ok_or(RpcError::ParseError)?);
let parent = params[1].as_i64().ok_or(RpcError::ParseError)?;
let db = file_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.file_db(&gid)?;
let files: Vec<RpcParam> = File::list(&db, &root, &parent)?
.iter()
.map(|p| p.to_rpc())
@ -39,8 +39,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -39,8 +39,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let parent = params[1].as_i64().ok_or(RpcError::ParseError)?;
let name = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned();
let base = state.group.read().await.base().clone();
let db = file_db(&base, &gid)?;
let group_lock = state.group.read().await;
let base = group_lock.base().clone();
let db = group_lock.file_db(&gid)?;
drop(group_lock);
// genereate new file.
let mut file = File::generate(root, parent, name);
file.insert(&db)?;
@ -66,8 +68,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -66,8 +68,10 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
.ok_or(RpcError::ParseError)?
.to_owned();
let base = state.group.read().await.base().clone();
let db = file_db(&base, &gid)?;
let group_lock = state.group.read().await;
let base = group_lock.base().clone();
let db = group_lock.file_db(&gid)?;
drop(group_lock);
let mut file = File::generate(root, parent, name);
file.insert(&db)?;
copy_file(&file_path, &base, &gid, &file.storage_name()).await?;
@ -84,7 +88,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -84,7 +88,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let name = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned();
// create new folder.
let db = file_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.file_db(&gid)?;
let mut file = File::generate(root, parent, name);
file.insert(&db)?;
@ -100,7 +104,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -100,7 +104,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let parent = params[2].as_i64().ok_or(RpcError::ParseError)?;
let name = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned();
let db = file_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.file_db(&gid)?;
let mut file = File::get(&db, &id)?;
file.root = root;
file.parent = parent;
@ -117,7 +121,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -117,7 +121,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let starred = params[1].as_bool().ok_or(RpcError::ParseError)?;
let db = file_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.file_db(&gid)?;
File::star(&db, &id, starred)?;
Ok(HandleResult::new())
},
@ -130,7 +134,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -130,7 +134,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
// TODO trash a directory.
let db = file_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.file_db(&gid)?;
File::trash(&db, &id)?;
Ok(HandleResult::new())
},
@ -143,7 +147,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -143,7 +147,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
// TODO deleted file & directory.
let db = file_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.file_db(&gid)?;
File::delete(&db, &id)?;
Ok(HandleResult::new())
},

143
src/apps/group/layer.rs

@ -1,4 +1,3 @@ @@ -1,4 +1,3 @@
use std::path::PathBuf;
use std::sync::Arc;
use tdn::types::{
group::GroupId,
@ -10,6 +9,7 @@ use tokio::sync::RwLock; @@ -10,6 +9,7 @@ use tokio::sync::RwLock;
use chat_types::MessageType;
use group_types::{Event, LayerConnect, LayerEvent, LayerResult};
use tdn_did::Proof;
use tdn_storage::local::DStorage;
use crate::apps::chat::Friend;
use crate::layer::{Layer, Online};
@ -18,7 +18,7 @@ use crate::rpc::{ @@ -18,7 +18,7 @@ use crate::rpc::{
session_update_name,
};
use crate::session::{connect_session, Session, SessionType};
use crate::storage::{chat_db, delete_avatar, group_db, session_db, write_avatar_sync};
use crate::storage::{delete_avatar, write_avatar_sync};
use super::models::{handle_network_message, GroupChat, Member, Message};
use super::{add_layer, add_server_layer, rpc};
@ -80,7 +80,7 @@ async fn handle_server_connect( @@ -80,7 +80,7 @@ async fn handle_server_connect(
) -> Result<()> {
let (ogid, height, id) = layer.read().await.running(&gcd)?.owner_height_id();
// check is member.
let db = group_db(&layer.read().await.base, &ogid)?;
let db = layer.read().await.group.read().await.group_db(&ogid)?;
let g = GroupChat::get(&db, &id)?;
let mdid = Member::get_id(&db, &id, &fgid)?;
@ -121,21 +121,29 @@ pub(crate) async fn handle_peer( @@ -121,21 +121,29 @@ pub(crate) async fn handle_peer(
RecvType::Result(addr, is_ok, data) => {
if is_ok {
let mut layer_lock = layer.write().await;
handle_connect(ogid, &addr, data, &mut layer_lock, &mut results)?;
handle_connect(ogid, &addr, data, &mut layer_lock, &mut results).await?;
} else {
// close the group chat.
let gcd: GroupId = bincode::deserialize(&data)?;
let base = layer.read().await.base().clone();
let db = group_db(&base, &ogid)?;
let layer_lock = layer.read().await;
let group_lock = layer_lock.group.read().await;
let db = group_lock.group_db(&ogid)?;
let s_db = group_lock.session_db(&ogid)?;
drop(group_lock);
drop(layer_lock);
let group = GroupChat::close(&db, &gcd)?;
let sid =
Session::close(&session_db(&base, &ogid)?, &group.id, &SessionType::Group)?;
let sid = Session::close(&s_db, &group.id, &SessionType::Group)?;
results.rpcs.push(session_close(ogid, &sid));
}
}
RecvType::ResultConnect(addr, data) => {
let mut layer_lock = layer.write().await;
if handle_connect(ogid, &addr, data, &mut layer_lock, &mut results).is_err() {
if handle_connect(ogid, &addr, data, &mut layer_lock, &mut results)
.await
.is_err()
{
let msg = SendType::Result(0, addr, true, false, vec![]);
add_layer(&mut results, ogid, msg);
}
@ -160,7 +168,7 @@ pub(crate) async fn handle_peer( @@ -160,7 +168,7 @@ pub(crate) async fn handle_peer(
Ok(results)
}
fn handle_connect(
async fn handle_connect(
ogid: GroupId,
addr: &Peer,
data: Vec<u8>,
@ -171,7 +179,7 @@ fn handle_connect( @@ -171,7 +179,7 @@ fn handle_connect(
let LayerResult(gcd, gname, height) = bincode::deserialize(&data)?;
// 1. check group.
let db = group_db(layer.base(), &ogid)?;
let db = layer.group.read().await.group_db(&ogid)?;
let group = GroupChat::get_id(&db, &gcd)?;
// 1.0 check address.
@ -183,19 +191,14 @@ fn handle_connect( @@ -183,19 +191,14 @@ fn handle_connect(
results.rpcs.push(rpc::group_name(ogid, &group.id, &gname));
// 1.1 get session.
let session_some = connect_session(
layer.base(),
&ogid,
&SessionType::Group,
&group.id,
&addr.id,
)?;
let s_db = layer.group.read().await.session_db(&ogid)?;
let session_some = connect_session(&s_db, &SessionType::Group, &group.id, &addr.id)?;
if session_some.is_none() {
return Err(anyhow!("invalid group chat address."));
}
let sid = session_some.unwrap().id;
let _ = Session::update_name(&session_db(layer.base(), &ogid)?, &sid, &gname);
let _ = Session::update_name(&s_db, &sid, &gname);
results.rpcs.push(session_update_name(ogid, &sid, &gname));
// 1.2 online this group.
@ -236,7 +239,7 @@ async fn handle_server_event( @@ -236,7 +239,7 @@ async fn handle_server_event(
let gcd = event.gcd();
let base = layer.read().await.base().clone();
let (ogid, height, id) = layer.read().await.running(gcd)?.owner_height_id();
let db = group_db(&base, &ogid)?;
let db = layer.read().await.group.read().await.group_db(&ogid)?;
match event {
LayerEvent::Offline(gcd) => {
@ -259,7 +262,7 @@ async fn handle_server_event( @@ -259,7 +262,7 @@ async fn handle_server_event(
// 2. UI: update
results.rpcs.push(rpc::group_name(ogid, &id, &name));
if let Ok(sid) = Session::update_name_by_id(
&session_db(&base, &ogid)?,
&layer.read().await.group.read().await.session_db(&ogid)?,
&id,
&SessionType::Group,
&name,
@ -302,7 +305,7 @@ async fn handle_server_event( @@ -302,7 +305,7 @@ async fn handle_server_event(
Member::leave(&db, &mdid, &h)?;
// check mid is my chat friend. if not, delete avatar.
let s_db = chat_db(&base, &mgid)?;
let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?;
if Friend::get_id(&s_db, &mgid).is_err() {
let _ = delete_avatar(&base, &ogid, &mgid).await;
}
@ -322,13 +325,24 @@ async fn handle_server_event( @@ -322,13 +325,24 @@ async fn handle_server_event(
GroupChat::add_height(&db, id, new_h)?;
let msg = handle_network_message(
new_h, id, mgid, &ogid, nmsg, mtime, &base, results,
)?;
&layer.read().await.group,
new_h,
id,
mgid,
&ogid,
nmsg,
mtime,
&base,
results,
)
.await?;
results.rpcs.push(rpc::message_create(ogid, &msg));
debug!("Sync: create message ok");
// UPDATE SESSION.
update_session(&base, &ogid, &id, &msg, results);
if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) {
update_session(&s_db, &ogid, &id, &msg, results);
}
}
}
}
@ -391,7 +405,7 @@ async fn handle_peer_event( @@ -391,7 +405,7 @@ async fn handle_peer_event(
let base = layer.read().await.base().clone();
let gcd = event.gcd();
let (sid, id) = layer.read().await.get_running_remote_id(&ogid, gcd)?;
let db = group_db(&base, &ogid)?;
let db = layer.read().await.group.read().await.group_db(&ogid)?;
match event {
LayerEvent::Offline(gcd) => {
@ -439,12 +453,20 @@ async fn handle_peer_event( @@ -439,12 +453,20 @@ async fn handle_peer_event(
LayerEvent::GroupName(_gcd, name) => {
let _ = GroupChat::update_name(&db, &id, &name)?;
results.rpcs.push(rpc::group_name(ogid, &id, &name));
let _ = Session::update_name(&session_db(&base, &ogid)?, &sid, &name);
let _ = Session::update_name(
&layer.read().await.group.read().await.session_db(&ogid)?,
&sid,
&name,
);
results.rpcs.push(session_update_name(ogid, &sid, &name));
}
LayerEvent::GroupClose(_gcd) => {
let group = GroupChat::close(&db, &gcd)?;
let sid = Session::close(&session_db(&base, &ogid)?, &group.id, &SessionType::Group)?;
let sid = Session::close(
&layer.read().await.group.read().await.session_db(&ogid)?,
&group.id,
&SessionType::Group,
)?;
results.rpcs.push(session_close(ogid, &sid));
}
LayerEvent::Sync(_gcd, height, event) => {
@ -477,7 +499,7 @@ async fn handle_peer_event( @@ -477,7 +499,7 @@ async fn handle_peer_event(
Member::leave(&db, &height, &mdid)?;
// check mid is my chat friend. if not, delete avatar.
let s_db = chat_db(&base, &mgid)?;
let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?;
if Friend::get_id(&s_db, &mgid).is_err() {
let _ = delete_avatar(&base, &ogid, &mgid).await;
}
@ -491,15 +513,26 @@ async fn handle_peer_event( @@ -491,15 +513,26 @@ async fn handle_peer_event(
let _mdid = Member::get_id(&db, &id, &mgid)?;
let msg = handle_network_message(
height, id, mgid, &ogid, nmsg, mtime, &base, results,
)?;
&layer.read().await.group,
height,
id,
mgid,
&ogid,
nmsg,
mtime,
&base,
results,
)
.await?;
results.rpcs.push(rpc::message_create(ogid, &msg));
GroupChat::add_height(&db, id, height)?;
debug!("Sync: create message ok");
// UPDATE SESSION.
update_session(&base, &ogid, &id, &msg, results);
if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) {
update_session(&s_db, &ogid, &id, &msg, results);
}
}
}
}
@ -535,7 +568,7 @@ async fn handle_peer_event( @@ -535,7 +568,7 @@ async fn handle_peer_event(
if let Ok(mdid) = Member::get_id(&db, &id, &mgid) {
Member::leave(&db, &height, &mdid)?;
// check mid is my chat friend. if not, delete avatar.
let s_db = chat_db(&base, &mgid)?;
let s_db = &layer.read().await.group.read().await.chat_db(&ogid)?;
if Friend::get_id(&s_db, &mgid).is_err() {
let _ = delete_avatar(&base, &ogid, &mgid).await;
}
@ -544,8 +577,18 @@ async fn handle_peer_event( @@ -544,8 +577,18 @@ async fn handle_peer_event(
}
for (height, mgid, nm, time) in messages {
if let Ok(msg) =
handle_network_message(height, id, mgid, &ogid, nm, time, &base, results)
if let Ok(msg) = handle_network_message(
&layer.read().await.group,
height,
id,
mgid,
&ogid,
nm,
time,
&base,
results,
)
.await
{
results.rpcs.push(rpc::message_create(ogid, &msg));
last_message = Some(msg);
@ -561,7 +604,9 @@ async fn handle_peer_event( @@ -561,7 +604,9 @@ async fn handle_peer_event(
// UPDATE SESSION.
if let Some(msg) = last_message {
update_session(&base, &ogid, &id, &msg, results);
if let Ok(s_db) = layer.read().await.group.read().await.session_db(&ogid) {
update_session(&s_db, &ogid, &id, &msg, results);
}
}
debug!("Over handle sync packed... {}, {}, {}", height, from, to);
}
@ -590,7 +635,7 @@ pub(crate) async fn broadcast( @@ -590,7 +635,7 @@ pub(crate) async fn broadcast(
// UPDATE SESSION.
pub(crate) fn update_session(
base: &PathBuf,
s_db: &DStorage,
gid: &GroupId,
id: &i64,
msg: &Message,
@ -603,19 +648,17 @@ pub(crate) fn update_session( @@ -603,19 +648,17 @@ pub(crate) fn update_session(
_ => format!("{}:", msg.m_type.to_int()),
};
if let Ok(s_db) = session_db(base, gid) {
if let Ok(sid) = Session::last(
&s_db,
id,
&SessionType::Group,
&msg.datetime,
&scontent,
true,
) {
results
.rpcs
.push(session_last(*gid, &sid, &msg.datetime, &scontent, false));
}
if let Ok(sid) = Session::last(
&s_db,
id,
&SessionType::Group,
&msg.datetime,
&scontent,
true,
) {
results
.rpcs
.push(session_last(*gid, &sid, &msg.datetime, &scontent, false));
}
}

14
src/apps/group/models/message.rs

@ -1,5 +1,6 @@ @@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tdn::types::{
group::GroupId,
@ -7,11 +8,12 @@ use tdn::types::{ @@ -7,11 +8,12 @@ use tdn::types::{
rpc::{json, RpcParam},
};
use tdn_storage::local::{DStorage, DsValue};
use tokio::sync::RwLock;
use chat_types::{MessageType, NetworkMessage};
use crate::apps::chat::{from_network_message, raw_to_network_message, to_network_message as tnm};
use crate::storage::group_db;
use crate::group::Group;
use super::Member;
@ -171,6 +173,7 @@ impl Message { @@ -171,6 +173,7 @@ impl Message {
}
pub(crate) async fn to_network_message(
group: &Arc<RwLock<Group>>,
base: &PathBuf,
gid: &GroupId,
mtype: MessageType,
@ -182,11 +185,12 @@ pub(crate) async fn to_network_message( @@ -182,11 +185,12 @@ pub(crate) async fn to_network_message(
.map(|s| s.as_secs())
.unwrap_or(0) as i64; // safe for all life.
let (nmsg, raw) = raw_to_network_message(base, gid, &mtype, content).await?;
let (nmsg, raw) = raw_to_network_message(group, base, gid, &mtype, content).await?;
Ok((nmsg, datetime, raw))
}
pub(crate) fn handle_network_message(
pub(crate) async fn handle_network_message(
group: &Arc<RwLock<Group>>,
height: i64,
gdid: i64,
mid: GroupId,
@ -196,10 +200,10 @@ pub(crate) fn handle_network_message( @@ -196,10 +200,10 @@ pub(crate) fn handle_network_message(
base: &PathBuf,
results: &mut HandleResult,
) -> Result<Message> {
let db = group_db(base, mgid)?;
let db = group.read().await.group_db(mgid)?;
let mdid = Member::get_id(&db, &gdid, &mid)?;
let is_me = &mid == mgid;
let (m_type, raw) = from_network_message(msg, base, mgid, results)?;
let (m_type, raw) = from_network_message(group, msg, base, mgid, results).await?;
let mut msg = Message::new_with_time(height, gdid, mdid, is_me, m_type, raw, datetime);
msg.insert(&db)?;
Ok(msg)

61
src/apps/group/rpc.rs

@ -13,7 +13,7 @@ use crate::apps::chat::{Friend, InviteType}; @@ -13,7 +13,7 @@ use crate::apps::chat::{Friend, InviteType};
use crate::layer::Online;
use crate::rpc::{session_create, session_delete, session_update_name, RpcState};
use crate::session::{Session, SessionType};
use crate::storage::{chat_db, group_db, read_avatar, session_db, write_avatar};
use crate::storage::{read_avatar, write_avatar};
use super::layer::{broadcast, update_session};
use super::models::{to_network_message, GroupChat, Member, Message};
@ -83,8 +83,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -83,8 +83,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"group-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let layer_lock = state.layer.read().await;
let db = group_db(&layer_lock.base, &gid)?;
let db = state.group.read().await.group_db(&gid)?;
Ok(HandleResult::rpc(group_list(GroupChat::all(&db)?)))
},
);
@ -93,7 +92,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -93,7 +92,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
"group-detail",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let db = group_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.group_db(&gid)?;
let group = GroupChat::get(&db, &id)?;
let members = Member::list(&db, &id)?;
let messages = Message::list(&db, &id)?;
@ -111,8 +110,9 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -111,8 +110,9 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let addr = group_lock.addr().clone();
let sender = group_lock.sender();
let me = group_lock.clone_user(&gid)?;
let db = group_lock.group_db(&gid)?;
let s_db = group_lock.session_db(&gid)?;
drop(group_lock);
let db = group_db(&base, &gid)?;
let mut gc = GroupChat::new(addr, name);
let gcd = gc.g_id;
@ -130,7 +130,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -130,7 +130,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let _ = write_avatar(&base, &gid, &gid, &me.avatar).await;
// Add new session.
let s_db = session_db(&base, &gid)?;
let mut session = gc.to_session();
session.insert(&s_db)?;
let sid = session.id;
@ -173,10 +172,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -173,10 +172,13 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let fid = params[1].as_i64().ok_or(RpcError::ParseError)?;
let base = state.layer.read().await.base().clone();
let chat_db = chat_db(&base, &gid)?;
let group_lock = state.group.read().await;
let base = group_lock.base().clone();
let chat_db = group_lock.chat_db(&gid)?;
let group_db = group_lock.group_db(&gid)?;
let s_db = group_lock.session_db(&gid)?;
drop(group_lock);
let f = Friend::get(&chat_db, &fid)?;
let group_db = group_db(&base, &gid)?;
let g = GroupChat::get(&group_db, &id)?;
let gcd = g.g_id;
let mut results = HandleResult::new();
@ -184,6 +186,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -184,6 +186,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
// handle invite message
let contact_values = InviteType::Group(gcd, g.g_addr, g.g_name).serialize();
let (msg, nw) = crate::apps::chat::LayerEvent::from_message(
&state.group,
&base,
gid,
fid,
@ -196,7 +199,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -196,7 +199,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let s = crate::apps::chat::event_message(&mut layer_lock, msg.id, gid, f.addr, &event);
drop(layer_lock);
results.layers.push((gid, f.gid, s));
crate::apps::chat::update_session(&base, &gid, &id, &msg, &mut results);
crate::apps::chat::update_session(&s_db, &gid, &id, &msg, &mut results);
// handle group member
let avatar = read_avatar(&base, &gid, &f.gid).await.unwrap_or(vec![]);
@ -237,14 +240,19 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -237,14 +240,19 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let m_type = MessageType::from_int(params[1].as_i64().ok_or(RpcError::ParseError)?);
let m_content = params[2].as_str().ok_or(RpcError::ParseError)?;
let base = state.layer.read().await.base().clone();
let db = group_db(&base, &gid)?;
let group_lock = state.group.read().await;
let base = group_lock.base().clone();
let db = group_lock.group_db(&gid)?;
let s_db = group_lock.session_db(&gid)?;
drop(group_lock);
let group = GroupChat::get(&db, &id)?;
let gcd = group.g_id;
let mid = Member::get_id(&db, &id, &gid)?;
let mut results = HandleResult::new();
let (nmsg, datetime, raw) = to_network_message(&base, &gid, m_type, m_content).await?;
let (nmsg, datetime, raw) =
to_network_message(&state.group, &base, &gid, m_type, m_content).await?;
let event = Event::MessageCreate(gid, nmsg, datetime);
if group.local {
@ -257,7 +265,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -257,7 +265,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
GroupChat::add_height(&db, id, new_h)?;
// UPDATE SESSION.
update_session(&base, &gid, &id, &msg, &mut results);
update_session(&s_db, &gid, &id, &msg, &mut results);
// broadcast.
broadcast(
@ -285,18 +293,17 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -285,18 +293,17 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let name = params[1].as_str().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new();
let base = state.layer.read().await.base().clone();
let db = group_db(&base, &gid)?;
let group_lock = state.group.read().await;
let db = group_lock.group_db(&gid)?;
let s_db = group_lock.session_db(&gid)?;
drop(group_lock);
let g = GroupChat::get(&db, &id)?;
let d = bincode::serialize(&LayerEvent::GroupName(g.g_id, name.to_owned()))?;
if g.local {
if let Ok(sid) = Session::update_name_by_id(
&session_db(&base, &gid)?,
&id,
&SessionType::Group,
&name,
) {
if let Ok(sid) = Session::update_name_by_id(&s_db, &id, &SessionType::Group, &name)
{
results.rpcs.push(session_update_name(gid, &sid, &name));
}
@ -322,11 +329,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -322,11 +329,15 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new();
let base = state.layer.read().await.base().clone();
let db = group_db(&base, &gid)?;
let group_lock = state.group.read().await;
let db = group_lock.group_db(&gid)?;
let s_db = group_lock.session_db(&gid)?;
drop(group_lock);
let g = GroupChat::delete(&db, &id)?;
let sid = Session::delete(&session_db(&base, &gid)?, &id, &SessionType::Group)?;
let sid = Session::delete(&s_db, &id, &SessionType::Group)?;
results.rpcs.push(session_delete(gid, &sid));
if g.local {

10
src/apps/jarvis/rpc.rs

@ -15,7 +15,6 @@ use chat_types::MessageType; @@ -15,7 +15,6 @@ use chat_types::MessageType;
use crate::account::lang_from_i64;
use crate::apps::chat::raw_to_network_message;
use crate::rpc::RpcState;
use crate::storage::jarvis_db;
use crate::utils::answer::load_answer;
use super::models::Message;
@ -53,7 +52,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -53,7 +52,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"jarvis-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let db = jarvis_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.jarvis_db(&gid)?;
let devices = Message::list(&db)?;
db.close()?;
let mut results = vec![];
@ -74,11 +73,12 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -74,11 +73,12 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let group_lock = state.group.read().await;
let base = group_lock.base().clone();
let sender = group_lock.sender();
let db = group_lock.jarvis_db(&gid)?;
drop(group_lock);
let (_, raw) = raw_to_network_message(&base, &gid, &m_type, content).await?;
let (_, raw) =
raw_to_network_message(&state.group, &base, &gid, &m_type, content).await?;
let mut msg = Message::new(m_type, raw, true);
let db = jarvis_db(&base, &gid)?;
msg.insert(&db)?;
let results = HandleResult::rpc(msg.to_rpc());
@ -92,7 +92,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -92,7 +92,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
"jarvis-delete",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let db = jarvis_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.jarvis_db(&gid)?;
Message::delete(&db, id)?;
db.close()?;
Ok(HandleResult::new())

32
src/apps/wallet/rpc.rs

@ -16,10 +16,7 @@ use web3::{ @@ -16,10 +16,7 @@ use web3::{
Web3,
};
use crate::{
rpc::RpcState,
storage::{account_db, wallet_db},
};
use crate::rpc::RpcState;
use super::{
models::{Address, Balance, ChainToken, Network, Token},
@ -331,7 +328,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -331,7 +328,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
handler.add_method(
"wallet-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let db = wallet_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.wallet_db(&gid)?;
let addresses = Address::list(&db)?;
Ok(HandleResult::rpc(wallet_list(addresses)))
},
@ -349,7 +346,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -349,7 +346,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let lang = account.lang();
let pass = account.pass.to_string();
let account_index = account.index as u32;
let db = wallet_db(group_lock.base(), &gid)?;
let db = group_lock.wallet_db(&gid)?;
drop(group_lock);
let mut results = HandleResult::new();
@ -377,7 +374,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -377,7 +374,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
results.rpcs.push(address.to_rpc());
if address.main {
let mut group_lock = state.group.write().await;
let a_db = account_db(group_lock.base())?;
let a_db = group_lock.account_db()?;
let account = group_lock.account_mut(&gid)?;
account.wallet = address.chain.update_main(&address.address, &account.wallet);
account.pub_height = account.pub_height + 1;
@ -404,7 +401,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -404,7 +401,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let group_lock = state.group.read().await;
let cbytes = group_lock.encrypt(&gid, lock, sk.as_ref())?;
let db = wallet_db(group_lock.base(), &gid)?;
let db = group_lock.wallet_db(&gid)?;
drop(group_lock);
let mut address = Address::import(chain, addr, cbytes);
@ -420,7 +417,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -420,7 +417,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let address = params[1].as_str().ok_or(RpcError::ParseError)?.to_owned();
let group_lock = state.group.read().await;
let db = wallet_db(group_lock.base(), &gid)?;
let db = group_lock.wallet_db(&gid)?;
let sender = group_lock.sender();
drop(group_lock);
@ -447,7 +444,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -447,7 +444,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let c_str = params[3].as_str().ok_or(RpcError::ParseError)?.to_owned();
let group_lock = state.group.read().await;
let db = wallet_db(group_lock.base(), &gid)?;
let db = group_lock.wallet_db(&gid)?;
let sender = group_lock.sender();
drop(group_lock);
@ -487,7 +484,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -487,7 +484,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
if !group_lock.check_lock(&gid, &lock) {
return Err(RpcError::Custom("Lock is invalid!".to_owned()));
}
let db = wallet_db(group_lock.base(), &gid)?;
let db = group_lock.wallet_db(&gid)?;
let address = Address::get(&db, &from)?;
let (mnemonic, pbytes) = if address.is_gen() {
@ -560,7 +557,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -560,7 +557,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let address = params[0].as_i64().ok_or(RpcError::ParseError)?;
let token = params[1].as_i64().ok_or(RpcError::ParseError)?;
let db = wallet_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.wallet_db(&gid)?;
let nfts = Balance::list(&db, &address, &token)?;
let mut results = vec![];
@ -578,7 +575,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -578,7 +575,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let token = params[1].as_i64().ok_or(RpcError::ParseError)?;
let hash = params[2].as_str().ok_or(RpcError::ParseError)?.to_owned();
let db = wallet_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.wallet_db(&gid)?;
let t = Token::get(&db, &token)?;
let a = Address::get(&db, &address)?;
@ -603,9 +600,12 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -603,9 +600,12 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
"wallet-main",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let base = state.layer.read().await.base().clone();
let db = wallet_db(&base, &gid)?;
let a_db = account_db(&base)?;
let group_lock = state.group.read().await;
let db = group_lock.wallet_db(&gid)?;
let a_db = group_lock.account_db()?;
drop(group_lock);
let address = Address::get(&db, &id)?;
Address::main(&db, &id)?;

112
src/event.rs

@ -15,7 +15,7 @@ use chat_types::NetworkMessage; @@ -15,7 +15,7 @@ use chat_types::NetworkMessage;
use crate::account::{Account, User};
use crate::apps::chat::LayerEvent;
use crate::consensus::Event;
use crate::consensus::Event as OldEvent;
use crate::group::{Group, GroupEvent};
use crate::layer::Layer;
use crate::migrate::consensus::{
@ -23,12 +23,36 @@ use crate::migrate::consensus::{ @@ -23,12 +23,36 @@ use crate::migrate::consensus::{
};
use crate::apps::chat::rpc as chat_rpc;
use crate::apps::chat::{from_model, handle_nmsg, Friend, Message, Request};
use crate::apps::chat::{from_model, Friend, Message, Request};
use crate::apps::file::{FileDid, RootDirectory};
use crate::rpc;
use crate::storage::{
account_db, chat_db, consensus_db, delete_avatar_sync, read_avatar_sync, write_avatar_sync,
};
use crate::storage::{delete_avatar_sync, read_avatar_sync, write_avatar_sync};
/// Online state synchronization.
#[derive(Serialize, Deserialize)]
pub(crate) enum State {
Account,
Session,
ChatMessage,
GroupMessage,
}
pub(crate) async fn _handle_state(
_gid: GroupId,
_addr: PeerId,
state: State,
_group: &Arc<RwLock<Group>>,
_layer: &Arc<RwLock<Layer>>,
_results: &mut HandleResult,
) -> Result<()> {
match state {
State::Account => {}
State::Session => {}
State::ChatMessage => {}
State::GroupMessage => {}
}
Ok(())
}
/// Event that will update data.
#[derive(Serialize, Deserialize)]
@ -168,7 +192,7 @@ impl InnerEvent { @@ -168,7 +192,7 @@ impl InnerEvent {
if our_height >= event_height {
// load current event_hegiht.
let events = Event::get_nexts(db, event_height)?;
let events = OldEvent::get_nexts(db, event_height)?;
for event in events {
let our_event_time = Self::event_time(&event.hash);
let remote_event_time = Self::event_time(&event_id);
@ -210,8 +234,8 @@ impl InnerEvent { @@ -210,8 +234,8 @@ impl InnerEvent {
layer: &Arc<RwLock<Layer>>,
) -> Result<()> {
let account = group.account(&gid)?;
let db = consensus_db(group.base(), &gid)?;
if Event::contains_hash(&db, &eid)? {
let db = group.consensus_db(&gid)?;
if OldEvent::contains_hash(&db, &eid)? {
return Ok(());
}
@ -240,7 +264,7 @@ impl InnerEvent { @@ -240,7 +264,7 @@ impl InnerEvent {
(ACCOUNT_TABLE_PATH, 0)
}
InnerEvent::SessionRequestCreate(is_me, remote, remark) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
// check if exist request.
if Friend::get_id(&db, &remote.id).is_ok() {
return Ok(());
@ -260,7 +284,7 @@ impl InnerEvent { @@ -260,7 +284,7 @@ impl InnerEvent {
(REQUEST_TABLE_PATH, request.id)
}
InnerEvent::SessionRequestHandle(rgid, is_ok, avatar) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if Friend::get_id(&db, &rgid).is_ok() {
return Ok(());
}
@ -293,7 +317,7 @@ impl InnerEvent { @@ -293,7 +317,7 @@ impl InnerEvent {
}
}
InnerEvent::SessionRequestDelete(rgid) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if let Ok(request) = Request::get_id(&db, &rgid) {
let rid = request.id;
Request::delete(&db, &request.id)?;
@ -308,7 +332,7 @@ impl InnerEvent { @@ -308,7 +332,7 @@ impl InnerEvent {
}
}
InnerEvent::SessionMessageCreate(rgid, is_me, hash, m) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if Message::exist(&db, &hash)? {
return Ok(());
}
@ -329,15 +353,18 @@ impl InnerEvent { @@ -329,15 +353,18 @@ impl InnerEvent {
));
}
let msg = handle_nmsg(m, is_me, gid, group.base(), &db, f.id, hash, results)?;
results.rpcs.push(chat_rpc::message_create(gid, &msg));
(MESSAGE_TABLE_PATH, msg.id)
// Need Refactor
// let msg =
// handle_nmsg(group, m, is_me, gid, group.base(), &db, f.id, hash, results)
// .await?;
// results.rpcs.push(chat_rpc::message_create(gid, &msg));
(MESSAGE_TABLE_PATH, 0)
} else {
return Ok(());
}
}
InnerEvent::SessionMessageDelete(hash) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if let Ok(m) = Message::get_by_hash(&db, &hash) {
Message::delete(&db, &m.id)?;
results.rpcs.push(chat_rpc::message_delete(gid, m.id));
@ -347,7 +374,7 @@ impl InnerEvent { @@ -347,7 +374,7 @@ impl InnerEvent {
}
}
InnerEvent::SessionFriendInfo(rgid, raddr, rname, ravatar) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if let Ok(mut f) = Friend::get_id(&db, &rgid) {
f.addr = raddr;
f.name = rname;
@ -362,7 +389,7 @@ impl InnerEvent { @@ -362,7 +389,7 @@ impl InnerEvent {
}
}
InnerEvent::SessionFriendUpdate(rgid, remark) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if let Ok(mut f) = Friend::get_id(&db, &rgid) {
f.remark = remark;
f.me_update(&db)?;
@ -375,7 +402,7 @@ impl InnerEvent { @@ -375,7 +402,7 @@ impl InnerEvent {
}
}
InnerEvent::SessionFriendClose(rgid) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if let Ok(f) = Friend::get_id(&db, &rgid) {
f.close(&db)?;
results.rpcs.push(chat_rpc::friend_close(gid, f.id));
@ -402,7 +429,7 @@ impl InnerEvent { @@ -402,7 +429,7 @@ impl InnerEvent {
}
}
InnerEvent::SessionFriendDelete(rgid) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if let Ok(f) = Friend::get_id(&db, &rgid) {
Friend::delete(&db, &f.id)?;
results.rpcs.push(chat_rpc::friend_delete(gid, f.id));
@ -452,11 +479,11 @@ impl InnerEvent { @@ -452,11 +479,11 @@ impl InnerEvent {
}
};
Event::merge(&db, eid, path, id, merge_height)?;
OldEvent::merge(&db, eid, path, id, merge_height)?;
drop(db);
drop(layer);
let account_db = account_db(group.base())?;
let account_db = group.account_db()?;
let account = group.account_mut(&gid)?;
account.update_consensus(&account_db, next_height, next_eid)?;
account_db.close()?;
@ -477,13 +504,13 @@ impl StatusEvent { @@ -477,13 +504,13 @@ impl StatusEvent {
) -> Result<()> {
match self {
StatusEvent::SessionFriendOnline(rgid) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if let Ok(_f) = Friend::get_id(&db, &rgid) {
// TODO
}
}
StatusEvent::SessionFriendOffline(rgid) => {
let db = chat_db(group.base(), &gid)?;
let db = group.chat_db(&gid)?;
if let Ok(f) = Friend::get_id(&db, &rgid) {
let layer_lock = layer.clone();
let rgid = f.gid;
@ -506,13 +533,14 @@ impl StatusEvent { @@ -506,13 +533,14 @@ impl StatusEvent {
impl SyncEvent {
pub async fn sync(
group: &Group,
base: &PathBuf,
gid: &GroupId,
account: &Account,
from: u64,
to: u64,
) -> Result<Vec<Self>> {
let db = consensus_db(base, gid)?;
let db = group.consensus_db(gid)?;
let sql = format!(
"SELECT id, hash, db_table, row from events WHERE id BETWEEN {} AND {}",
from, to
@ -546,7 +574,7 @@ impl SyncEvent { @@ -546,7 +574,7 @@ impl SyncEvent {
events.push(SyncEvent::Account(hash, name, avatar));
}
REQUEST_TABLE_PATH => {
let db = chat_db(base, gid)?;
let db = group.chat_db(gid)?;
let event = if let Ok(request) = Request::get(&db, &row) {
if pre_keys.contains(&(path, row)) {
events.push(SyncEvent::RequestHad(hash, request.gid));
@ -580,7 +608,7 @@ impl SyncEvent { @@ -580,7 +608,7 @@ impl SyncEvent {
events.push(event);
}
FRIEND_TABLE_PATH => {
let db = chat_db(base, gid)?;
let db = group.chat_db(gid)?;
let event = if let Ok(friend) = Friend::get(&db, &row) {
if pre_keys.contains(&(path, row)) {
events.push(SyncEvent::FriendHad(hash, friend.gid));
@ -611,7 +639,7 @@ impl SyncEvent { @@ -611,7 +639,7 @@ impl SyncEvent {
events.push(event);
}
MESSAGE_TABLE_PATH => {
let db = chat_db(base, gid)?;
let db = group.chat_db(gid)?;
let event = if let Ok(msg) = Message::get(&db, &row) {
let fgid = if let Ok(f) = Friend::get(&db, &msg.fid) {
f.gid
@ -658,7 +686,7 @@ impl SyncEvent { @@ -658,7 +686,7 @@ impl SyncEvent {
return Ok(());
}
let base = group.base().clone();
let consensus_db = consensus_db(&base, &gid)?;
let consensus_db = group.consensus_db(&gid)?;
let mut next = from;
for event in events {
@ -673,7 +701,7 @@ impl SyncEvent { @@ -673,7 +701,7 @@ impl SyncEvent {
| SyncEvent::Friend(eid, ..)
| SyncEvent::FriendHad(eid, ..)
| SyncEvent::Message(eid, ..) => {
if Event::contains_hash(&consensus_db, eid)? {
if OldEvent::contains_hash(&consensus_db, eid)? {
continue;
}
}
@ -702,7 +730,7 @@ impl SyncEvent { @@ -702,7 +730,7 @@ impl SyncEvent {
is_ok,
is_over,
) => {
let chat_db = chat_db(&base, &gid)?;
let chat_db = group.chat_db(&gid)?;
let request = if let Ok(mut req) = Request::get_id(&chat_db, &rgid) {
req.is_ok = is_ok;
req.is_over = is_over;
@ -742,7 +770,7 @@ impl SyncEvent { @@ -742,7 +770,7 @@ impl SyncEvent {
(eid, REQUEST_TABLE_PATH, rid)
}
SyncEvent::RequestHad(eid, rgid) => {
let chat_db = chat_db(&base, &gid)?;
let chat_db = group.chat_db(&gid)?;
let id = if let Ok(req) = Request::get_id(&chat_db, &rgid) {
req.id
} else {
@ -751,7 +779,7 @@ impl SyncEvent { @@ -751,7 +779,7 @@ impl SyncEvent {
(eid, REQUEST_TABLE_PATH, id)
}
SyncEvent::Friend(eid, fgid, faddr, fname, avatar, remark, is_closed) => {
let chat_db = chat_db(&base, &gid)?;
let chat_db = group.chat_db(&gid)?;
let id = if let Ok(mut friend) = Friend::get_id(&chat_db, &fgid) {
friend.addr = faddr;
friend.name = fname;
@ -791,7 +819,7 @@ impl SyncEvent { @@ -791,7 +819,7 @@ impl SyncEvent {
(eid, FRIEND_TABLE_PATH, id)
}
SyncEvent::FriendHad(eid, fgid) => {
let chat_db = chat_db(&base, &gid)?;
let chat_db = group.chat_db(&gid)?;
let id = if let Ok(friend) = Friend::get_id(&chat_db, &fgid) {
friend.id
} else {
@ -799,16 +827,16 @@ impl SyncEvent { @@ -799,16 +827,16 @@ impl SyncEvent {
};
(eid, FRIEND_TABLE_PATH, id)
}
SyncEvent::Message(eid, fgid, meid, is_me, m) => {
let chat_db = chat_db(&base, &gid)?;
SyncEvent::Message(eid, fgid, meid, _is_me, _m) => {
let chat_db = group.chat_db(&gid)?;
if Message::exist(&chat_db, &meid)? {
continue;
}
let id = if let Ok(f) = Friend::get_id(&chat_db, &fgid) {
let msg = handle_nmsg(m, is_me, gid, &base, &chat_db, f.id, eid, results)?;
results.rpcs.push(chat_rpc::message_create(gid, &msg));
msg.id
let id = if let Ok(_f) = Friend::get_id(&chat_db, &fgid) {
//let msg = handle_nmsg(m, is_me, gid, &base, &chat_db, f.id, eid, results)?;
//results.rpcs.push(chat_rpc::message_create(gid, &msg));
-1
} else {
-1
};
@ -820,6 +848,7 @@ impl SyncEvent { @@ -820,6 +848,7 @@ impl SyncEvent {
}
};
let account_db = group.account_db()?;
let account = group.account_mut(&gid)?;
let (merge_height, next_height, next_eid) = InnerEvent::merge_event(
&consensus_db,
@ -832,11 +861,10 @@ impl SyncEvent { @@ -832,11 +861,10 @@ impl SyncEvent {
None,
)?;
let account_db = account_db(&base)?;
account.update_consensus(&account_db, next_height, next_eid)?;
account_db.close()?;
Event::merge(&consensus_db, eid, path, id, merge_height)?;
OldEvent::merge(&consensus_db, eid, path, id, merge_height)?;
}
consensus_db.close()?;

160
src/group.rs

@ -10,14 +10,20 @@ use tdn::types::{ @@ -10,14 +10,20 @@ use tdn::types::{
use tdn_did::Proof;
use tokio::sync::{mpsc::Sender, RwLock};
use tdn_storage::local::DStorage;
use crate::account::{Account, User};
use crate::apps::device::rpc as device_rpc;
use crate::apps::device::Device;
use crate::consensus::Event;
use crate::event::{InnerEvent, StatusEvent, SyncEvent};
use crate::layer::Layer;
use crate::migrate::{
ACCOUNT_DB, CHAT_DB, CLOUD_DB, CONSENSUS_DB, DAO_DB, DOMAIN_DB, FILE_DB, GROUP_DB, JARVIS_DB,
SERVICE_DB, SESSION_DB, WALLET_DB,
};
use crate::rpc;
use crate::storage::{account_db, account_init, consensus_db, write_avatar};
use crate::storage::{account_init, write_avatar};
use crate::utils::crypto::{decrypt, encrypt};
use crate::utils::device_status::{device_info, device_status as local_device_status};
@ -159,11 +165,11 @@ impl Group { @@ -159,11 +165,11 @@ impl Group {
// first init sync.
if remote.avatar.len() > 0 {
let account_db = self.account_db()?;
if let Some(u) = self.accounts.get_mut(gid) {
if u.avatar.len() == 0 {
u.name = remote.name;
u.avatar = remote.avatar;
let account_db = account_db(&self.base)?;
u.update(&account_db)?;
account_db.close()?;
results.rpcs.push(rpc::account_update(
@ -175,6 +181,7 @@ impl Group { @@ -175,6 +181,7 @@ impl Group {
}
}
let db = self.consensus_db(gid)?;
let running = self.runnings.get_mut(gid).unwrap(); // safe unwrap. checked.
let mut new_addrs = vec![];
for a in others {
@ -189,7 +196,6 @@ impl Group { @@ -189,7 +196,6 @@ impl Group {
(remote_height, remote_event, new_addrs)
} else {
let mut device = Device::new(device_name, device_info, peer_id);
let db = consensus_db(&self.base, gid)?;
device.insert(&db)?;
db.close()?;
running
@ -411,18 +417,21 @@ impl Group { @@ -411,18 +417,21 @@ impl Group {
}
pub fn add_running(&mut self, gid: &GroupId, lock: &str) -> Result<(i64, bool)> {
if let Some(u) = self.accounts.get(gid) {
let (keypair, id, key) = if let Some(u) = self.accounts.get_mut(gid) {
let keypair = u.secret(&self.secret, lock)?;
if !self.runnings.contains_key(gid) {
// load devices to runnings.
let running = RunningAccount::init(keypair, &self.base, gid)?;
self.runnings.insert(gid.clone(), running);
Ok((u.id, false))
} else {
Ok((u.id, true))
}
u.cache_plainkey(&self.secret, lock)?;
(keypair, u.id, u.plainkey())
} else {
Err(anyhow!("user missing."))
return Err(anyhow!("user missing."));
};
if !self.runnings.contains_key(gid) {
// load devices to runnings.
let running = RunningAccount::init(keypair, &self.base, &key, gid)?;
self.runnings.insert(gid.clone(), running);
Ok((id, false))
} else {
Ok((id, true))
}
}
@ -476,36 +485,37 @@ impl Group { @@ -476,36 +485,37 @@ impl Group {
let account_id = account.gid;
if let Some(u) = self.accounts.get(&account_id) {
let running = RunningAccount::init(sk, &self.base, &account_id)?;
let running = RunningAccount::init(sk, &self.base, &account.plainkey(), &account_id)?;
self.runnings.insert(account_id, running);
return Ok((u.id, account_id));
}
account_init(&self.base, &account.gid).await?;
account_init(&self.base, &account.plainkey(), &account.gid).await?;
let account_db = account_db(&self.base)?;
let account_db = self.account_db()?;
account.insert(&account_db)?;
account_db.close()?;
let account_did = account.id;
let key = account.plainkey();
let _ = write_avatar(&self.base, &account_id, &account_id, &account.avatar).await;
self.accounts.insert(account.gid, account);
let (device_name, device_info) = device_info();
let mut device = Device::new(device_name, device_info, self.addr);
let db = consensus_db(&self.base, &account_id)?;
let db = self.consensus_db(&account_id)?;
device.insert(&db)?;
db.close()?;
self.runnings.insert(
account_id,
RunningAccount::init(sk, &self.base, &account_id)?,
RunningAccount::init(sk, &self.base, &key, &account_id)?,
);
Ok((account_did, account_id))
}
pub fn update_account(&mut self, gid: GroupId, name: &str, avatar: Vec<u8>) -> Result<()> {
let account_db = account_db(&self.base)?;
let account_db = self.account_db()?;
let account = self.account_mut(&gid)?;
account.name = name.to_owned();
if avatar.len() > 0 {
@ -525,9 +535,9 @@ impl Group { @@ -525,9 +535,9 @@ impl Group {
}
pub fn pin(&mut self, gid: &GroupId, lock: &str, new: &str) -> Result<()> {
let account_db = self.account_db()?;
if let Some(u) = self.accounts.get_mut(gid) {
u.pin(&self.secret, lock, new)?;
let account_db = account_db(&self.base)?;
u.update(&account_db)?;
account_db.close()
} else {
@ -631,7 +641,7 @@ impl Group { @@ -631,7 +641,7 @@ impl Group {
) -> Result<SendType> {
let (ancestors, hashes, is_min) = if to >= from {
let (ancestors, is_min) = Self::ancestor(from, to);
let db = consensus_db(&self.base, gid)?;
let db = self.consensus_db(gid)?;
let hashes = crate::consensus::Event::get_assign_hash(&db, &ancestors)?;
db.close()?;
(ancestors, hashes, is_min)
@ -657,17 +667,17 @@ impl Group { @@ -657,17 +667,17 @@ impl Group {
row: i64,
results: &mut HandleResult,
) -> Result<()> {
let base = self.base.clone();
let db = self.consensus_db(gid)?;
let account_db = self.account_db()?;
let account = self.account_mut(gid)?;
let pre_event = account.event;
let eheight = account.own_height + 1;
let eid = event.generate_event_id();
let db = consensus_db(&base, gid)?;
Event::merge(&db, eid, path, row, eheight)?;
drop(db);
let account_db = account_db(&base)?;
account.update_consensus(&account_db, eheight, eid)?;
account_db.close()?;
drop(account);
@ -702,6 +712,95 @@ impl Group { @@ -702,6 +712,95 @@ impl Group {
}
}
impl Group {
fn db_key(&self, gid: &GroupId) -> Result<String> {
Ok(self.account(gid)?.plainkey())
}
pub(crate) fn account_db(&self) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(ACCOUNT_DB);
DStorage::open(db_path, &hex::encode(&self.secret))
}
pub(crate) fn consensus_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(CONSENSUS_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn session_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(SESSION_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn chat_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(CHAT_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn file_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(FILE_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn _service_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(SERVICE_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn jarvis_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(JARVIS_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn group_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(GROUP_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn _dao_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(DAO_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn domain_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(DOMAIN_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn wallet_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(WALLET_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
pub(crate) fn _cloud_db(&self, gid: &GroupId) -> Result<DStorage> {
let mut db_path = self.base.clone();
db_path.push(gid.to_hex());
db_path.push(CLOUD_DB);
DStorage::open(db_path, &self.db_key(gid)?)
}
}
impl GroupEvent {
pub async fn handle(
group: &mut Group,
@ -782,7 +881,7 @@ impl GroupEvent { @@ -782,7 +881,7 @@ impl GroupEvent {
let remote_event = hashes.last().map(|v| *v).unwrap_or(EventId::default());
if account.own_height != remote_height || account.event != remote_event {
// check ancestor and merge.
let db = consensus_db(&group.base, &gid)?;
let db = group.consensus_db(&gid)?;
let ours = crate::consensus::Event::get_assign_hash(&db, &ancestors)?;
drop(db);
@ -840,8 +939,15 @@ impl GroupEvent { @@ -840,8 +939,15 @@ impl GroupEvent {
println!("====== DEBUG Sync Request: from: {} to {}", from, to);
// every time sync MAX is 100.
let last_to = if to - from > 100 { to - 100 } else { to };
let sync_events =
SyncEvent::sync(&group.base, &gid, group.account(&gid)?, from, last_to).await?;
let sync_events = SyncEvent::sync(
&group,
&group.base,
&gid,
group.account(&gid)?,
from,
last_to,
)
.await?;
let event = GroupEvent::SyncResponse(from, last_to, to, sync_events);
let data = bincode::serialize(&event).unwrap_or(vec![]);
results.groups.push((gid, SendType::Event(0, addr, data)));

12
src/group/running.rs

@ -5,11 +5,11 @@ use tdn::types::{ @@ -5,11 +5,11 @@ use tdn::types::{
group::GroupId,
primitive::{Peer, PeerId, Result},
};
use tdn_did::Keypair;
use tdn_storage::local::DStorage;
use crate::apps::device::Device;
use crate::storage::consensus_db;
use crate::migrate::CONSENSUS_DB;
pub(crate) struct RunningAccount {
/// secret keypair.
@ -25,9 +25,11 @@ pub(crate) struct RunningAccount { @@ -25,9 +25,11 @@ pub(crate) struct RunningAccount {
}
impl RunningAccount {
pub fn init(keypair: Keypair, base: &PathBuf, gid: &GroupId) -> Result<Self> {
// load devices to runnings.
let db = consensus_db(base, gid)?;
pub fn init(keypair: Keypair, base: &PathBuf, key: &str, gid: &GroupId) -> Result<Self> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(CONSENSUS_DB);
let db = DStorage::open(db_path, key)?;
let distributes = Device::distributes(&db)?;
let (device_name, device_info) = Device::device_info(&db)?;
db.close()?;

3
src/layer.rs

@ -14,7 +14,6 @@ use crate::apps::chat::{chat_conn, LayerEvent as ChatLayerEvent}; @@ -14,7 +14,6 @@ use crate::apps::chat::{chat_conn, LayerEvent as ChatLayerEvent};
use crate::apps::group::{group_conn, GROUP_ID};
use crate::group::Group;
use crate::session::{Session, SessionType};
use crate::storage::session_db;
/// ESSE app's `BaseLayerEvent`.
/// EVERY LAYER APP MUST EQUAL THE FIRST THREE FIELDS.
@ -129,7 +128,7 @@ impl Layer { @@ -129,7 +128,7 @@ impl Layer {
for mgid in self.runnings.keys() {
let mut vecs = vec![];
let db = session_db(&self.base, &mgid)?;
let db = group_lock.session_db(&mgid)?;
let sessions = Session::list(&db)?;
drop(db);

32
src/migrate.rs

@ -65,12 +65,12 @@ pub(crate) const WALLET_DB: &'static str = "wallet.db"; @@ -65,12 +65,12 @@ pub(crate) const WALLET_DB: &'static str = "wallet.db";
/// Account's cloud database name
pub(crate) const CLOUD_DB: &'static str = "cloud.db";
pub(crate) fn main_migrate(path: &PathBuf) -> Result<()> {
pub(crate) fn main_migrate(path: &PathBuf, key: &str) -> Result<()> {
let mut db_path = path.clone();
db_path.push(ACCOUNT_DB);
if db_path.exists() {
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
// 1. get current version.
let first_matrix =
@ -147,7 +147,7 @@ pub(crate) fn main_migrate(path: &PathBuf) -> Result<()> { @@ -147,7 +147,7 @@ pub(crate) fn main_migrate(path: &PathBuf) -> Result<()> {
let mut account_path = path.clone();
account_path.push(matrix.pop().unwrap().pop().unwrap().as_str());
account_path.push(&db_name);
let account_db = DStorage::open(account_path)?;
let account_db = DStorage::open(account_path, key)?;
// migrate
for i in &current_versions[db_version..] {
account_db.execute(i)?;
@ -165,7 +165,7 @@ pub(crate) fn main_migrate(path: &PathBuf) -> Result<()> { @@ -165,7 +165,7 @@ pub(crate) fn main_migrate(path: &PathBuf) -> Result<()> {
db.close()?;
} else {
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
// migrate all.
for i in ACCOUNT_VERSIONS.iter() {
db.execute(i)?;
@ -249,10 +249,10 @@ pub(crate) fn main_migrate(path: &PathBuf) -> Result<()> { @@ -249,10 +249,10 @@ pub(crate) fn main_migrate(path: &PathBuf) -> Result<()> {
Ok(())
}
pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
pub(crate) fn account_init_migrate(path: &PathBuf, key: &str) -> Result<()> {
let mut db_path = path.clone();
db_path.push(CONSENSUS_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &CONSENSUS_VERSIONS {
db.execute(i)?;
}
@ -260,7 +260,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -260,7 +260,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(SESSION_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &SESSION_VERSIONS {
db.execute(i)?;
}
@ -268,7 +268,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -268,7 +268,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(FILE_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &FILE_VERSIONS {
db.execute(i)?;
}
@ -276,7 +276,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -276,7 +276,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(SERVICE_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &SERVICE_VERSIONS {
db.execute(i)?;
}
@ -284,7 +284,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -284,7 +284,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(JARVIS_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &JARVIS_VERSIONS {
db.execute(i)?;
}
@ -292,7 +292,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -292,7 +292,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(GROUP_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &GROUP_VERSIONS {
db.execute(i)?;
}
@ -300,7 +300,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -300,7 +300,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(DAO_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &DAO_VERSIONS {
db.execute(i)?;
}
@ -308,7 +308,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -308,7 +308,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(CHAT_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &CHAT_VERSIONS {
db.execute(i)?;
}
@ -316,7 +316,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -316,7 +316,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(DOMAIN_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &DOMAIN_VERSIONS {
db.execute(i)?;
}
@ -324,7 +324,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -324,7 +324,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(WALLET_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &WALLET_VERSIONS {
db.execute(i)?;
}
@ -332,7 +332,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> { @@ -332,7 +332,7 @@ pub(crate) fn account_init_migrate(path: &PathBuf) -> Result<()> {
let mut db_path = path.clone();
db_path.push(CLOUD_DB);
let db = DStorage::open(db_path)?;
let db = DStorage::open(db_path, key)?;
for i in &CLOUD_VERSIONS {
db.execute(i)?;
}

26
src/rpc.rs

@ -21,7 +21,6 @@ use crate::event::InnerEvent; @@ -21,7 +21,6 @@ use crate::event::InnerEvent;
use crate::group::Group;
use crate::layer::{Layer, LayerEvent, Online};
use crate::session::{connect_session, Session, SessionType};
use crate::storage::{group_db, session_db};
pub(crate) fn init_rpc(
addr: PeerId,
@ -420,20 +419,19 @@ fn new_rpc_handler( @@ -420,20 +419,19 @@ fn new_rpc_handler(
// load all services layer created by this account.
// 1. group chat.
let self_addr = layer_lock.addr.clone();
let group_db = group_db(&layer_lock.base, &ogid)?;
let group_lock = state.group.read().await;
let group_db = group_lock.group_db(&ogid)?;
let s_db = group_lock.session_db(&ogid)?;
drop(group_lock);
let group_chats = GroupChat::local(&group_db)?;
for g in group_chats {
layer_lock.add_running(&g.g_id, ogid, g.id, g.height)?;
results.networks.push(NetworkType::AddGroup(g.g_id));
// 2. online group to self group onlines.
if let Some(session) = connect_session(
&layer_lock.base,
&ogid,
&SessionType::Group,
&g.id,
&self_addr,
)? {
if let Some(session) =
connect_session(&s_db, &SessionType::Group, &g.id, &self_addr)?
{
layer_lock.running_mut(&ogid)?.check_add_online(
g.g_id,
Online::Direct(self_addr),
@ -541,7 +539,7 @@ fn new_rpc_handler( @@ -541,7 +539,7 @@ fn new_rpc_handler(
handler.add_method(
"session-list",
|gid: GroupId, _params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let db = session_db(state.layer.read().await.base(), &gid)?;
let db = state.group.read().await.session_db(&gid)?;
Ok(HandleResult::rpc(session_list(Session::list(&db)?)))
},
);
@ -553,7 +551,7 @@ fn new_rpc_handler( @@ -553,7 +551,7 @@ fn new_rpc_handler(
let remote = GroupId::from_hex(params[1].as_str().ok_or(RpcError::ParseError)?)?;
let group_lock = state.group.read().await;
let db = session_db(group_lock.base(), &gid)?;
let db = group_lock.session_db(&gid)?;
Session::readed(&db, &id)?;
let mut layer_lock = state.layer.write().await;
@ -595,7 +593,7 @@ fn new_rpc_handler( @@ -595,7 +593,7 @@ fn new_rpc_handler(
let remote = GroupId::from_hex(params[1].as_str().ok_or(RpcError::ParseError)?)?;
let must = params[2].as_bool().ok_or(RpcError::ParseError)?; // if need must suspend.
let db = session_db(state.group.read().await.base(), &gid)?;
let db = state.group.read().await.session_db(&gid)?;
let s = Session::get(&db, &id)?;
drop(db);
@ -637,7 +635,7 @@ fn new_rpc_handler( @@ -637,7 +635,7 @@ fn new_rpc_handler(
"session-readed",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let db = session_db(state.group.read().await.base(), &gid)?;
let db = state.group.read().await.session_db(&gid)?;
Session::readed(&db, &id)?;
Ok(HandleResult::new())
},
@ -650,7 +648,7 @@ fn new_rpc_handler( @@ -650,7 +648,7 @@ fn new_rpc_handler(
let is_top = params[1].as_bool().ok_or(RpcError::ParseError)?;
let is_close = params[2].as_bool().ok_or(RpcError::ParseError)?;
let db = session_db(state.group.read().await.base(), &gid)?;
let db = state.group.read().await.session_db(&gid)?;
Session::update(&db, &id, is_top, is_close)?;
Ok(HandleResult::new())
},

13
src/server.rs

@ -12,14 +12,15 @@ use tokio::{ @@ -12,14 +12,15 @@ use tokio::{
sync::RwLock,
};
use tdn_storage::local::DStorage;
use crate::account::Account;
use crate::apps::app_layer_handle;
use crate::group::Group;
use crate::layer::Layer;
use crate::migrate::main_migrate;
use crate::migrate::{main_migrate, ACCOUNT_DB};
use crate::primitives::network_seeds;
use crate::rpc::{init_rpc, inner_rpc};
use crate::storage::account_db;
pub const DEFAULT_WS_ADDR: &'static str = "127.0.0.1:8080";
pub const DEFAULT_LOG_FILE: &'static str = "esse.log.txt";
@ -33,8 +34,6 @@ pub async fn start(db_path: String) -> Result<()> { @@ -33,8 +34,6 @@ pub async fn start(db_path: String) -> Result<()> {
}
init_log(db_path.clone());
main_migrate(&db_path)?;
info!("Core storage path {:?}", db_path);
let mut config = Config::load_save(db_path.clone()).await;
config.db_path = Some(db_path.clone());
@ -54,8 +53,12 @@ pub async fn start(db_path: String) -> Result<()> { @@ -54,8 +53,12 @@ pub async fn start(db_path: String) -> Result<()> {
);
let rand_secret = config.secret.clone();
main_migrate(&db_path, &hex::encode(&rand_secret))?;
info!("Core storage path {:?}", db_path);
let account_db = account_db(&db_path)?;
let mut account_db_path = db_path.clone();
account_db_path.push(ACCOUNT_DB);
let account_db = DStorage::open(account_db_path, &hex::encode(&rand_secret))?;
let accounts = Account::all(&account_db)?;
account_db.close()?;
let mut me: HashMap<GroupId, Account> = HashMap::new();

8
src/session.rs

@ -1,4 +1,3 @@ @@ -1,4 +1,3 @@
use std::path::PathBuf;
use tdn::types::{
group::GroupId,
primitive::{PeerId, Result},
@ -6,8 +5,6 @@ use tdn::types::{ @@ -6,8 +5,6 @@ use tdn::types::{
};
use tdn_storage::local::{DStorage, DsValue};
use crate::storage::session_db;
pub(crate) enum SessionType {
Chat,
Group,
@ -266,14 +263,11 @@ impl Session { @@ -266,14 +263,11 @@ impl Session {
#[inline]
pub(crate) fn connect_session(
base: &PathBuf,
mgid: &GroupId,
db: &DStorage,
s_type: &SessionType,
fid: &i64,
addr: &PeerId,
) -> Result<Option<Session>> {
let db = session_db(base, mgid)?;
let sql = format!("SELECT id, fid, gid, addr, s_type, name, is_top, is_close, last_datetime, last_content, last_readed FROM sessions WHERE s_type = {} AND fid = {}", s_type.to_int(), fid);
let mut matrix = db.query(&sql)?;

105
src/storage.rs

@ -5,12 +5,8 @@ use std::time::{SystemTime, UNIX_EPOCH}; @@ -5,12 +5,8 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tokio::fs;
use tdn::types::{group::GroupId, primitive::Result};
use tdn_storage::local::DStorage;
use crate::migrate::{
account_init_migrate, ACCOUNT_DB, CHAT_DB, CLOUD_DB, CONSENSUS_DB, DAO_DB, DOMAIN_DB, FILE_DB,
GROUP_DB, JARVIS_DB, SERVICE_DB, SESSION_DB, WALLET_DB,
};
use crate::migrate::account_init_migrate;
const FILES_DIR: &'static str = "files";
const IMAGE_DIR: &'static str = "images";
@ -328,107 +324,12 @@ pub(crate) fn _write_emoji(base: &PathBuf, gid: &GroupId) -> Result<()> { @@ -328,107 +324,12 @@ pub(crate) fn _write_emoji(base: &PathBuf, gid: &GroupId) -> Result<()> {
Ok(())
}
#[inline]
pub(crate) fn account_db(base: &PathBuf) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(ACCOUNT_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn consensus_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(CONSENSUS_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn session_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(SESSION_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn chat_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(CHAT_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn file_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(FILE_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn _service_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(SERVICE_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn jarvis_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(JARVIS_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn group_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(GROUP_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn dao_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(DAO_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn domain_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(DOMAIN_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn wallet_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(WALLET_DB);
DStorage::open(db_path)
}
#[inline]
pub(crate) fn cloud_db(base: &PathBuf, gid: &GroupId) -> Result<DStorage> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
db_path.push(CLOUD_DB);
DStorage::open(db_path)
}
/// account independent db and storage directory.
pub(crate) async fn account_init(base: &PathBuf, gid: &GroupId) -> Result<()> {
pub(crate) async fn account_init(base: &PathBuf, key: &str, gid: &GroupId) -> Result<()> {
let mut db_path = base.clone();
db_path.push(gid.to_hex());
init_local_files(&db_path).await?;
// Inner Database.
account_init_migrate(&db_path)
account_init_migrate(&db_path, key)
}

Loading…
Cancel
Save