Browse Source

update local group chat

pull/18/head
Sun 4 years ago
parent
commit
02733e67fe
  1. 22
      lib/apps/group_chat/provider.dart
  2. 16
      lib/utils/emoji_picker.dart
  3. 8
      lib/widgets/avatar.dart
  4. 20
      lib/widgets/chat_message.dart
  5. 2
      lib/widgets/emoji.dart
  6. 1
      src/apps.rs
  7. 1
      src/apps/chat/layer.rs
  8. 90
      src/apps/group_chat/common.rs
  9. 511
      src/apps/group_chat/layer.rs
  10. 2
      src/apps/group_chat/mod.rs
  11. 10
      src/apps/group_chat/models/consensus.rs
  12. 2
      src/apps/group_chat/models/group.rs
  13. 40
      src/apps/group_chat/models/member.rs
  14. 2
      src/apps/group_chat/models/message.rs
  15. 7
      src/apps/group_chat/models/request.rs
  16. 95
      src/apps/group_chat/rpc.rs
  17. 2
      src/layer.rs
  18. 52
      src/rpc.rs
  19. 2
      src/server.rs

22
lib/apps/group_chat/provider.dart

@ -142,9 +142,7 @@ class GroupChatProvider extends ChangeNotifier { @@ -142,9 +142,7 @@ class GroupChatProvider extends ChangeNotifier {
messageCreate(MessageType mtype, String content) {
final gcd = this.activedGroup!.gid;
rpc.send('group-chat-message-create', [
gcd, this.actived!, this.activedGroup!.isRemote, mtype.toInt(), content
]);
rpc.send('group-chat-message-create', [gcd, mtype.toInt(), content]);
}
close(int id) {
@ -235,10 +233,10 @@ class GroupChatProvider extends ChangeNotifier { @@ -235,10 +233,10 @@ class GroupChatProvider extends ChangeNotifier {
}
_requestHandle(List params) {
final id = params[0];
final ok = params[1];
//final _efficacy = params[2];
if (this.requests.containsKey(id)) {
final id = params[1];
final ok = params[2];
//final _efficacy = params[3];
if (this.actived == params[0] && this.requests.containsKey(id)) {
this.requests[id]!.overIt(ok);
notifyListeners();
}
@ -263,17 +261,17 @@ class GroupChatProvider extends ChangeNotifier { @@ -263,17 +261,17 @@ class GroupChatProvider extends ChangeNotifier {
_memberLeave(List params) {
final id = params[0];
if (this.activedMembers.containsKey(id)) {
if (this.actived == params[0] && this.activedMembers.containsKey(id)) {
this.activedMembers.remove(id);
notifyListeners();
}
}
_memberInfo(List params) {
final id = params[0];
if (this.activedMembers.containsKey(id)) {
this.activedMembers[id]!.addr = params[1];
this.activedMembers[id]!.name = params[2];
final id = params[1];
if (this.actived == params[0] && this.activedMembers.containsKey(id)) {
this.activedMembers[id]!.addr = params[2];
this.activedMembers[id]!.name = params[3];
notifyListeners();
}
}

16
lib/utils/emoji_picker.dart

@ -252,7 +252,7 @@ class _EmojiPickerState extends State<EmojiPicker> { @@ -252,7 +252,7 @@ class _EmojiPickerState extends State<EmojiPicker> {
child: TextButton(
child: Text(
emojiTxt,
style: TextStyle(fontSize: 18.0),
style: TextStyle(fontSize: 16.0),
),
onPressed: () {
widget.onEmojiSelected(
@ -290,7 +290,7 @@ class _EmojiPickerState extends State<EmojiPicker> { @@ -290,7 +290,7 @@ class _EmojiPickerState extends State<EmojiPicker> {
child: Text(
animalMap.values.toList()[
index + (widget.columns * widget.rows * i)],
style: TextStyle(fontSize: 18.0),
style: TextStyle(fontSize: 16.0),
),
onPressed: () {
widget.onEmojiSelected(
@ -328,7 +328,7 @@ class _EmojiPickerState extends State<EmojiPicker> { @@ -328,7 +328,7 @@ class _EmojiPickerState extends State<EmojiPicker> {
child: Text(
foodMap.values.toList()[
index + (widget.columns * widget.rows * i)],
style: TextStyle(fontSize: 18.0),
style: TextStyle(fontSize: 16.0),
),
onPressed: () {
widget.onEmojiSelected(
@ -366,7 +366,7 @@ class _EmojiPickerState extends State<EmojiPicker> { @@ -366,7 +366,7 @@ class _EmojiPickerState extends State<EmojiPicker> {
child: Text(
travelMap.values.toList()[
index + (widget.columns * widget.rows * i)],
style: TextStyle(fontSize: 18.0),
style: TextStyle(fontSize: 16.0),
),
onPressed: () {
widget.onEmojiSelected(
@ -405,7 +405,7 @@ class _EmojiPickerState extends State<EmojiPicker> { @@ -405,7 +405,7 @@ class _EmojiPickerState extends State<EmojiPicker> {
child: Text(
activityMap.values.toList()[
index + (widget.columns * widget.rows * i)],
style: TextStyle(fontSize: 18.0),
style: TextStyle(fontSize: 16.0),
),
onPressed: () {
widget.onEmojiSelected(
@ -443,7 +443,7 @@ class _EmojiPickerState extends State<EmojiPicker> { @@ -443,7 +443,7 @@ class _EmojiPickerState extends State<EmojiPicker> {
child: Text(
objectMap.values.toList()[
index + (widget.columns * widget.rows * i)],
style: TextStyle(fontSize: 18.0),
style: TextStyle(fontSize: 16.0),
),
onPressed: () {
widget.onEmojiSelected(
@ -481,7 +481,7 @@ class _EmojiPickerState extends State<EmojiPicker> { @@ -481,7 +481,7 @@ class _EmojiPickerState extends State<EmojiPicker> {
child: Text(
symbolMap.values.toList()[
index + (widget.columns * widget.rows * i)],
style: TextStyle(fontSize: 18.0),
style: TextStyle(fontSize: 16.0),
),
onPressed: () {
widget.onEmojiSelected(
@ -519,7 +519,7 @@ class _EmojiPickerState extends State<EmojiPicker> { @@ -519,7 +519,7 @@ class _EmojiPickerState extends State<EmojiPicker> {
child: Text(
flagMap.values.toList()[
index + (widget.columns * widget.rows * i)],
style: TextStyle(fontSize: 18.0),
style: TextStyle(fontSize: 16.0),
),
onPressed: () {
widget.onEmojiSelected(

8
lib/widgets/avatar.dart

@ -50,11 +50,11 @@ class Avatar extends StatelessWidget { @@ -50,11 +50,11 @@ class Avatar extends StatelessWidget {
decoration: showAvatar != null
? BoxDecoration(
image: DecorationImage(image: showAvatar, fit: BoxFit.cover),
borderRadius: BorderRadius.circular(15.0)
borderRadius: BorderRadius.circular(10.0)
)
: BoxDecoration(
color: this.colorSurface ? color.surface : color.background,
borderRadius: BorderRadius.circular(15.0)
borderRadius: BorderRadius.circular(10.0)
),
child: Stack(
alignment: Alignment.center,
@ -63,7 +63,7 @@ class Avatar extends StatelessWidget { @@ -63,7 +63,7 @@ class Avatar extends StatelessWidget {
Text(this.name.length > 0 ? this.name[0].toUpperCase() : "A"),
if (this.hasNew)
Positioned(top: 0.0, right: 0.0,
child: Container(width: 9.0, height: 9.0,
child: Container(width: 8.0, height: 8.0,
decoration: BoxDecoration(color: this.hasNewColor, shape: BoxShape.circle),
),
),
@ -74,7 +74,7 @@ class Avatar extends StatelessWidget { @@ -74,7 +74,7 @@ class Avatar extends StatelessWidget {
decoration: ShapeDecoration(color: color.background, shape: CircleBorder()),
child: this.loading
? CupertinoActivityIndicator(radius: 5.0, animating: true)
: Container(width: 9.0, height: 9.0,
: Container(width: 6.0, height: 6.0,
decoration: BoxDecoration(color: this.onlineColor, shape: BoxShape.circle),
),
),

20
lib/widgets/chat_message.dart

@ -32,9 +32,9 @@ class ChatMessage extends StatelessWidget { @@ -32,9 +32,9 @@ class ChatMessage extends StatelessWidget {
Widget _showContactCard(Widget avatar, String gid, String name, String title, ColorScheme color) {
return Container(
padding: const EdgeInsets.symmetric(vertical: 10.0, horizontal: 10.0),
padding: const EdgeInsets.only(top: 10, bottom: 6.0, left: 10.0, right: 10.0),
width: 200.0,
decoration: BoxDecoration(color: const Color(0x40ADB0BB), borderRadius: BorderRadius.circular(15.0)),
decoration: BoxDecoration(color: const Color(0x40ADB0BB), borderRadius: BorderRadius.circular(10.0)),
child: Column(crossAxisAlignment: CrossAxisAlignment.start,
children: [
Row(children: [
@ -57,10 +57,10 @@ class ChatMessage extends StatelessWidget { @@ -57,10 +57,10 @@ class ChatMessage extends StatelessWidget {
// text
return Container(
constraints: BoxConstraints(minWidth: 50, maxWidth: maxWidth),
padding: const EdgeInsets.symmetric(vertical: 10.0, horizontal: 14.0),
padding: const EdgeInsets.symmetric(vertical: 6.0, horizontal: 10.0),
decoration: BoxDecoration(
color: message.isMe ? Color(0xFF6174FF) : color.primaryVariant,
borderRadius: BorderRadius.circular(15.0),
borderRadius: BorderRadius.circular(10.0),
),
child: Text(message.content,
style: TextStyle(
@ -272,7 +272,7 @@ class ChatMessage extends StatelessWidget { @@ -272,7 +272,7 @@ class ChatMessage extends StatelessWidget {
),
child: _showContactCard(
Container(width: 40.0, height: 40.0,
decoration: BoxDecoration(color: color.surface, borderRadius: BorderRadius.circular(15.0)),
decoration: BoxDecoration(color: color.surface, borderRadius: BorderRadius.circular(10.0)),
child: Icon(Icons.groups_rounded, color: color.primary, size: 20.0),
),
gid, infos[3], lang.groupChat, color)
@ -313,24 +313,24 @@ class ChatMessage extends StatelessWidget { @@ -313,24 +313,24 @@ class ChatMessage extends StatelessWidget {
final isAvatar = avatar != null && !message.isMe;
final timeWidget = Container(
padding: EdgeInsets.only(top: 6.0),
padding: EdgeInsets.only(top: 4.0),
child: Row(children: [
if (message.isMe) Spacer(),
if (isAvatar)
Container(
width: 50.0,
child: Text(name, maxLines: 1, overflow: TextOverflow.ellipsis,
style: TextStyle(color: color.onPrimary.withOpacity(0.5), fontSize: 12.0)
style: TextStyle(color: color.onPrimary.withOpacity(0.5), fontSize: 10.0)
)),
const SizedBox(width: 4.0),
Text(message.time.toString(), style: TextStyle(
color: color.onPrimary.withOpacity(0.5),
fontSize: 12.0)),
fontSize: 10.0)),
const SizedBox(width: 4.0),
Icon(
message.isDelivery == null ? Icons.hourglass_top
: (message.isDelivery! ? Icons.done : Icons.error),
size: 12.0,
size: 10.0,
color: message.isDelivery == null ? color.primaryVariant
: (message.isDelivery! ? color.primary : Colors.red)
),
@ -346,7 +346,7 @@ class ChatMessage extends StatelessWidget { @@ -346,7 +346,7 @@ class ChatMessage extends StatelessWidget {
]);
return Padding(
padding: const EdgeInsets.symmetric(vertical: 5.0),
padding: const EdgeInsets.symmetric(vertical: 4.0),
child:
isAvatar
? Row(

2
lib/widgets/emoji.dart

@ -25,7 +25,7 @@ class Emoji extends StatelessWidget { @@ -25,7 +25,7 @@ class Emoji extends StatelessWidget {
child: SingleChildScrollView(
child: EmojiPicker(
rows: 3,
columns: maxWidth ~/ 32,
columns: maxWidth ~/ 36,
maxWidth: maxWidth,
bgColor: color.background,
onEmojiSelected: (emoji, category) {

1
src/apps.rs

@ -32,6 +32,7 @@ pub(crate) async fn app_layer_handle( @@ -32,6 +32,7 @@ pub(crate) async fn app_layer_handle(
mgid: GroupId,
msg: RecvType,
) -> Result<HandleResult> {
println!("Handle Sync: fgid: {:?}, mgid: {:?}", fgid, mgid);
match (fgid, mgid) {
(group_chat::GROUP_ID, _) => group_chat::layer_handle(layer, fgid, mgid, false, msg).await,
(_, group_chat::GROUP_ID) => group_chat::layer_handle(layer, fgid, mgid, true, msg).await,

1
src/apps/chat/layer.rs

@ -52,6 +52,7 @@ pub(crate) async fn handle( @@ -52,6 +52,7 @@ pub(crate) async fn handle(
mgid: GroupId,
msg: RecvType,
) -> Result<HandleResult> {
println!("---------DEBUG--------- GOT CHAT EVENT");
let mut results = HandleResult::new();
let mut layer = arc_layer.write().await;

90
src/apps/group_chat/common.rs

@ -1,90 +0,0 @@ @@ -1,90 +0,0 @@
use group_chat_types::{Event, LayerEvent};
use std::path::PathBuf;
use tdn::types::{
group::GroupId,
primitive::{HandleResult, Result},
};
use tdn_storage::local::DStorage;
use crate::apps::chat::Friend;
use crate::rpc::session_last;
use crate::session::{Session, SessionType};
use crate::storage::{chat_db, delete_avatar, session_db, write_avatar_sync};
use super::models::{from_network_message, Member};
use super::rpc;
pub async fn handle_event(
db: DStorage,
base: PathBuf,
gid: i64, // group chat database id.
mgid: GroupId, // me account(group_id)
event: LayerEvent,
results: &mut HandleResult,
) -> Result<()> {
match event {
LayerEvent::Sync(_gcd, height, event) => {
match event {
Event::GroupInfo => {}
Event::GroupTransfer => {}
Event::GroupManagerAdd => {}
Event::GroupManagerDel => {}
Event::GroupClose => {}
Event::MemberInfo(mid, maddr, mname, mavatar) => {
let id = Member::get_id(&db, &gid, &mid)?;
Member::update(&db, &id, &maddr, &mname)?;
if mavatar.len() > 0 {
write_avatar_sync(&base, &mgid, &mid, mavatar)?;
}
results.rpcs.push(rpc::member_info(mgid, id, maddr, mname));
}
Event::MemberJoin(mid, maddr, mname, mavatar, mtime) => {
if Member::get_id(&db, &gid, &mid).is_err() {
let mut member = Member::new(gid, mid, maddr, mname, false, mtime);
member.insert(&db)?;
if mavatar.len() > 0 {
write_avatar_sync(&base, &mgid, &mid, mavatar)?;
}
results.rpcs.push(rpc::member_join(mgid, member));
}
}
Event::MemberLeave(mid) => {
let id = Member::get_id(&db, &gid, &mid)?;
Member::leave(&db, &id)?;
// check mid is my chat friend. if not, delete avatar.
let s_db = chat_db(&base, &mgid)?;
if Friend::get(&s_db, &mid)?.is_none() {
let _ = delete_avatar(&base, &mgid, &mid).await;
}
results.rpcs.push(rpc::member_leave(mgid, id));
}
Event::MessageCreate(mid, nmsg, mtime) => {
println!("Sync: create message start");
let (msg, scontent) =
from_network_message(height, gid, mid, &mgid, nmsg, mtime, &base)?;
results.rpcs.push(rpc::message_create(mgid, &msg));
println!("Sync: create message ok");
// UPDATE SESSION.
let s_db = session_db(&base, &mgid)?;
if let Ok(id) = Session::last(
&s_db,
&gid,
&SessionType::Group,
&msg.datetime,
&scontent,
true,
) {
results
.rpcs
.push(session_last(mgid, &id, &msg.datetime, &scontent, false));
}
}
}
}
_ => {} // TODO
}
Ok(())
}

511
src/apps/group_chat/layer.rs

@ -22,13 +22,20 @@ use crate::storage::{ @@ -22,13 +22,20 @@ use crate::storage::{
chat_db, delete_avatar, group_chat_db, read_avatar, session_db, write_avatar, write_avatar_sync,
};
use super::models::{from_network_message, GroupChat, Member, Request};
use super::models::{from_network_message, Consensus, ConsensusType, GroupChat, Member, Request};
use super::{add_layer, add_server_layer, rpc};
// variable statement:
// gcd: Group Chat ID.
// fgid: where is event come from.
// ogid: my account ID. if server is group owner. if client is my.
// mgid: member account ID.
// id: Group Chat database Id.
// mid: member database Id.
pub(crate) async fn handle(
layer: &Arc<RwLock<Layer>>,
fgid: GroupId, // when as client, `fgid` is GROUP_ID
mgid: GroupId, // when as server, `mgid` is GROUP_ID
tgid: GroupId, // when as server, `tgid` is GROUP_ID
is_server: bool,
msg: RecvType,
) -> Result<HandleResult> {
@ -36,7 +43,7 @@ pub(crate) async fn handle( @@ -36,7 +43,7 @@ pub(crate) async fn handle(
match msg {
RecvType::Connect(addr, data) => {
// only server handle it.
// only server handle it. IMPORTANT !!! fgid IS mgid.
if !is_server {
let s = SendType::Result(0, addr, false, false, vec![]);
add_server_layer(&mut results, fgid, s);
@ -51,21 +58,21 @@ pub(crate) async fn handle( @@ -51,21 +58,21 @@ pub(crate) async fn handle(
// check is member.
let db = group_chat_db(&layer.read().await.base, &ogid)?;
if let Ok(mid) = Member::get_id(&db, &id, &fgid) {
if let Ok((mid, _)) = Member::get_id(&db, &id, &fgid) {
let res = LayerResult(gcd, height);
let data = bincode::serialize(&res).unwrap_or(vec![]);
let s = SendType::Result(0, addr, true, false, data);
add_server_layer(&mut results, fgid, s);
layer.write().await.running_mut(&gcd)?.check_add_online(
mgid,
fgid,
Online::Direct(addr),
id,
mid,
)?;
let _ = Member::addr_update(&db, &id, &fgid, &addr);
results.rpcs.push(rpc::member_online(mgid, id, fgid, addr));
results.rpcs.push(rpc::member_online(ogid, id, fgid, addr));
let new_data =
bincode::serialize(&LayerEvent::MemberOnline(gcd, fgid, addr))?;
@ -85,36 +92,38 @@ pub(crate) async fn handle( @@ -85,36 +92,38 @@ pub(crate) async fn handle(
}
}
RecvType::Leave(_addr) => {
// only server handle it.
// only server handle it. IMPORTANT !!! fgid IS mgid.
// TODO
}
RecvType::Result(addr, is_ok, data) => {
// only client handle it.
// only cleint handle it. IMPORTANT !!! tgid IS ogid.
if !is_server && is_ok {
let mut layer_lock = layer.write().await;
handle_connect(mgid, addr, data, &mut layer_lock, &mut results)?;
handle_connect(tgid, addr, data, &mut layer_lock, &mut results)?;
} else {
let msg = SendType::Result(0, addr, false, false, vec![]);
add_layer(&mut results, mgid, msg);
add_layer(&mut results, tgid, msg);
}
}
RecvType::ResultConnect(addr, data) => {
// only client handle it.
// only cleint handle it. IMPORTANT !!! tgid IS ogid.
if is_server {
let msg = SendType::Result(0, addr, false, false, vec![]);
add_layer(&mut results, mgid, msg);
add_layer(&mut results, tgid, msg);
}
let mut layer_lock = layer.write().await;
if handle_connect(mgid, addr, data, &mut layer_lock, &mut results)? {
if handle_connect(tgid, addr, data, &mut layer_lock, &mut results)? {
let msg = SendType::Result(0, addr, true, false, vec![]);
add_layer(&mut results, mgid, msg);
add_layer(&mut results, tgid, msg);
}
}
RecvType::Event(addr, bytes) => {
println!("----------- DEBUG GROUP CHAT: GOT LAYER EVENT");
// server & client handle it.
let event: LayerEvent = bincode::deserialize(&bytes)?;
handle_event(fgid, mgid, is_server, addr, event, layer, &mut results).await?;
handle_event(fgid, tgid, is_server, addr, event, layer, &mut results).await?;
println!("----------- DEBUG GROUP CHAT: OVER LAYER EVENT");
}
RecvType::Stream(_uid, _stream, _bytes) => {
// TODO stream
@ -128,7 +137,7 @@ pub(crate) async fn handle( @@ -128,7 +137,7 @@ pub(crate) async fn handle(
}
fn handle_connect(
mgid: GroupId,
ogid: GroupId,
addr: PeerAddr,
data: Vec<u8>,
layer: &mut Layer,
@ -138,7 +147,7 @@ fn handle_connect( @@ -138,7 +147,7 @@ fn handle_connect(
let LayerResult(gcd, height) = bincode::deserialize(&data)?;
// 1. check group.
let db = group_chat_db(layer.base(), &mgid)?;
let db = group_chat_db(layer.base(), &ogid)?;
if let Some(group) = GroupChat::get(&db, &gcd)? {
// 1.0 check address.
if group.g_addr != addr {
@ -147,7 +156,7 @@ fn handle_connect( @@ -147,7 +156,7 @@ fn handle_connect(
// 1.1 get session.
let session_some =
connect_session(layer.base(), &mgid, &SessionType::Group, &group.id, &addr)?;
connect_session(layer.base(), &ogid, &SessionType::Group, &group.id, &addr)?;
if session_some.is_none() {
return Ok(false);
}
@ -155,19 +164,19 @@ fn handle_connect( @@ -155,19 +164,19 @@ fn handle_connect(
// 1.2 online this group.
layer
.running_mut(&mgid)?
.running_mut(&ogid)?
.check_add_online(gcd, Online::Direct(addr), sid, group.id)?;
// 1.3 online to UI.
results.rpcs.push(session_connect(mgid, &sid, &addr));
results.rpcs.push(session_connect(ogid, &sid, &addr));
println!("will sync remote: {}, my: {}", height, group.height);
// 1.4 sync group height.
if group.height < height {
add_layer(results, mgid, sync(gcd, addr, group.height));
add_layer(results, ogid, sync(gcd, addr, group.height));
} else {
// sync online members.
add_layer(results, mgid, sync_online(gcd, addr));
add_layer(results, ogid, sync_online(gcd, addr));
}
Ok(true)
} else {
@ -175,16 +184,54 @@ fn handle_connect( @@ -175,16 +184,54 @@ fn handle_connect(
}
}
// variable statement:
// gcd: Group Chat ID.
// fgid: where is event come from.
// ogid: my account ID. if server is group owner. if client is my.
// mgid: member account ID.
// id: Group Chat database Id.
// mid: member database Id.
// sid: session Id.
async fn handle_event(
fgid: GroupId, // server use fgid is remote account.
mgid: GroupId, // client user mgid is my account.
tgid: GroupId, // client user tgid is my account.
is_server: bool,
addr: PeerAddr,
event: LayerEvent,
layer: &Arc<RwLock<Layer>>,
results: &mut HandleResult,
) -> Result<()> {
println!("Got event.......");
println!("Got event.......is server: {:?}", is_server);
let base = layer.read().await.base().clone();
let (sid, db, id, height, ogid, fgid) = if let Some(gcd) = event.gcd() {
if is_server {
let (ogid, height, id) = layer.read().await.running(gcd)?.owner_height_id();
println!("--- DEBUG server:--- online info ok");
let db = group_chat_db(&base, &ogid)?;
println!("--- DEBUG server:--- db ok");
(0, db, id, height, ogid, fgid)
} else {
let (sid, id) = if event.need_online() {
println!("--- DEBUG client: --- need online info start");
let (sid, id) = layer.read().await.get_running_remote_id(&tgid, gcd)?;
(sid, id)
} else {
println!("--- DEBUG client: --- not need online info");
(0, 0)
};
println!("--- DEBUG client:--- online info ok");
let db = group_chat_db(&base, &tgid)?;
println!("--- DEBUG client:--- db ok");
(sid, db, id, 0, tgid, *gcd)
}
} else {
println!("--- DEBUG --- no group id");
let db = group_chat_db(&base, &tgid)?;
(0, db, 0, 0, tgid, fgid)
};
println!("Handle variable statement ok.");
match event {
LayerEvent::Offline(gcd) => {
if is_server {
@ -193,82 +240,81 @@ async fn handle_event( @@ -193,82 +240,81 @@ async fn handle_event(
return Ok(());
}
// 2. offline this member.
let (ogid, _, id) = layer.read().await.running(&gcd)?.owner_height_id();
// 2. UI: offline the member.
results.rpcs.push(rpc::member_offline(ogid, id, fgid));
// 3. broadcast offline event.
let new_data = bincode::serialize(&LayerEvent::MemberOffline(gcd, fgid))?;
for (mid, maddr) in layer.read().await.running(&gcd)?.onlines() {
let s = SendType::Event(0, *maddr, new_data.clone());
add_layer(results, *mid, s);
}
broadcast(&LayerEvent::MemberOffline(gcd, fgid), layer, &gcd, results).await?;
} else {
let mut layer_lock = layer.write().await;
let (sid, _gid) = layer_lock.get_running_remote_id(&mgid, &gcd)?;
layer_lock.running_mut(&mgid)?.check_offline(&gcd, &addr);
drop(layer_lock);
results.rpcs.push(session_lost(mgid, &sid));
// 1. offline group chat.
layer
.write()
.await
.running_mut(&ogid)?
.check_offline(&gcd, &addr);
// 2. UI: offline the session.
results.rpcs.push(session_lost(ogid, &sid));
}
}
LayerEvent::Suspend(gcd) => {
let mut layer_lock = layer.write().await;
let (sid, _gid) = layer_lock.get_running_remote_id(&mgid, &gcd)?;
if layer_lock.running_mut(&mgid)?.suspend(&gcd, false, true)? {
results.rpcs.push(session_suspend(mgid, &sid));
// TODO if only client handle it ???
if layer
.write()
.await
.running_mut(&ogid)?
.suspend(&gcd, false, true)?
{
results.rpcs.push(session_suspend(ogid, &sid));
}
drop(layer_lock);
}
LayerEvent::Actived(gcd) => {
let mut layer_lock = layer.write().await;
let (sid, _gid) = layer_lock.get_running_remote_id(&mgid, &gcd)?;
let _ = layer_lock.running_mut(&mgid)?.active(&gcd, false);
drop(layer_lock);
results.rpcs.push(session_connect(mgid, &sid, &addr));
// TODO if only client handle it ???
let _ = layer.write().await.running_mut(&ogid)?.active(&gcd, false);
results.rpcs.push(session_connect(ogid, &sid, &addr));
}
LayerEvent::CheckResult(ct, supported) => {
// only client handle it.
println!("check: {:?}, supported: {:?}", ct, supported);
results.rpcs.push(rpc::create_check(mgid, ct, supported))
results.rpcs.push(rpc::create_check(ogid, ct, supported))
}
LayerEvent::CreateResult(gcd, ok) => {
// only client handle it.
println!("Create result: {}", ok);
if ok {
// get gc by gcd.
let db = group_chat_db(layer.read().await.base(), &mgid)?;
if let Some(mut gc) = GroupChat::get(&db, &gcd)? {
gc.ok(&db)?;
results.rpcs.push(rpc::create_result(mgid, gc.id, ok));
results.rpcs.push(rpc::create_result(ogid, gc.id, ok));
// ADD NEW SESSION.
let s_db = session_db(layer.read().await.base(), &mgid)?;
let s_db = session_db(&base, &ogid)?;
let mut session = gc.to_session();
session.insert(&s_db)?;
results.rpcs.push(session_create(mgid, &session));
results.rpcs.push(session_create(ogid, &session));
}
}
}
LayerEvent::Agree(gcd, info) => {
// only client handle it.
println!("Agree..........");
let base = layer.read().await.base.clone();
let db = group_chat_db(&base, &mgid)?;
let (rid, key) = Request::over(&db, &gcd, true)?;
// 1. add group chat.
let mut group = GroupChat::from_info(key, info, 0, addr, &base, &mgid, true)?;
let mut group = GroupChat::from_info(key, info, 0, addr, &base, &ogid, true)?;
group.insert(&db)?;
// 2. ADD NEW SESSION.
let s_db = session_db(&base, &mgid)?;
let s_db = session_db(&base, &ogid)?;
let mut session = group.to_session();
session.insert(&s_db)?;
results.rpcs.push(session_create(mgid, &session));
results.rpcs.push(session_create(ogid, &session));
// 3. update UI.
results
.rpcs
.push(rpc::request_handle(mgid, rid, true, false));
results.rpcs.push(rpc::group_create(mgid, group));
.push(rpc::request_handle(ogid, id, rid, true, false));
results.rpcs.push(rpc::group_create(ogid, group));
// 4. try connect.
let proof = layer
@ -277,39 +323,35 @@ async fn handle_event( @@ -277,39 +323,35 @@ async fn handle_event(
.group
.read()
.await
.prove_addr(&mgid, &addr)?;
add_layer(results, mgid, group_chat_conn(proof, addr, gcd));
.prove_addr(&ogid, &addr)?;
add_layer(results, ogid, group_chat_conn(proof, addr, gcd));
}
LayerEvent::Reject(gcd, efficacy) => {
// only client handle it.
println!("Reject..........");
let db = group_chat_db(layer.read().await.base(), &mgid)?;
let (rid, _key) = Request::over(&db, &gcd, true)?;
results
.rpcs
.push(rpc::request_handle(mgid, rid, false, efficacy));
}
LayerEvent::MemberOnline(gcd, mid, maddr) => {
let (_sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?;
let db = group_chat_db(layer.read().await.base(), &mgid)?;
let _ = Member::addr_update(&db, &gid, &mid, &maddr);
results.rpcs.push(rpc::member_online(mgid, gid, mid, maddr));
}
LayerEvent::MemberOffline(gcd, mid) => {
let (_sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?;
results.rpcs.push(rpc::member_offline(mgid, gid, mid));
}
LayerEvent::MemberOnlineSyncResult(gcd, onlines) => {
let (_sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?;
for (mid, maddr) in onlines {
results.rpcs.push(rpc::member_online(mgid, gid, mid, maddr));
.push(rpc::request_handle(ogid, id, rid, false, efficacy));
}
LayerEvent::MemberOnline(_gcd, mgid, maddr) => {
// only client handle it.
let _ = Member::addr_update(&db, &id, &mgid, &maddr);
results.rpcs.push(rpc::member_online(ogid, id, mgid, maddr));
}
LayerEvent::MemberOffline(_gcd, mgid) => {
// only client handle it.
results.rpcs.push(rpc::member_offline(ogid, id, mgid));
}
LayerEvent::MemberOnlineSyncResult(_gcd, onlines) => {
// only client handle it.
for (mgid, maddr) in onlines {
results.rpcs.push(rpc::member_online(ogid, id, mgid, maddr));
}
}
LayerEvent::Sync(gcd, height, event) => {
let (_sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?;
println!("Sync: height: {}", height);
let base = layer.read().await.base().clone();
let db = group_chat_db(&base, &mgid)?;
// all server & client handle it.
println!("Sync: handle height: {}", height);
match event {
Event::GroupInfo => {}
@ -317,90 +359,115 @@ async fn handle_event( @@ -317,90 +359,115 @@ async fn handle_event(
Event::GroupManagerAdd => {}
Event::GroupManagerDel => {}
Event::GroupClose => {}
Event::MemberInfo(mid, maddr, mname, mavatar) => {
let id = Member::get_id(&db, &gid, &mid)?;
Member::update(&db, &id, &maddr, &mname)?;
if mavatar.len() > 0 {
write_avatar_sync(&base, &mgid, &mid, mavatar)?;
}
results.rpcs.push(rpc::member_info(mgid, id, maddr, mname));
}
Event::MemberJoin(mid, maddr, mname, mavatar, mtime) => {
if Member::get_id(&db, &gid, &mid).is_err() {
let mut member = Member::new(gid, mid, maddr, mname, false, mtime);
Event::MemberJoin(mgid, maddr, mname, mavatar, mtime) => {
// only client handle it.
if Member::get_id(&db, &id, &mgid).is_err() {
let mut member = Member::new(id, mgid, maddr, mname, false, mtime);
member.insert(&db)?;
if mavatar.len() > 0 {
write_avatar_sync(&base, &mgid, &mid, mavatar)?;
write_avatar_sync(&base, &ogid, &mgid, mavatar)?;
}
results.rpcs.push(rpc::member_join(mgid, member));
results.rpcs.push(rpc::member_join(ogid, &member));
}
// save consensus.
GroupChat::add_height(&db, id, height)?;
}
Event::MemberLeave(mid) => {
let id = Member::get_id(&db, &gid, &mid)?;
Member::leave(&db, &id)?;
Event::MemberInfo(mgid, maddr, mname, mavatar) => {
// TOOD server & client all handlt it.
let (mid, _) = Member::get_id(&db, &id, &mgid)?;
Member::update(&db, &mid, &maddr, &mname)?;
if mavatar.len() > 0 {
write_avatar_sync(&base, &ogid, &mgid, mavatar)?;
}
results
.rpcs
.push(rpc::member_info(ogid, id, mid, maddr, mname));
// save consensus.
GroupChat::add_height(&db, id, height)?;
}
Event::MemberLeave(mgid) => {
// TODO server & client all handle it.
let (mid, _) = Member::get_id(&db, &id, &mgid)?;
Member::leave(&db, &mid)?;
// check mid is my chat friend. if not, delete avatar.
let s_db = chat_db(&base, &mgid)?;
if Friend::get(&s_db, &mid)?.is_none() {
let _ = delete_avatar(&base, &mgid, &mid).await;
if Friend::get(&s_db, &mgid)?.is_none() {
let _ = delete_avatar(&base, &ogid, &mgid).await;
}
results.rpcs.push(rpc::member_leave(mgid, id));
results.rpcs.push(rpc::member_leave(ogid, id, mid));
// save consensus.
GroupChat::add_height(&db, id, height)?;
}
Event::MessageCreate(mid, nmsg, mtime) => {
Event::MessageCreate(mgid, nmsg, mtime) => {
// server & client all handle it.
println!("Sync: create message start");
let base = layer.read().await.base.clone();
let (mid, _) = Member::get_id(&db, &id, &mgid)?;
let new_height = if is_server {
let new_e = Event::MessageCreate(mgid, nmsg.clone(), mtime);
let height = layer.write().await.running_mut(&gcd)?.increased();
Consensus::insert(&db, &id, &height, &mid, &ConsensusType::MessageCreate)?;
broadcast(&LayerEvent::Sync(gcd, height, new_e), layer, &gcd, results)
.await?;
height
} else {
height
};
GroupChat::add_height(&db, id, new_height)?;
let (msg, scontent) =
from_network_message(height, gid, mid, &mgid, nmsg, mtime, &base)?;
results.rpcs.push(rpc::message_create(mgid, &msg));
from_network_message(new_height, id, mgid, &ogid, nmsg, mtime, &base)?;
results.rpcs.push(rpc::message_create(ogid, &msg));
println!("Sync: create message ok");
// UPDATE SESSION.
let s_db = session_db(&base, &mgid)?;
if let Ok(id) = Session::last(
let s_db = session_db(&base, &ogid)?;
if let Ok(sid) = Session::last(
&s_db,
&gid,
&id,
&SessionType::Group,
&msg.datetime,
&scontent,
true,
) {
results
.rpcs
.push(session_last(mgid, &id, &msg.datetime, &scontent, false));
results.rpcs.push(session_last(
ogid,
&sid,
&msg.datetime,
&scontent,
false,
));
}
}
}
// save event.
GroupChat::add_height(&db, gid, height)?;
}
LayerEvent::Packed(gcd, height, from, to, events) => {
let (_sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?;
// only client handle it.
if to >= height {
// when last packed sync, start sync online members.
add_layer(results, mgid, sync_online(gcd, addr));
add_layer(results, ogid, sync_online(gcd, addr));
}
println!("Start handle sync packed... {}, {}, {}", height, from, to);
let base = layer.read().await.base().clone();
handle_sync(
mgid, gid, gcd, addr, height, from, to, events, base, results,
&db, ogid, id, gcd, addr, height, from, to, events, base, results,
)?;
}
LayerEvent::RequestHandle(gcd, rgid, raddr, join_proof, rid, time) => {
let (_sid, gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?;
LayerEvent::RequestHandle(_gcd, rgid, raddr, join_proof, rid, time) => {
// only client handle it.
match join_proof {
JoinProof::Invite(i, _proof, mname, mavatar) => {
let mut req =
Request::new_by_remote(gid, rid, rgid, raddr, mname, i.to_hex(), time);
let base = layer.read().await.base().clone();
let db = group_chat_db(&base, &mgid)?;
Request::new_by_remote(id, rid, rgid, raddr, mname, i.to_hex(), time);
req.insert(&db)?;
if mavatar.len() > 0 {
write_avatar_sync(&base, &mgid, &rgid, mavatar)?;
write_avatar_sync(&base, &ogid, &rgid, mavatar)?;
}
results.rpcs.push(rpc::request_create(mgid, &req));
results.rpcs.push(rpc::request_create(ogid, &req));
}
JoinProof::Zkp(_proof) => {
//
@ -408,19 +475,31 @@ async fn handle_event( @@ -408,19 +475,31 @@ async fn handle_event(
JoinProof::Open(..) => {} // nerver here.
}
}
LayerEvent::RequestResult(gcd, rid, ok) => {
let (_sid, _gid) = layer.read().await.get_running_remote_id(&mgid, &gcd)?;
let db = group_chat_db(layer.read().await.base(), &mgid)?;
let id = Request::over_rid(&db, &gcd, &rid, ok)?;
results.rpcs.push(rpc::request_handle(mgid, id, ok, false));
LayerEvent::RequestResult(_gcd, rrid, ok) => {
// only client handle it.
let rid = Request::over_rid(&db, &id, &rrid, ok)?;
results
.rpcs
.push(rpc::request_handle(ogid, id, rid, ok, false));
}
LayerEvent::MemberOnlineSync(..) => {
// TODO
LayerEvent::MemberOnlineSync(gcd) => {
// only server handle it.
let onlines = layer
.read()
.await
.running(&gcd)?
.onlines()
.iter()
.map(|(g, a)| (**g, **a))
.collect();
let event = LayerEvent::MemberOnlineSyncResult(gcd, onlines);
let data = bincode::serialize(&event).unwrap_or(vec![]);
let s = SendType::Event(0, addr, data);
add_server_layer(results, fgid, s);
}
LayerEvent::Request(gcd, join_proof) => {
let (ogid, height, id) = layer.read().await.running(&gcd)?.owner_height_id();
let base = layer.read().await.base.clone();
let db = group_chat_db(&base, &ogid)?;
// only server handle it.
println!("----------- PRINTLN GROUP CHAT: GOT REQUEST: {}", id);
let group = GroupChat::get_id(&db, &id)?.ok_or(anyhow!("missing group"))?;
// 1. check account is online, if not online, nothing.
@ -458,58 +537,87 @@ async fn handle_event( @@ -458,58 +537,87 @@ async fn handle_event(
}
}
JoinProof::Invite(invite_gid, proof, mname, mavatar) => {
println!("----------- PRINTLN GROUP CHAT: GOT REQUEST INVITE: {}", id);
// check is member.
if Member::get_id(&db, &id, &fgid).is_ok() {
let s = agree(&base, &ogid, &gcd, group, addr).await;
add_server_layer(results, fgid, s);
println!("----------- PRINTLN GROUP CHAT: GOT REQUEST HAD MEMBER");
return Ok(());
}
// TODO check if request had or is blocked by manager.
// check if inviter is member.
if Member::get_id(&db, &id, &invite_gid).is_err() {
let inv_mid = Member::get_id(&db, &id, &invite_gid);
if inv_mid.is_err() {
add_server_layer(results, fgid, reject(gcd, addr, false));
println!("----------- PRINTLN GROUP CHAT: Inviter not exists");
return Ok(());
}
// TODO check proof.
// proof.verify(&invite_gid, &addr, &layer.addr)?;
// if group.is_need_agree {
// if !Member::is_manager(fid, &invite_gid).await? {
// let mut request = Request::new();
// request.insert().await?;
// self.broadcast_request(
// &gcd,
// request,
// JoinProof::Invite(invite_gid, proof, mname, mavatar),
// results,
// );
// return Ok(());
// }
// }
//let mut m = Member::new(*fid, fmid, addr, mname, false);
//m.insert().await?;
//proof.verify(&invite_gid, &addr, &layer.addr)?;
if group.is_need_agree {
let (_inv_id, inv_is_manager) = inv_mid.unwrap();
if !inv_is_manager {
// let mut request = Request::new();
// request.insert().await?;
// self.broadcast_request(
// &gcd,
// request,
// JoinProof::Invite(invite_gid, proof, mname, mavatar),
// results,
// );
return Ok(());
}
}
let mut m = Member::new_notime(id, fgid, addr, mname, false);
m.insert(&db)?;
// save avatar.
//let _ = write_avatar(&self.base, &gcd, &m.m_id, &mavatar).await;
let _ = write_avatar(&base, &ogid, &m.m_id, &mavatar).await;
//self.add_member(&gcd, fmid, addr);
//self.broadcast_join(&gcd, m, mavatar, results).await?;
// add consensuse and storage.
let height = layer.write().await.running_mut(&gcd)?.increased();
Consensus::insert(&db, &id, &height, &m.id, &ConsensusType::MemberJoin)?;
GroupChat::add_height(&db, id, height)?;
// UI: update.
results.rpcs.push(rpc::member_join(fgid, &m));
// broadcast join event.
let event = Event::MemberJoin(m.m_id, m.m_addr, m.m_name, mavatar, m.datetime);
broadcast(&LayerEvent::Sync(gcd, height, event), layer, &gcd, results).await?;
// return join result.
let s = agree(&base, &ogid, &gcd, group, addr).await;
add_server_layer(results, fgid, s);
println!("----------- PRINTLN GROUP CHAT: GOT REQUEST INVITE OVER");
}
JoinProof::Zkp(_proof) => {
// TOOD zkp join.
}
}
}
LayerEvent::SyncReq(..) => {
// TODO
LayerEvent::SyncReq(gcd, from) => {
// only server handle it.
println!("Got sync request. height: {} from: {}", height, from);
if height >= from {
let to = if height - from > 100 {
from + 100
} else {
height
};
let packed = Consensus::pack(&db, &base, &gcd, &id, &from, &to).await?;
let event = LayerEvent::Packed(gcd, height, from, to, packed);
let data = bincode::serialize(&event).unwrap_or(vec![]);
let s = SendType::Event(0, addr, data);
add_server_layer(results, fgid, s);
println!("Sended sync request results. from: {}, to: {}", from, to);
}
}
LayerEvent::Check => {} // nerver here.
LayerEvent::Create(..) => {} // nerver here.
@ -518,6 +626,23 @@ async fn handle_event( @@ -518,6 +626,23 @@ async fn handle_event(
Ok(())
}
async fn broadcast(
event: &LayerEvent,
layer: &Arc<RwLock<Layer>>,
gcd: &GroupId,
results: &mut HandleResult,
) -> Result<()> {
let new_data = bincode::serialize(&event)?;
for (mgid, maddr) in layer.read().await.running(&gcd)?.onlines() {
let s = SendType::Event(0, *maddr, new_data.clone());
add_server_layer(results, *mgid, s);
println!("--- DEBUG broadcast to: {:?}", mgid);
}
Ok(())
}
pub(crate) fn group_chat_conn(proof: Proof, addr: PeerAddr, gid: GroupId) -> SendType {
let data =
bincode::serialize(&LayerConnect(gid, ConnectProof::Common(proof))).unwrap_or(vec![]);
@ -554,42 +679,10 @@ fn reject(gcd: GroupId, addr: PeerAddr, lost: bool) -> SendType { @@ -554,42 +679,10 @@ fn reject(gcd: GroupId, addr: PeerAddr, lost: bool) -> SendType {
SendType::Event(0, addr, d)
}
// fn broadcast_join(
// gcd: &GroupId,
// member: Member,
// avatar: Vec<u8>,
// results: &mut HandleResult,
// ) -> Result<()> {
// println!("start broadcast join...");
// let height = self
// .add_height(gcd, &member.id, ConsensusType::MemberJoin)
// .await?;
// let datetime = member.datetime;
// let event = Event::MemberJoin(
// member.m_id,
// member.m_addr,
// member.m_name,
// avatar,
// member.datetime,
// );
// let new_data = bincode::serialize(&LayerEvent::Sync(*gcd, height, event)).unwrap_or(vec![]);
// if let Some((members, _, _)) = self.groups.get(gcd) {
// for (mid, maddr, _) in members {
// let s = SendType::Event(0, *maddr, new_data.clone());
// add_layer(results, *mid, s);
// }
// }
// println!("over broadcast join...");
// Ok(())
// }
fn handle_sync(
mgid: GroupId,
fid: i64,
db: &DStorage,
ogid: GroupId,
id: i64,
gcd: GroupId,
addr: PeerAddr,
height: i64,
@ -599,29 +692,27 @@ fn handle_sync( @@ -599,29 +692,27 @@ fn handle_sync(
base: PathBuf,
results: &mut HandleResult,
) -> Result<()> {
let db = group_chat_db(&base, &mgid)?;
let mut last_scontent: Option<(String, i64)> = None;
for event in events {
if let Ok(scontent) = handle_sync_event(&mgid, &fid, from, event, &base, &db, results) {
if let Ok(scontent) = handle_sync_event(&ogid, &id, from, event, &base, db, results) {
last_scontent = scontent;
}
from += 1;
}
if to < height {
add_layer(results, mgid, sync(gcd, addr, to + 1));
add_layer(results, ogid, sync(gcd, addr, to + 1));
}
// update group chat height.
GroupChat::add_height(&db, fid, to)?;
GroupChat::add_height(db, id, to)?;
// UPDATE SESSION.
if let Some((sc, t)) = last_scontent {
let s_db = session_db(&base, &mgid)?;
if let Ok(id) = Session::last(&s_db, &fid, &SessionType::Group, &t, &sc, true) {
results.rpcs.push(session_last(mgid, &id, &t, &sc, false));
let s_db = session_db(&base, &ogid)?;
if let Ok(sid) = Session::last(&s_db, &id, &SessionType::Group, &t, &sc, true) {
results.rpcs.push(session_last(ogid, &sid, &t, &sc, false));
}
}
@ -629,7 +720,7 @@ fn handle_sync( @@ -629,7 +720,7 @@ fn handle_sync(
}
fn handle_sync_event(
mgid: &GroupId,
ogid: &GroupId,
fid: &i64,
height: i64,
event: PackedEvent,
@ -665,11 +756,11 @@ fn handle_sync_event( @@ -665,11 +756,11 @@ fn handle_sync_event(
PackedEvent::MemberJoin(mid, maddr, mname, mavatar, mtime) => {
if Member::get_id(db, fid, &mid).is_err() {
if mavatar.len() > 0 {
write_avatar_sync(&base, &mgid, &mid, mavatar)?;
write_avatar_sync(&base, &ogid, &mid, mavatar)?;
}
let mut member = Member::new(*fid, mid, maddr, mname, false, mtime);
member.insert(&db)?;
results.rpcs.push(rpc::member_join(*mgid, member));
results.rpcs.push(rpc::member_join(*ogid, &member));
}
None
}
@ -678,8 +769,8 @@ fn handle_sync_event( @@ -678,8 +769,8 @@ fn handle_sync_event(
None
}
PackedEvent::MessageCreate(mid, nmsg, time) => {
let (msg, scontent) = from_network_message(height, *fid, mid, mgid, nmsg, time, base)?;
results.rpcs.push(rpc::message_create(*mgid, &msg));
let (msg, scontent) = from_network_message(height, *fid, mid, ogid, nmsg, time, base)?;
results.rpcs.push(rpc::message_create(*ogid, &msg));
Some((scontent, time))
}
PackedEvent::None => None,

2
src/apps/group_chat/mod.rs

@ -1,4 +1,3 @@ @@ -1,4 +1,3 @@
mod common;
mod layer;
mod models;
@ -21,4 +20,5 @@ pub(crate) mod rpc; @@ -21,4 +20,5 @@ pub(crate) mod rpc;
pub(crate) use layer::group_chat_conn;
pub(crate) use layer::handle as layer_handle;
pub(crate) use models::GroupChat;
pub(crate) use models::Member;
pub(crate) use rpc::new_rpc_handler;

10
src/apps/group_chat/models/consensus.rs

@ -140,20 +140,20 @@ impl Consensus { @@ -140,20 +140,20 @@ impl Consensus {
Ok(packed)
}
pub async fn insert(
pub fn insert(
db: &DStorage,
fid: &i64,
height: &i64,
cid: &i64,
ctype: &ConsensusType,
) -> Result<()> {
let unique_check = db.query(&format!(
let mut unique_check = db.query(&format!(
"SELECT id from consensus WHERE fid = {} AND height = {}",
fid, height
));
))?;
if let Ok(mut rec) = unique_check {
let id = rec.pop().unwrap().pop().unwrap().as_i64();
if unique_check.len() > 0 {
let id = unique_check.pop().unwrap().pop().unwrap().as_i64();
let _ = db.query(&format!(
"UPDATE consensus SET ctype = {}, cid = {} WHERE id = {}",
ctype.to_i64(),

2
src/apps/group_chat/models/group.rs

@ -38,7 +38,7 @@ pub(crate) struct GroupChat { @@ -38,7 +38,7 @@ pub(crate) struct GroupChat {
/// group chat is closed.
is_closed: bool,
/// group chat need manager agree.
is_need_agree: bool,
pub is_need_agree: bool,
/// group chat encrypted-key.
pub key: GroupChatKey,
/// group chat created time.

40
src/apps/group_chat/models/member.rs

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
use rand::Rng;
use std::time::{SystemTime, UNIX_EPOCH};
use tdn::types::{
group::GroupId,
primitive::{PeerAddr, Result},
@ -9,7 +10,7 @@ use tdn_storage::local::{DStorage, DsValue}; @@ -9,7 +10,7 @@ use tdn_storage::local::{DStorage, DsValue};
/// Group Member Model.
pub(crate) struct Member {
/// db auto-increment id.
id: i64,
pub id: i64,
/// group's db id.
fid: i64,
/// member's Did(GroupId)
@ -29,6 +30,32 @@ pub(crate) struct Member { @@ -29,6 +30,32 @@ pub(crate) struct Member {
}
impl Member {
pub fn new_notime(
fid: i64,
m_id: GroupId,
m_addr: PeerAddr,
m_name: String,
is_manager: bool,
) -> Self {
let start = SystemTime::now();
let datetime = start
.duration_since(UNIX_EPOCH)
.map(|s| s.as_secs())
.unwrap_or(0) as i64; // safe for all life.
Self {
fid,
m_id,
m_addr,
m_name,
is_manager,
datetime,
id: 0,
is_block: false,
is_deleted: false,
}
}
pub fn new(
fid: i64,
m_id: GroupId,
@ -127,7 +154,7 @@ impl Member { @@ -127,7 +154,7 @@ impl Member {
pub fn get(db: &DStorage, id: &i64) -> Result<Member> {
let mut matrix = db.query(&format!(
"SELECT id, fid, m_id, m_addr, m_name, is_manager, is_block, datetime FROM members WHERE id = {}",
"SELECT id, fid, mid, addr, name, is_manager, is_block, datetime FROM members WHERE id = {}",
id,
))?;
if matrix.len() > 0 {
@ -137,14 +164,17 @@ impl Member { @@ -137,14 +164,17 @@ impl Member {
}
}
pub fn get_id(db: &DStorage, fid: &i64, mid: &GroupId) -> Result<i64> {
pub fn get_id(db: &DStorage, fid: &i64, mid: &GroupId) -> Result<(i64, bool)> {
let mut matrix = db.query(&format!(
"SELECT id FROM members WHERE fid = {} AND mid = '{}' AND is_delete = false",
"SELECT id, is_manager FROM members WHERE fid = {} AND mid = '{}' AND is_deleted = false",
fid,
mid.to_hex()
))?;
if matrix.len() > 0 {
Ok(matrix.pop().unwrap().pop().unwrap().as_i64()) // safe unwrap.
let mut values = matrix.pop().unwrap();
let is_manager = values.pop().unwrap().as_bool(); // safe unwrap.
let id = values.pop().unwrap().as_i64(); // safe unwrap.
Ok((id, is_manager)) // safe unwrap.
} else {
Err(anyhow!("missing member"))
}

2
src/apps/group_chat/models/message.rs

@ -21,7 +21,7 @@ use super::Member; @@ -21,7 +21,7 @@ use super::Member;
/// Group Chat Message Model.
pub(crate) struct Message {
/// db auto-increment id.
id: i64,
pub id: i64,
/// group message consensus height.
height: i64,
/// group's db id.

7
src/apps/group_chat/models/request.rs

@ -163,11 +163,10 @@ impl Request { @@ -163,11 +163,10 @@ impl Request {
}
}
pub fn over_rid(db: &DStorage, gcd: &GroupId, rid: &i64, is_ok: bool) -> Result<i64> {
pub fn over_rid(db: &DStorage, gid: &i64, rid: &i64, is_ok: bool) -> Result<i64> {
let mut matrix = db.query(&format!(
"SELECT id from requests WHERE gid = '{}' AND rid = {} AND is_over = 0",
gcd.to_hex(),
rid
"SELECT id from requests WHERE fid = {} AND rid = {} AND is_over = 0",
gid, rid
))?;
if matrix.len() == 0 {
return Err(anyhow!("request is missing"));

95
src/apps/group_chat/rpc.rs

@ -15,8 +15,9 @@ use crate::session::{Session, SessionType}; @@ -15,8 +15,9 @@ use crate::session::{Session, SessionType};
use crate::storage::{chat_db, group_chat_db, read_avatar, session_db, write_avatar};
use super::add_layer;
use super::common::handle_event;
use super::models::{to_network_message, GroupChat, GroupChatKey, Member, Message, Request};
use super::models::{
to_network_message, Consensus, ConsensusType, GroupChat, GroupChatKey, Member, Message, Request,
};
#[inline]
pub(crate) fn create_check(mgid: GroupId, ct: CheckType, supported: Vec<GroupType>) -> RpcParam {
@ -40,26 +41,43 @@ pub(crate) fn request_create(mgid: GroupId, req: &Request) -> RpcParam { @@ -40,26 +41,43 @@ pub(crate) fn request_create(mgid: GroupId, req: &Request) -> RpcParam {
}
#[inline]
pub(crate) fn request_handle(mgid: GroupId, id: i64, ok: bool, efficacy: bool) -> RpcParam {
rpc_response(0, "group-chat-join-handle", json!([id, ok, efficacy]), mgid)
pub(crate) fn request_handle(
mgid: GroupId,
id: i64,
rid: i64,
ok: bool,
efficacy: bool,
) -> RpcParam {
rpc_response(
0,
"group-chat-join-handle",
json!([id, rid, ok, efficacy]),
mgid,
)
}
#[inline]
pub(crate) fn member_join(mgid: GroupId, member: Member) -> RpcParam {
pub(crate) fn member_join(mgid: GroupId, member: &Member) -> RpcParam {
rpc_response(0, "group-chat-member-join", json!(member.to_rpc()), mgid)
}
#[inline]
pub(crate) fn member_leave(mgid: GroupId, id: i64) -> RpcParam {
rpc_response(0, "group-chat-member-leave", json!([id]), mgid)
pub(crate) fn member_leave(mgid: GroupId, id: i64, mid: i64) -> RpcParam {
rpc_response(0, "group-chat-member-leave", json!([id, mid]), mgid)
}
#[inline]
pub(crate) fn member_info(mgid: GroupId, id: i64, addr: PeerAddr, name: String) -> RpcParam {
pub(crate) fn member_info(
mgid: GroupId,
id: i64,
mid: i64,
addr: PeerAddr,
name: String,
) -> RpcParam {
rpc_response(
0,
"group-chat-member-info",
json!([id, addr.to_hex(), name]),
json!([id, mid, addr.to_hex(), name]),
mgid,
)
}
@ -211,7 +229,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -211,7 +229,6 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
// save db
let me = state.group.read().await.clone_user(&gid)?;
gc.insert(&db)?;
Member::new(gc.id, gid, me.addr, me.name, true, gc.datetime).insert(&db)?;
// save avatar
let _ = write_avatar(&base, &gid, &gcd, &avatar_bytes).await;
@ -229,18 +246,25 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -229,18 +246,25 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
let s = SendType::Event(0, addr, data);
add_layer(&mut results, gid, s);
} else {
let mut m = Member::new(gc.id, gid, me.addr, me.name, true, gc.datetime);
m.insert(&db)?;
let _ = write_avatar(&base, &gid, &gid, &me.avatar).await;
// ADD NEW SESSION.
let s_db = session_db(state.layer.read().await.base(), &gid)?;
let mut session = gc.to_session();
session.insert(&s_db)?;
results.rpcs.push(session_create(gid, &session));
// Add frist member join.
let mut layer_lock = state.layer.write().await;
layer_lock.add_running(&gcd, gid, gdid, gheight)?;
let height = layer_lock.running_mut(&gcd)?.increased();
drop(layer_lock);
Consensus::insert(&db, &gdid, &height, &m.id, &ConsensusType::MemberJoin)?;
GroupChat::add_height(&db, gdid, height)?;
// online local group.
state
.layer
.write()
.await
.add_running(&gcd, gid, gdid, gheight)?;
results.networks.push(NetworkType::AddGroup(gcd));
}
@ -443,43 +467,18 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -443,43 +467,18 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
"group-chat-message-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let gcd = GroupId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let id = params[1].as_i64().ok_or(RpcError::ParseError)?;
let is_remote = params[2].as_bool().ok_or(RpcError::ParseError)?;
let m_type = MessageType::from_int(params[3].as_i64().ok_or(RpcError::ParseError)?);
let m_content = params[4].as_str().ok_or(RpcError::ParseError)?;
let m_type = MessageType::from_int(params[1].as_i64().ok_or(RpcError::ParseError)?);
let m_content = params[2].as_str().ok_or(RpcError::ParseError)?;
let addr = state.layer.read().await.running(&gid)?.online(&gcd)?;
let mut results = HandleResult::new();
let base = state.group.read().await.base().clone();
let (nmsg, datetime) = to_network_message(&base, &gid, m_type, m_content).await?;
let event = Event::MessageCreate(gid, nmsg, datetime);
let mut results = HandleResult::new();
if is_remote {
let addr = state.layer.read().await.running(&gid)?.online(&gcd)?;
let data = bincode::serialize(&LayerEvent::Sync(gcd, 0, event))?;
let msg = SendType::Event(0, addr, data);
add_layer(&mut results, gid, msg);
} else {
let db = group_chat_db(&base, &gid)?;
// 1. increase the consensus height in running layer.
let height = state.layer.write().await.running_mut(&gcd)?.increased();
// 2. update group chat height.
GroupChat::add_height(&db, id, height)?;
// 3. broadcast event bytes.
let event = LayerEvent::Sync(gcd, height, event);
let new_data = bincode::serialize(&event)?;
// 4. handle event.
handle_event(db, base, id, gid, event, &mut results).await?;
// 5. broadcast event.
for (mid, maddr) in state.layer.read().await.running(&gcd)?.onlines() {
let s = SendType::Event(0, *maddr, new_data.clone());
add_layer(&mut results, *mid, s);
}
}
let data = bincode::serialize(&LayerEvent::Sync(gcd, 0, event))?;
let msg = SendType::Event(0, addr, data);
add_layer(&mut results, gid, msg);
Ok(results)
},

2
src/layer.rs

@ -114,6 +114,7 @@ impl Layer { @@ -114,6 +114,7 @@ impl Layer {
}
pub fn get_running_remote_id(&self, mgid: &GroupId, fgid: &GroupId) -> Result<(i64, i64)> {
println!("onlines: {:?}, find: {:?}", self.runnings.keys(), mgid);
self.running(mgid)?.get_online_id(fgid)
}
@ -281,6 +282,7 @@ impl RunningLayer { @@ -281,6 +282,7 @@ impl RunningLayer {
}
pub fn get_online_id(&self, gid: &GroupId) -> Result<(i64, i64)> {
println!("onlines: {:?}, find: {:?}", self.sessions.keys(), gid);
self.sessions
.get(gid)
.map(|online| (online.db_id, online.db_fid))

52
src/rpc.rs

@ -14,11 +14,11 @@ use tokio::sync::{ @@ -14,11 +14,11 @@ use tokio::sync::{
use crate::apps::app_rpc_inject;
use crate::apps::chat::chat_conn;
use crate::apps::group_chat::{add_layer, group_chat_conn, GroupChat};
use crate::apps::group_chat::{add_layer, group_chat_conn, GroupChat, Member};
use crate::event::InnerEvent;
use crate::group::Group;
use crate::layer::{Layer, LayerEvent};
use crate::session::{Session, SessionType};
use crate::layer::{Layer, LayerEvent, Online};
use crate::session::{connect_session, Session, SessionType};
use crate::storage::{group_chat_db, session_db};
pub(crate) fn init_rpc(
@ -360,29 +360,55 @@ fn new_rpc_handler( @@ -360,29 +360,55 @@ fn new_rpc_handler(
handler.add_method(
"account-login",
|_gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let gid = GroupId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let ogid = GroupId::from_hex(params[0].as_str().ok_or(RpcError::ParseError)?)?;
let me_lock = params[1].as_str().ok_or(RpcError::ParseError)?;
let mut results = HandleResult::rpc(json!([gid.to_hex()]));
let mut results = HandleResult::rpc(json!([ogid.to_hex()]));
let id = state.group.write().await.add_running(&gid, me_lock)?;
let id = state.group.write().await.add_running(&ogid, me_lock)?;
// add AddGroup to TDN.
results.networks.push(NetworkType::AddGroup(gid));
results.networks.push(NetworkType::AddGroup(ogid));
let mut layer_lock = state.layer.write().await;
layer_lock.add_running(&gid, gid, id, 0)?; // TODO account current state height.
layer_lock.add_running(&ogid, ogid, id, 0)?; // TODO account current state height.
// load all services layer created by this account.
// 1. group chat.
let group_db = group_chat_db(&layer_lock.base, &gid)?;
let group_chats = GroupChat::all_local(&group_db, &gid)?;
for (id, gcd, gheight) in group_chats {
layer_lock.add_running(&gcd, gid, id, gheight)?;
let self_addr = layer_lock.addr.clone();
let group_db = group_chat_db(&layer_lock.base, &ogid)?;
let group_chats = GroupChat::all_local(&group_db, &ogid)?;
for (gid, gcd, gheight) in group_chats {
layer_lock.add_running(&gcd, ogid, gid, gheight)?;
results.networks.push(NetworkType::AddGroup(gcd));
// 2. online self-hold owner to group.
let (mid, _) = Member::get_id(&group_db, &gid, &ogid)?;
layer_lock.running_mut(&gcd)?.check_add_online(
ogid,
Online::Direct(self_addr),
gid,
mid,
)?;
// 3. online group to self group onlines.
if let Some(session) = connect_session(
&layer_lock.base,
&ogid,
&SessionType::Group,
&gid,
&self_addr,
)? {
layer_lock.running_mut(&ogid)?.check_add_online(
gcd,
Online::Direct(self_addr),
session.id,
gid,
)?;
}
}
drop(layer_lock);
debug!("Account Logined: {}.", gid.to_hex());
debug!("Account Logined: {}.", ogid.to_hex());
Ok(results)
},

2
src/server.rs

@ -273,7 +273,7 @@ pub fn init_log(mut db_path: PathBuf) { @@ -273,7 +273,7 @@ pub fn init_log(mut db_path: PathBuf) {
#[cfg(not(debug_assertions))]
CombinedLogger::init(vec![simplelog::WriteLogger::new(
LevelFilter::Debug,
LevelFilter::Info,
LogConfig::default(),
std::fs::File::create(db_path).unwrap(),
)])

Loading…
Cancel
Save