|
|
|
|
@ -5,7 +5,7 @@ use std::sync::Arc;
@@ -5,7 +5,7 @@ use std::sync::Arc;
|
|
|
|
|
use tdn::types::{ |
|
|
|
|
group::{EventId, GroupId}, |
|
|
|
|
message::{RecvType, SendMessage, SendType}, |
|
|
|
|
primitive::{HandleResult, PeerAddr, Result}, |
|
|
|
|
primitive::{HandleResult, Peer, PeerId, Result}, |
|
|
|
|
}; |
|
|
|
|
use tdn_did::{user::User, Proof}; |
|
|
|
|
use tokio::sync::{mpsc::Sender, RwLock}; |
|
|
|
|
@ -33,7 +33,7 @@ pub(crate) struct Group {
@@ -33,7 +33,7 @@ pub(crate) struct Group {
|
|
|
|
|
/// TDN network sender.
|
|
|
|
|
sender: Sender<SendMessage>, |
|
|
|
|
/// current address.
|
|
|
|
|
addr: PeerAddr, |
|
|
|
|
addr: PeerId, |
|
|
|
|
/// all accounts.
|
|
|
|
|
accounts: HashMap<GroupId, Account>, |
|
|
|
|
/// distributed devices.
|
|
|
|
|
@ -44,7 +44,7 @@ pub(crate) struct Group {
@@ -44,7 +44,7 @@ pub(crate) struct Group {
|
|
|
|
|
#[derive(Serialize, Deserialize)] |
|
|
|
|
enum GroupConnect { |
|
|
|
|
/// Params: User, consensus height, event_id, remote_name, remote_info, other_devices addr.
|
|
|
|
|
Create(Proof, User, u64, EventId, String, String, Vec<PeerAddr>), |
|
|
|
|
Create(Proof, User, u64, EventId, String, String, Vec<PeerId>), |
|
|
|
|
/// connected.
|
|
|
|
|
Connect(u64, EventId), |
|
|
|
|
} |
|
|
|
|
@ -57,9 +57,9 @@ pub(crate) enum GroupEvent {
@@ -57,9 +57,9 @@ pub(crate) enum GroupEvent {
|
|
|
|
|
/// Sync infomations.
|
|
|
|
|
Status(StatusEvent), |
|
|
|
|
/// device's info update.
|
|
|
|
|
DeviceUpdate(PeerAddr, String), |
|
|
|
|
DeviceUpdate(PeerId, String), |
|
|
|
|
/// device deleted.
|
|
|
|
|
DeviceDelete(PeerAddr), |
|
|
|
|
DeviceDelete(PeerId), |
|
|
|
|
/// offline.
|
|
|
|
|
DeviceOffline, |
|
|
|
|
/// Device status request.
|
|
|
|
|
@ -97,8 +97,8 @@ impl Group {
@@ -97,8 +97,8 @@ impl Group {
|
|
|
|
|
RecvType::Leave(addr) => { |
|
|
|
|
for (_, account) in &mut self.runnings { |
|
|
|
|
if let Some(device) = account.distributes.get_mut(&addr) { |
|
|
|
|
device.1 = false; |
|
|
|
|
results.rpcs.push(device_rpc::device_offline(gid, device.0)); |
|
|
|
|
device.2 = false; |
|
|
|
|
results.rpcs.push(device_rpc::device_offline(gid, device.1)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -128,11 +128,12 @@ impl Group {
@@ -128,11 +128,12 @@ impl Group {
|
|
|
|
|
&mut self, |
|
|
|
|
results: &mut HandleResult, |
|
|
|
|
gid: &GroupId, |
|
|
|
|
addr: PeerAddr, |
|
|
|
|
addr: Peer, |
|
|
|
|
data: Vec<u8>, |
|
|
|
|
is_connect: bool, |
|
|
|
|
) -> Result<()> { |
|
|
|
|
let connect = bincode::deserialize(&data)?; |
|
|
|
|
let peer_id = addr.id; |
|
|
|
|
|
|
|
|
|
let (remote_height, remote_event, others) = match connect { |
|
|
|
|
GroupConnect::Create( |
|
|
|
|
@ -145,12 +146,14 @@ impl Group {
@@ -145,12 +146,14 @@ impl Group {
|
|
|
|
|
others, |
|
|
|
|
) => { |
|
|
|
|
// check remote addr is receive addr.
|
|
|
|
|
if remote.addr != addr { |
|
|
|
|
if remote.addr != peer_id { |
|
|
|
|
return Err(anyhow!("Address is invalid.")); |
|
|
|
|
} |
|
|
|
|
proof.verify(gid, &addr, &self.addr)?; |
|
|
|
|
proof.verify(gid, &peer_id, &self.addr)?; |
|
|
|
|
if is_connect { |
|
|
|
|
results.groups.push((*gid, self.agree_message(gid, addr)?)); |
|
|
|
|
results |
|
|
|
|
.groups |
|
|
|
|
.push((*gid, self.agree_message(gid, addr.clone())?)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// first init sync.
|
|
|
|
|
@ -174,21 +177,23 @@ impl Group {
@@ -174,21 +177,23 @@ impl Group {
|
|
|
|
|
let running = self.runnings.get_mut(gid).unwrap(); // safe unwrap. checked.
|
|
|
|
|
let mut new_addrs = vec![]; |
|
|
|
|
for a in others { |
|
|
|
|
if a != addr && a != self.addr && !running.distributes.contains_key(&a) { |
|
|
|
|
if a != peer_id && a != self.addr && !running.distributes.contains_key(&a) { |
|
|
|
|
new_addrs.push(a); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if let Some(v) = running.distributes.get_mut(&addr) { |
|
|
|
|
v.1 = true; |
|
|
|
|
results.rpcs.push(device_rpc::device_online(*gid, v.0)); |
|
|
|
|
if let Some(v) = running.distributes.get_mut(&peer_id) { |
|
|
|
|
v.2 = true; |
|
|
|
|
results.rpcs.push(device_rpc::device_online(*gid, v.1)); |
|
|
|
|
(remote_height, remote_event, new_addrs) |
|
|
|
|
} else { |
|
|
|
|
let mut device = Device::new(device_name, device_info, addr); |
|
|
|
|
let mut device = Device::new(device_name, device_info, peer_id); |
|
|
|
|
let db = consensus_db(&self.base, gid)?; |
|
|
|
|
device.insert(&db)?; |
|
|
|
|
db.close()?; |
|
|
|
|
running.distributes.insert(addr, (device.id, true)); |
|
|
|
|
running |
|
|
|
|
.distributes |
|
|
|
|
.insert(peer_id, (addr.clone(), device.id, true)); |
|
|
|
|
results.rpcs.push(device_rpc::device_create(*gid, &device)); |
|
|
|
|
results |
|
|
|
|
.rpcs |
|
|
|
|
@ -202,7 +207,7 @@ impl Group {
@@ -202,7 +207,7 @@ impl Group {
|
|
|
|
|
.get(gid) |
|
|
|
|
.unwrap() // safe, checked
|
|
|
|
|
.distributes |
|
|
|
|
.contains_key(&addr) |
|
|
|
|
.contains_key(&peer_id) |
|
|
|
|
{ |
|
|
|
|
if is_connect { |
|
|
|
|
results.groups.push((*gid, self.connect_result(gid, addr)?)); |
|
|
|
|
@ -215,7 +220,7 @@ impl Group {
@@ -215,7 +220,7 @@ impl Group {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let v = self.running_mut(gid)?; |
|
|
|
|
let did = v.add_online(&addr)?; |
|
|
|
|
let did = v.add_online(&peer_id)?; |
|
|
|
|
results.rpcs.push(device_rpc::device_online(*gid, did)); |
|
|
|
|
(remote_height, remote_event, vec![]) |
|
|
|
|
} |
|
|
|
|
@ -225,12 +230,14 @@ impl Group {
@@ -225,12 +230,14 @@ impl Group {
|
|
|
|
|
if account.height != remote_height || account.event != remote_event { |
|
|
|
|
results |
|
|
|
|
.groups |
|
|
|
|
.push((*gid, self.sync_message(gid, addr, 1, account.height)?)); |
|
|
|
|
.push((*gid, self.sync_message(gid, peer_id, 1, account.height)?)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// connect to others.
|
|
|
|
|
for addr in others { |
|
|
|
|
results.groups.push((*gid, self.create_message(gid, addr)?)); |
|
|
|
|
results |
|
|
|
|
.groups |
|
|
|
|
.push((*gid, self.create_message(gid, Peer::peer(addr))?)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Ok(()) |
|
|
|
|
@ -241,7 +248,7 @@ impl Group {
@@ -241,7 +248,7 @@ impl Group {
|
|
|
|
|
pub async fn init( |
|
|
|
|
secret: [u8; 32], |
|
|
|
|
sender: Sender<SendMessage>, |
|
|
|
|
addr: PeerAddr, |
|
|
|
|
addr: PeerId, |
|
|
|
|
accounts: HashMap<GroupId, Account>, |
|
|
|
|
base: PathBuf, |
|
|
|
|
) -> Result<Group> { |
|
|
|
|
@ -255,7 +262,7 @@ impl Group {
@@ -255,7 +262,7 @@ impl Group {
|
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn addr(&self) -> &PeerAddr { |
|
|
|
|
pub fn addr(&self) -> &PeerId { |
|
|
|
|
&self.addr |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -311,7 +318,7 @@ impl Group {
@@ -311,7 +318,7 @@ impl Group {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn prove_addr(&self, mgid: &GroupId, raddr: &PeerAddr) -> Result<Proof> { |
|
|
|
|
pub fn prove_addr(&self, mgid: &GroupId, raddr: &PeerId) -> Result<Proof> { |
|
|
|
|
let running = self.running(mgid)?; |
|
|
|
|
Ok(Proof::prove(&running.keypair, &self.addr, raddr)) |
|
|
|
|
} |
|
|
|
|
@ -327,9 +334,9 @@ impl Group {
@@ -327,9 +334,9 @@ impl Group {
|
|
|
|
|
pub fn distribute_conns(&self, gid: &GroupId) -> Vec<SendType> { |
|
|
|
|
let mut vecs = vec![]; |
|
|
|
|
if let Some(running) = &self.runnings.get(gid) { |
|
|
|
|
for (addr, _) in &running.distributes { |
|
|
|
|
for (addr, (peer, _, _)) in &running.distributes { |
|
|
|
|
if addr != &self.addr { |
|
|
|
|
if let Ok(s) = self.connect_message(gid, *addr) { |
|
|
|
|
if let Ok(s) = self.connect_message(gid, peer.clone()) { |
|
|
|
|
vecs.push(s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -342,9 +349,9 @@ impl Group {
@@ -342,9 +349,9 @@ impl Group {
|
|
|
|
|
let mut conns = HashMap::new(); |
|
|
|
|
for (mgid, running) in &self.runnings { |
|
|
|
|
let mut vecs = vec![]; |
|
|
|
|
for (addr, _) in &running.distributes { |
|
|
|
|
for (addr, (peer, _, _)) in &running.distributes { |
|
|
|
|
if addr != &self.addr { |
|
|
|
|
if let Ok(s) = self.connect_message(mgid, *addr) { |
|
|
|
|
if let Ok(s) = self.connect_message(mgid, peer.clone()) { |
|
|
|
|
vecs.push(s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -356,7 +363,7 @@ impl Group {
@@ -356,7 +363,7 @@ impl Group {
|
|
|
|
|
|
|
|
|
|
pub fn online_devices(&self, gid: &GroupId, mut devices: Vec<Device>) -> Vec<Device> { |
|
|
|
|
if let Some(running) = self.runnings.get(gid) { |
|
|
|
|
for (addr, (_id, online)) in &running.distributes { |
|
|
|
|
for (addr, (_peer, _id, online)) in &running.distributes { |
|
|
|
|
if *online { |
|
|
|
|
for device in devices.iter_mut() { |
|
|
|
|
if device.addr == *addr { |
|
|
|
|
@ -370,10 +377,10 @@ impl Group {
@@ -370,10 +377,10 @@ impl Group {
|
|
|
|
|
devices |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn remove_all_running(&mut self) -> HashMap<PeerAddr, ()> { |
|
|
|
|
let mut addrs: HashMap<PeerAddr, ()> = HashMap::new(); |
|
|
|
|
pub fn remove_all_running(&mut self) -> HashMap<PeerId, ()> { |
|
|
|
|
let mut addrs: HashMap<PeerId, ()> = HashMap::new(); |
|
|
|
|
for (_, running) in self.runnings.drain() { |
|
|
|
|
for (addr, (_id, online)) in running.distributes { |
|
|
|
|
for (addr, (_peer, _id, online)) in running.distributes { |
|
|
|
|
if addr != self.addr && online { |
|
|
|
|
addrs.insert(addr, ()); |
|
|
|
|
} |
|
|
|
|
@ -382,11 +389,11 @@ impl Group {
@@ -382,11 +389,11 @@ impl Group {
|
|
|
|
|
addrs |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn remove_running(&mut self, gid: &GroupId) -> HashMap<PeerAddr, ()> { |
|
|
|
|
pub fn remove_running(&mut self, gid: &GroupId) -> HashMap<PeerId, ()> { |
|
|
|
|
// check close the stable connection.
|
|
|
|
|
let mut addrs: HashMap<PeerAddr, ()> = HashMap::new(); |
|
|
|
|
let mut addrs: HashMap<PeerId, ()> = HashMap::new(); |
|
|
|
|
if let Some(running) = self.runnings.remove(gid) { |
|
|
|
|
for (addr, (_id, online)) in running.distributes { |
|
|
|
|
for (addr, (_peer, _id, online)) in running.distributes { |
|
|
|
|
if addr != self.addr && online { |
|
|
|
|
addrs.insert(addr, ()); |
|
|
|
|
} |
|
|
|
|
@ -394,7 +401,7 @@ impl Group {
@@ -394,7 +401,7 @@ impl Group {
|
|
|
|
|
|
|
|
|
|
// check if other stable connection.
|
|
|
|
|
for other_running in self.runnings.values() { |
|
|
|
|
for (addr, (_id, online)) in &other_running.distributes { |
|
|
|
|
for (addr, (_peer, _id, online)) in &other_running.distributes { |
|
|
|
|
if *online && addrs.contains_key(addr) { |
|
|
|
|
addrs.remove(addr); |
|
|
|
|
} |
|
|
|
|
@ -514,19 +521,17 @@ impl Group {
@@ -514,19 +521,17 @@ impl Group {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn create_message(&self, gid: &GroupId, addr: PeerAddr) -> Result<SendType> { |
|
|
|
|
pub fn create_message(&self, gid: &GroupId, addr: Peer) -> Result<SendType> { |
|
|
|
|
let user = self.clone_user(gid)?; |
|
|
|
|
let account = self.account(gid)?; |
|
|
|
|
let height = account.height; |
|
|
|
|
let event = account.event; |
|
|
|
|
let proof = self.prove_addr(gid, &addr)?; |
|
|
|
|
let proof = self.prove_addr(gid, &addr.id)?; |
|
|
|
|
let running = self.running(gid)?; |
|
|
|
|
|
|
|
|
|
Ok(SendType::Connect( |
|
|
|
|
0, |
|
|
|
|
addr, |
|
|
|
|
None, |
|
|
|
|
None, |
|
|
|
|
bincode::serialize(&GroupConnect::Create( |
|
|
|
|
proof, |
|
|
|
|
user, |
|
|
|
|
@ -540,15 +545,15 @@ impl Group {
@@ -540,15 +545,15 @@ impl Group {
|
|
|
|
|
)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn connect_message(&self, gid: &GroupId, addr: PeerAddr) -> Result<SendType> { |
|
|
|
|
pub fn connect_message(&self, gid: &GroupId, addr: Peer) -> Result<SendType> { |
|
|
|
|
let account = self.account(gid)?; |
|
|
|
|
let height = account.height; |
|
|
|
|
let event = account.event; |
|
|
|
|
let data = bincode::serialize(&GroupConnect::Connect(height, event)).unwrap_or(vec![]); |
|
|
|
|
Ok(SendType::Connect(0, addr, None, None, data)) |
|
|
|
|
Ok(SendType::Connect(0, addr, data)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn connect_result(&self, gid: &GroupId, addr: PeerAddr) -> Result<SendType> { |
|
|
|
|
pub fn connect_result(&self, gid: &GroupId, addr: Peer) -> Result<SendType> { |
|
|
|
|
let account = self.account(gid)?; |
|
|
|
|
let height = account.height; |
|
|
|
|
let event = account.event; |
|
|
|
|
@ -556,12 +561,12 @@ impl Group {
@@ -556,12 +561,12 @@ impl Group {
|
|
|
|
|
Ok(SendType::Result(0, addr, true, false, data)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn agree_message(&self, gid: &GroupId, addr: PeerAddr) -> Result<SendType> { |
|
|
|
|
pub fn agree_message(&self, gid: &GroupId, addr: Peer) -> Result<SendType> { |
|
|
|
|
let account = self.account(gid)?; |
|
|
|
|
let height = account.height; |
|
|
|
|
let event = account.event; |
|
|
|
|
let me = self.clone_user(gid)?; |
|
|
|
|
let proof = self.prove_addr(gid, &addr)?; |
|
|
|
|
let proof = self.prove_addr(gid, &addr.id)?; |
|
|
|
|
let running = self.running(gid)?; |
|
|
|
|
|
|
|
|
|
Ok(SendType::Result( |
|
|
|
|
@ -597,7 +602,7 @@ impl Group {
@@ -597,7 +602,7 @@ impl Group {
|
|
|
|
|
pub fn sync_message( |
|
|
|
|
&self, |
|
|
|
|
gid: &GroupId, |
|
|
|
|
addr: PeerAddr, |
|
|
|
|
addr: PeerId, |
|
|
|
|
from: u64, |
|
|
|
|
to: u64, |
|
|
|
|
) -> Result<SendType> { |
|
|
|
|
@ -616,7 +621,7 @@ impl Group {
@@ -616,7 +621,7 @@ impl Group {
|
|
|
|
|
Ok(SendType::Event(0, addr, data)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn event_message(&self, addr: PeerAddr, event: &GroupEvent) -> Result<SendType> { |
|
|
|
|
pub fn event_message(&self, addr: PeerId, event: &GroupEvent) -> Result<SendType> { |
|
|
|
|
let data = bincode::serialize(event).unwrap_or(vec![]); |
|
|
|
|
Ok(SendType::Event(0, addr, data)) |
|
|
|
|
} |
|
|
|
|
@ -647,7 +652,7 @@ impl Group {
@@ -647,7 +652,7 @@ impl Group {
|
|
|
|
|
let e = GroupEvent::Event(eheight, eid, pre_event, event); |
|
|
|
|
let data = bincode::serialize(&e).unwrap_or(vec![]); |
|
|
|
|
let running = self.running(gid)?; |
|
|
|
|
for (addr, (_id, online)) in &running.distributes { |
|
|
|
|
for (addr, (_peer, _id, online)) in &running.distributes { |
|
|
|
|
if *online { |
|
|
|
|
let msg = SendType::Event(0, *addr, data.clone()); |
|
|
|
|
results.groups.push((*gid, msg)) |
|
|
|
|
@ -664,7 +669,7 @@ impl Group {
@@ -664,7 +669,7 @@ impl Group {
|
|
|
|
|
) -> Result<()> { |
|
|
|
|
let running = self.running(gid)?; |
|
|
|
|
let data = bincode::serialize(&GroupEvent::Status(event)).unwrap_or(vec![]); |
|
|
|
|
for (addr, (_id, online)) in &running.distributes { |
|
|
|
|
for (addr, (_peer, _id, online)) in &running.distributes { |
|
|
|
|
if *online { |
|
|
|
|
let msg = SendType::Event(0, *addr, data.clone()); |
|
|
|
|
results.groups.push((*gid, msg)) |
|
|
|
|
@ -679,7 +684,7 @@ impl GroupEvent {
@@ -679,7 +684,7 @@ impl GroupEvent {
|
|
|
|
|
group: &mut Group, |
|
|
|
|
event: GroupEvent, |
|
|
|
|
gid: GroupId, |
|
|
|
|
addr: PeerAddr, |
|
|
|
|
addr: PeerId, |
|
|
|
|
layer: &Arc<RwLock<Layer>>, |
|
|
|
|
uid: u64, |
|
|
|
|
) -> Result<HandleResult> { |
|
|
|
|
|