Browse Source

add own handle message

pull/24/head
Sun 4 years ago
parent
commit
956bd06af1
  1. 2
      src/account.rs
  2. 2
      src/apps/cloud/layer.rs
  3. 43
      src/apps/device/models.rs
  4. 12
      src/apps/device/rpc.rs
  5. 2
      src/apps/file/mod.rs
  6. 2
      src/apps/group/layer.rs
  7. 2
      src/apps/group/models/group.rs
  8. 2
      src/apps/group/rpc.rs
  9. 2
      src/apps/wallet/models.rs
  10. 2
      src/global.rs
  11. 11
      src/group/handle.rs
  12. 25
      src/group/mod.rs
  13. 16
      src/group/models.rs
  14. 11
      src/group/rpc.rs
  15. 15
      src/layer.rs
  16. 4
      src/lib.rs
  17. 11
      src/migrate/consensus.rs
  18. 846
      src/own/mod.rs
  19. 4
      src/rpc.rs
  20. 2
      src/server.rs
  21. 4
      src/storage.rs
  22. 3
      src/utils/crypto.rs

2
src/account.rs

@ -347,7 +347,7 @@ impl Account { @@ -347,7 +347,7 @@ impl Account {
db.delete(&sql)
}
pub fn update_consensus(&mut self, db: &DStorage, height: u64, eid: EventId) -> Result<usize> {
pub fn _update_consensus(&mut self, db: &DStorage, height: u64, eid: EventId) -> Result<usize> {
self.own_height = height;
self.event = eid;
let sql = format!(

2
src/apps/cloud/layer.rs

@ -7,7 +7,7 @@ use tdn::types::{ @@ -7,7 +7,7 @@ use tdn::types::{
use crate::global::Global;
pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<HandleResult> {
pub(crate) async fn handle(msg: RecvType, _global: &Arc<Global>) -> Result<HandleResult> {
let results = HandleResult::new();
match msg {

43
src/apps/device/models.rs

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
use std::time::{SystemTime, UNIX_EPOCH};
use tdn::types::primitives::{Peer, Result};
use tdn::types::primitives::{Peer, PeerId, Result};
use tdn::types::rpc::{json, RpcParam};
use tdn_storage::local::{DStorage, DsValue};
@ -7,25 +7,28 @@ pub(crate) struct Device { @@ -7,25 +7,28 @@ pub(crate) struct Device {
pub id: i64,
pub name: String,
pub info: String,
pub assist: PeerId,
pub peer: Peer,
pub lasttime: i64,
pub online: bool,
}
impl Device {
pub fn new(name: String, info: String, peer: Peer) -> Self {
pub fn new(peer: Peer) -> Self {
let start = SystemTime::now();
let lasttime = start
.duration_since(UNIX_EPOCH)
.map(|s| s.as_secs())
.unwrap_or(0) as i64; // safe for all life.
let assist = peer.id;
Self {
lasttime,
info,
name,
assist,
peer,
id: 0,
name: String::new(),
info: String::new(),
online: true,
}
}
@ -35,6 +38,7 @@ impl Device { @@ -35,6 +38,7 @@ impl Device {
Device {
lasttime: v.pop().unwrap().as_i64(),
peer: Peer::from_string(v.pop().unwrap().as_str()).unwrap_or(Peer::default()),
assist: PeerId::from_hex(v.pop().unwrap().as_str()).unwrap_or(PeerId::default()),
info: v.pop().unwrap().as_string(),
name: v.pop().unwrap().as_string(),
id: v.pop().unwrap().as_i64(),
@ -47,6 +51,7 @@ impl Device { @@ -47,6 +51,7 @@ impl Device {
self.id,
self.name,
self.info,
self.assist.to_hex(),
self.peer.to_string(),
self.lasttime,
if self.online { "1" } else { "0" },
@ -55,7 +60,7 @@ impl Device { @@ -55,7 +60,7 @@ impl Device {
/// load account devices.
pub fn list(db: &DStorage) -> Result<Vec<Device>> {
let matrix = db.query("SELECT id, name, info, peer, lasttime FROM devices")?;
let matrix = db.query("SELECT id, name, info, assist, peer, lasttime FROM devices")?;
let mut devices = vec![];
for values in matrix {
devices.push(Device::from_values(values));
@ -63,11 +68,24 @@ impl Device { @@ -63,11 +68,24 @@ impl Device {
Ok(devices)
}
pub fn _get(db: &DStorage, aid: &PeerId) -> Result<Option<Device>> {
let mut matrix = db.query(&format!(
"SELECT id, name, info, assist, peer, lasttime FROM devices WHERE assist = '{}'",
aid.to_hex()
))?;
if let Some(values) = matrix.pop() {
Ok(Some(Device::from_values(values)))
} else {
Ok(None)
}
}
pub fn insert(&mut self, db: &DStorage) -> Result<()> {
let sql = format!(
"INSERT INTO devices (name, info, peer, lasttime) VALUES ('{}', '{}', '{}', {})",
"INSERT INTO devices (name, info, assist, peer, lasttime) VALUES ('{}', '{}', '{}', '{}', {})",
self.name,
self.info,
self.assist.to_hex(),
self.peer.to_string(),
self.lasttime,
);
@ -76,14 +94,11 @@ impl Device { @@ -76,14 +94,11 @@ impl Device {
Ok(())
}
pub fn _update(db: &DStorage, id: i64, name: &str) -> Result<usize> {
let sql = format!("UPDATE devices SET name='{}' WHERE id = {}", name, id);
db.update(&sql)
}
/// used in rpc, when what to delete a friend.
pub fn _delete(&self, db: &DStorage) -> Result<usize> {
let sql = format!("DELETE FROM devices WHERE id = {}", self.id);
pub fn update(db: &DStorage, id: i64, name: &str, info: &str) -> Result<usize> {
let sql = format!(
"UPDATE devices SET name='{}', info = '{}' WHERE id = {}",
name, info, id
);
db.update(&sql)
}
}

12
src/apps/device/rpc.rs

@ -15,11 +15,6 @@ pub(crate) fn device_create(device: &Device) -> RpcParam { @@ -15,11 +15,6 @@ pub(crate) fn device_create(device: &Device) -> RpcParam {
rpc_response(0, "device-create", json!(device.to_rpc()))
}
#[inline]
pub(crate) fn _device_remove(id: i64) -> RpcParam {
rpc_response(0, "device-remove", json!([id]))
}
#[inline]
pub(crate) fn device_online(id: i64) -> RpcParam {
rpc_response(0, "device-online", json!([id]))
@ -32,6 +27,7 @@ pub(crate) fn device_offline(id: i64) -> RpcParam { @@ -32,6 +27,7 @@ pub(crate) fn device_offline(id: i64) -> RpcParam {
#[inline]
pub(crate) fn device_status(
id: i64,
cpu: u32,
memory: u32,
swap: u32,
@ -45,7 +41,7 @@ pub(crate) fn device_status( @@ -45,7 +41,7 @@ pub(crate) fn device_status(
rpc_response(
0,
"device-status",
json!([cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]),
json!([id, cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]),
)
}
@ -77,7 +73,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) { @@ -77,7 +73,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let own_lock = state.own.read().await;
if id == own_lock.device()?.id {
if id == own_lock.current_device()?.0 {
let uptime = own_lock.uptime;
let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) =
local_device_status();
@ -95,7 +91,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) { @@ -95,7 +91,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<Global>) {
handler.add_method(
"device-search",
|_params: Vec<RpcParam>, state: Arc<Global>| async move {
|_params: Vec<RpcParam>, _state: Arc<Global>| async move {
//let msg = state.own.read().await.create_message(&gid, Peer::peer(addr))?;
//Ok(HandleResult::group(gid, msg))
Ok(HandleResult::new())

2
src/apps/file/mod.rs

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
mod models;
mod rpc;
pub(crate) use models::{FileDid, RootDirectory};
//pub(crate) use models::{FileDid, RootDirectory};
pub(crate) use rpc::new_rpc_handler;

2
src/apps/group/layer.rs

@ -278,7 +278,7 @@ async fn handle_event( @@ -278,7 +278,7 @@ async fn handle_event(
broadcast(&gid, global, &LayerEvent::GroupName(gid, name), results).await?;
}
}
LayerEvent::GroupClose(gid) => {
LayerEvent::GroupClose(_gid) => {
// PEER
let group = GroupChat::close(&db, &id)?;
let s_db = session_db(&global.base, &pid, &db_key)?;

2
src/apps/group/models/group.rs

@ -143,7 +143,7 @@ impl GroupChat { @@ -143,7 +143,7 @@ impl GroupChat {
}
pub fn insert(&mut self, db: &DStorage) -> Result<()> {
let mut unique_check = db.query(&format!(
let unique_check = db.query(&format!(
"SELECT id from groups WHERE gid = {} AND addr = '{}'",
self.gid,
self.addr.to_hex()

2
src/apps/group/rpc.rs

@ -3,7 +3,7 @@ use group_types::{Event, LayerEvent, GROUP_CHAT_ID}; @@ -3,7 +3,7 @@ use group_types::{Event, LayerEvent, GROUP_CHAT_ID};
use std::sync::Arc;
use tdn::types::{
message::{RpcSendMessage, SendType},
primitives::{HandleResult, PeerId},
primitives::HandleResult,
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam},
};

2
src/apps/wallet/models.rs

@ -522,7 +522,7 @@ impl Balance { @@ -522,7 +522,7 @@ impl Balance {
}
/// use for common and erc20.
pub fn update(db: &DStorage, address: &i64, token: &i64, value: &str) -> Result<()> {
pub fn _update(db: &DStorage, address: &i64, token: &i64, value: &str) -> Result<()> {
let matrix = db.query(&format!(
"SELECT id FROM balances WHERE address = {} AND token = {}",
address, token,

2
src/global.rs

@ -1,7 +1,7 @@ @@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::path::PathBuf;
use tdn::{
prelude::{GroupId, P2pConfig, PeerId, PeerKey, ReceiveMessage, SendMessage},
prelude::{GroupId, P2pConfig, PeerId, ReceiveMessage, SendMessage},
types::message::RpcSendMessage,
};
use tokio::{sync::mpsc::Sender, sync::RwLock};

11
src/group/handle.rs

@ -1,9 +1,6 @@ @@ -1,9 +1,6 @@
use esse_primitives::{MessageType, NetworkMessage};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use esse_primitives::MessageType;
use std::sync::Arc;
use tdn::types::{
group::EventId,
message::{RecvType, SendType},
primitives::{DeliveryType, HandleResult, Peer, PeerId, Result},
};
@ -19,9 +16,7 @@ use crate::session::{connect_session, Session, SessionType}; @@ -19,9 +16,7 @@ use crate::session::{connect_session, Session, SessionType};
use crate::storage::{account_db, chat_db, session_db, write_avatar_sync};
use super::rpc;
use super::{
from_model, from_network_message, handle_nmsg, Friend, GroupEvent, InviteType, Message, Request,
};
use super::{handle_nmsg, Friend, GroupEvent, Message, Request};
pub(crate) async fn group_handle(msg: RecvType, global: &Arc<Global>) -> Result<HandleResult> {
debug!("---------DEBUG--------- GOT GROUP MESSAGE");
@ -304,7 +299,7 @@ impl GroupEvent { @@ -304,7 +299,7 @@ impl GroupEvent {
GroupEvent::Close => {
let mut group = global.group.write().await;
group.rm_online(&fpid);
let (sid, fid) = group.get(&fpid)?;
let (_sid, fid) = group.get(&fpid)?;
let keep = group.is_online(&fpid);
drop(group);

25
src/group/mod.rs

@ -1,22 +1,13 @@ @@ -1,22 +1,13 @@
use esse_primitives::{MessageType, NetworkMessage};
use esse_primitives::NetworkMessage;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tdn::types::{
group::EventId,
message::{RecvType, SendType},
primitives::{DeliveryType, HandleResult, Peer, PeerId, Result},
message::SendType,
primitives::{HandleResult, PeerId, Result},
};
use tdn_storage::local::DStorage;
use crate::account::{Account, User};
use crate::global::Global;
use crate::rpc::{
notice_menu, session_connect, session_create, session_last, session_lost, session_suspend,
session_update_name,
};
use crate::session::{connect_session, Session, SessionType};
use crate::storage::{account_db, chat_db, session_db, write_avatar_sync};
use crate::account::User;
mod handle;
mod models;
@ -24,8 +15,8 @@ mod rpc; @@ -24,8 +15,8 @@ mod rpc;
pub(crate) use handle::{group_conn, group_handle, update_session};
pub(crate) use models::{
from_model, from_network_message, handle_nmsg, raw_to_network_message, to_network_message,
Friend, InviteType, Message, Request,
from_network_message, handle_nmsg, raw_to_network_message, to_network_message, Friend,
InviteType, Message, Request,
};
pub(crate) use rpc::group_rpc;
@ -175,11 +166,11 @@ impl GroupSession { @@ -175,11 +166,11 @@ impl GroupSession {
}
}
pub fn info(&self) -> (i64, i64, i64) {
pub fn _info(&self) -> (i64, i64, i64) {
(self.height, self.sid, self.fid)
}
pub fn increased(&mut self) -> i64 {
pub fn _increased(&mut self) -> i64 {
self.height += 1;
self.height
}

16
src/group/models.rs

@ -3,7 +3,7 @@ mod message; @@ -3,7 +3,7 @@ mod message;
mod request;
pub(crate) use self::friend::Friend;
pub(crate) use self::message::{from_model, handle_nmsg, Message};
pub(crate) use self::message::{handle_nmsg, Message};
pub(crate) use self::request::Request;
use esse_primitives::{id_from_str, id_to_str, MessageType, NetworkMessage};
@ -11,7 +11,7 @@ use group_types::GroupChatId; @@ -11,7 +11,7 @@ use group_types::GroupChatId;
use std::path::PathBuf;
use tdn::types::primitives::{HandleResult, PeerId, Result};
//use crate::apps::group::GroupChat;
use crate::apps::group::GroupChat;
use crate::rpc::session_create;
use crate::storage::{
chat_db, group_db, read_avatar, read_db_file, read_file, read_image, read_record, session_db,
@ -57,14 +57,14 @@ pub(crate) async fn from_network_message( @@ -57,14 +57,14 @@ pub(crate) async fn from_network_message(
InviteType::Group(gcd, addr, name) => {
// 1 add group chat.
let db = group_db(base, own, db_key)?;
//let mut g = GroupChat::from(gcd, 0, addr, name);
//g.insert(&db)?;
let mut g = GroupChat::from(gcd, 0, addr, name);
g.insert(&db)?;
// 2 add new session.
//let mut session = g.to_session();
//let s_db = session_db(base, own, db_key)?;
//session.insert(&s_db)?;
//results.rpcs.push(session_create(&session));
let mut session = g.to_session();
let s_db = session_db(base, own, db_key)?;
session.insert(&s_db)?;
results.rpcs.push(session_create(&session));
}
}

11
src/group/rpc.rs

@ -19,7 +19,7 @@ pub(crate) fn friend_info(friend: &Friend) -> RpcParam { @@ -19,7 +19,7 @@ pub(crate) fn friend_info(friend: &Friend) -> RpcParam {
}
#[inline]
pub(crate) fn friend_update(fid: i64, remark: &str) -> RpcParam {
pub(crate) fn _friend_update(fid: i64, remark: &str) -> RpcParam {
rpc_response(0, "chat-friend-update", json!([fid, remark]))
}
@ -29,7 +29,7 @@ pub(crate) fn friend_close(fid: i64) -> RpcParam { @@ -29,7 +29,7 @@ pub(crate) fn friend_close(fid: i64) -> RpcParam {
}
#[inline]
pub(crate) fn friend_delete(fid: i64) -> RpcParam {
pub(crate) fn _friend_delete(fid: i64) -> RpcParam {
rpc_response(0, "chat-friend-delete", json!([fid]))
}
@ -69,7 +69,7 @@ pub(crate) fn message_delivery(id: i64, is_d: bool) -> RpcParam { @@ -69,7 +69,7 @@ pub(crate) fn message_delivery(id: i64, is_d: bool) -> RpcParam {
}
#[inline]
pub(crate) fn message_delete(id: i64) -> RpcParam {
pub(crate) fn _message_delete(id: i64) -> RpcParam {
rpc_response(0, "chat-message-delete", json!([id]))
}
@ -140,7 +140,7 @@ pub(crate) fn group_rpc(handler: &mut RpcHandler<Global>) { @@ -140,7 +140,7 @@ pub(crate) fn group_rpc(handler: &mut RpcHandler<Global>) {
let id = params[0].as_i64().ok_or(RpcError::ParseError)?;
let remark = params[1].as_str().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::new();
let results = HandleResult::new();
let pid = state.pid().await;
let db_key = state.own.read().await.db_key(&pid)?;
let db = chat_db(&state.base, &pid, &db_key)?;
@ -342,7 +342,7 @@ pub(crate) fn group_rpc(handler: &mut RpcHandler<Global>) { @@ -342,7 +342,7 @@ pub(crate) fn group_rpc(handler: &mut RpcHandler<Global>) {
let data = bincode::serialize(&GroupEvent::Reject).unwrap_or(vec![]);
let msg = SendType::Event(0, req.pid, data);
let mut results = HandleResult::group(msg);
let results = HandleResult::group(msg);
// state.own.write().await.broadcast(
// &gid,
@ -458,7 +458,6 @@ pub(crate) fn group_rpc(handler: &mut RpcHandler<Global>) { @@ -458,7 +458,6 @@ pub(crate) fn group_rpc(handler: &mut RpcHandler<Global>) {
let db_key = state.own.read().await.db_key(&pid)?;
let db = chat_db(&state.base, &pid, &db_key)?;
let msg = Message::get(&db, &id)?;
Message::delete(&db, &id)?;
drop(db);

15
src/layer.rs

@ -1,19 +1,6 @@ @@ -1,19 +1,6 @@
use esse_primitives::id_to_str;
use group_types::GroupChatId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tdn::types::{
message::SendType,
primitives::{HandleResult, Peer, PeerId, Result},
};
use tokio::sync::RwLock;
use crate::account::User;
use crate::group::GroupEvent;
use crate::own::Own;
use crate::session::{Session, SessionType};
use tdn::types::primitives::{PeerId, Result};
/// ESSE layers.
pub(crate) struct Layer {

4
src/lib.rs

@ -24,6 +24,8 @@ mod session; @@ -24,6 +24,8 @@ mod session;
mod storage;
mod utils;
const DEFAULT_LOG_FILE: &'static str = "esse.log.txt";
#[cfg(target_os = "android")]
#[allow(non_snake_case)]
pub mod android {
@ -60,7 +62,7 @@ pub extern "C" fn start(db_path: *const c_char) { @@ -60,7 +62,7 @@ pub extern "C" fn start(db_path: *const c_char) {
}
// init log file.
let file_appender = tracing_appender::rolling::daily(&s_path, server::DEFAULT_LOG_FILE);
let file_appender = tracing_appender::rolling::daily(&s_path, DEFAULT_LOG_FILE);
let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
.with_writer(non_blocking)

11
src/migrate/consensus.rs

@ -1,8 +1,8 @@ @@ -1,8 +1,8 @@
pub(crate) const ACCOUNT_TABLE_PATH: i64 = 0;
pub(crate) const FRIEND_TABLE_PATH: i64 = 1;
pub(crate) const REQUEST_TABLE_PATH: i64 = 2;
pub(crate) const MESSAGE_TABLE_PATH: i64 = 3;
pub(crate) const FILE_TABLE_PATH: i64 = 4;
//pub(crate) const ACCOUNT_TABLE_PATH: i64 = 0;
//pub(crate) const FRIEND_TABLE_PATH: i64 = 1;
//pub(crate) const REQUEST_TABLE_PATH: i64 = 2;
//pub(crate) const MESSAGE_TABLE_PATH: i64 = 3;
//pub(crate) const FILE_TABLE_PATH: i64 = 4;
#[rustfmt::skip]
pub(super) const CONSENSUS_VERSIONS: [&str; 9] = [
@ -10,6 +10,7 @@ pub(super) const CONSENSUS_VERSIONS: [&str; 9] = [ @@ -10,6 +10,7 @@ pub(super) const CONSENSUS_VERSIONS: [&str; 9] = [
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
name TEXT NOT NULL,
info TEXT NOT NULL,
assist TEXT NOT NULL,
peer TEXT NOT NULL,
lasttime INTEGER NOT NULL);",
"CREATE TABLE IF NOT EXISTS db_tables(

846
src/own/mod.rs

@ -1,27 +1,25 @@ @@ -1,27 +1,25 @@
use esse_primitives::id_to_str;
//use esse_primitives::id_to_str;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tdn::types::{
group::{EventId, GroupId},
message::{RecvType, SendMessage, SendType},
group::EventId,
message::{RecvType, SendType},
primitives::{HandleResult, Peer, PeerId, PeerKey, Result},
};
use tdn_storage::local::DStorage;
use tokio::sync::{mpsc::Sender, RwLock};
use crate::account::{Account, User};
use crate::global::Global;
//use crate::apps::device::rpc as device_rpc;
use crate::apps::device::rpc as device_rpc;
use crate::apps::device::Device;
use crate::global::Global;
//use crate::consensus::Event;
//use crate::event::{InnerEvent, StatusEvent, SyncEvent};
//use crate::layer::Layer;
//use crate::rpc;
use crate::storage::{account_db, account_init, consensus_db, wallet_db, write_avatar};
use crate::utils::crypto::{decrypt, encrypt};
//use crate::utils::crypto::{decrypt, encrypt};
use crate::utils::device_status::{device_info, device_status as local_device_status};
/// ESSE own distributed accounts.
@ -51,15 +49,12 @@ pub(crate) enum OwnEvent { @@ -51,15 +49,12 @@ pub(crate) enum OwnEvent {
/// Sync event.
Event(u64, EventId, EventId),
//Event(u64, EventId, EventId, InnerEvent),
/// Sync infomations.
Status,
//Status(StatusEvent),
/// device's info update.
DeviceUpdate(String),
/// Sync infomations (name, info).
Info(String, String),
/// update device's name.
DeviceUpdate(PeerId, String),
/// device deleted.
DeviceDelete,
/// offline.
DeviceOffline,
DeviceDelete(PeerId),
/// Device status request.
StatusRequest,
/// Device status response.
@ -79,156 +74,46 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle @@ -79,156 +74,46 @@ pub(crate) async fn handle(msg: RecvType, global: &Arc<Global>) -> Result<Handle
let mut results = HandleResult::new();
match msg {
RecvType::Connect(peer, data) => {
//self.hanlde_connect(&mut results, peer, data, true)?;
RecvType::Connect(peer, _) => {
let pid = global.pid().await;
let db_key = global.own.read().await.db_key(&pid)?;
let db = consensus_db(&global.base, &pid, &db_key)?;
if let Ok(id) = global.own.write().await.online(&peer.id) {
results.rpcs.push(device_rpc::device_online(id));
} else {
let aid = peer.id;
let mut device = Device::new(peer);
device.insert(&db)?;
let (_id, name, info) = global.own.read().await.current_device()?;
let own_event = OwnEvent::Info(name, info);
let data = bincode::serialize(&own_event)?;
let msg = SendType::Event(0, aid, data);
results.owns.push(msg);
results.rpcs.push(device_rpc::device_create(&device));
global.own.write().await.add_device(device);
};
}
RecvType::Leave(peer) => {
// check device leave.
//if let Ok(id) = account.offline(&peer) {
//results.rpcs.push(device_rpc::device_offline(peer.id, id));
//}
}
RecvType::Result(peer, is_ok, data) => {
if is_ok {
//self.hanlde_connect(&mut results, peer, data, false)?;
if let Ok(id) = global.own.write().await.offline(&peer.id) {
results.rpcs.push(device_rpc::device_offline(id));
}
}
RecvType::ResultConnect(peer, data) => {
//self.hanlde_connect(&mut results, peer, data, true)?;
}
RecvType::Event(addr, bytes) => {
//let event: OwnEvent = bincode::deserialize(&bytes)?;
//return OwnEvent::handle(self, event, pid, addr, uid).await;
RecvType::Event(aid, bytes) => {
let event: OwnEvent = bincode::deserialize(&bytes)?;
return OwnEvent::handle(aid, event, global).await;
}
RecvType::Stream(_uid, _stream, _bytes) => {
todo!();
// TODO stream
}
RecvType::Delivery(_t, _tid, _is_ok) => {}
_ => {
warn!("own message nerver here!");
}
}
Ok(results)
}
// fn hanlde_connect(
// &mut self,
// results: &mut HandleResult,
// peer: Peer,
// data: Vec<u8>,
// is_connect: bool,
// ) -> Result<()> {
// let connect = bincode::deserialize(&data)?;
// let pid = peer.id;
// let (remote_height, remote_event, others) = match connect {
// OwnConnect::Create(
// remote,
// remote_height,
// remote_event,
// device_name,
// device_info,
// others,
// ) => {
// // check remote addr is receive addr.
// if remote.addr != pid {
// return Err(anyhow!("Address is invalid."));
// }
// if is_connect {
// results
// .groups
// .push((pid, self.agree_message(peer.clone())?));
// }
// // first init sync.
// if remote.avatar.len() > 0 {
// let account_db = self.account_db()?;
// if let Some(u) = self.accounts.get_mut(pid) {
// if u.avatar.len() == 0 {
// u.name = remote.name;
// u.avatar = remote.avatar;
// u.update(&account_db)?;
// account_db.close()?;
// results.rpcs.push(rpc::account_update(
// *pid,
// &u.name,
// base64::encode(&u.avatar),
// ));
// }
// }
// }
// let db = self.consensus_db(pid)?;
// let running = self.runnings.get_mut(pid).unwrap(); // safe unwrap. checked.
// let mut new_addrs = vec![];
// for a in others {
// if a != peer_id && a != self.addr && !running.distributes.contains_key(&a) {
// new_addrs.push(a);
// }
// }
// if let Some(v) = running.distributes.get_mut(&peer_id) {
// v.2 = true;
// results.rpcs.push(device_rpc::device_online(*pid, v.1));
// (remote_height, remote_event, new_addrs)
// } else {
// let mut device = Device::new(device_name, device_info, peer_id);
// device.insert(&db)?;
// db.close()?;
// running
// .distributes
// .insert(peer_id, (addr.clone(), device.id, true));
// results.rpcs.push(device_rpc::device_create(*pid, &device));
// results
// .rpcs
// .push(device_rpc::device_online(*pid, device.id));
// (remote_height, remote_event, new_addrs)
// }
// }
// OwnConnect::Connect(remote_height, remote_event) => {
// if self
// .runnings
// .get(pid)
// .unwrap() // safe, checked
// .distributes
// .contains_key(&peer_id)
// {
// if is_connect {
// results.groups.push((*pid, self.connect_result(pid, addr)?));
// }
// } else {
// if is_connect {
// results.groups.push((*pid, self.create_message(pid, addr)?));
// }
// return Ok(());
// }
// let v = self.running_mut(pid)?;
// let did = v.add_online(&peer_id)?;
// results.rpcs.push(device_rpc::device_online(*pid, did));
// (remote_height, remote_event, vec![])
// }
// };
// let account = self.account(pid)?;
// if account.own_height != remote_height || account.event != remote_event {
// results.groups.push((
// *pid,
// self.sync_message(pid, peer_id, 1, account.own_height)?,
// ));
// }
// // connect to others.
// for addr in others {
// results
// .groups
// .push((*pid, self.create_message(pid, Peer::peer(addr))?));
// }
// Ok(())
// }
// }
impl Own {
pub fn init(accounts: HashMap<PeerId, Account>) -> Own {
Own {
@ -248,25 +133,38 @@ impl Own { @@ -248,25 +133,38 @@ impl Own {
Ok(self.account(pid)?.plainkey())
}
// pub fn online(&mut self, peer: &Peer) -> Result<i64> {
// for i in self.distributes.iter_mut() {
// if &i.0 == peer {
// i.2 = true;
// return Ok(i.1);
// }
// }
// Err(anyhow!("missing distribute device"))
// }
// pub fn offline(&mut self, peer: &Peer) -> Result<i64> {
// for i in self.distributes.iter_mut() {
// if &i.0 == peer {
// i.2 = false;
// return Ok(i.1);
// }
// }
// Err(anyhow!("missing distribute device"))
// }
pub fn online(&mut self, aid: &PeerId) -> Result<i64> {
for device in self.distributes.iter_mut() {
if &device.assist == aid {
device.online = true;
return Ok(device.id);
}
}
Err(anyhow!("missing distribute device"))
}
pub fn offline(&mut self, aid: &PeerId) -> Result<i64> {
for device in self.distributes.iter_mut() {
if &device.assist == aid {
device.online = false;
return Ok(device.id);
}
}
Err(anyhow!("missing distribute device"))
}
pub fn device_id(&self, aid: &PeerId) -> Result<i64> {
for device in self.distributes.iter() {
if &device.assist == aid {
return Ok(device.id);
}
}
Err(anyhow!("missing distribute device"))
}
pub fn add_device(&mut self, device: Device) {
self.distributes.push(device);
}
pub fn check_lock(&self, pid: &PeerId, lock: &str) -> bool {
if let Some(account) = self.accounts.get(pid) {
@ -292,116 +190,6 @@ impl Own { @@ -292,116 +190,6 @@ impl Own {
}
}
// pub fn running(&self, pid: &PeerId) -> Result<&RunningAccount> {
// if let Some(running) = self.runnings.get(pid) {
// Ok(running)
// } else {
// Err(anyhow!("user missing"))
// }
// }
// pub fn running_mut(&mut self, pid: &PeerId) -> Result<&mut RunningAccount> {
// if let Some(running) = self.runnings.get_mut(pid) {
// Ok(running)
// } else {
// Err(anyhow!("user missing"))
// }
// }
// pub fn prove_addr(&self, mpid: &PeerId, raddr: &PeerId) -> Result<Proof> {
// let running = self.running(mpid)?;
// Ok(Proof::prove(&running.keypair, &self.addr, raddr))
// }
// pub fn uptime(&self, pid: &PeerId) -> Result<u32> {
// self.running(pid).map(|v| v.uptime)
// }
// pub fn list_running_user(&self) -> Vec<PeerId> {
// self.runnings.keys().map(|d| *d).collect()
// }
// pub fn distribute_conns(&self, pid: &PeerId) -> Vec<SendType> {
// let mut vecs = vec![];
// if let Some(running) = &self.runnings.get(pid) {
// for (addr, (peer, _, _)) in &running.distributes {
// if addr != &self.addr {
// if let Ok(s) = self.connect_message(pid, peer.clone()) {
// vecs.push(s);
// }
// }
// }
// }
// vecs
// }
// pub fn all_distribute_conns(&self) -> HashMap<PeerId, Vec<SendType>> {
// let mut conns = HashMap::new();
// for (mpid, running) in &self.runnings {
// let mut vecs = vec![];
// for (addr, (peer, _, _)) in &running.distributes {
// if addr != &self.addr {
// if let Ok(s) = self.connect_message(mpid, peer.clone()) {
// vecs.push(s);
// }
// }
// }
// conns.insert(*mpid, vecs);
// }
// conns
// }
// pub fn online_devices(&self, pid: &PeerId, mut devices: Vec<Device>) -> Vec<Device> {
// if let Some(running) = self.runnings.get(pid) {
// for (addr, (_peer, _id, online)) in &running.distributes {
// if *online {
// for device in devices.iter_mut() {
// if device.addr == *addr {
// device.online = true;
// }
// }
// }
// }
// }
// devices
// }
// pub fn remove_all_running(&mut self) -> HashMap<PeerId, ()> {
// let mut addrs: HashMap<PeerId, ()> = HashMap::new();
// for (_, running) in self.runnings.drain() {
// for (addr, (_peer, _id, online)) in running.distributes {
// if addr != self.addr && online {
// addrs.insert(addr, ());
// }
// }
// }
// addrs
// }
// pub fn remove_running(&mut self, pid: &PeerId) -> HashMap<PeerId, ()> {
// // check close the stable connection.
// let mut addrs: HashMap<PeerId, ()> = HashMap::new();
// if let Some(running) = self.runnings.remove(pid) {
// for (addr, (_peer, _id, online)) in running.distributes {
// if addr != self.addr && online {
// addrs.insert(addr, ());
// }
// }
// // check if other stable connection.
// for other_running in self.runnings.values() {
// for (addr, (_peer, _id, online)) in &other_running.distributes {
// if *online && addrs.contains_key(addr) {
// addrs.remove(addr);
// }
// }
// }
// }
// addrs
// }
/// reset group info when change account.
pub fn reset(
&mut self,
@ -410,10 +198,10 @@ impl Own { @@ -410,10 +198,10 @@ impl Own {
base: &PathBuf,
secret: &[u8],
) -> Result<(u64, u64)> {
let (keypair, pheight, oheight, key) = if let Some(u) = self.accounts.get_mut(pid) {
let (keypair, pheight, oheight) = if let Some(u) = self.accounts.get_mut(pid) {
let keypair = u.secret(secret, lock)?;
u.cache_plainkey(secret, lock)?;
(keypair, u.pub_height, u.own_height, u.plainkey())
(keypair, u.pub_height, u.own_height)
} else {
return Err(anyhow!("user missing."));
};
@ -464,7 +252,7 @@ impl Own { @@ -464,7 +252,7 @@ impl Own {
secret: &[u8],
) -> Result<(i64, PeerId)> {
let account_index = self.accounts.len() as u32;
let (mut account, sk, mut wallet) = Account::generate(
let (mut account, _sk, mut wallet) = Account::generate(
account_index,
secret,
lang,
@ -486,7 +274,7 @@ impl Own { @@ -486,7 +274,7 @@ impl Own {
account.insert(&account_db)?;
account_db.close()?;
let account_did = account.id;
let key = account.plainkey();
let _key = account.plainkey();
let _ = write_avatar(base, &account_id, &account_id, &account.avatar).await;
self.accounts.insert(account.pid, account);
@ -496,7 +284,9 @@ impl Own { @@ -496,7 +284,9 @@ impl Own {
wallet_db.close()?;
let (device_name, device_info) = device_info();
let mut device = Device::new(device_name, device_info, Peer::peer(account_id));
let mut device = Device::new(Peer::peer(account_id));
device.name = device_name;
device.info = device_info;
let device_db = consensus_db(base, &account_id, &db_key)?;
device.insert(&device_db)?;
device_db.close()?;
@ -543,332 +333,172 @@ impl Own { @@ -543,332 +333,172 @@ impl Own {
account_db.close()
}
pub fn device(&self) -> Result<&Device> {
pub fn current_device(&self) -> Result<(i64, String, String)> {
if self.distributes.len() > 0 {
Ok(&self.distributes[0])
Ok((
self.distributes[0].id.clone(),
self.distributes[0].name.clone(),
self.distributes[0].info.clone(),
))
} else {
Err(anyhow!("no devices"))
}
}
// pub fn create_message(&self, pid: &PeerId, addr: Peer) -> Result<SendType> {
// let user = self.clone_user(pid)?;
// let account = self.account(pid)?;
// let height = account.own_height;
// let event = account.event;
// let proof = self.prove_addr(pid, &addr.id)?;
// let running = self.running(pid)?;
// Ok(SendType::Connect(
// 0,
// addr,
// bincode::serialize(&OwnConnect::Create(
// proof,
// user,
// height,
// event,
// running.device_name.clone(),
// running.device_info.clone(),
// running.distributes.keys().cloned().collect(),
// ))
// .unwrap_or(vec![]),
// ))
// }
// pub fn connect_message(&self, pid: &PeerId, addr: Peer) -> Result<SendType> {
// let account = self.account(pid)?;
// let height = account.own_height;
// let event = account.event;
// let data = bincode::serialize(&OwnConnect::Connect(height, event)).unwrap_or(vec![]);
// Ok(SendType::Connect(0, addr, data))
// }
// pub fn connect_result(&self, pid: &PeerId, addr: Peer) -> Result<SendType> {
// let account = self.account(pid)?;
// let height = account.own_height;
// let event = account.event;
// let data = bincode::serialize(&OwnConnect::Connect(height, event)).unwrap_or(vec![]);
// Ok(SendType::Result(0, addr, true, false, data))
// }
// pub fn agree_message(&self, pid: &PeerId, addr: Peer) -> Result<SendType> {
// let account = self.account(pid)?;
// let height = account.own_height;
// let event = account.event;
// let me = self.clone_user(pid)?;
// let proof = self.prove_addr(pid, &addr.id)?;
// let running = self.running(pid)?;
// Ok(SendType::Result(
// 0,
// addr,
// true,
// false,
// bincode::serialize(&OwnConnect::Create(
// proof,
// me,
// height,
// event,
// running.device_name.clone(),
// running.device_info.clone(),
// running.distributes.keys().cloned().collect(),
// ))
// .unwrap_or(vec![]),
// ))
// }
// fn ancestor(from: u64, to: u64) -> (Vec<u64>, bool) {
// let space = to - from;
// let step = space / 8;
// if step == 0 {
// ((from..to + 1).map(|i| i).collect(), true)
// } else {
// let mut vec: Vec<u64> = (1..8).map(|i| step * i + from).collect();
// vec.push(to);
// (vec, false)
// }
// }
// pub fn sync_message(&self, pid: &PeerId, addr: PeerId, from: u64, to: u64) -> Result<SendType> {
// let (ancestors, hashes, is_min) = if to >= from {
// let (ancestors, is_min) = Self::ancestor(from, to);
// let db = self.consensus_db(pid)?;
// let hashes = crate::consensus::Event::get_assign_hash(&db, &ancestors)?;
// db.close()?;
// (ancestors, hashes, is_min)
// } else {
// (vec![], vec![], true)
// };
// let event = OwnEvent::SyncCheck(ancestors, hashes, is_min);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// Ok(SendType::Event(0, addr, data))
// }
// pub fn event_message(&self, addr: PeerId, event: &OwnEvent) -> Result<SendType> {
// let data = bincode::serialize(event).unwrap_or(vec![]);
// Ok(SendType::Event(0, addr, data))
// }
// pub fn broadcast(
// &mut self,
// pid: &PeerId,
// event: InnerEvent,
// path: i64,
// row: i64,
// results: &mut HandleResult,
// ) -> Result<()> {
// let db = self.consensus_db(pid)?;
// let account_db = self.account_db()?;
// let account = self.account_mut(pid)?;
// let pre_event = account.event;
// let eheight = account.own_height + 1;
// let eid = event.generate_event_id();
// Event::merge(&db, eid, path, row, eheight)?;
// drop(db);
// account.update_consensus(&account_db, eheight, eid)?;
// account_db.close()?;
// drop(account);
// let e = OwnEvent::Event(eheight, eid, pre_event, event);
// let data = bincode::serialize(&e).unwrap_or(vec![]);
// let running = self.running(pid)?;
// for (addr, (_peer, _id, online)) in &running.distributes {
// if *online {
// let msg = SendType::Event(0, *addr, data.clone());
// results.groups.push((*pid, msg))
// }
// }
// Ok(())
// }
// pub fn _status(
// &mut self,
// pid: &PeerId,
// event: StatusEvent,
// results: &mut HandleResult,
// ) -> Result<()> {
// let running = self.running(pid)?;
// let data = bincode::serialize(&OwnEvent::Status(event)).unwrap_or(vec![]);
// for (addr, (_peer, _id, online)) in &running.distributes {
// if *online {
// let msg = SendType::Event(0, *addr, data.clone());
// results.groups.push((*pid, msg))
// }
// }
// Ok(())
// }
}
// impl OwnEvent {
// pub async fn handle(
// group: &mut Own,
// event: OwnEvent,
// pid: PeerId,
// addr: PeerId,
// //layer: &Arc<RwLock<Layer>>,
// uid: u64,
// ) -> Result<HandleResult> {
// let mut results = HandleResult::new();
// match event {
// OwnEvent::DeviceUpdate(_at, _name) => {
// // TODO
// }
// OwnEvent::DeviceDelete(_at) => {
// // TODO
// }
// OwnEvent::DeviceOffline => {
// let v = group.running_mut(&pid)?;
// let did = v.offline(&addr)?;
// results.rpcs.push(device_rpc::device_offline(pid, did));
// }
// OwnEvent::StatusRequest => {
// let (cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p) =
// local_device_status();
// results.groups.push((
// pid,
// SendType::Event(
// 0,
// addr,
// bincode::serialize(&OwnEvent::StatusResponse(
// cpu_n,
// mem_s,
// swap_s,
// disk_s,
// cpu_p,
// mem_p,
// swap_p,
// disk_p,
// group.uptime(&pid)?,
// ))
// .unwrap_or(vec![]),
// ),
// ))
// }
// OwnEvent::StatusResponse(
// cpu_n,
// mem_s,
// swap_s,
// disk_s,
// cpu_p,
// mem_p,
// swap_p,
// disk_p,
// uptime,
// ) => results.rpcs.push(device_rpc::device_status(
// pid, cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p, uptime,
// )),
// OwnEvent::Event(eheight, eid, pre) => {
// //inner_event.handle(group, pid, addr, eheight, eid, pre, &mut results, layer)?;
// }
// OwnEvent::Status => {
// //status_event.handle(group, pid, addr, &mut results, layer, uid)?;
// }
// OwnEvent::SyncCheck(ancestors, hashes, is_min) => {
// println!("sync check: {:?}", ancestors);
// let account = group.account(&pid)?;
// if ancestors.len() == 0 || hashes.len() == 0 {
// return Ok(results);
// }
// // remote is new need it handle.
// if hashes[0] == EventId::default() {
// return Ok(results);
// }
// let remote_height = ancestors.last().map(|v| *v).unwrap_or(0);
// let remote_event = hashes.last().map(|v| *v).unwrap_or(EventId::default());
// if account.own_height != remote_height || account.event != remote_event {
// // check ancestor and merge.
// let db = group.consensus_db(&pid)?;
// let ours = vec![];
// //let ours = crate::consensus::Event::get_assign_hash(&db, &ancestors)?;
// drop(db);
// if ours.len() == 0 {
// let event = OwnEvent::SyncRequest(1, remote_height);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// results.groups.push((pid, SendType::Event(0, addr, data)));
// return Ok(results);
// }
// let mut ancestor = 0u64;
// for i in 0..ancestors.len() {
// if hashes[i] != ours[i] {
// if i == 0 {
// ancestor = ancestors[0];
// break;
// }
// if ancestors[i - 1] == ancestors[i] + 1 {
// ancestor = ancestors[i - 1];
// } else {
// if is_min {
// ancestor = ancestors[i - 1];
// } else {
// results.groups.push((
// pid,
// group.sync_message(
// &pid,
// addr,
// ancestors[i - 1],
// ancestors[i],
// )?,
// ));
// return Ok(results);
// }
// }
// break;
// }
// }
// if ancestor != 0 {
// let event = OwnEvent::SyncRequest(ancestor, remote_height);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// results.groups.push((pid, SendType::Event(0, addr, data)));
// } else {
// results.groups.push((
// pid,
// group.sync_message(&pid, addr, remote_height, account.own_height)?,
// ));
// }
// }
// }
// OwnEvent::SyncRequest(from, to) => {
// println!("====== DEBUG Sync Request: from: {} to {}", from, to);
// // every time sync MAX is 100.
// let last_to = if to - from > 100 { to - 100 } else { to };
// // let sync_events = SyncEvent::sync(
// // &group,
// // &group.base,
// // &pid,
// // group.account(&pid)?,
// // from,
// // last_to,
// // )
// // .await?;
// let event = OwnEvent::SyncResponse(from, last_to, to);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// results.groups.push((pid, SendType::Event(0, addr, data)));
// }
// OwnEvent::SyncResponse(from, last_to, to) => {
// println!(
// "====== DEBUG Sync Response: from: {} last {}, to {}",
// from, last_to, to
// );
// if last_to < to {
// let event = OwnEvent::SyncRequest(last_to + 1, to);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// results.groups.push((pid, SendType::Event(0, addr, data)));
// }
// //SyncEvent::handle(pid, from, last_to, events, group, layer, &mut results, addr)?;
// }
// }
// Ok(results)
// }
// }
impl OwnEvent {
pub async fn handle(
aid: PeerId,
event: OwnEvent,
global: &Arc<Global>,
) -> Result<HandleResult> {
let pid = global.pid().await;
let mut results = HandleResult::new();
match event {
OwnEvent::Info(name, info) => {
let id = global.own.read().await.device_id(&aid)?;
let db_key = global.own.read().await.db_key(&pid)?;
let db = consensus_db(&global.base, &pid, &db_key)?;
Device::update(&db, id, &name, &info)?;
}
OwnEvent::DeviceUpdate(_aid, _name) => {
// TODO
}
OwnEvent::DeviceDelete(_aid) => {
// TODO
}
OwnEvent::StatusRequest => {
let uptime = global.own.read().await.uptime;
let (cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p) =
local_device_status();
let event = OwnEvent::StatusResponse(
cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p, uptime,
);
results
.owns
.push(SendType::Event(0, aid, bincode::serialize(&event)?))
}
OwnEvent::StatusResponse(
cpu_n,
mem_s,
swap_s,
disk_s,
cpu_p,
mem_p,
swap_p,
disk_p,
uptime,
) => {
let id = global.own.read().await.device_id(&aid)?;
results.rpcs.push(device_rpc::device_status(
id, cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p, uptime,
));
}
OwnEvent::Event(_eheight, _eid, _pre) => {
//inner_event.handle(group, pid, addr, eheight, eid, pre, &mut results, layer)?;
}
OwnEvent::SyncCheck(_ancestors, _hashes, _is_min) => {
// println!("sync check: {:?}", ancestors);
// let account = group.account(&pid)?;
// if ancestors.len() == 0 || hashes.len() == 0 {
// return Ok(results);
// }
// // remote is new need it handle.
// if hashes[0] == EventId::default() {
// return Ok(results);
// }
// let remote_height = ancestors.last().map(|v| *v).unwrap_or(0);
// let remote_event = hashes.last().map(|v| *v).unwrap_or(EventId::default());
// if account.own_height != remote_height || account.event != remote_event {
// // check ancestor and merge.
// let db = group.consensus_db(&pid)?;
// let ours = vec![];
// //let ours = crate::consensus::Event::get_assign_hash(&db, &ancestors)?;
// drop(db);
// if ours.len() == 0 {
// let event = OwnEvent::SyncRequest(1, remote_height);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// results.groups.push((pid, SendType::Event(0, addr, data)));
// return Ok(results);
// }
// let mut ancestor = 0u64;
// for i in 0..ancestors.len() {
// if hashes[i] != ours[i] {
// if i == 0 {
// ancestor = ancestors[0];
// break;
// }
// if ancestors[i - 1] == ancestors[i] + 1 {
// ancestor = ancestors[i - 1];
// } else {
// if is_min {
// ancestor = ancestors[i - 1];
// } else {
// results.groups.push((
// pid,
// group.sync_message(
// &pid,
// addr,
// ancestors[i - 1],
// ancestors[i],
// )?,
// ));
// return Ok(results);
// }
// }
// break;
// }
// }
// if ancestor != 0 {
// let event = OwnEvent::SyncRequest(ancestor, remote_height);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// results.groups.push((pid, SendType::Event(0, addr, data)));
// } else {
// results.groups.push((
// pid,
// group.sync_message(&pid, addr, remote_height, account.own_height)?,
// ));
// }
// }
}
OwnEvent::SyncRequest(_from, _to) => {
//println!("====== DEBUG Sync Request: from: {} to {}", from, to);
// every time sync MAX is 100.
//let last_to = if to - from > 100 { to - 100 } else { to };
// let sync_events = SyncEvent::sync(
// &group,
// &group.base,
// &pid,
// group.account(&pid)?,
// from,
// last_to,
// )
// .await?;
// let event = OwnEvent::SyncResponse(from, last_to, to);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// results.groups.push((pid, SendType::Event(0, addr, data)));
}
OwnEvent::SyncResponse(_from, _last_to, _to) => {
// println!(
// "====== DEBUG Sync Response: from: {} last {}, to {}",
// from, last_to, to
// );
// if last_to < to {
// let event = OwnEvent::SyncRequest(last_to + 1, to);
// let data = bincode::serialize(&event).unwrap_or(vec![]);
// results.groups.push((pid, SendType::Event(0, addr, data)));
// }
//SyncEvent::handle(pid, from, last_to, events, group, layer, &mut results, addr)?;
}
}
Ok(results)
}
}

4
src/rpc.rs

@ -57,7 +57,7 @@ pub(crate) fn network_dht(peers: Vec<PeerId>) -> RpcParam { @@ -57,7 +57,7 @@ pub(crate) fn network_dht(peers: Vec<PeerId>) -> RpcParam {
}
#[inline]
pub(crate) fn account_update(pid: &PeerId, name: &str, avatar: String) -> RpcParam {
pub(crate) fn _account_update(pid: &PeerId, name: &str, avatar: String) -> RpcParam {
rpc_response(0, "account-update", json!([id_to_str(pid), name, avatar]))
}
@ -82,7 +82,7 @@ pub(crate) fn session_update_name(id: &i64, name: &str) -> RpcParam { @@ -82,7 +82,7 @@ pub(crate) fn session_update_name(id: &i64, name: &str) -> RpcParam {
}
#[inline]
pub(crate) fn session_update(id: &i64, addr: &PeerId, name: &str, is_top: bool) -> RpcParam {
pub(crate) fn _session_update(id: &i64, addr: &PeerId, name: &str, is_top: bool) -> RpcParam {
rpc_response(
0,
"session-update",

2
src/server.rs

@ -10,7 +10,6 @@ use tdn::{ @@ -10,7 +10,6 @@ use tdn::{
},
};
use tdn_storage::local::DStorage;
use tokio::{sync::mpsc::Sender, sync::RwLock};
use crate::account::Account;
use crate::apps::app_layer_handle;
@ -22,7 +21,6 @@ use crate::primitives::network_seeds; @@ -22,7 +21,6 @@ use crate::primitives::network_seeds;
use crate::rpc::{init_rpc, inner_rpc, session_lost};
pub const DEFAULT_WS_ADDR: &'static str = "127.0.0.1:7366";
pub const DEFAULT_LOG_FILE: &'static str = "esse.log.txt";
pub static RPC_WS_UID: OnceCell<u64> = OnceCell::new();

4
src/storage.rs

@ -209,7 +209,7 @@ pub(crate) async fn read_avatar(base: &PathBuf, pid: &PeerId, remote: &PeerId) - @@ -209,7 +209,7 @@ pub(crate) async fn read_avatar(base: &PathBuf, pid: &PeerId, remote: &PeerId) -
}
}
pub(crate) fn read_avatar_sync(base: &PathBuf, pid: &PeerId, remote: &PeerId) -> Result<Vec<u8>> {
pub(crate) fn _read_avatar_sync(base: &PathBuf, pid: &PeerId, remote: &PeerId) -> Result<Vec<u8>> {
let mut path = base.clone();
path.push(id_to_str(pid));
path.push(AVATAR_DIR);
@ -266,7 +266,7 @@ pub(crate) async fn delete_avatar(base: &PathBuf, pid: &PeerId, remote: &PeerId) @@ -266,7 +266,7 @@ pub(crate) async fn delete_avatar(base: &PathBuf, pid: &PeerId, remote: &PeerId)
}
}
pub(crate) fn delete_avatar_sync(base: &PathBuf, pid: &PeerId, remote: &PeerId) -> Result<()> {
pub(crate) fn _delete_avatar_sync(base: &PathBuf, pid: &PeerId, remote: &PeerId) -> Result<()> {
let mut path = base.clone();
path.push(id_to_str(pid));
path.push(AVATAR_DIR);

3
src/utils/crypto.rs

@ -7,7 +7,6 @@ use argon2::{ @@ -7,7 +7,6 @@ use argon2::{
Argon2,
};
use sha2::{Digest, Sha256};
use tdn::types::primitives::{PeerId, PeerKey};
const FIX_PADDING: [u8; 19] = [
69, 83, 83, 69, 70, 111, 114, 68, 97, 116, 97, 83, 101, 99, 117, 114, 105, 116, 121,
@ -198,6 +197,6 @@ pub fn _decrypt_multiple( @@ -198,6 +197,6 @@ pub fn _decrypt_multiple(
/// Compute the session key in the cloud.
#[inline]
pub fn cloud_key(key: &[u8; 32]) -> Aes256Gcm {
pub fn _cloud_key(key: &[u8; 32]) -> Aes256Gcm {
Aes256Gcm::new(GenericArray::from_slice(key))
}

Loading…
Cancel
Save