Browse Source

group chat message send and receive

pull/18/head
Sun 4 years ago
parent
commit
373b59fb42
  1. 2
      lib/apps/chat/provider.dart
  2. 2
      lib/apps/group_chat/detail.dart
  3. 6
      lib/apps/group_chat/models.dart
  4. 39
      lib/apps/group_chat/provider.dart
  5. 36
      pubspec.lock
  6. 8
      src/apps/chat/layer.rs
  7. 4
      src/apps/chat/rpc.rs
  8. 12
      src/apps/group_chat/layer.rs
  9. 141
      src/apps/group_chat/models.rs
  10. 43
      src/apps/group_chat/rpc.rs

2
lib/apps/chat/provider.dart

@ -215,10 +215,12 @@ class ChatProvider extends ChangeNotifier { @@ -215,10 +215,12 @@ class ChatProvider extends ChangeNotifier {
_friendOffline(List params) {
final id = params[0];
if (this.friends.containsKey(id)) {
if (this.friends[id].gid == params[1]) {
this.friends[id].online = false;
notifyListeners();
}
}
}
_friendInfo(List params) {
final id = params[0];

2
lib/apps/group_chat/detail.dart

@ -76,7 +76,7 @@ class _GroupChatDetailState extends State<GroupChatDetail> { @@ -76,7 +76,7 @@ class _GroupChatDetailState extends State<GroupChatDetail> {
return;
}
//context.read<GroupChatProvider>().messageCreate(Message(group.id, MessageType.String, textController.text));
context.read<GroupChatProvider>().messageCreate(MessageType.String, textController.text);
setState(() {
textController.text = '';
textFocus.requestFocus();

6
lib/apps/group_chat/models.dart

@ -122,6 +122,12 @@ class GroupChat { @@ -122,6 +122,12 @@ class GroupChat {
hasNew: !this.lastReaded,
);
}
updateLastMessage(Message msg, bool isReaded) {
this.lastTime = msg.time;
this.lastContent = msg.shortShow();
this.lastReaded = isReaded;
}
}
class Member {

39
lib/apps/group_chat/provider.dart

@ -48,9 +48,9 @@ class GroupChatProvider extends ChangeNotifier { @@ -48,9 +48,9 @@ class GroupChatProvider extends ChangeNotifier {
// rpc.addListener('group-chat-member-join', _memberJoin, false);
// rpc.addListener('group-chat-member-info', _memberInfo, false);
// rpc.addListener('group-chat-member-leave', _memberLeave, false);
// rpc.addListener('group-chat-member-online', _memberOnline, false);
// rpc.addListener('group-chat-member-offline', _memberOffline, false);
// rpc.addListener('group-chat-message-create', _messageCreate, true);
rpc.addListener('group-chat-member-online', _memberOnline, false);
rpc.addListener('group-chat-member-offline', _memberOffline, false);
rpc.addListener('group-chat-message-create', _messageCreate, true);
// rpc.addListener('group-chat-message-delete', _messageDelete, false);
// rpc.addListener('group-chat-message-delivery', _messageDelivery, false);
}
@ -93,6 +93,11 @@ class GroupChatProvider extends ChangeNotifier { @@ -93,6 +93,11 @@ class GroupChatProvider extends ChangeNotifier {
//
}
messageCreate(MessageType mtype, String content) {
final gid = this.activedGroup.gid;
rpc.send('group-chat-message-create', [gid, mtype.toInt(), content]);
}
_list(List params) {
this.clear();
params.forEach((params) {
@ -161,8 +166,36 @@ class GroupChatProvider extends ChangeNotifier { @@ -161,8 +166,36 @@ class GroupChatProvider extends ChangeNotifier {
_offline(List params) {
final id = params[0];
if (this.groups.containsKey(id)) {
if (this.groups[id].gid == params[1]) {
this.groups[id].online = false;
notifyListeners();
}
}
}
_memberOnline(List params) {
//
}
_memberOffline(List params) {
//
}
_messageCreate(List params) {
final msg = Message.fromList(params);
if (msg.fid == this.actived) {
if (!msg.isDelivery) {
msg.isDelivery = null; // When message create, set is is none;
}
this.groups[msg.fid].updateLastMessage(msg, true);
this.activedMessages[msg.id] = msg;
rpc.send('group-chat-readed', [this.actived]);
} else {
if (this.groups.containsKey(msg.fid)) {
this.groups[msg.fid].updateLastMessage(msg, false);
}
}
//orderGroups(msg.fid);
notifyListeners();
}
}

36
pubspec.lock

@ -21,7 +21,7 @@ packages: @@ -21,7 +21,7 @@ packages:
name: async
url: "https://pub.dartlang.org"
source: hosted
version: "2.5.0"
version: "2.6.0"
audio_session:
dependency: transitive
description:
@ -77,7 +77,7 @@ packages: @@ -77,7 +77,7 @@ packages:
name: crop
url: "https://pub.dartlang.org"
source: hosted
version: "0.5.0"
version: "0.5.1"
cross_file:
dependency: transitive
description:
@ -98,42 +98,42 @@ packages: @@ -98,42 +98,42 @@ packages:
name: device_info_plus
url: "https://pub.dartlang.org"
source: hosted
version: "1.0.0"
version: "1.0.1"
device_info_plus_linux:
dependency: transitive
description:
name: device_info_plus_linux
url: "https://pub.dartlang.org"
source: hosted
version: "1.0.0"
version: "1.0.1"
device_info_plus_macos:
dependency: transitive
description:
name: device_info_plus_macos
url: "https://pub.dartlang.org"
source: hosted
version: "1.0.0"
version: "1.0.1"
device_info_plus_platform_interface:
dependency: transitive
description:
name: device_info_plus_platform_interface
url: "https://pub.dartlang.org"
source: hosted
version: "1.0.1"
version: "1.0.2"
device_info_plus_web:
dependency: transitive
description:
name: device_info_plus_web
url: "https://pub.dartlang.org"
source: hosted
version: "1.0.0"
version: "1.0.1"
device_info_plus_windows:
dependency: transitive
description:
name: device_info_plus_windows
url: "https://pub.dartlang.org"
source: hosted
version: "1.0.0"
version: "1.0.1"
esse_core:
dependency: "direct main"
description:
@ -181,7 +181,7 @@ packages: @@ -181,7 +181,7 @@ packages:
description:
path: "plugins/file_selector/file_selector_linux"
ref: HEAD
resolved-ref: "1d3bff412f8e93087243995570db3a56eba415e5"
resolved-ref: f2d8aa3820fb87316516670bf4d51a74de8ac0dd
url: "git://github.com/google/flutter-desktop-embedding.git"
source: git
version: "0.0.2"
@ -190,7 +190,7 @@ packages: @@ -190,7 +190,7 @@ packages:
description:
path: "plugins/file_selector/file_selector_macos"
ref: HEAD
resolved-ref: "1d3bff412f8e93087243995570db3a56eba415e5"
resolved-ref: f2d8aa3820fb87316516670bf4d51a74de8ac0dd
url: "git://github.com/google/flutter-desktop-embedding.git"
source: git
version: "0.0.4"
@ -213,7 +213,7 @@ packages: @@ -213,7 +213,7 @@ packages:
description:
path: "plugins/file_selector/file_selector_windows"
ref: HEAD
resolved-ref: "1d3bff412f8e93087243995570db3a56eba415e5"
resolved-ref: f2d8aa3820fb87316516670bf4d51a74de8ac0dd
url: "git://github.com/google/flutter-desktop-embedding.git"
source: git
version: "0.0.2"
@ -274,7 +274,7 @@ packages: @@ -274,7 +274,7 @@ packages:
name: http
url: "https://pub.dartlang.org"
source: hosted
version: "0.13.1"
version: "0.13.3"
http_parser:
dependency: transitive
description:
@ -302,7 +302,7 @@ packages: @@ -302,7 +302,7 @@ packages:
name: image_picker
url: "https://pub.dartlang.org"
source: hosted
version: "0.7.4"
version: "0.7.5"
image_picker_for_web:
dependency: transitive
description:
@ -342,7 +342,7 @@ packages: @@ -342,7 +342,7 @@ packages:
name: just_audio
url: "https://pub.dartlang.org"
source: hosted
version: "0.7.4"
version: "0.7.4+1"
just_audio_platform_interface:
dependency: transitive
description:
@ -384,7 +384,7 @@ packages: @@ -384,7 +384,7 @@ packages:
name: open_file
url: "https://pub.dartlang.org"
source: hosted
version: "3.2.0"
version: "3.2.1"
path:
dependency: transitive
description:
@ -447,7 +447,7 @@ packages: @@ -447,7 +447,7 @@ packages:
name: permission_handler
url: "https://pub.dartlang.org"
source: hosted
version: "7.0.0"
version: "7.1.0"
permission_handler_platform_interface:
dependency: transitive
description:
@ -697,14 +697,14 @@ packages: @@ -697,14 +697,14 @@ packages:
name: vm_service
url: "https://pub.dartlang.org"
source: hosted
version: "6.1.0+1"
version: "6.2.0"
web_socket_channel:
dependency: "direct main"
description:
name: web_socket_channel
url: "https://pub.dartlang.org"
source: hosted
version: "2.0.0"
version: "2.1.0"
webdriver:
dependency: transitive
description:

8
src/apps/chat/layer.rs

@ -186,7 +186,13 @@ pub(crate) async fn handle( @@ -186,7 +186,13 @@ pub(crate) async fn handle(
for (mgid, running) in &mut layer.runnings {
let peers = running.peer_leave(&addr);
for (fgid, fid) in peers {
results.rpcs.push(rpc::friend_offline(*mgid, fid));
results.rpcs.push(rpc::friend_offline(*mgid, fid, &fgid));
results
.rpcs
.push(crate::apps::group_chat::rpc::group_offline(
*mgid, fid, &fgid,
));
group_lock.status(
&mgid,
StatusEvent::SessionFriendOffline(fgid),

4
src/apps/chat/rpc.rs

@ -22,8 +22,8 @@ pub(crate) fn friend_online(mgid: GroupId, fid: i64, addr: PeerAddr) -> RpcParam @@ -22,8 +22,8 @@ pub(crate) fn friend_online(mgid: GroupId, fid: i64, addr: PeerAddr) -> RpcParam
}
#[inline]
pub(crate) fn friend_offline(mgid: GroupId, fid: i64) -> RpcParam {
rpc_response(0, "chat-friend-offline", json!([fid]), mgid)
pub(crate) fn friend_offline(mgid: GroupId, fid: i64, gid: &GroupId) -> RpcParam {
rpc_response(0, "chat-friend-offline", json!([fid, gid.to_hex()]), mgid)
}
#[inline]

12
src/apps/group_chat/layer.rs

@ -9,13 +9,13 @@ use tdn::{ @@ -9,13 +9,13 @@ use tdn::{
},
};
use group_chat_types::{Event, GroupConnect, GroupResult, JoinProof, LayerEvent};
use group_chat_types::{Event, GroupConnect, GroupResult, JoinProof, LayerEvent, NetworkMessage};
use tdn_did::Proof;
use crate::layer::{Layer, Online};
use crate::storage::group_chat_db;
use super::models::GroupChat;
use super::models::{from_network_message, GroupChat};
use super::{add_layer, rpc};
pub(crate) async fn handle(
@ -144,10 +144,12 @@ async fn handle_event( @@ -144,10 +144,12 @@ async fn handle_event(
LayerEvent::OnlinePong(_) => {
results.rpcs.push(rpc::group_online(mgid, gid));
}
LayerEvent::Sync(_gcd, _, event) => {
LayerEvent::Sync(_, height, event) => {
match event {
Event::Message => {
//
Event::Message(mid, nmsg) => {
let base = layer.read().await.base.clone();
let msg = from_network_message(height as i64, gid, mid, mgid, nmsg, base)?;
results.rpcs.push(rpc::message_create(mgid, msg));
}
Event::GroupUpdate => {
//

141
src/apps/group_chat/models.rs

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
use group_chat_types::{GroupInfo, GroupType};
use rand::Rng;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use tdn::types::{
group::GroupId,
@ -8,7 +8,12 @@ use tdn::types::{ @@ -8,7 +8,12 @@ use tdn::types::{
};
use tdn_storage::local::{DStorage, DsValue};
use group_chat_types::{GroupInfo, GroupType, NetworkMessage};
use crate::apps::chat::MessageType;
use crate::storage::{
group_chat_db, write_avatar_sync, write_file_sync, write_image_sync, write_record_sync,
};
pub(super) struct GroupChatKey(Vec<u8>);
@ -268,14 +273,21 @@ impl GroupChat { @@ -268,14 +273,21 @@ impl GroupChat {
db.update(&sql)
}
pub fn addr_update(db: &DStorage, id: i64, addr: &PeerAddr) -> Result<usize> {
pub fn update_last_message(db: &DStorage, id: i64, msg: &Message, read: bool) -> Result<usize> {
let sql = format!(
"UPDATE groups SET addr='{}' WHERE id = {}",
addr.to_hex(),
"UPDATE groups SET last_datetime={}, last_content='{}', last_readed={} WHERE id = {}",
msg.datetime,
msg.content,
if read { 1 } else { 0 },
id,
);
db.update(&sql)
}
pub fn readed(db: &DStorage, id: i64) -> Result<usize> {
let sql = format!("UPDATE groups SET last_readed=1 WHERE id = {}", id);
db.update(&sql)
}
}
/// Group Member Model.
@ -372,10 +384,23 @@ impl Member { @@ -372,10 +384,23 @@ impl Member {
self.id = id;
Ok(())
}
pub fn get_id(db: &DStorage, fid: &i64, mid: &GroupId) -> Result<i64> {
let mut matrix = db.query(&format!(
"SELECT id FROM members WHERE fid = {} AND mid = '{}'",
fid,
mid.to_hex()
))?;
if matrix.len() > 0 {
Ok(matrix.pop().unwrap().pop().unwrap().as_i64()) // safe unwrap.
} else {
Err(new_io_error("missing member"))
}
}
}
/// Group Chat Message Model.
pub(super) struct Message {
pub(crate) struct Message {
/// db auto-increment id.
id: i64,
/// group message consensus height.
@ -399,6 +424,33 @@ pub(super) struct Message { @@ -399,6 +424,33 @@ pub(super) struct Message {
}
impl Message {
pub(crate) fn new(
height: i64,
fid: i64,
mid: i64,
is_me: bool,
m_type: MessageType,
content: String,
) -> Message {
let start = SystemTime::now();
let datetime = start
.duration_since(UNIX_EPOCH)
.map(|s| s.as_secs())
.unwrap_or(0) as i64; // safe for all life.
Self {
fid,
mid,
m_type,
content,
datetime,
height,
is_me,
is_deleted: false,
is_delivery: true,
id: 0,
}
}
/// here is zero-copy and unwrap is safe. checked.
fn from_values(mut v: Vec<DsValue>, contains_deleted: bool) -> Message {
let is_deleted = if contains_deleted {
@ -443,4 +495,83 @@ impl Message { @@ -443,4 +495,83 @@ impl Message {
}
Ok(groups)
}
pub fn insert(&mut self, db: &DStorage) -> Result<()> {
let sql = format!("INSERT INTO messages (height, fid, mid, is_me, m_type, content, is_delivery, datetime, is_deleted) VALUES ({}, {}, {}, {}, {}, '{}', {}, {}, false)",
self.height,
self.fid,
self.mid,
if self.is_me { 1 } else { 0 },
self.m_type.to_int(),
self.content,
if self.is_delivery { 1 } else { 0 },
self.datetime,
);
let id = db.insert(&sql)?;
self.id = id;
Ok(())
}
}
pub(super) fn from_network_message(
height: i64,
gdid: i64,
mid: GroupId,
mgid: GroupId,
msg: NetworkMessage,
base: PathBuf,
) -> Result<Message> {
let db = group_chat_db(&base, &mgid)?;
let mdid = Member::get_id(&db, &gdid, &mid)?;
let is_me = mid == mgid;
// handle event.
let (m_type, raw) = match msg {
NetworkMessage::String(content) => (MessageType::String, content),
NetworkMessage::Image(bytes) => {
let image_name = write_image_sync(&base, &mgid, bytes)?;
(MessageType::Image, image_name)
}
NetworkMessage::File(old_name, bytes) => {
let filename = write_file_sync(&base, &mgid, &old_name, bytes)?;
(MessageType::File, filename)
}
NetworkMessage::Contact(name, rgid, addr, avatar_bytes) => {
write_avatar_sync(&base, &mgid, &rgid, avatar_bytes)?;
let tmp_name = name.replace(";", "-;");
let contact_values = format!("{};;{};;{}", tmp_name, rgid.to_hex(), addr.to_hex());
(MessageType::Contact, contact_values)
}
NetworkMessage::Emoji => {
// TODO
(MessageType::Emoji, "".to_owned())
}
NetworkMessage::Record(bytes, time) => {
let record_name = write_record_sync(&base, &mgid, gdid, time, bytes)?;
(MessageType::Record, record_name)
}
NetworkMessage::Phone => {
// TODO
(MessageType::Phone, "".to_owned())
}
NetworkMessage::Video => {
// TODO
(MessageType::Video, "".to_owned())
}
NetworkMessage::None => {
return Ok(Message::new(
height,
gdid,
mdid,
is_me,
MessageType::String,
"".to_owned(),
));
}
};
let mut msg = Message::new(height, gdid, mdid, is_me, m_type, raw);
msg.insert(&db)?;
GroupChat::update_last_message(&db, gdid, &msg, false)?;
Ok(msg)
}

43
src/apps/group_chat/rpc.rs

@ -7,8 +7,9 @@ use tdn::types::{ @@ -7,8 +7,9 @@ use tdn::types::{
};
use tdn_did::Proof;
use group_chat_types::{CheckType, GroupConnect, GroupType};
use group_chat_types::{CheckType, Event, GroupConnect, GroupType, LayerEvent, NetworkMessage};
use crate::apps::chat::MessageType;
use crate::rpc::RpcState;
use crate::storage::group_chat_db;
@ -32,8 +33,8 @@ pub(crate) fn group_online(mgid: GroupId, gid: i64) -> RpcParam { @@ -32,8 +33,8 @@ pub(crate) fn group_online(mgid: GroupId, gid: i64) -> RpcParam {
}
#[inline]
pub(crate) fn group_offline(mgid: GroupId, gid: i64) -> RpcParam {
rpc_response(0, "group-chat-offline", json!([gid]), mgid)
pub(crate) fn group_offline(mgid: GroupId, fid: i64, gid: &GroupId) -> RpcParam {
rpc_response(0, "group-chat-offline", json!([fid, gid.to_hex()]), mgid)
}
#[inline]
@ -56,6 +57,11 @@ pub(crate) fn member_offline(mgid: GroupId, gid: i64, mid: GroupId, maddr: PeerA @@ -56,6 +57,11 @@ pub(crate) fn member_offline(mgid: GroupId, gid: i64, mid: GroupId, maddr: PeerA
)
}
#[inline]
pub(crate) fn message_create(mgid: GroupId, msg: Message) -> RpcParam {
rpc_response(0, "group-chat-message-create", json!(msg.to_rpc()), mgid)
}
#[inline]
fn group_list(groups: Vec<GroupChat>) -> RpcParam {
let mut results = vec![];
@ -166,4 +172,35 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) { @@ -166,4 +172,35 @@ pub(crate) fn new_rpc_handler(handler: &mut RpcHandler<RpcState>) {
Ok(results)
},
);
handler.add_method(
"group-chat-message-create",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let gcd = GroupId::from_hex(params[0].as_str()?)?;
let m_type = MessageType::from_int(params[1].as_i64()?);
let m_content = params[2].as_str()?;
let addr = state.layer.read().await.running(&gid)?.online(&gcd)?;
let mut results = HandleResult::new();
let event = Event::Message(gid, NetworkMessage::String(m_content.to_owned()));
let data = postcard::to_allocvec(&LayerEvent::Sync(gcd, 0, event)).unwrap_or(vec![]);
let msg = SendType::Event(0, addr, data);
add_layer(&mut results, gid, msg);
Ok(results)
},
);
handler.add_method(
"group-chat-readed",
|gid: GroupId, params: Vec<RpcParam>, state: Arc<RpcState>| async move {
let fid = params[0].as_i64()?;
let db = group_chat_db(state.layer.read().await.base(), &gid)?;
GroupChat::readed(&db, fid)?;
drop(db);
Ok(HandleResult::new())
},
);
}

Loading…
Cancel
Save