Browse Source

add start/stop network

pull/18/head
Sun 4 years ago
parent
commit
e79a15bc74
  1. 2
      .cargo/config.toml
  2. 1
      Cargo.toml
  3. 2
      src/daemon.rs
  4. 61
      src/global.rs
  5. 5
      src/group.rs
  6. 66
      src/rpc.rs
  7. 101
      src/server.rs

2
.cargo/config.toml

@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]

1
Cargo.toml

@ -60,6 +60,7 @@ dao_types = { version = "0.1", path = "./types/dao" }
data = { version = "0.1", path = "./types/data" } data = { version = "0.1", path = "./types/data" }
openssl = { version = "0.10", features = ["vendored"] } # Add for cross-compile. openssl = { version = "0.10", features = ["vendored"] } # Add for cross-compile.
console-subscriber = "0.1" # only use in bin daemon.
[target.'cfg(target_os="android")'.dependencies] [target.'cfg(target_os="android")'.dependencies]
jni = { version = "0.19", default-features = false } jni = { version = "0.19", default-features = false }

2
src/daemon.rs

@ -23,6 +23,8 @@ mod utils;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
console_subscriber::init();
let db_path = args().nth(1).unwrap_or("./.tdn".to_owned()); let db_path = args().nth(1).unwrap_or("./.tdn".to_owned());
if std::fs::metadata(&db_path).is_err() { if std::fs::metadata(&db_path).is_err() {

61
src/global.rs

@ -1,7 +1,9 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use tdn::{
use tdn::prelude::{GroupId, PeerId, SendMessage}; prelude::{GroupId, P2pConfig, PeerId, PeerKey, ReceiveMessage, SendMessage},
types::message::RpcSendMessage,
};
use tokio::{sync::mpsc::Sender, sync::RwLock}; use tokio::{sync::mpsc::Sender, sync::RwLock};
use crate::account::Account; use crate::account::Account;
@ -20,32 +22,48 @@ pub(crate) struct Global {
pub group: RwLock<Group>, pub group: RwLock<Group>,
/// current layer. /// current layer.
pub layer: RwLock<Layer>, pub layer: RwLock<Layer>,
/// TDN network sender.
pub sender: RwLock<Sender<SendMessage>>,
/// message delivery tracking. uuid, me_gid, db_id. /// message delivery tracking. uuid, me_gid, db_id.
pub _delivery: RwLock<HashMap<u64, (GroupId, i64)>>, pub _delivery: RwLock<HashMap<u64, (GroupId, i64)>>,
/// storage base path. /// storage base path.
pub base: PathBuf, pub base: PathBuf,
/// random secret seed. /// random secret seed.
pub secret: [u8; 32], pub secret: [u8; 32],
/// supported layers.
pub gids: Vec<GroupId>,
/// inner network params.
pub p2p_config: P2pConfig,
/// inner services channel sender.
pub self_send: Sender<ReceiveMessage>,
/// inner p2p network sender.
pub p2p_send: RwLock<Option<Sender<SendMessage>>>,
/// inner rpc channel sender.
pub rpc_send: Sender<RpcSendMessage>,
} }
impl Global { impl Global {
pub fn init( pub fn init(
accounts: HashMap<PeerId, Account>, accounts: HashMap<PeerId, Account>,
tdn_send: Sender<SendMessage>,
base: PathBuf, base: PathBuf,
secret: [u8; 32], secret: [u8; 32],
p2p_config: P2pConfig,
self_send: Sender<ReceiveMessage>,
rpc_send: Sender<RpcSendMessage>,
) -> Self { ) -> Self {
let gids = vec![];
Global { Global {
base, base,
secret, secret,
p2p_config,
self_send,
rpc_send,
gids,
peer_id: RwLock::new(PeerId::default()), peer_id: RwLock::new(PeerId::default()),
peer_pub_height: RwLock::new(0), peer_pub_height: RwLock::new(0),
peer_own_height: RwLock::new(0), peer_own_height: RwLock::new(0),
group: RwLock::new(Group::init(accounts)), group: RwLock::new(Group::init(accounts)),
layer: RwLock::new(Layer::init()), layer: RwLock::new(Layer::init()),
sender: RwLock::new(tdn_send), p2p_send: RwLock::new(None),
_delivery: RwLock::new(HashMap::new()), _delivery: RwLock::new(HashMap::new()),
} }
} }
@ -54,13 +72,20 @@ impl Global {
self.peer_id.read().await.clone() self.peer_id.read().await.clone()
} }
pub async fn send(&self, msg: SendMessage) -> anyhow::Result<()> { pub async fn sender(&self) -> anyhow::Result<Sender<SendMessage>> {
self.sender self.p2p_send
.read() .read()
.await .await
.send(msg) .clone()
.await .ok_or(anyhow!("network lost!"))
.map_err(|_e| anyhow!("network lost!")) }
pub async fn send(&self, msg: SendMessage) -> anyhow::Result<()> {
if let Some(sender) = &*self.p2p_send.read().await {
Ok(sender.send(msg).await?)
} else {
Err(anyhow!("network lost!"))
}
} }
pub async fn clear(&self) { pub async fn clear(&self) {
@ -68,9 +93,14 @@ impl Global {
self.layer.write().await.clear(); self.layer.write().await.clear();
} }
pub async fn reset(&self, pid: &PeerId, lock: &str) -> anyhow::Result<bool> { pub async fn reset(
&self,
pid: &PeerId,
lock: &str,
send: Sender<SendMessage>,
) -> anyhow::Result<bool> {
if *self.peer_id.read().await == *pid { if *self.peer_id.read().await == *pid {
return Ok(false); return Ok(true);
} }
let (pheight, oheight) = let (pheight, oheight) =
@ -80,13 +110,12 @@ impl Global {
.reset(pid, lock, &self.base, &self.secret)?; .reset(pid, lock, &self.base, &self.secret)?;
self.layer.write().await.clear(); self.layer.write().await.clear();
*self.p2p_send.write().await = Some(send);
*self.peer_id.write().await = *pid; *self.peer_id.write().await = *pid;
*self.peer_pub_height.write().await = pheight; *self.peer_pub_height.write().await = pheight;
*self.peer_own_height.write().await = oheight; *self.peer_own_height.write().await = oheight;
self._delivery.write().await.clear(); self._delivery.write().await.clear();
// TODO change sender. Ok(false)
Ok(true)
} }
} }

5
src/group.rs

@ -245,6 +245,11 @@ impl Group {
} }
} }
pub fn keypair(&self) -> PeerKey {
let bytes = self.keypair.to_db_bytes();
PeerKey::from_db_bytes(&bytes).unwrap()
}
pub fn db_key(&self, pid: &PeerId) -> Result<String> { pub fn db_key(&self, pid: &PeerId) -> Result<String> {
Ok(self.account(pid)?.plainkey()) Ok(self.account(pid)?.plainkey())
} }

66
src/rpc.rs

@ -2,11 +2,14 @@ use esse_primitives::{id_from_str, id_to_str};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tdn::types::{ use tdn::{
group::GroupId, prelude::{new_send_channel, start_main},
message::{NetworkType, SendMessage, SendType, StateRequest, StateResponse}, types::{
primitives::{HandleResult, Peer, PeerId, Result}, group::GroupId,
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, message::{NetworkType, SendMessage, SendType, StateRequest, StateResponse},
primitives::{HandleResult, Peer, PeerId, Result},
rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam},
},
}; };
use tdn_did::{generate_mnemonic, Count}; use tdn_did::{generate_mnemonic, Count};
use tokio::sync::{ use tokio::sync::{
@ -167,12 +170,8 @@ pub(crate) async fn inner_rpc(uid: u64, method: &str, global: &Arc<Global>) -> R
let (s, mut r) = mpsc::channel::<StateResponse>(128); let (s, mut r) = mpsc::channel::<StateResponse>(128);
let _ = global let _ = global
.sender
.read()
.await
.send(SendMessage::Network(NetworkType::NetworkState(req, s))) .send(SendMessage::Network(NetworkType::NetworkState(req, s)))
.await .await?;
.expect("TDN channel closed");
let param = match r.recv().await { let param = match r.recv().await {
Some(StateResponse::Stable(peers)) => network_stable(peers), Some(StateResponse::Stable(peers)) => network_stable(peers),
@ -182,14 +181,7 @@ pub(crate) async fn inner_rpc(uid: u64, method: &str, global: &Arc<Global>) -> R
} }
}; };
global global.send(SendMessage::Rpc(uid, param, false)).await?;
.sender
.read()
.await
.send(SendMessage::Rpc(uid, param, false))
.await
.expect("TDN channel closed");
return Ok(()); return Ok(());
} }
@ -385,7 +377,8 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
let mut results = HandleResult::rpc(json!([id_to_str(&pid)])); let mut results = HandleResult::rpc(json!([id_to_str(&pid)]));
let running = state.reset(&pid, me_lock).await?; let (tdn_send, tdn_recv) = new_send_channel();
let running = state.reset(&pid, me_lock, tdn_send).await?;
if running { if running {
return Ok(results); return Ok(results);
} }
@ -416,7 +409,18 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
// } // }
// drop(layer_lock); // drop(layer_lock);
debug!("Account Logined: {}.", id_to_str(&pid)); let key = state.group.read().await.keypair();
let peer_id = start_main(
state.gids.clone(),
state.p2p_config.clone(),
state.self_send.clone(),
tdn_recv,
None,
Some(key),
)
.await?;
debug!("Account Logined: {}.", id_to_str(&peer_id));
Ok(results) Ok(results)
}, },
@ -426,29 +430,9 @@ fn new_rpc_handler(global: Arc<Global>) -> RpcHandler<Global> {
"account-logout", "account-logout",
|_params: Vec<RpcParam>, state: Arc<Global>| async move { |_params: Vec<RpcParam>, state: Arc<Global>| async move {
let mut results = HandleResult::new(); let mut results = HandleResult::new();
results.networks.push(NetworkType::NetworkStop);
// TODO broadcast to inner-group.
let group_lock = state.group.read().await;
drop(group_lock);
// TODO broadcast to layers.
let layer_lock = state.layer.read().await;
for (gid, sessions) in layer_lock.sessions.iter() {
for (pid, _) in sessions {
// send a event that is offline.
let data = bincode::serialize(&LayerEvent::Offline(*gid))?;
let msg = SendType::Event(0, *pid, data);
results.layers.push((*gid, msg));
}
}
drop(layer_lock);
debug!("Account Offline: {}.", id_to_str(&state.pid().await)); debug!("Account Offline: {}.", id_to_str(&state.pid().await));
//let sender = state.sender.read().await.clone();
state.clear().await; state.clear().await;
//tokio::spawn(sleep_waiting_close_stable(sender, groups, layers));
Ok(results) Ok(results)
}, },
); );

101
src/server.rs

@ -69,27 +69,19 @@ pub async fn start(db_path: String) -> Result<()> {
for account in accounts { for account in accounts {
me.insert(account.pid, account); me.insert(account.pid, account);
} }
let gids: Vec<GroupId> = vec![]; // TODO add apps inject GROUP_ID
let peer_id = PeerId::default();
let (_, _, p2p_config, rpc_config) = config.split(); let (_, _, p2p_config, rpc_config) = config.split();
let (tdn_send, tdn_recv) = new_send_channel();
let (self_send, mut self_recv) = new_receive_channel(); let (self_send, mut self_recv) = new_receive_channel();
let rpc_send = start_rpc(rpc_config, self_send).await?; let rpc_send = start_rpc(rpc_config, self_send.clone()).await?;
let global = Arc::new(Global::init(me, tdn_send, db_path, rand_secret)); let global = Arc::new(Global::init(
me,
//let peer_id = start_main(gids, p2p_config, self_send, tdn_recv, rpc_send, Some(key)).await; db_path,
// TODO CHECK peer_id is equal. rand_secret,
p2p_config,
// info!("Network Peer id : {}", peer_id.to_hex()); self_send,
rpc_send,
// let group = Arc::new(RwLock::new( ));
// Group::init(rand_secret, sender.clone(), peer_id, me, db_path.clone()).await?,
// ));
// let layer = Arc::new(RwLock::new(
// Layer::init(db_path, peer_id, group.clone()).await?,
// ));
let rpc = init_rpc(global.clone()); let rpc = init_rpc(global.clone());
// //let mut group_rpcs: HashMap<u64, GroupId> = HashMap::new(); // //let mut group_rpcs: HashMap<u64, GroupId> = HashMap::new();
@ -126,7 +118,7 @@ pub async fn start(db_path: String) -> Result<()> {
} }
if let Ok(handle_result) = rpc.handle(params).await { if let Ok(handle_result) = rpc.handle(params).await {
handle(handle_result, uid, is_ws, &global, &rpc_send).await; handle(handle_result, uid, is_ws, &global).await;
} }
} }
ReceiveMessage::NetworkLost => { ReceiveMessage::NetworkLost => {
@ -223,13 +215,7 @@ pub async fn start(db_path: String) -> Result<()> {
// } // }
#[inline] #[inline]
async fn handle( async fn handle(handle_result: HandleResult, uid: u64, is_ws: bool, global: &Arc<Global>) {
handle_result: HandleResult,
uid: u64,
is_ws: bool,
global: &Arc<Global>,
rpc_sender: &Sender<RpcSendMessage>,
) {
let HandleResult { let HandleResult {
mut rpcs, mut rpcs,
mut groups, mut groups,
@ -240,7 +226,8 @@ async fn handle(
loop { loop {
if rpcs.len() != 0 { if rpcs.len() != 0 {
let msg = rpcs.remove(0); let msg = rpcs.remove(0);
rpc_sender global
.rpc_send
.send(RpcSendMessage(uid, msg, is_ws)) .send(RpcSendMessage(uid, msg, is_ws))
.await .await
.expect("TDN channel closed"); .expect("TDN channel closed");
@ -249,40 +236,42 @@ async fn handle(
} }
} }
let sender = global.sender.read().await; if let Ok(sender) = global.sender().await {
loop { loop {
if networks.len() != 0 { if groups.len() != 0 {
let msg = networks.remove(0); let msg = groups.remove(0);
sender sender
.send(SendMessage::Network(msg)) .send(SendMessage::Group(msg))
.await .await
.expect("TDN channel closed"); .expect("TDN channel closed");
} else { } else {
break; break;
}
} }
}
loop { loop {
if groups.len() != 0 { if layers.len() != 0 {
let msg = groups.remove(0); let (tgid, msg) = layers.remove(0);
sender sender
.send(SendMessage::Group(msg)) .send(SendMessage::Layer(tgid, msg))
.await .await
.expect("TDN channel closed"); .expect("TDN channel closed");
} else { } else {
break; break;
}
} }
}
loop { // must last send, because it will has stop type.
if layers.len() != 0 { loop {
let (tgid, msg) = layers.remove(0); if networks.len() != 0 {
sender let msg = networks.remove(0);
.send(SendMessage::Layer(tgid, msg)) sender
.await .send(SendMessage::Network(msg))
.expect("TDN channel closed"); .await
} else { .expect("TDN channel closed");
break; } else {
break;
}
} }
} }
} }

Loading…
Cancel
Save