Browse Source

update group chat sync

pull/18/head
Sun 4 years ago
parent
commit
e94f58ac02
  1. 19
      Cargo.toml
  2. 14
      lib/apps/group/detail.dart
  3. 28
      lib/apps/group/models.dart
  4. 62
      pubspec.lock
  5. 1
      src/apps.rs
  6. 4
      src/apps/chat/layer.rs
  7. 43
      src/apps/group/layer.rs
  8. 5
      src/apps/group/models/member.rs
  9. 5
      src/apps/group/models/message.rs
  10. 6
      src/apps/group/rpc.rs
  11. 4
      src/layer.rs

19
Cargo.toml

@ -39,13 +39,18 @@ web3 = { version = "0.17", default-features = false, features = ["http-tls", "si @@ -39,13 +39,18 @@ web3 = { version = "0.17", default-features = false, features = ["http-tls", "si
tdn = { version = "0.6", default-features = false, features = ["full"] }
tdn_did = { version = "0.6" }
tdn_storage = { git = "https://github.com/cypherlink/tdn_storage", branch="main" }
#group-chat_types = { git = "https://github.com/cympletech/esse_types", branch="main" }
#domain_types = { git = "https://github.com/cympletech/esse_types", branch="main" }
chat_types = { path = "../esse_types/chat" }
group_types = { path = "../esse_types/group" }
organization_types = { path = "../esse_types/organization" }
domain_types = { path = "../esse_types/domain" }
cloud_types = { path = "../esse_types/cloud" }
chat_types = { git = "https://github.com/cympletech/esse_types", branch="main" }
group_types = { git = "https://github.com/cympletech/esse_types", branch="main" }
cloud_types = { git = "https://github.com/cympletech/esse_types", branch="main" }
domain_types = { git = "https://github.com/cympletech/esse_types", branch="main" }
organization_types = { git = "https://github.com/cympletech/esse_types", branch="main" }
#chat_types = { path = "../esse_types/chat" }
#group_types = { path = "../esse_types/group" }
#organization_types = { path = "../esse_types/organization" }
#domain_types = { path = "../esse_types/domain" }
#cloud_types = { path = "../esse_types/cloud" }
openssl = { version = "0.10", features = ["vendored"] } # Add for cross-compile.
[target.'cfg(target_os="android")'.dependencies]

14
lib/apps/group/detail.dart

@ -229,11 +229,15 @@ class _GroupChatDetailState extends State<GroupChatDetail> { @@ -229,11 +229,15 @@ class _GroupChatDetailState extends State<GroupChatDetail> {
padding: EdgeInsets.symmetric(horizontal: 20.0),
itemCount: recentMessageKeys.length,
reverse: true,
itemBuilder: (BuildContext context, index) => ChatMessage(
fgid: this._group.gid,
name: this._group.name,
message: this._messages[recentMessageKeys[index]]!,
)
itemBuilder: (BuildContext context, index) {
final msg = this._messages[recentMessageKeys[index]]!;
return ChatMessage(
avatar: this._members[msg.mid]!.showAvatar(isOnline: false),
fgid: this._group.gid,
name: this._group.name,
message: msg,
);
}
)),
ChatInput(
sid: sid,

28
lib/apps/group/models.dart

@ -32,21 +32,25 @@ class GroupChat { @@ -32,21 +32,25 @@ class GroupChat {
}
class Member {
int id;
int fid;
String mid;
String addr;
String name;
bool leave;
int id = 0;
int fid = 0;
String mid = '';
String addr = '';
String name = '';
bool leave = false;
bool online = false;
Member.fromList(List params):
this.id = params[0],
this.fid = params[1],
this.mid = params[2],
this.addr = params[3],
this.name = params[4],
Member.fromList(List params) {
this.id = params[0];
this.fid = params[1];
this.mid = params[2];
this.addr = params[3];
this.name = params[4];
this.leave = params[5];
if (this.addr == Global.addr) {
this.online = true;
}
}
Avatar showAvatar({double width = 45.0, bool isOnline = true}) {
final avatar = Global.avatarPath + this.mid + '.png';

62
pubspec.lock

@ -7,21 +7,21 @@ packages: @@ -7,21 +7,21 @@ packages:
name: _fe_analyzer_shared
url: "https://pub.dartlang.org"
source: hosted
version: "30.0.0"
version: "31.0.0"
analyzer:
dependency: transitive
description:
name: analyzer
url: "https://pub.dartlang.org"
source: hosted
version: "2.7.0"
version: "2.8.0"
archive:
dependency: transitive
description:
name: archive
url: "https://pub.dartlang.org"
source: hosted
version: "3.1.2"
version: "3.1.6"
args:
dependency: transitive
description:
@ -35,7 +35,7 @@ packages: @@ -35,7 +35,7 @@ packages:
name: async
url: "https://pub.dartlang.org"
source: hosted
version: "2.8.1"
version: "2.8.2"
audio_session:
dependency: transitive
description:
@ -63,7 +63,7 @@ packages: @@ -63,7 +63,7 @@ packages:
name: characters
url: "https://pub.dartlang.org"
source: hosted
version: "1.1.0"
version: "1.2.0"
charcode:
dependency: transitive
description:
@ -182,7 +182,7 @@ packages: @@ -182,7 +182,7 @@ packages:
name: file_picker
url: "https://pub.dartlang.org"
source: hosted
version: "4.2.4"
version: "4.3.0"
file_selector:
dependency: "direct main"
description:
@ -242,7 +242,7 @@ packages: @@ -242,7 +242,7 @@ packages:
name: flutter_colorpicker
url: "https://pub.dartlang.org"
source: hosted
version: "0.6.1"
version: "1.0.3"
flutter_driver:
dependency: transitive
description: flutter
@ -308,7 +308,7 @@ packages: @@ -308,7 +308,7 @@ packages:
name: flutter_native_splash
url: "https://pub.dartlang.org"
source: hosted
version: "1.3.1"
version: "1.3.2"
flutter_plugin_android_lifecycle:
dependency: transitive
description:
@ -322,7 +322,7 @@ packages: @@ -322,7 +322,7 @@ packages:
name: flutter_quill
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.21"
version: "2.5.1"
flutter_test:
dependency: "direct dev"
description: flutter
@ -393,7 +393,7 @@ packages: @@ -393,7 +393,7 @@ packages:
name: image
url: "https://pub.dartlang.org"
source: hosted
version: "3.0.8"
version: "3.1.0"
image_picker:
dependency: "direct main"
description:
@ -475,7 +475,7 @@ packages: @@ -475,7 +475,7 @@ packages:
name: matcher
url: "https://pub.dartlang.org"
source: hosted
version: "0.12.10"
version: "0.12.11"
meta:
dependency: transitive
description:
@ -517,7 +517,7 @@ packages: @@ -517,7 +517,7 @@ packages:
name: path_provider
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.7"
version: "2.0.8"
path_provider_android:
dependency: transitive
description:
@ -538,14 +538,14 @@ packages: @@ -538,14 +538,14 @@ packages:
name: path_provider_linux
url: "https://pub.dartlang.org"
source: hosted
version: "2.1.2"
version: "2.1.4"
path_provider_macos:
dependency: transitive
description:
name: path_provider_macos
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.3"
version: "2.0.4"
path_provider_platform_interface:
dependency: transitive
description:
@ -608,7 +608,7 @@ packages: @@ -608,7 +608,7 @@ packages:
name: platform
url: "https://pub.dartlang.org"
source: hosted
version: "3.0.0"
version: "3.0.2"
plugin_platform_interface:
dependency: transitive
description:
@ -622,7 +622,7 @@ packages: @@ -622,7 +622,7 @@ packages:
name: process
url: "https://pub.dartlang.org"
source: hosted
version: "4.2.3"
version: "4.2.4"
provider:
dependency: "direct main"
description:
@ -699,7 +699,7 @@ packages: @@ -699,7 +699,7 @@ packages:
name: shared_preferences
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.9"
version: "2.0.11"
shared_preferences_android:
dependency: transitive
description:
@ -816,7 +816,7 @@ packages: @@ -816,7 +816,7 @@ packages:
name: test_api
url: "https://pub.dartlang.org"
source: hosted
version: "0.4.2"
version: "0.4.3"
tuple:
dependency: transitive
description:
@ -851,7 +851,21 @@ packages: @@ -851,7 +851,21 @@ packages:
name: url_launcher
url: "https://pub.dartlang.org"
source: hosted
version: "6.0.15"
version: "6.0.17"
url_launcher_android:
dependency: transitive
description:
name: url_launcher_android
url: "https://pub.dartlang.org"
source: hosted
version: "6.0.13"
url_launcher_ios:
dependency: transitive
description:
name: url_launcher_ios
url: "https://pub.dartlang.org"
source: hosted
version: "6.0.13"
url_launcher_linux:
dependency: transitive
description:
@ -879,7 +893,7 @@ packages: @@ -879,7 +893,7 @@ packages:
name: url_launcher_web
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.4"
version: "2.0.5"
url_launcher_windows:
dependency: transitive
description:
@ -900,14 +914,14 @@ packages: @@ -900,14 +914,14 @@ packages:
name: vector_math
url: "https://pub.dartlang.org"
source: hosted
version: "2.1.0"
version: "2.1.1"
video_player:
dependency: transitive
description:
name: video_player
url: "https://pub.dartlang.org"
source: hosted
version: "2.2.7"
version: "2.2.8"
video_player_platform_interface:
dependency: transitive
description:
@ -928,7 +942,7 @@ packages: @@ -928,7 +942,7 @@ packages:
name: vm_service
url: "https://pub.dartlang.org"
source: hosted
version: "7.1.1"
version: "7.3.0"
watcher:
dependency: transitive
description:
@ -987,4 +1001,4 @@ packages: @@ -987,4 +1001,4 @@ packages:
version: "8.0.0"
sdks:
dart: ">=2.14.0 <3.0.0"
flutter: ">=2.5.0"
flutter: ">=2.5.3"

1
src/apps.rs

@ -38,7 +38,6 @@ pub(crate) async fn app_layer_handle( @@ -38,7 +38,6 @@ pub(crate) async fn app_layer_handle(
mgid: GroupId,
msg: RecvType,
) -> Result<HandleResult> {
println!("Handle Sync: fgid: {:?}, mgid: {:?}", fgid, mgid);
match (fgid, mgid) {
(group::GROUP_ID, _) => group::handle_peer(layer, mgid, msg).await,
(_, group::GROUP_ID) => group::handle_server(layer, fgid, msg).await,

4
src/apps/chat/layer.rs

@ -51,7 +51,7 @@ pub(crate) async fn handle( @@ -51,7 +51,7 @@ pub(crate) async fn handle(
mgid: GroupId,
msg: RecvType,
) -> Result<HandleResult> {
println!("---------DEBUG--------- GOT CHAT EVENT");
debug!("---------DEBUG--------- GOT CHAT EVENT");
let mut results = HandleResult::new();
let mut layer = arc_layer.write().await;
@ -98,7 +98,7 @@ pub(crate) async fn handle( @@ -98,7 +98,7 @@ pub(crate) async fn handle(
// TODO stream
}
RecvType::Delivery(t, tid, is_ok) => {
println!("delivery: tid: {}, is_ok: {}", tid, is_ok);
debug!("delivery: tid: {}, is_ok: {}", tid, is_ok);
// TODO maybe send failure need handle.
if is_ok {
if let Some((gid, db_id)) = layer.delivery.remove(&tid) {

43
src/apps/group/layer.rs

@ -52,10 +52,10 @@ pub(crate) async fn handle_server( @@ -52,10 +52,10 @@ pub(crate) async fn handle_server(
// TODO
}
RecvType::Event(addr, bytes) => {
println!("----------- DEBUG GROUP CHAT: SERVER GOT LAYER EVENT");
debug!("----------- DEBUG GROUP CHAT: SERVER GOT LAYER EVENT");
let event: LayerEvent = bincode::deserialize(&bytes)?;
handle_server_event(fgid, addr, event, layer, &mut results).await?;
println!("----------- DEBUG GROUP CHAT: SERVER OVER LAYER EVENT");
debug!("----------- DEBUG GROUP CHAT: SERVER OVER LAYER EVENT");
}
RecvType::Stream(_uid, _stream, _bytes) => {
// TODO stream
@ -137,10 +137,10 @@ pub(crate) async fn handle_peer( @@ -137,10 +137,10 @@ pub(crate) async fn handle_peer(
}
}
RecvType::Event(addr, bytes) => {
println!("----------- DEBUG GROUP CHAT: PEER GOT LAYER EVENT");
debug!("----------- DEBUG GROUP CHAT: PEER GOT LAYER EVENT");
let event: LayerEvent = bincode::deserialize(&bytes)?;
handle_peer_event(ogid, addr, event, layer, &mut results).await?;
println!("----------- DEBUG GROUP CHAT: PEER OVER LAYER EVENT");
debug!("----------- DEBUG GROUP CHAT: PEER OVER LAYER EVENT");
}
RecvType::Stream(_uid, _stream, _bytes) => {
// TODO stream
@ -287,7 +287,7 @@ async fn handle_server_event( @@ -287,7 +287,7 @@ async fn handle_server_event(
broadcast(&LayerEvent::Sync(gcd, h, event), layer, &gcd, results).await?;
}
Event::MessageCreate(mgid, nmsg, mtime) => {
println!("Sync: create message start");
debug!("Sync: create message start");
let _mdid = Member::get_id(&db, &id, &mgid)?;
let new_e = Event::MessageCreate(mgid, nmsg.clone(), mtime);
@ -299,7 +299,7 @@ async fn handle_server_event( @@ -299,7 +299,7 @@ async fn handle_server_event(
new_h, id, mgid, &ogid, nmsg, mtime, &base, results,
)?;
results.rpcs.push(rpc::message_create(ogid, &msg));
println!("Sync: create message ok");
debug!("Sync: create message ok");
// UPDATE SESSION.
update_session(&base, &ogid, &id, &msg, results);
@ -322,7 +322,7 @@ async fn handle_server_event( @@ -322,7 +322,7 @@ async fn handle_server_event(
add_server_layer(results, fgid, s);
}
LayerEvent::SyncReq(gcd, from) => {
println!("Got sync request. height: {} from: {}", height, from);
debug!("Got sync request. height: {} from: {}", height, from);
if height >= from {
let to = if height - from > 20 {
@ -331,13 +331,13 @@ async fn handle_server_event( @@ -331,13 +331,13 @@ async fn handle_server_event(
height
};
let (members, leaves) = Member::sync(&base, &ogid, &db, &id, &to).await?;
let messages = Message::sync(&base, &ogid, &db, &id, &to).await?;
let (members, leaves) = Member::sync(&base, &ogid, &db, &id, &from, &to).await?;
let messages = Message::sync(&base, &ogid, &db, &id, &from, &to).await?;
let event = LayerEvent::SyncRes(gcd, height, from, to, members, leaves, messages);
let data = bincode::serialize(&event).unwrap_or(vec![]);
let s = SendType::Event(0, addr, data);
add_server_layer(results, fgid, s);
println!("Sended sync request results. from: {}, to: {}", from, to);
debug!("Sended sync request results. from: {}, to: {}", from, to);
}
}
LayerEvent::Suspend(..) => {}
@ -412,7 +412,7 @@ async fn handle_peer_event( @@ -412,7 +412,7 @@ async fn handle_peer_event(
}
}
LayerEvent::Sync(gcd, height, event) => {
println!("Sync: handle height: {}", height);
debug!("Sync: handle height: {}", height);
match event {
Event::GroupTransfer(_addr) => {
@ -460,7 +460,7 @@ async fn handle_peer_event( @@ -460,7 +460,7 @@ async fn handle_peer_event(
GroupChat::add_height(&db, id, height)?;
}
Event::MessageCreate(mgid, nmsg, mtime) => {
println!("Sync: create message start");
debug!("Sync: create message start");
let _mdid = Member::get_id(&db, &id, &mgid)?;
let msg = handle_network_message(
@ -469,14 +469,14 @@ async fn handle_peer_event( @@ -469,14 +469,14 @@ async fn handle_peer_event(
results.rpcs.push(rpc::message_create(ogid, &msg));
GroupChat::add_height(&db, id, height)?;
println!("Sync: create message ok");
debug!("Sync: create message ok");
// UPDATE SESSION.
update_session(&base, &ogid, &id, &msg, results);
}
}
}
LayerEvent::SyncRes(gcd, height, mut from, to, adds, leaves, messages) => {
LayerEvent::SyncRes(gcd, height, from, to, adds, leaves, messages) => {
if to >= height {
// when last packed sync, start sync online members.
add_layer(results, ogid, sync_online(gcd, addr));
@ -517,11 +517,12 @@ async fn handle_peer_event( @@ -517,11 +517,12 @@ async fn handle_peer_event(
}
for (height, mgid, nm, time) in messages {
let msg =
handle_network_message(height, id, mgid, &ogid, nm, time, &base, results)?;
results.rpcs.push(rpc::message_create(ogid, &msg));
last_message = Some(msg);
from += 1;
if let Ok(msg) =
handle_network_message(height, id, mgid, &ogid, nm, time, &base, results)
{
results.rpcs.push(rpc::message_create(ogid, &msg));
last_message = Some(msg);
}
}
if to < height {
@ -535,6 +536,7 @@ async fn handle_peer_event( @@ -535,6 +536,7 @@ async fn handle_peer_event(
if let Some(msg) = last_message {
update_session(&base, &ogid, &id, &msg, results);
}
debug!("Over handle sync packed... {}, {}, {}", height, from, to);
}
_ => error!("group peer handle event nerver here"),
}
@ -553,7 +555,7 @@ pub(crate) async fn broadcast( @@ -553,7 +555,7 @@ pub(crate) async fn broadcast(
for (mgid, maddr) in layer.read().await.running(&gcd)?.onlines() {
let s = SendType::Event(0, *maddr, new_data.clone());
add_server_layer(results, *mgid, s);
println!("--- DEBUG broadcast to: {:?}", mgid);
debug!("--- DEBUG broadcast to: {:?}", mgid);
}
Ok(())
@ -596,7 +598,6 @@ pub(crate) fn group_conn(proof: Proof, addr: Peer, gid: GroupId) -> SendType { @@ -596,7 +598,6 @@ pub(crate) fn group_conn(proof: Proof, addr: Peer, gid: GroupId) -> SendType {
}
fn sync(gcd: GroupId, addr: PeerId, height: i64) -> SendType {
println!("Send sync request...");
let data = bincode::serialize(&LayerEvent::SyncReq(gcd, height + 1)).unwrap_or(vec![]);
SendType::Event(0, addr, data)
}

5
src/apps/group/models/member.rs

@ -189,12 +189,13 @@ impl Member { @@ -189,12 +189,13 @@ impl Member {
gid: &GroupId,
db: &DStorage,
fid: &i64,
height: &i64,
from: &i64,
to: &i64,
) -> Result<(
Vec<(i64, GroupId, PeerId, String, Vec<u8>)>,
Vec<(i64, GroupId)>,
)> {
let sql = format!("SELECT id, height, fid, mid, addr, name, leave FROM members WHERE fid = {} AND height >= {}", fid, height);
let sql = format!("SELECT id, height, fid, mid, addr, name, leave FROM members WHERE fid = {} AND height BETWEEN {} AND {}", fid, from, to);
let matrix = db.query(&sql)?;
let mut adds = vec![];
let mut leaves = vec![];

5
src/apps/group/models/message.rs

@ -142,7 +142,8 @@ impl Message { @@ -142,7 +142,8 @@ impl Message {
gid: &GroupId,
db: &DStorage,
fid: &i64,
height: &i64,
from: &i64,
to: &i64,
) -> Result<Vec<(i64, GroupId, NetworkMessage, i64)>> {
let sql = format!("SELECT id, mid FROM members WHERE fid = {}", fid);
let m = db.query(&sql)?;
@ -154,7 +155,7 @@ impl Message { @@ -154,7 +155,7 @@ impl Message {
members.insert(id, mid);
}
let sql = format!("SELECT id, height, fid, mid, is_me, m_type, content, is_delivery, datetime FROM messages WHERE fid = {} AND height >= {}", fid, height);
let sql = format!("SELECT id, height, fid, mid, is_me, m_type, content, is_delivery, datetime FROM messages WHERE fid = {} AND height BETWEEN {} AND {}", fid, from, to);
let matrix = db.query(&sql)?;
let mut messages = vec![];
for values in matrix {

6
src/apps/group/rpc.rs

@ -115,7 +115,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -115,7 +115,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let mut gc = GroupChat::new(addr, name);
let gcd = gc.g_id;
let gheight = gc.height;
let gheight = gc.height + 1; // add first member.
// save db
gc.insert(&db)?;
@ -145,7 +145,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -145,7 +145,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
// Add frist member join.
let mut layer_lock = state.layer.write().await;
layer_lock.add_running(&gcd, gid, gdid, gheight)?;
let height = layer_lock.running_mut(&gcd)?.increased();
// Add online to layers.
layer_lock
@ -158,7 +157,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -158,7 +157,7 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
drop(layer_lock);
// Update consensus.
GroupChat::add_height(&db, gdid, height)?;
GroupChat::add_height(&db, gdid, gheight)?;
// Online local group.
results.networks.push(NetworkType::AddGroup(gcd));
@ -305,7 +304,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -305,7 +304,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
for (mgid, maddr) in state.layer.read().await.running(&gcd)?.onlines() {
let s = SendType::Event(0, *maddr, data.clone());
add_server_layer(&mut results, *mgid, s);
println!("--- DEBUG broadcast to: {:?}", mgid);
}
} else {
// leave group.

4
src/layer.rs

@ -114,7 +114,7 @@ impl Layer { @@ -114,7 +114,7 @@ impl Layer {
}
pub fn get_running_remote_id(&self, mgid: &GroupId, fgid: &GroupId) -> Result<(i64, i64)> {
println!("onlines: {:?}, find: {:?}", self.runnings.keys(), mgid);
debug!("onlines: {:?}, find: {:?}", self.runnings.keys(), mgid);
self.running(mgid)?.get_online_id(fgid)
}
@ -283,7 +283,7 @@ impl RunningLayer { @@ -283,7 +283,7 @@ impl RunningLayer {
}
pub fn get_online_id(&self, gid: &GroupId) -> Result<(i64, i64)> {
println!("onlines: {:?}, find: {:?}", self.sessions.keys(), gid);
debug!("onlines: {:?}, find: {:?}", self.sessions.keys(), gid);
self.sessions
.get(gid)
.map(|online| (online.db_id, online.db_fid))

Loading…
Cancel
Save