From cc00d5498f22e44ff06bd1098ac24a6ff93fed7a Mon Sep 17 00:00:00 2001 From: Sun Date: Thu, 22 Apr 2021 13:07:22 +0800 Subject: [PATCH] refactor rpc to different apps --- assets/logo/logo_40.jpg | Bin 13517 -> 16651 bytes src/{models => }/account.rs | 0 src/apps.rs | 38 + .../service.rs => apps/assistant/layer.rs} | 0 src/apps/assistant/migrate.rs | 0 src/apps/assistant/mod.rs | 12 + src/apps/assistant/models.rs | 0 src/apps/assistant/rpc.rs | 0 src/apps/chat/mod.rs | 5 + .../session.rs => apps/chat/models.rs} | 0 src/apps/chat/rpc.rs | 493 +++++++++++++ src/apps/device/mod.rs | 5 + .../device.rs => apps/device/models.rs} | 0 src/apps/device/rpc.rs | 138 ++++ src/apps/domain/mod.rs | 12 + src/apps/file/mod.rs | 5 + src/{models/file.rs => apps/file/models.rs} | 0 src/apps/file/rpc.rs | 22 + src/{models => }/consensus.rs | 0 src/daemon.rs | 4 +- src/event.rs | 63 +- src/group.rs | 25 +- src/group/running.rs | 2 +- src/layer.rs | 5 +- src/layer/running.rs | 2 +- src/lib.rs | 4 +- src/models.rs | 6 - src/rpc.rs | 666 +----------------- src/server.rs | 6 +- 29 files changed, 831 insertions(+), 682 deletions(-) rename src/{models => }/account.rs (100%) create mode 100644 src/apps.rs rename src/{models/service.rs => apps/assistant/layer.rs} (100%) create mode 100644 src/apps/assistant/migrate.rs create mode 100644 src/apps/assistant/mod.rs create mode 100644 src/apps/assistant/models.rs create mode 100644 src/apps/assistant/rpc.rs create mode 100644 src/apps/chat/mod.rs rename src/{models/session.rs => apps/chat/models.rs} (100%) create mode 100644 src/apps/chat/rpc.rs create mode 100644 src/apps/device/mod.rs rename src/{models/device.rs => apps/device/models.rs} (100%) create mode 100644 src/apps/device/rpc.rs create mode 100644 src/apps/domain/mod.rs create mode 100644 src/apps/file/mod.rs rename src/{models/file.rs => apps/file/models.rs} (100%) create mode 100644 src/apps/file/rpc.rs rename src/{models => }/consensus.rs (100%) delete mode 100644 src/models.rs diff --git a/assets/logo/logo_40.jpg b/assets/logo/logo_40.jpg index 0f76537ca8beb9f4280d3b26a8c34fe60aa00ce8..58f028ee8b815c7b2b5bc536c56952b279d25a76 100644 GIT binary patch delta 5536 zcmeHLcT`i$)<2PE0lW%G2L(j|Lk)x;y$W2E9*Tm1^d_KG4dvJiRhkHhARr-$5FsQe zEh5rEq}PB-Qy>8&#Xv}2biHpq@49P!>wW)!bIze_nhYI^E0fRnfiAOIKV3E^Bo!481?R~Z1T3%LGMPMx%Z{76@Uyb%%t zgt#Dbz>a+ou6+B*Yq?4lPw9t^SeKVVU9lg3cH=9opuIfkFu%!M%I2I z;UQsuA(szos;B`vf2P0T+6CP!1#tqnB#)R%?EnDI&8auD;6B1B0KgyJJ)5zmtqr?) zE=QK*d73N)<>^uHNU?qFEB zt+$V!SGtRk4k1hw6cMrpthpfjexK;O2|Q2&1Y9SPh=EE24nyYy&^B1?v$401JC^zm zcy5~=&i8nqfJ)j}o1)h`vVlBJu$Q$xRI@QP40cf6SmB(BX1cZEx;O>8umNQv$moY} z%Pv!*@v+-c%<@4tFmFD!3X&se^?w&J{JRM7zm!qj>o7$0mdhFI)Uw+hK4>R@uH*XX zqZ1wAGwyxMln^}o!%!cY#pl2V23)MRgF%k!dJldo6V1_Za(lSO4>KVqp;uCf>;9}5 zXvZ6%l$a@Hf5QswO;YY6d_@df)YI!&hNp^Z$wD2}l z!GASnv1D@l)YsvfsEzXa{zrTHURAq(9*>6mP|F(YqeyELV%^UxBTf2REOIHiJ&`{1 zTbHy)$8`cirrN#f$O*Zd70nWL{RWp!4?p;Pf;-(<%F3Am3K~kM zg$!1Gq)Wr*zG7KW%gE`?UNue6{iELdm!f=2*NL%OhtJ1$*O#v3g6ijASJJz#3~JP? zuKakEeLGE)zd`4ka9pOJp@mL9p`Wql>mtnexde4sSawA(fmQgGycilCjg4UU{$?uh zH(OzT#2AHz`l?Af2VT@1uZ;>`T2~HO+Ri6lu|H28u7kUxFn*Vv-2y+B+AM~NFExAK z|9DfqM`c#tG~!zUclrsvDJjU%EZB)-3Gonzd2Ucc=4`UcCvx)-W{m1t#pi|9C|z+D z3PiT0c0HaPUy3Cn&GNyepf~OErp_ntVp~5R^!`vedpc}dHvs+Uxw^3hi{Mf+KS8kavK;=OzWQO`?r z&O1$1tUG-_Gn1A2U<(zal)PJSBZ#T?BWX%UX}fxYEMB8rc|{ylihPc}7a0-R-7)aO zBF>K<=7EU!L_6nc^mhy6!hB?y>1Xa@^0!?3gYdgD@uG&xEGKcuq znLP|X1`2hoYIk)jOR%b%6(C*hI0<$U=0!aRyhN3aK~@*<{9+lw32iG-)GvJXeev7= z6I1jTgiB)&GU6m65w@|p&I$=v9XGYMUw=J*w|suS_do#~xG~_iCd%2=ozus+k$86B zZ;<*8QDq;BJfZq zGITxebK#43HVy~P{MRQ9Z(B@v4B&)=;k<eS=B zp^I&ngx1O$e57{`qD-w2VJ^Ov|t3jhND=pt#CK{s=v($UWl5 zr?}5`Y=R3%ZXelW+h=1es8AwKzay%dWgIV}98hNjYHkS_HO)8L`Tu#o+Y8ZI7C)LP zl6~La`uyWupx{Cp2tcTNts=g4P{ccudh4S_!w1{w1GI)-Ds&i@R&hfA&@Nz13r;X)MN4{6&c z*Saf9pmJq^IC%OZ_zef>aoLZ^JbwxL*QAy~?Y_^!(G-DW&x=rZ3m zxJS-~9WgRVGn%KkoUQQ4GY`DAD3jFbrPuF&5#xVL>}vt4OXZZZQmM?DC8Bv*UR;FRqy2VLej&+T{%X0@D17da&OLq);P%f;FrYlyGtKE9pTd zO&@nrdJhUp9hB_^vG6jH1TAt~wWuFWNfonQlhlipzBf*d_XxeOkI(Hoe`{S7F~OoA9Z%*M^Ju2=>4~fnnwAsp(EvQ-#`?K{X(2 zs@4WO`PPmUb4|AFG>pbqv^15V43h}QW_Vn){HbHU$i}_AGRX?uGQqpIc2x_lulnc+ zHaN?6-J3RI17AQkFy~K?p1yn3ix%5n8`aYm(?b2SolUAB%x}2)Qm0b0$5?L{b7J$8 zpIeXC-HAI3fB5dzpYmvI(_r?P7-lYfzjGTtNb2_W9rLs1G6|HwBAu>H<21O+$q(T! z3+^xVag2#5hc~Owzf@Nv*#MM=o~$|@pv?y4CtQkNm4XfE^@S*7^GMkclH!AoqqK$) zb)UT>7dYV(Q#OQ8edih{f>9yP_V`xIP$#0Zt}PpO=1-1KM+fsirA}^5Mb6B12Y1ic zY9eD>JVR+0noXCzG)8xoVO~@i?=My>zHBPxSI0RX+n7T75t^+dHbn*0E@5SFPg-1~+=m;*wk!B`*l6q^pY{k;4{=D?4{OIoD@ zZ-mb$w{2Xp=_)m0XkzQs>=3k4T4L?ts#ZX-Gh9n(! zPM)Ih!xDgq0 zpX;X22KF|3d?{5dX}9imFo5_;lw&K;sf%bfrMm<{!u@?bNVwK{x@-_1{rktR6!1?G zo}Y`WR)Yno;x^~fXP8h=2jv?ymrJ=&-oz7$4ZR_O)Dbb}u*UOJi>__PR77j1M8Xy@Lyqs1UX#*)~!N7^8< z1052^?SVhWtOT>1&QV?|F2;Z}-wiR(*T55HCQi zCQWLGJ#1NnBdxt|bWH290g=p9EGJcYMe&}7x;VY7u<5f&bD6L$Yn-Q2Rogygb!X(adg~R{T(_MEp8+H?b0Nv*};|;SHF;mGdc2C zwW>c&Lc*ZTTQcL305PQwH{H=kz>b$KSTynTtB%issw>-haD5h}wWkcaox7Ta3IFXp z406H)_Rv3JWI*od$0%*}Ffy3Zi>R2(9leM)C~m|#C$apwOHn^;RE5$#XkKtrF?IdI zAA~rG2w7XhiqJ9WTmVZVZX)7pvkPBOSwAOgiuztYo)3^_-CFS}_R(N;%*14EoL)zO zQ`@s{Dk2$_w9?vB#^<{pyQA$;TX4*6z2dPgSCjLhmB(JZ*f%Y&P+a)F>wS!w`c4*j zh=pXiW}ZD}+53$R+`{f=nM`$`{RDqYrA&w;9_Pw~{V_=lp*juyTb1O;u8}zC#5I@Y zl=hhx2LaqDOgKYJvK6=TIH!#@hmA2j5-G22TVV&<=Y&)j-^`AnR`qkQJiZ#BR;Rd* zTJ5q`8g_gavq`DN82QAQEk6~0-J)l6m(D|ELa=wbjHSw`lbGwyJ<#VG}M$ zKWdO8LZYHC3;o`6h^Zu3Y$l=jinFK|NGnXIx0bMhxJ8${0r!j)itAJrp6(yD@6-O!M*NZQh~ zrzEL{nMpe_DEWTjxX#sQmtyVu?AgE{nTn5#6T9~&?-OHX2ERR}j zuu*DjZIsOHWOPsH{08aA(r|d5FJ@;P4P5jyC^C8O6Oe4rSi*5=xy=JO1##0sV7l RZjU->bb$px`Y#@0^k29(_NM>< delta 2358 zcmcIkcTm%L7XO8!CxR{lDk6e_ih)2HBoIhM4qa+Q2@0YJq|l@nrTtcR6+sXrfPzxQ zLQQfKKtta}N&o?cP!|M2T&hP21PIA;JA1P`b8~b1&%N)wd7oG2JD>OczIpdzA@96) zhzG>ul7$a;yg*^pktkz0+SmXAY>Q+b1d!aeknIEHNdQv6VF1w1ll%ie7$riy>3Wa` z1sOm_5~2Y}$U`LMA>xtk5(pb2L4;iEoK{tn6AUODyAK9+wcCabTU%$f zlx2BFV=G-|%W5wNN4u7-h=C6%9xIdoRScZ@_;hAVVWqx1SlBPzcM-&YoT|sHcNH|o z#EIh1ZoVO!iwDK?y9?=fDP_n``ZX!VZ+=;HS=RsY;M4p_8l#EKt788k#g~%S-X^Y1 zkJ*L$9o6l+Bmb`$aGvHF;_L56`z=qB`TCzxHfi?*$RUY4+ZEf70Lc8V>f7!q0Fdxb zmHWqkZQS|)*EoA)?*&O2jKjabi+)i-^Uv-N3K3dvXsnyNAqEaZVX+7p8g5_!GlG** zFfs;3MquHF23RDPuAvCkayK$`H!wtDVQvUF3=D}vyTjb5NF)qPMNlY66cT})q@PVc|l2R&A-#HhDyGxat+I-mWv1O^1>)v@UB2mXDS%O(eZ;;6M-ynRf zTC*v*>3;j-KsGN~X-Ei?gUf$naifj#!wWS-{J`s&`r7V|(Bk{qFS)#ir#U6+bITWc z%%$PX($bRB(!zw)JF&5`Ni;_5WSJ7XMcM#F%X3X_mbC1j;h@=}3lAfEsH;0kVaFp& zaVHL>52@ZRLpfZsoh>gZgWF~d4RQ+uIbLw5n}MMfY2}=n=p*NqN((HroS}@6WzU?G znzlMS&b4_LFjCCa7iY!YeVyf3t4jA+78e40yS@@$(JZok*}@luwD9IDyu~<-up1;~ zipJiK$7p>D;EeS5ZTWCe={Hm#yfNIfgIOR)2nd(9+n=wyI^N{x+o8XQH&osP#up6t z+RFK+_ofwI2@Ta~g)!o0)^l_7#K6gE!NmDGO`)lZgD3*z?$c%HJM@SF-452qjvQ~* zBHQZ6pT4$CJ}-RA@`U#)^zcy>P~{c$LtWX8@z_W9Pe#nY&GM2n|J%JYor?~wez*n( zlqb64+LTZg;<5D0ycp1P{j9c_>>GZ6RYbX*J8@ipVnI_ml}o`NkHWMrK$FJshM!iO zV7AA?gTm@GULVbDoL8ei9-bd9GtIWp6^xc7__LEfcN(~?Boc!Wu98hWaFWOxoI2ap znOix1NRnl1+|4>sUAA?*4&&effutsT*}GOXK|fkJ>H3xG znrjxSC%eCY8*=H!k=NEDFlHcnq=G=-x%7ExSW_V-e@2%2S-`zJXQDUS+}nFvLA5*6 zjEAkNan#819%|K^yP++8%*5U(7pGCuBDWo4h$Vd|%0u-*E)pNX?=;wY{~1v&}BFSEXg<{HR<7etwi zMuCnVGy9ngN?W_)`9~nm)!REr+4uID<-Mfz;w+k1Fp^Zm z_<~ZS^tZ->pQJ5-6gZxyYPt)rk{UpG*OwNlz!mPM-6Vau5I;QSkRq>P)th$5BISsA zN$Os~rjZHJ@+_oZ{GqRn?brWRx7M)r?fYJG(B^W8Tjd@#RZ9gTDW$`o(PUm8h*s>* z)7Ry54tLmV+_%QA>N0aC8fdE_PA#hb6T-KItOBsP^ZA@ziRjW}!lYi?w9TmLs@?8M zTDvT>{|dhnuYRpE26rH1_~7+tiUUs;MxL)@IM7#V9?}oEO~+5LUS!C2+8CU+x9>EU zEANyQDQn(6!r3${5P&~bJU1IGjuWcxv#J*myOuKB7fh0UbElH4Y11xKVf7(YFKD$G zm;jkp1u(;~GMCY?sNAT@K%Po!^@%5?#aa0UI`+DmUPm0$i*4#T^ZZ&4=~7dDfa2Jw zt6rg7Vz%?*ddNjFko9ocS&)275N9SV2Gh^rRBach4%^^#DmHyQ8|nE2$^GU0Mx8ei z?WMs^{%g2FmG9YXB64A*Un}*3wY7mac#IT!J@TCmzoPu5YY|7(bN5InAD_0Q>S(E` zA9t|Dx#^r3Xc!`FBou4}isp$8W08nyf~&ZvpH23Mt?^9LD>g5(ptJ^mQO(G%>M;CR z?oR8B>zpc2{B*!(LatKQvy$faM-2|WGHp9^l<{3hWtFt^FP*z}SvwTkSbiV8XQtj8 zIQ`8E$7x2#$H&g+WGZ(OshrsCfv`_$r!HsK)RG4ecZTC0wVIwei^_gSE~XT1FVa-2 zgsk|hmdcs&YxVyOWzt(bu62A23JcTfRW@nPo1Z};veuT?W4gGX>sMJ|WD$RMgs_$5 zFD&ALM8RJDiCbIA4QtHW-lJK@P+sW^kF$=admFxWb{e_6uV|&X*(rGi*cZLY;EM-; E1**qKBLDyZ diff --git a/src/models/account.rs b/src/account.rs similarity index 100% rename from src/models/account.rs rename to src/account.rs diff --git a/src/apps.rs b/src/apps.rs new file mode 100644 index 0000000..6c7a30f --- /dev/null +++ b/src/apps.rs @@ -0,0 +1,38 @@ +use tdn::types::{ + group::GroupId, + primitive::{HandleResult, PeerAddr, Result}, + rpc::RpcHandler, +}; + +use crate::rpc::RpcState; + +pub(crate) mod assistant; +pub(crate) mod chat; +pub(crate) mod device; +pub(crate) mod domain; +pub(crate) mod file; + +pub(crate) fn app_rpc_inject(handler: &mut RpcHandler) { + device::new_rpc_handler(handler); + chat::new_rpc_handler(handler); + assistant::new_rpc_handler(handler); + domain::new_rpc_handler(handler); + file::new_rpc_handler(handler); +} + +pub(crate) fn _app_layer_handle( + _gid: GroupId, + _fgid: GroupId, + _addr: PeerAddr, + _data: Vec, +) -> Result { + todo!() +} + +pub(crate) fn _app_group_handle() -> Result { + todo!() +} + +pub(crate) fn _app_migrate() -> Result<()> { + todo!() +} diff --git a/src/models/service.rs b/src/apps/assistant/layer.rs similarity index 100% rename from src/models/service.rs rename to src/apps/assistant/layer.rs diff --git a/src/apps/assistant/migrate.rs b/src/apps/assistant/migrate.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/apps/assistant/mod.rs b/src/apps/assistant/mod.rs new file mode 100644 index 0000000..eeb5c48 --- /dev/null +++ b/src/apps/assistant/mod.rs @@ -0,0 +1,12 @@ +use tdn::types::{ + primitive::HandleResult, + rpc::{json, RpcHandler}, +}; + +use crate::rpc::RpcState; + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("assistant-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); +} diff --git a/src/apps/assistant/models.rs b/src/apps/assistant/models.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/apps/assistant/rpc.rs b/src/apps/assistant/rpc.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/apps/chat/mod.rs b/src/apps/chat/mod.rs new file mode 100644 index 0000000..e646058 --- /dev/null +++ b/src/apps/chat/mod.rs @@ -0,0 +1,5 @@ +mod models; + +pub(crate) mod rpc; +pub(crate) use models::{Friend, Message, MessageType, NetworkMessage, Request}; +pub(crate) use rpc::new_rpc_handler; diff --git a/src/models/session.rs b/src/apps/chat/models.rs similarity index 100% rename from src/models/session.rs rename to src/apps/chat/models.rs diff --git a/src/apps/chat/rpc.rs b/src/apps/chat/rpc.rs new file mode 100644 index 0000000..ef46aaf --- /dev/null +++ b/src/apps/chat/rpc.rs @@ -0,0 +1,493 @@ +use std::collections::HashMap; +use std::sync::Arc; +use tdn::types::{ + group::GroupId, + message::SendType, + primitive::{HandleResult, PeerAddr}, + rpc::{json, rpc_response, RpcHandler, RpcParam}, +}; +use tdn_did::user::User; + +use crate::event::InnerEvent; +use crate::layer::LayerEvent; +use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH}; +use crate::rpc::{sleep_waiting_close_stable, RpcState}; +use crate::storage::{delete_avatar, session_db}; + +use super::{Friend, Message, MessageType, Request}; + +#[inline] +pub(crate) fn friend_online(mgid: GroupId, fid: i64, addr: PeerAddr) -> RpcParam { + rpc_response(0, "friend-online", json!([fid, addr.to_hex()]), mgid) +} + +#[inline] +pub(crate) fn friend_offline(mgid: GroupId, fid: i64) -> RpcParam { + rpc_response(0, "friend-offline", json!([fid]), mgid) +} + +#[inline] +pub(crate) fn friend_info(mgid: GroupId, friend: &Friend) -> RpcParam { + rpc_response(0, "friend-info", json!(friend.to_rpc()), mgid) +} + +#[inline] +pub(crate) fn friend_update(mgid: GroupId, fid: i64, is_top: bool, remark: &str) -> RpcParam { + rpc_response(0, "friend-update", json!([fid, is_top, remark]), mgid) +} + +#[inline] +pub(crate) fn friend_close(mgid: GroupId, fid: i64) -> RpcParam { + rpc_response(0, "friend-close", json!([fid]), mgid) +} + +#[inline] +pub(crate) fn friend_delete(mgid: GroupId, fid: i64) -> RpcParam { + rpc_response(0, "friend-delete", json!([fid]), mgid) +} + +#[inline] +pub(crate) fn request_create(mgid: GroupId, req: &Request) -> RpcParam { + rpc_response(0, "request-create", json!(req.to_rpc()), mgid) +} + +#[inline] +pub(crate) fn request_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam { + rpc_response(0, "request-delivery", json!([id, is_d]), mgid) +} + +#[inline] +pub(crate) fn request_agree(mgid: GroupId, id: i64, friend: &Friend) -> RpcParam { + rpc_response(0, "request-agree", json!([id, friend.to_rpc()]), mgid) +} + +#[inline] +pub(crate) fn request_reject(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "request-reject", json!([id]), mgid) +} + +#[inline] +pub(crate) fn request_delete(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "request-delete", json!([id]), mgid) +} + +#[inline] +pub(crate) fn message_create(mgid: GroupId, msg: &Message) -> RpcParam { + rpc_response(0, "message-create", json!(msg.to_rpc()), mgid) +} + +#[inline] +pub(crate) fn message_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam { + rpc_response(0, "message-delivery", json!([id, is_d]), mgid) +} + +#[inline] +pub(crate) fn message_delete(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "message-delete", json!([id]), mgid) +} + +#[inline] +fn friend_list(friends: Vec) -> RpcParam { + let mut results = vec![]; + for friend in friends { + results.push(friend.to_rpc()); + } + + json!(results) +} + +#[inline] +fn request_list(requests: Vec) -> RpcParam { + let mut results = vec![]; + for request in requests { + results.push(request.to_rpc()); + } + json!(results) +} + +#[inline] +fn message_list(messages: Vec) -> RpcParam { + let mut results = vec![]; + for msg in messages { + results.push(msg.to_rpc()); + } + json!(results) +} + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("chat-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); + + handler.add_method( + "friend-list", + |gid: GroupId, _params: Vec, state: Arc| async move { + let friends = state.layer.read().await.all_friends_with_online(&gid)?; + Ok(HandleResult::rpc(friend_list(friends))) + }, + ); + + handler.add_method( + "friend-update", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + let remark = params[1].as_str()?; + let is_top = params[2].as_bool()?; + + let mut results = HandleResult::new(); + let db = session_db(state.layer.read().await.base(), &gid)?; + let f = if let Some(mut f) = Friend::get_id(&db, id)? { + f.is_top = is_top; + f.remark = remark.to_owned(); + f.me_update(&db)?; + f + } else { + return Ok(results); + }; + drop(db); + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionFriendUpdate(f.gid, f.is_top, f.remark), + FRIEND_TABLE_PATH, + f.id, + &mut results, + )?; + Ok(results) + }, + ); + + handler.add_method( + "friend-readed", + |gid: GroupId, params: Vec, state: Arc| async move { + let fid = params[0].as_i64()?; + + let db = session_db(state.layer.read().await.base(), &gid)?; + Friend::readed(&db, fid)?; + drop(db); + + Ok(HandleResult::new()) + }, + ); + + handler.add_method( + "friend-close", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let mut results = HandleResult::new(); + let mut layer_lock = state.layer.write().await; + + let db = session_db(layer_lock.base(), &gid)?; + let friend = Friend::get_id(&db, id)??; + friend.close(&db)?; + drop(db); + + let online = layer_lock.remove_friend(&gid, &friend.gid); + drop(layer_lock); + + if let Some(faddr) = online { + let mut addrs: HashMap = HashMap::new(); + addrs.insert(faddr, friend.gid); + let sender = state.group.read().await.sender(); + tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs)) + .detach(); + } + + let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]); + let msg = SendType::Event(0, friend.addr, data); + results.layers.push((gid, friend.gid, msg)); + + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionFriendClose(friend.gid), + FRIEND_TABLE_PATH, + friend.id, + &mut results, + )?; + + Ok(results) + }, + ); + + handler.add_method( + "friend-delete", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let mut results = HandleResult::new(); + let mut layer_lock = state.layer.write().await; + + let db = session_db(layer_lock.base(), &gid)?; + let friend = Friend::get_id(&db, id)??; + friend.delete(&db)?; + drop(db); + + let online = layer_lock.remove_friend(&gid, &friend.gid); + delete_avatar(layer_lock.base(), &gid, &friend.gid).await?; + drop(layer_lock); + + if let Some(faddr) = online { + let mut addrs: HashMap = HashMap::new(); + addrs.insert(faddr, friend.gid); + let sender = state.group.read().await.sender(); + tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs)) + .detach(); + } + + let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]); + let msg = SendType::Event(0, friend.addr, data); + results.layers.push((gid, friend.gid, msg)); + + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionFriendDelete(friend.gid), + FRIEND_TABLE_PATH, + friend.id, + &mut results, + )?; + + Ok(results) + }, + ); + + handler.add_method( + "request-list", + |gid: GroupId, _params: Vec, state: Arc| async move { + let layer_lock = state.layer.read().await; + let db = session_db(layer_lock.base(), &gid)?; + drop(layer_lock); + let requests = Request::all(&db)?; + drop(db); + Ok(HandleResult::rpc(request_list(requests))) + }, + ); + + handler.add_method( + "request-create", + |gid: GroupId, params: Vec, state: Arc| async move { + let remote_gid = GroupId::from_hex(params[0].as_str()?)?; + let remote_addr = PeerAddr::from_hex(params[1].as_str()?)?; + let remote_name = params[2].as_str()?.to_string(); + let remark = params[3].as_str()?.to_string(); + + let mut request = Request::new( + remote_gid, + remote_addr, + remote_name.clone(), + remark.clone(), + true, + false, + ); + + let mut results = HandleResult::rpc(Default::default()); + let me = state.group.read().await.clone_user(&gid)?; + + let mut layer_lock = state.layer.write().await; + let db = session_db(layer_lock.base(), &gid)?; + if Friend::is_friend(&db, &request.gid)? { + debug!("had friend."); + drop(layer_lock); + return Ok(results); + } + + if let Some(req) = Request::get(&db, &request.gid)? { + println!("Had this request."); + req.delete(&db)?; + } + request.insert(&db)?; + drop(db); + + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionRequestCreate( + true, + User::new(remote_gid, remote_addr, remote_name, vec![])?, + remark, + ), + REQUEST_TABLE_PATH, + request.id, + &mut results, + )?; + + results + .layers + .push((gid, remote_gid, layer_lock.req_message(me, request))); + + drop(layer_lock); + + Ok(results) + }, + ); + + handler.add_method( + "request-agree", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let mut group_lock = state.group.write().await; + let me = group_lock.clone_user(&gid)?; + let mut layer_lock = state.layer.write().await; + let db = session_db(layer_lock.base(), &gid)?; + let mut results = HandleResult::new(); + + if let Some(mut request) = Request::get_id(&db, id)? { + group_lock.broadcast( + &gid, + InnerEvent::SessionRequestHandle(request.gid, true, vec![]), + REQUEST_TABLE_PATH, + request.id, + &mut results, + )?; + request.is_ok = true; + request.is_over = true; + request.update(&db)?; + + let f = Friend::from_request(&db, request)?; + layer_lock.running_mut(&gid)?.add_permissioned(f.gid, f.id); + results.rpcs.push(json!([id, f.to_rpc()])); + + let proof = group_lock.prove_addr(&gid, &f.addr)?; + let msg = layer_lock.rpc_agree_message(id, proof, me, &gid, f.addr)?; + results.layers.push((gid, f.gid, msg)); + } + db.close()?; + drop(group_lock); + drop(layer_lock); + Ok(results) + }, + ); + + handler.add_method( + "request-reject", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let mut layer_lock = state.layer.write().await; + let db = session_db(layer_lock.base(), &gid)?; + let mut req = Request::get_id(&db, id)??; + req.is_ok = false; + req.is_over = true; + req.update(&db)?; + drop(db); + let msg = layer_lock.reject_message(id, req.addr, gid); + drop(layer_lock); + + let mut results = HandleResult::layer(gid, req.gid, msg); + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionRequestHandle(req.gid, false, vec![]), + REQUEST_TABLE_PATH, + req.id, + &mut results, + )?; + Ok(results) + }, + ); + + handler.add_method( + "request-delete", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let layer_lock = state.layer.read().await; + let db = session_db(layer_lock.base(), &gid)?; + let base = layer_lock.base().clone(); + drop(layer_lock); + let req = Request::get_id(&db, id)??; + req.delete(&db)?; + + // delete avatar. check had friend. + if Friend::get(&db, &req.gid)?.is_none() { + delete_avatar(&base, &gid, &req.gid).await?; + } + drop(db); + + let mut results = HandleResult::new(); + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionRequestDelete(req.gid), + REQUEST_TABLE_PATH, + req.id, + &mut results, + )?; + Ok(results) + }, + ); + + handler.add_method( + "message-list", + |gid: GroupId, params: Vec, state: Arc| async move { + let fid = params[0].as_i64()?; + + let layer_lock = state.layer.read().await; + let db = session_db(layer_lock.base(), &gid)?; + drop(layer_lock); + + Friend::readed(&db, fid)?; + let messages = Message::get(&db, &fid)?; + drop(db); + Ok(HandleResult::rpc(message_list(messages))) + }, + ); + + handler.add_method( + "message-create", + |gid: GroupId, params: Vec, state: Arc| async move { + let fid = params[0].as_i64()?; + let fgid = GroupId::from_hex(params[1].as_str()?)?; + let m_type = MessageType::from_int(params[2].as_i64()?); + let content = params[3].as_str()?.to_string(); + + let mut layer_lock = state.layer.write().await; + let base = layer_lock.base(); + let faddr = layer_lock.running(&gid)?.online(&fgid)?; + + let (msg, nw) = LayerEvent::from_message(base, gid, fid, m_type, content).await?; + let event = LayerEvent::Message(msg.hash, nw); + let s = layer_lock.event_message(msg.id, gid, faddr, &event); + drop(layer_lock); + + let mut results = HandleResult::rpc(json!(msg.to_rpc())); + results.layers.push((gid, fgid, s)); + + match event { + LayerEvent::Message(hash, nw) => { + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionMessageCreate(fgid, true, hash, nw), + MESSAGE_TABLE_PATH, + msg.id, + &mut results, + )?; + } + _ => {} + } + + Ok(results) + }, + ); + + handler.add_method( + "message-delete", + |gid: GroupId, params: Vec, state: Arc| async move { + let id = params[0].as_i64()?; + + let layer_lock = state.layer.read().await; + let db = session_db(&layer_lock.base(), &gid)?; + drop(layer_lock); + + let msg = Message::get_id(&db, id)??; + msg.delete(&db)?; + drop(db); + let mut results = HandleResult::new(); + state.group.write().await.broadcast( + &gid, + InnerEvent::SessionMessageDelete(msg.hash), + MESSAGE_TABLE_PATH, + msg.id, + &mut results, + )?; + Ok(results) + }, + ); +} diff --git a/src/apps/device/mod.rs b/src/apps/device/mod.rs new file mode 100644 index 0000000..32f135b --- /dev/null +++ b/src/apps/device/mod.rs @@ -0,0 +1,5 @@ +mod models; + +pub(crate) mod rpc; +pub(crate) use models::Device; +pub(crate) use rpc::new_rpc_handler; diff --git a/src/models/device.rs b/src/apps/device/models.rs similarity index 100% rename from src/models/device.rs rename to src/apps/device/models.rs diff --git a/src/apps/device/rpc.rs b/src/apps/device/rpc.rs new file mode 100644 index 0000000..957669a --- /dev/null +++ b/src/apps/device/rpc.rs @@ -0,0 +1,138 @@ +use std::sync::Arc; +use tdn::types::{ + group::GroupId, + primitive::{new_io_error, HandleResult, PeerAddr}, + rpc::{json, rpc_response, RpcHandler, RpcParam}, +}; + +use crate::group::GroupEvent; +use crate::rpc::RpcState; +use crate::storage::consensus_db; +use crate::utils::device_status::device_status as local_device_status; + +use super::Device; + +#[inline] +pub(crate) fn device_create(mgid: GroupId, device: &Device) -> RpcParam { + rpc_response(0, "device-create", json!(device.to_rpc()), mgid) +} + +#[inline] +pub(crate) fn _device_remove(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "device-remove", json!([id]), mgid) +} + +#[inline] +pub(crate) fn device_online(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "device-online", json!([id]), mgid) +} + +#[inline] +pub(crate) fn device_offline(mgid: GroupId, id: i64) -> RpcParam { + rpc_response(0, "device-offline", json!([id]), mgid) +} + +#[inline] +pub(crate) fn device_status( + mgid: GroupId, + cpu: u32, + memory: u32, + swap: u32, + disk: u32, + cpu_p: u16, + memory_p: u16, + swap_p: u16, + disk_p: u16, + uptime: u32, +) -> RpcParam { + rpc_response( + 0, + "device-status", + json!([cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]), + mgid, + ) +} + +#[inline] +fn device_list(devices: Vec) -> RpcParam { + let mut results = vec![]; + for device in devices { + results.push(device.to_rpc()); + } + json!(results) +} + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("device-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); + + handler.add_method( + "device-list", + |gid: GroupId, _params: Vec, state: Arc| async move { + let db = consensus_db(state.layer.read().await.base(), &gid)?; + let devices = Device::all(&db)?; + drop(db); + let online_devices = state.group.read().await.online_devices(&gid, devices); + Ok(HandleResult::rpc(device_list(online_devices))) + }, + ); + + handler.add_method( + "device-status", + |gid: GroupId, params: Vec, state: Arc| async move { + let addr = PeerAddr::from_hex(params[0].as_str()?) + .map_err(|_e| new_io_error("PeerAddr invalid!"))?; + + let group_lock = state.group.read().await; + if &addr == group_lock.addr() { + let uptime = group_lock.uptime(&gid)?; + let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) = + local_device_status(); + return Ok(HandleResult::rpc(json!([ + cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime + ]))); + } + drop(group_lock); + + let msg = state + .group + .write() + .await + .event_message(addr, &GroupEvent::StatusRequest)?; + + Ok(HandleResult::group(gid, msg)) + }, + ); + + handler.add_method( + "device-create", + |gid: GroupId, params: Vec, state: Arc| async move { + let addr = PeerAddr::from_hex(params[0].as_str()?) + .map_err(|_e| new_io_error("PeerAddr invalid!"))?; + + let msg = state.group.read().await.create_message(&gid, addr)?; + Ok(HandleResult::group(gid, msg)) + }, + ); + + handler.add_method( + "device-connect", + |gid: GroupId, params: Vec, state: Arc| async move { + let addr = PeerAddr::from_hex(params[0].as_str()?) + .map_err(|_e| new_io_error("PeerAddr invalid!"))?; + + let msg = state.group.read().await.connect_message(&gid, addr)?; + Ok(HandleResult::group(gid, msg)) + }, + ); + + handler.add_method( + "device-delete", + |_gid: GroupId, params: Vec, _state: Arc| async move { + let _id = params[0].as_i64()?; + // TODO delete a device. + Ok(HandleResult::new()) + }, + ); +} diff --git a/src/apps/domain/mod.rs b/src/apps/domain/mod.rs new file mode 100644 index 0000000..6968310 --- /dev/null +++ b/src/apps/domain/mod.rs @@ -0,0 +1,12 @@ +use tdn::types::{ + primitive::HandleResult, + rpc::{json, RpcHandler}, +}; + +use crate::rpc::RpcState; + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("domain-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); +} diff --git a/src/apps/file/mod.rs b/src/apps/file/mod.rs new file mode 100644 index 0000000..f9b898c --- /dev/null +++ b/src/apps/file/mod.rs @@ -0,0 +1,5 @@ +mod models; +mod rpc; + +pub(crate) use models::{FileId, FileType}; +pub(crate) use rpc::new_rpc_handler; diff --git a/src/models/file.rs b/src/apps/file/models.rs similarity index 100% rename from src/models/file.rs rename to src/apps/file/models.rs diff --git a/src/apps/file/rpc.rs b/src/apps/file/rpc.rs new file mode 100644 index 0000000..eeb8818 --- /dev/null +++ b/src/apps/file/rpc.rs @@ -0,0 +1,22 @@ +use std::sync::Arc; +use tdn::types::{ + group::GroupId, + primitive::HandleResult, + rpc::{json, RpcHandler, RpcParam}, +}; + +use crate::rpc::RpcState; + +pub(crate) fn new_rpc_handler(handler: &mut RpcHandler) { + handler.add_method("files-echo", |_, params, _| async move { + Ok(HandleResult::rpc(json!(params))) + }); + + handler.add_method( + "files-folder", + |_gid: GroupId, params: Vec, _state: Arc| async move { + let _path = params[0].as_str()?; + Ok(HandleResult::new()) + }, + ); +} diff --git a/src/models/consensus.rs b/src/consensus.rs similarity index 100% rename from src/models/consensus.rs rename to src/consensus.rs diff --git a/src/daemon.rs b/src/daemon.rs index 69db1af..2d2a35f 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -4,11 +4,13 @@ extern crate log; use std::env::args; use tdn::smol::{self, io::Result}; +mod account; +mod apps; +mod consensus; mod event; mod group; mod layer; mod migrate; -mod models; mod primitives; mod rpc; mod server; diff --git a/src/event.rs b/src/event.rs index 972ee7f..b432b94 100644 --- a/src/event.rs +++ b/src/event.rs @@ -12,16 +12,18 @@ use tdn::types::{ use tdn_did::user::User; use tdn_storage::local::DStorage; +use crate::account::Account; +use crate::consensus::Event; use crate::group::{Group, GroupEvent}; use crate::layer::running::Online; use crate::layer::{Layer, LayerEvent}; use crate::migrate::consensus::{ ACCOUNT_TABLE_PATH, FILE_TABLE_PATH, FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH, }; -use crate::models::account::Account; -use crate::models::consensus::Event; -use crate::models::file::{FileId, FileType}; -use crate::models::session::{Friend, Message, NetworkMessage, Request}; + +use crate::apps::chat::rpc as chat_rpc; +use crate::apps::chat::{Friend, Message, NetworkMessage, Request}; +use crate::apps::file::{FileId, FileType}; use crate::rpc; use crate::storage::{ account_db, consensus_db, delete_avatar_sync, read_avatar_sync, session_db, write_avatar_sync, @@ -255,7 +257,7 @@ impl InnerEvent { } if let Some(req) = Request::get(&db, &remote.id)? { req.delete(&db)?; // delete the old request. - results.rpcs.push(rpc::request_delete(gid, req.id)); + results.rpcs.push(chat_rpc::request_delete(gid, req.id)); } let mut request = Request::new(remote.id, remote.addr, remote.name, remark, is_me, true); @@ -264,7 +266,7 @@ impl InnerEvent { drop(db); // save the avatar. write_avatar_sync(group.base(), &gid, &remote.id, remote.avatar)?; - results.rpcs.push(rpc::request_create(gid, &request)); + results.rpcs.push(chat_rpc::request_create(gid, &request)); (REQUEST_TABLE_PATH, request.id) } InnerEvent::SessionRequestHandle(rgid, is_ok, avatar) => { @@ -292,9 +294,11 @@ impl InnerEvent { } }) .detach(); - results.rpcs.push(rpc::request_agree(gid, rid, &friend)); + results + .rpcs + .push(chat_rpc::request_agree(gid, rid, &friend)); } else { - results.rpcs.push(rpc::request_reject(gid, rid)); + results.rpcs.push(chat_rpc::request_reject(gid, rid)); } (REQUEST_TABLE_PATH, rid) } else { @@ -310,7 +314,7 @@ impl InnerEvent { if Friend::get(&db, &request.gid)?.is_none() { delete_avatar_sync(group.base(), &gid, &request.gid)?; } - results.rpcs.push(rpc::request_delete(gid, rid)); + results.rpcs.push(chat_rpc::request_delete(gid, rid)); (REQUEST_TABLE_PATH, rid) } else { return Ok(()); @@ -340,7 +344,7 @@ impl InnerEvent { } let msg = m.handle(is_me, gid, group.base(), &db, f.id, hash)?; - results.rpcs.push(rpc::message_create(gid, &msg)); + results.rpcs.push(chat_rpc::message_create(gid, &msg)); (MESSAGE_TABLE_PATH, msg.id) } else { return Ok(()); @@ -350,7 +354,7 @@ impl InnerEvent { let db = session_db(group.base(), &gid)?; if let Some(m) = Message::get_it(&db, &hash)? { m.delete(&db)?; - results.rpcs.push(rpc::message_delete(gid, m.id)); + results.rpcs.push(chat_rpc::message_delete(gid, m.id)); (MESSAGE_TABLE_PATH, m.id) } else { return Ok(()); @@ -365,7 +369,7 @@ impl InnerEvent { if ravatar.len() > 0 { write_avatar_sync(group.base(), &gid, &rgid, ravatar)?; } - results.rpcs.push(rpc::friend_info(gid, &f)); + results.rpcs.push(chat_rpc::friend_info(gid, &f)); (FRIEND_TABLE_PATH, f.id) } else { return Ok(()); @@ -379,7 +383,7 @@ impl InnerEvent { f.me_update(&db)?; results .rpcs - .push(rpc::friend_update(gid, f.id, is_top, &f.remark)); + .push(chat_rpc::friend_update(gid, f.id, is_top, &f.remark)); (FRIEND_TABLE_PATH, f.id) } else { return Ok(()); @@ -389,7 +393,7 @@ impl InnerEvent { let db = session_db(group.base(), &gid)?; if let Some(f) = Friend::get_it(&db, &rgid)? { f.close(&db)?; - results.rpcs.push(rpc::friend_close(gid, f.id)); + results.rpcs.push(chat_rpc::friend_close(gid, f.id)); let rfid = f.id; let layer_lock = layer.clone(); @@ -418,7 +422,7 @@ impl InnerEvent { let db = session_db(group.base(), &gid)?; if let Some(f) = Friend::get_it(&db, &rgid)? { f.delete(&db)?; - results.rpcs.push(rpc::friend_delete(gid, f.id)); + results.rpcs.push(chat_rpc::friend_delete(gid, f.id)); delete_avatar_sync(group.base(), &gid, &f.gid)?; let rfid = f.id; @@ -494,7 +498,9 @@ impl StatusEvent { StatusEvent::SessionFriendOnline(rgid) => { let db = session_db(group.base(), &gid)?; if let Some(f) = Friend::get_it(&db, &rgid)? { - results.rpcs.push(rpc::friend_online(gid, f.id, f.addr)); + results + .rpcs + .push(chat_rpc::friend_online(gid, f.id, f.addr)); let layer_lock = layer.clone(); let rgid = f.gid; let ggid = gid.clone(); @@ -517,8 +523,11 @@ impl StatusEvent { tdn::smol::spawn(async move { if let Ok(running) = layer_lock.write().await.running_mut(&ggid) { if running.check_offline(&rgid, &addr) { - let msg = - SendMessage::Rpc(uid, rpc::friend_offline(ggid, rid), true); + let msg = SendMessage::Rpc( + uid, + chat_rpc::friend_offline(ggid, rid), + true, + ); let _ = sender.send(msg).await; } } @@ -752,7 +761,7 @@ impl SyncEvent { if Friend::get(&session_db, &rgid)?.is_none() { delete_avatar_sync(&base, &gid, &rgid)?; } - results.rpcs.push(rpc::request_delete(gid, req.id)); + results.rpcs.push(chat_rpc::request_delete(gid, req.id)); } req.is_ok = is_ok; @@ -768,13 +777,13 @@ impl SyncEvent { // save to db. request.insert(&session_db)?; let rid = request.id; - results.rpcs.push(rpc::request_create(gid, &request)); + results.rpcs.push(chat_rpc::request_create(gid, &request)); if is_delete { if Friend::get(&session_db, &rgid)?.is_none() { delete_avatar_sync(&base, &gid, &rgid)?; } - results.rpcs.push(rpc::request_delete(gid, rid)); + results.rpcs.push(chat_rpc::request_delete(gid, rid)); } request @@ -795,9 +804,11 @@ impl SyncEvent { } }) .detach(); - results.rpcs.push(rpc::request_agree(gid, rid, &friend)); + results + .rpcs + .push(chat_rpc::request_agree(gid, rid, &friend)); } else { - results.rpcs.push(rpc::request_reject(gid, rid)); + results.rpcs.push(chat_rpc::request_reject(gid, rid)); } session_db.close()?; @@ -859,9 +870,9 @@ impl SyncEvent { } if friend.is_deleted { - results.rpcs.push(rpc::friend_delete(gid, friend.id)); + results.rpcs.push(chat_rpc::friend_delete(gid, friend.id)); } else { - results.rpcs.push(rpc::friend_info(gid, &friend)); + results.rpcs.push(chat_rpc::friend_info(gid, &friend)); } friend.id @@ -887,7 +898,7 @@ impl SyncEvent { let id = if let Some(f) = Friend::get_it(&session_db, &fgid)? { let msg = m.handle(is_me, gid, &base, &session_db, f.id, eid)?; - results.rpcs.push(rpc::message_create(gid, &msg)); + results.rpcs.push(chat_rpc::message_create(gid, &msg)); msg.id } else { -1 diff --git a/src/group.rs b/src/group.rs index ef7b7cb..73d3db5 100644 --- a/src/group.rs +++ b/src/group.rs @@ -12,9 +12,12 @@ use tdn::{ }; use tdn_did::{user::User, Proof}; +use crate::account::Account; +use crate::apps::device::rpc as device_rpc; +use crate::apps::device::Device; +use crate::consensus::Event; use crate::event::{InnerEvent, StatusEvent, SyncEvent}; use crate::layer::Layer; -use crate::models::{account::Account, consensus::Event, device::Device}; use crate::rpc; use crate::storage::{account_db, account_init, consensus_db}; use crate::utils::device_status::device_status as local_device_status; @@ -97,7 +100,7 @@ impl Group { for (_, account) in &mut self.runnings { if let Some(device) = account.distributes.get_mut(&addr) { device.1 = false; - results.rpcs.push(rpc::device_offline(gid, device.0)); + results.rpcs.push(device_rpc::device_offline(gid, device.0)); } } } @@ -182,7 +185,7 @@ impl Group { if let Some(v) = running.distributes.get_mut(&addr) { v.1 = true; - results.rpcs.push(rpc::device_online(*gid, v.0)); + results.rpcs.push(device_rpc::device_online(*gid, v.0)); (remote_height, remote_event, new_addrs) } else { let mut device = Device::new(device_name, device_info, addr); @@ -190,8 +193,10 @@ impl Group { device.insert(&db)?; db.close()?; running.distributes.insert(addr, (device.id, true)); - results.rpcs.push(rpc::device_create(*gid, &device)); - results.rpcs.push(rpc::device_online(*gid, device.id)); + results.rpcs.push(device_rpc::device_create(*gid, &device)); + results + .rpcs + .push(device_rpc::device_online(*gid, device.id)); (remote_height, remote_event, new_addrs) } } @@ -215,7 +220,7 @@ impl Group { let v = self.running_mut(gid)?; let did = v.add_online(&addr)?; - results.rpcs.push(rpc::device_online(*gid, did)); + results.rpcs.push(device_rpc::device_online(*gid, did)); (remote_height, remote_event, vec![]) } }; @@ -575,7 +580,7 @@ impl Group { let (ancestors, hashes, is_min) = if to >= from { let (ancestors, is_min) = Self::ancestor(from, to); let db = consensus_db(&self.base, gid)?; - let hashes = crate::models::consensus::Event::get_assign_hash(&db, &ancestors)?; + let hashes = crate::consensus::Event::get_assign_hash(&db, &ancestors)?; db.close()?; (ancestors, hashes, is_min) } else { @@ -665,7 +670,7 @@ impl GroupEvent { GroupEvent::DeviceOffline => { let v = group.running_mut(&gid)?; let did = v.offline(&addr)?; - results.rpcs.push(rpc::device_offline(gid, did)); + results.rpcs.push(device_rpc::device_offline(gid, did)); } GroupEvent::StatusRequest => { let (cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p) = @@ -700,7 +705,7 @@ impl GroupEvent { swap_p, disk_p, uptime, - ) => results.rpcs.push(rpc::device_status( + ) => results.rpcs.push(device_rpc::device_status( gid, cpu_n, mem_s, swap_s, disk_s, cpu_p, mem_p, swap_p, disk_p, uptime, )), GroupEvent::Event(eheight, eid, pre, inner_event) => { @@ -726,7 +731,7 @@ impl GroupEvent { if account.height != remote_height || account.event != remote_event { // check ancestor and merge. let db = consensus_db(&group.base, &gid)?; - let ours = crate::models::consensus::Event::get_assign_hash(&db, &ancestors)?; + let ours = crate::consensus::Event::get_assign_hash(&db, &ancestors)?; drop(db); if ours.len() == 0 { diff --git a/src/group/running.rs b/src/group/running.rs index a3ce8bb..200d695 100644 --- a/src/group/running.rs +++ b/src/group/running.rs @@ -8,7 +8,7 @@ use tdn::types::{ use tdn_did::Keypair; -use crate::models::device::Device; +use crate::apps::device::Device; use crate::storage::consensus_db; pub(crate) struct RunningAccount { diff --git a/src/layer.rs b/src/layer.rs index 3e3d9e1..31ff2f8 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -12,11 +12,12 @@ use tdn::{ }; use tdn_did::{user::User, Proof}; +use crate::apps::chat::{Friend, Message, MessageType, NetworkMessage, Request}; use crate::event::{InnerEvent, StatusEvent}; use crate::group::Group; use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH}; -use crate::models::session::{Friend, Message, MessageType, NetworkMessage, Request}; -use crate::rpc; + +use crate::apps::chat::rpc; use crate::storage::{ read_avatar, read_file, read_record, session_db, write_avatar_sync, write_file, write_image, }; diff --git a/src/layer/running.rs b/src/layer/running.rs index 80610ff..227cb4a 100644 --- a/src/layer/running.rs +++ b/src/layer/running.rs @@ -5,7 +5,7 @@ use tdn::types::{ primitive::{new_io_error, PeerAddr, Result}, }; -use crate::models::session::Friend; +use crate::apps::chat::Friend; use crate::storage::session_db; /// online info. diff --git a/src/lib.rs b/src/lib.rs index f742181..6c66541 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,11 +5,13 @@ use std::ffi::CStr; use std::os::raw::c_char; use tdn::smol; +mod account; +mod apps; +mod consensus; mod event; mod group; mod layer; mod migrate; -mod models; mod primitives; mod rpc; mod server; diff --git a/src/models.rs b/src/models.rs deleted file mode 100644 index 773f21e..0000000 --- a/src/models.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub(crate) mod account; -pub(crate) mod consensus; -pub(crate) mod device; -pub(crate) mod file; -pub(crate) mod service; -pub(crate) mod session; diff --git a/src/rpc.rs b/src/rpc.rs index 21d8034..e4b6378 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -7,22 +7,30 @@ use tdn::{ types::{ group::GroupId, message::{NetworkType, SendMessage, SendType, StateRequest, StateResponse}, - primitive::{new_io_error, HandleResult, PeerAddr}, + primitive::{new_io_error, HandleResult, PeerAddr, Result}, rpc::{json, rpc_response, RpcError, RpcHandler, RpcParam}, }, }; -use tdn_did::user::User; +use crate::apps::app_rpc_inject; use crate::event::InnerEvent; -use crate::group::{Group, GroupEvent}; +use crate::group::Group; use crate::layer::{Layer, LayerEvent}; -use crate::migrate::consensus::{FRIEND_TABLE_PATH, MESSAGE_TABLE_PATH, REQUEST_TABLE_PATH}; -use crate::models::{ - device::Device, - session::{Friend, Message, MessageType, Request}, -}; -use crate::storage::{consensus_db, delete_avatar, session_db}; -use crate::utils::device_status::device_status as local_device_status; + +pub(crate) fn init_rpc( + addr: PeerAddr, + group: Arc>, + layer: Arc>, +) -> RpcHandler { + let mut handler = new_rpc_handler(addr, group, layer); + app_rpc_inject(&mut handler); + handler +} + +pub(crate) struct RpcState { + pub group: Arc>, + pub layer: Arc>, +} #[inline] pub(crate) fn network_stable(peers: Vec<(PeerAddr, bool)>) -> RpcParam { @@ -52,159 +60,11 @@ pub(crate) fn network_seed(peers: Vec) -> RpcParam { rpc_response(0, "network-seed", json!(s_peers), GroupId::default()) } -#[inline] -pub(crate) fn friend_online(mgid: GroupId, fid: i64, addr: PeerAddr) -> RpcParam { - rpc_response(0, "friend-online", json!([fid, addr.to_hex()]), mgid) -} - -#[inline] -pub(crate) fn friend_offline(mgid: GroupId, fid: i64) -> RpcParam { - rpc_response(0, "friend-offline", json!([fid]), mgid) -} - -#[inline] -pub(crate) fn friend_info(mgid: GroupId, friend: &Friend) -> RpcParam { - rpc_response(0, "friend-info", json!(friend.to_rpc()), mgid) -} - -#[inline] -pub(crate) fn friend_update(mgid: GroupId, fid: i64, is_top: bool, remark: &str) -> RpcParam { - rpc_response(0, "friend-update", json!([fid, is_top, remark]), mgid) -} - -#[inline] -pub(crate) fn friend_close(mgid: GroupId, fid: i64) -> RpcParam { - rpc_response(0, "friend-close", json!([fid]), mgid) -} - -#[inline] -pub(crate) fn friend_delete(mgid: GroupId, fid: i64) -> RpcParam { - rpc_response(0, "friend-delete", json!([fid]), mgid) -} - -#[inline] -pub(crate) fn request_create(mgid: GroupId, req: &Request) -> RpcParam { - rpc_response(0, "request-create", json!(req.to_rpc()), mgid) -} - -#[inline] -pub(crate) fn request_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam { - rpc_response(0, "request-delivery", json!([id, is_d]), mgid) -} - -#[inline] -pub(crate) fn request_agree(mgid: GroupId, id: i64, friend: &Friend) -> RpcParam { - rpc_response(0, "request-agree", json!([id, friend.to_rpc()]), mgid) -} - -#[inline] -pub(crate) fn request_reject(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "request-reject", json!([id]), mgid) -} - -#[inline] -pub(crate) fn request_delete(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "request-delete", json!([id]), mgid) -} - -#[inline] -pub(crate) fn message_create(mgid: GroupId, msg: &Message) -> RpcParam { - rpc_response(0, "message-create", json!(msg.to_rpc()), mgid) -} - -#[inline] -pub(crate) fn message_delivery(mgid: GroupId, id: i64, is_d: bool) -> RpcParam { - rpc_response(0, "message-delivery", json!([id, is_d]), mgid) -} - -#[inline] -pub(crate) fn message_delete(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "message-delete", json!([id]), mgid) -} - -#[inline] -pub(crate) fn device_create(mgid: GroupId, device: &Device) -> RpcParam { - rpc_response(0, "device-create", json!(device.to_rpc()), mgid) -} - -#[inline] -pub(crate) fn _device_remove(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "device-remove", json!([id]), mgid) -} - -#[inline] -pub(crate) fn device_online(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "device-online", json!([id]), mgid) -} - -#[inline] -pub(crate) fn device_offline(mgid: GroupId, id: i64) -> RpcParam { - rpc_response(0, "device-offline", json!([id]), mgid) -} - #[inline] pub(crate) fn account_update(mgid: GroupId, name: &str, avatar: String) -> RpcParam { rpc_response(0, "account-update", json!([name, avatar]), mgid) } -#[inline] -pub(crate) fn device_status( - mgid: GroupId, - cpu: u32, - memory: u32, - swap: u32, - disk: u32, - cpu_p: u16, - memory_p: u16, - swap_p: u16, - disk_p: u16, - uptime: u32, -) -> RpcParam { - rpc_response( - 0, - "device-status", - json!([cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime]), - mgid, - ) -} - -#[inline] -fn friend_list(friends: Vec) -> RpcParam { - let mut results = vec![]; - for friend in friends { - results.push(friend.to_rpc()); - } - - json!(results) -} - -#[inline] -fn request_list(requests: Vec) -> RpcParam { - let mut results = vec![]; - for request in requests { - results.push(request.to_rpc()); - } - json!(results) -} - -#[inline] -fn message_list(messages: Vec) -> RpcParam { - let mut results = vec![]; - for msg in messages { - results.push(msg.to_rpc()); - } - json!(results) -} - -#[inline] -fn device_list(devices: Vec) -> RpcParam { - let mut results = vec![]; - for device in devices { - results.push(device.to_rpc()); - } - json!(results) -} - #[inline] pub(crate) async fn sleep_waiting_close_stable( sender: Sender, @@ -239,7 +99,7 @@ pub(crate) async fn inner_rpc( uid: u64, method: &str, sender: &async_channel::Sender, -) -> Result<(), std::io::Error> { +) -> Result<()> { // Inner network default rpc method. only use in http-rpc. if method == "network-stable" || method == "network-dht" || method == "network-seed" { let req = match method { @@ -275,28 +135,22 @@ pub(crate) async fn inner_rpc( Err(new_io_error("not found")) } -pub(crate) struct RpcState { - group: Arc>, - layer: Arc>, -} - -#[inline] -pub(crate) fn new_rpc_handler( +fn new_rpc_handler( addr: PeerAddr, group: Arc>, layer: Arc>, ) -> RpcHandler { - let mut rpc_handler = RpcHandler::new(RpcState { group, layer }); + let mut handler = RpcHandler::new(RpcState { group, layer }); - rpc_handler.add_method("echo", |_, params, _| async move { + handler.add_method("echo", |_, params, _| async move { Ok(HandleResult::rpc(json!(params))) }); - rpc_handler.add_method("system-info", move |_, _, _| async move { + handler.add_method("system-info", move |_, _, _| async move { Ok(HandleResult::rpc(json!(vec![addr.to_hex()]))) }); - rpc_handler.add_method( + handler.add_method( "add-bootstrap", |_gid, params: Vec, _| async move { let socket = params[0].as_str()?; @@ -308,7 +162,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-list", |_gid, _params: Vec, state: Arc| async move { let mut users: Vec> = vec![]; @@ -327,7 +181,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-create", |_gid, params: Vec, state: Arc| async move { let name = params[0].as_str()?; @@ -353,7 +207,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-restore", |_gid, params: Vec, state: Arc| async move { let name = params[0].as_str()?; @@ -390,76 +244,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( - "device-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let db = consensus_db(state.layer.read().await.base(), &gid)?; - let devices = Device::all(&db)?; - drop(db); - let online_devices = state.group.read().await.online_devices(&gid, devices); - Ok(HandleResult::rpc(device_list(online_devices))) - }, - ); - - rpc_handler.add_method( - "device-status", - |gid: GroupId, params: Vec, state: Arc| async move { - let addr = PeerAddr::from_hex(params[0].as_str()?) - .map_err(|_e| new_io_error("PeerAddr invalid!"))?; - - let group_lock = state.group.read().await; - if &addr == group_lock.addr() { - let uptime = group_lock.uptime(&gid)?; - let (cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p) = - local_device_status(); - return Ok(HandleResult::rpc(json!([ - cpu, memory, swap, disk, cpu_p, memory_p, swap_p, disk_p, uptime - ]))); - } - drop(group_lock); - - let msg = state - .group - .write() - .await - .event_message(addr, &GroupEvent::StatusRequest)?; - - Ok(HandleResult::group(gid, msg)) - }, - ); - - rpc_handler.add_method( - "device-create", - |gid: GroupId, params: Vec, state: Arc| async move { - let addr = PeerAddr::from_hex(params[0].as_str()?) - .map_err(|_e| new_io_error("PeerAddr invalid!"))?; - - let msg = state.group.read().await.create_message(&gid, addr)?; - Ok(HandleResult::group(gid, msg)) - }, - ); - - rpc_handler.add_method( - "device-connect", - |gid: GroupId, params: Vec, state: Arc| async move { - let addr = PeerAddr::from_hex(params[0].as_str()?) - .map_err(|_e| new_io_error("PeerAddr invalid!"))?; - - let msg = state.group.read().await.connect_message(&gid, addr)?; - Ok(HandleResult::group(gid, msg)) - }, - ); - - rpc_handler.add_method( - "device-delete", - |_gid: GroupId, params: Vec, _state: Arc| async move { - let _id = params[0].as_i64()?; - // TODO delete a device. - Ok(HandleResult::new()) - }, - ); - - rpc_handler.add_method( + handler.add_method( "account-update", |gid: GroupId, params: Vec, state: Arc| async move { let name = params[0].as_str()?; @@ -484,7 +269,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-pin", |gid: GroupId, params: Vec, state: Arc| async move { let old = params[0].as_str()?; @@ -495,7 +280,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-mnemonic", |gid: GroupId, params: Vec, state: Arc| async move { let lock = params[0].as_str()?; @@ -505,7 +290,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-login", |_gid: GroupId, params: Vec, state: Arc| async move { let gid = GroupId::from_hex(params[0].as_str()?)?; @@ -524,7 +309,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-logout", |_gid: GroupId, _params: Vec, state: Arc| async move { let mut results = HandleResult::new(); @@ -562,7 +347,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-online", |_gid: GroupId, params: Vec, state: Arc| async move { let gid = GroupId::from_hex(params[0].as_str()?)?; @@ -588,7 +373,7 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( + handler.add_method( "account-offline", |_gid: GroupId, params: Vec, state: Arc| async move { let gid = GroupId::from_hex(params[0].as_str()?)?; @@ -618,386 +403,5 @@ pub(crate) fn new_rpc_handler( }, ); - rpc_handler.add_method( - "friend-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let friends = state.layer.read().await.all_friends_with_online(&gid)?; - Ok(HandleResult::rpc(friend_list(friends))) - }, - ); - - rpc_handler.add_method( - "friend-update", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - let remark = params[1].as_str()?; - let is_top = params[2].as_bool()?; - - let mut results = HandleResult::new(); - let db = session_db(state.layer.read().await.base(), &gid)?; - let f = if let Some(mut f) = Friend::get_id(&db, id)? { - f.is_top = is_top; - f.remark = remark.to_owned(); - f.me_update(&db)?; - f - } else { - return Ok(results); - }; - drop(db); - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionFriendUpdate(f.gid, f.is_top, f.remark), - FRIEND_TABLE_PATH, - f.id, - &mut results, - )?; - Ok(results) - }, - ); - - rpc_handler.add_method( - "friend-readed", - |gid: GroupId, params: Vec, state: Arc| async move { - let fid = params[0].as_i64()?; - - let db = session_db(state.layer.read().await.base(), &gid)?; - Friend::readed(&db, fid)?; - drop(db); - - Ok(HandleResult::new()) - }, - ); - - rpc_handler.add_method( - "friend-close", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let mut results = HandleResult::new(); - let mut layer_lock = state.layer.write().await; - - let db = session_db(layer_lock.base(), &gid)?; - let friend = Friend::get_id(&db, id)??; - friend.close(&db)?; - drop(db); - - let online = layer_lock.remove_friend(&gid, &friend.gid); - drop(layer_lock); - - if let Some(faddr) = online { - let mut addrs: HashMap = HashMap::new(); - addrs.insert(faddr, friend.gid); - let sender = state.group.read().await.sender(); - tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs)) - .detach(); - } - - let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]); - let msg = SendType::Event(0, friend.addr, data); - results.layers.push((gid, friend.gid, msg)); - - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionFriendClose(friend.gid), - FRIEND_TABLE_PATH, - friend.id, - &mut results, - )?; - - Ok(results) - }, - ); - - rpc_handler.add_method( - "friend-delete", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let mut results = HandleResult::new(); - let mut layer_lock = state.layer.write().await; - - let db = session_db(layer_lock.base(), &gid)?; - let friend = Friend::get_id(&db, id)??; - friend.delete(&db)?; - drop(db); - - let online = layer_lock.remove_friend(&gid, &friend.gid); - delete_avatar(layer_lock.base(), &gid, &friend.gid).await?; - drop(layer_lock); - - if let Some(faddr) = online { - let mut addrs: HashMap = HashMap::new(); - addrs.insert(faddr, friend.gid); - let sender = state.group.read().await.sender(); - tdn::smol::spawn(sleep_waiting_close_stable(sender, HashMap::new(), addrs)) - .detach(); - } - - let data = postcard::to_allocvec(&LayerEvent::Close).unwrap_or(vec![]); - let msg = SendType::Event(0, friend.addr, data); - results.layers.push((gid, friend.gid, msg)); - - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionFriendDelete(friend.gid), - FRIEND_TABLE_PATH, - friend.id, - &mut results, - )?; - - Ok(results) - }, - ); - - rpc_handler.add_method( - "request-list", - |gid: GroupId, _params: Vec, state: Arc| async move { - let layer_lock = state.layer.read().await; - let db = session_db(layer_lock.base(), &gid)?; - drop(layer_lock); - let requests = Request::all(&db)?; - drop(db); - Ok(HandleResult::rpc(request_list(requests))) - }, - ); - - rpc_handler.add_method( - "request-create", - |gid: GroupId, params: Vec, state: Arc| async move { - let remote_gid = GroupId::from_hex(params[0].as_str()?)?; - let remote_addr = PeerAddr::from_hex(params[1].as_str()?)?; - let remote_name = params[2].as_str()?.to_string(); - let remark = params[3].as_str()?.to_string(); - - let mut request = Request::new( - remote_gid, - remote_addr, - remote_name.clone(), - remark.clone(), - true, - false, - ); - - let mut results = HandleResult::rpc(Default::default()); - let me = state.group.read().await.clone_user(&gid)?; - - let mut layer_lock = state.layer.write().await; - let db = session_db(layer_lock.base(), &gid)?; - if Friend::is_friend(&db, &request.gid)? { - debug!("had friend."); - drop(layer_lock); - return Ok(results); - } - - if let Some(req) = Request::get(&db, &request.gid)? { - println!("Had this request."); - req.delete(&db)?; - } - request.insert(&db)?; - drop(db); - - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionRequestCreate( - true, - User::new(remote_gid, remote_addr, remote_name, vec![])?, - remark, - ), - REQUEST_TABLE_PATH, - request.id, - &mut results, - )?; - - results - .layers - .push((gid, remote_gid, layer_lock.req_message(me, request))); - - drop(layer_lock); - - Ok(results) - }, - ); - - rpc_handler.add_method( - "request-agree", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let mut group_lock = state.group.write().await; - let me = group_lock.clone_user(&gid)?; - let mut layer_lock = state.layer.write().await; - let db = session_db(layer_lock.base(), &gid)?; - let mut results = HandleResult::new(); - - if let Some(mut request) = Request::get_id(&db, id)? { - group_lock.broadcast( - &gid, - InnerEvent::SessionRequestHandle(request.gid, true, vec![]), - REQUEST_TABLE_PATH, - request.id, - &mut results, - )?; - request.is_ok = true; - request.is_over = true; - request.update(&db)?; - - let f = Friend::from_request(&db, request)?; - layer_lock.running_mut(&gid)?.add_permissioned(f.gid, f.id); - results.rpcs.push(json!([id, f.to_rpc()])); - - let proof = group_lock.prove_addr(&gid, &f.addr)?; - let msg = layer_lock.rpc_agree_message(id, proof, me, &gid, f.addr)?; - results.layers.push((gid, f.gid, msg)); - } - db.close()?; - drop(group_lock); - drop(layer_lock); - Ok(results) - }, - ); - - rpc_handler.add_method( - "request-reject", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let mut layer_lock = state.layer.write().await; - let db = session_db(layer_lock.base(), &gid)?; - let mut req = Request::get_id(&db, id)??; - req.is_ok = false; - req.is_over = true; - req.update(&db)?; - drop(db); - let msg = layer_lock.reject_message(id, req.addr, gid); - drop(layer_lock); - - let mut results = HandleResult::layer(gid, req.gid, msg); - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionRequestHandle(req.gid, false, vec![]), - REQUEST_TABLE_PATH, - req.id, - &mut results, - )?; - Ok(results) - }, - ); - - rpc_handler.add_method( - "request-delete", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let layer_lock = state.layer.read().await; - let db = session_db(layer_lock.base(), &gid)?; - let base = layer_lock.base().clone(); - drop(layer_lock); - let req = Request::get_id(&db, id)??; - req.delete(&db)?; - - // delete avatar. check had friend. - if Friend::get(&db, &req.gid)?.is_none() { - delete_avatar(&base, &gid, &req.gid).await?; - } - drop(db); - - let mut results = HandleResult::new(); - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionRequestDelete(req.gid), - REQUEST_TABLE_PATH, - req.id, - &mut results, - )?; - Ok(results) - }, - ); - - rpc_handler.add_method( - "message-list", - |gid: GroupId, params: Vec, state: Arc| async move { - let fid = params[0].as_i64()?; - - let layer_lock = state.layer.read().await; - let db = session_db(layer_lock.base(), &gid)?; - drop(layer_lock); - - Friend::readed(&db, fid)?; - let messages = Message::get(&db, &fid)?; - drop(db); - Ok(HandleResult::rpc(message_list(messages))) - }, - ); - - rpc_handler.add_method( - "message-create", - |gid: GroupId, params: Vec, state: Arc| async move { - let fid = params[0].as_i64()?; - let fgid = GroupId::from_hex(params[1].as_str()?)?; - let m_type = MessageType::from_int(params[2].as_i64()?); - let content = params[3].as_str()?.to_string(); - - let mut layer_lock = state.layer.write().await; - let base = layer_lock.base(); - let faddr = layer_lock.running(&gid)?.online(&fgid)?; - - let (msg, nw) = LayerEvent::from_message(base, gid, fid, m_type, content).await?; - let event = LayerEvent::Message(msg.hash, nw); - let s = layer_lock.event_message(msg.id, gid, faddr, &event); - drop(layer_lock); - - let mut results = HandleResult::rpc(json!(msg.to_rpc())); - results.layers.push((gid, fgid, s)); - - match event { - LayerEvent::Message(hash, nw) => { - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionMessageCreate(fgid, true, hash, nw), - MESSAGE_TABLE_PATH, - msg.id, - &mut results, - )?; - } - _ => {} - } - - Ok(results) - }, - ); - - rpc_handler.add_method( - "message-delete", - |gid: GroupId, params: Vec, state: Arc| async move { - let id = params[0].as_i64()?; - - let layer_lock = state.layer.read().await; - let db = session_db(&layer_lock.base(), &gid)?; - drop(layer_lock); - - let msg = Message::get_id(&db, id)??; - msg.delete(&db)?; - drop(db); - let mut results = HandleResult::new(); - state.group.write().await.broadcast( - &gid, - InnerEvent::SessionMessageDelete(msg.hash), - MESSAGE_TABLE_PATH, - msg.id, - &mut results, - )?; - Ok(results) - }, - ); - - rpc_handler.add_method( - "files-folder", - |_gid: GroupId, params: Vec, _state: Arc| async move { - let _path = params[0].as_str()?; - - Ok(HandleResult::new()) - }, - ); - - rpc_handler + handler } diff --git a/src/server.rs b/src/server.rs index ff61122..d4fd600 100644 --- a/src/server.rs +++ b/src/server.rs @@ -12,12 +12,12 @@ use tdn::{ types::primitive::HandleResult, }; +use crate::account::Account; use crate::group::Group; use crate::layer::Layer; use crate::migrate::main_migrate; -use crate::models::account::Account; use crate::primitives::network_seeds; -use crate::rpc::{inner_rpc, new_rpc_handler}; +use crate::rpc::{init_rpc, inner_rpc}; use crate::storage::account_db; pub const DEFAULT_WS_ADDR: &'static str = "127.0.0.1:8080"; @@ -67,7 +67,7 @@ pub async fn start(db_path: String) -> Result<()> { Layer::init(db_path, peer_id, group.clone()).await?, )); - let rpc = new_rpc_handler(peer_id, group.clone(), layer.clone()); + let rpc = init_rpc(peer_id, group.clone(), layer.clone()); //let mut group_rpcs: HashMap = HashMap::new(); let mut now_rpc_uid = 0;