Browse Source

feat: add distributed message feature for go chat

pull/7/head
konenet 3 years ago
parent
commit
baf67719e6
  1. 2
      Makefile
  2. 83
      README.md
  3. 15
      cmd/main.go
  4. 6
      config.toml
  5. 21
      config/toml_config.go
  6. 6
      deployments/docker/Dockerfile
  7. 56
      deployments/docker/docker-compose.yml
  8. 48
      deployments/docker/nginx.conf
  9. 48
      internal/kafka/consumer.go
  10. 37
      internal/kafka/producer.go
  11. 8
      internal/server/client.go
  12. 11
      internal/server/server.go
  13. 6
      pkg/common/constant/constant.go
  14. 8
      test/kafka_test.go

2
Makefile

@ -58,4 +58,4 @@ clean: @@ -58,4 +58,4 @@ clean:
# protoc build
.PHONY: protoc
protoc:
protoc --gogo_out=. protocol/*.proto
protoc --gogo_out=. pkg/protocol/*.proto

83
README.md

@ -20,6 +20,7 @@ @@ -20,6 +20,7 @@
* 视频消息
* 屏幕共享(基于图片)
* 视频通话(基于WebRTC的p2p视频通话)
* 分布式部署(通过kafka全局消息队列,统一消息传递,可以水平扩展系统)
## 后端
[代码仓库](https://github.com/kone-net/go-chat)
@ -186,43 +187,61 @@ npm start @@ -186,43 +187,61 @@ npm start
http://127.0.0.1:3000/login
```
### 分布式部署
* 拉取代码
将代码拉取到服务器,运行make build构建后端代码。
* 构建后端服务镜像
进入目录deployments/docker
通过目录下的Dockerfile构建镜像
```
docker build -t konenet/gochat:1.0 .
```
* 部署服务
需要部署nginx进行反向代理,mysql保存数据,1个或者多个后端服务。
* 在config.toml中配置分布式消息队列
将msgChannelType中的channelType修改为kafka,就为分布式消息队列。需要填写消息队列对应的地址和topic
* 启动服务
通过deployments/docker下的docker-compose.yml进行启动。
```
docker-compose up -d
```
## 代码结构
```
├── Makefile 代码编译,打包,结构化等操作
├── Makefile 代码编译,打包,结构化等操作
├── README.md
├── api
   └── v1 controller类,对外的接口,如添加好友,查找好友等。所有http请求的入口
├── bin
   └── chat 打包的二进制文件
├── chat.sql 整个项目的SQL
├── cmd main函数入口,程序启动
├── common
   ├── constant 常量
   └── util 工具类
├── config 配置初始化类
├── config.toml 配置文件
├── dao
   └── pool 数据库连接池
├── errors 封装的异常类
├── global
   └── log 封装的日志类,使用时不会出现第三方的包依赖
├── api controller类,对外的接口,如添加好友,查找好友等。所有http请求的入口
   └── v1
├── assets
   └── screenshot 系统使用到的资源,markdown用到的截图文件
├── bin 打包的二进制文件
├── chat.sql 整个项目的SQL
├── cmd
   └── main.go main函数入口,程序启动
├── config
   └── toml_config.go 系统全局的配置文件配置类
├── config.toml 配置文件
├── deployments
   └── docker docker构建镜像,docker-compose.yml等文件
├── go.mod
├── go.sum
├── logs 日志文件
├── model 数据库模型,和表一一对应
   ├── request 请求的实体类
   ├── response 响应的实体类
├── protocol 消息协议
   ├── message.pb.go protoc buffer自动生成的文件
   └── message.proto 定义的protoc buffer字段
├── response 全局响应,通过http请求的,都包含code,msg,data三个字段
├── router gin和controller类进行绑定
├── server WebSocket中消息的接受和转发的主要逻辑
├── service controller调用的服务类
├── static 静态文件,图片等
   ├── img
   └── screenshot markdown用到的截图文件
└── test 测试文件
├── internal
   ├── dao 数据库
   ├── kafka kafka消费者和生产者
   ├── model 数据库模型,和表一一对应
   ├── router gin和controller类进行绑定
   ├── server WebSocket中消息的接受和转发的主要逻辑
   └── service 调用的服务类
├── logs
├── pkg
   ├── common 常量,工具类
   ├── errors 封装的异常类
   ├── global 封装的日志类,使用时不会出现第三方的包依赖
   └── protocol protoc buffer自动生成的文件,定义的protoc buffer字段
├── test
   └── kafka_test.go
└── web
└── static 上传的文件等
```
## Makefile

15
cmd/main.go

@ -2,18 +2,27 @@ package main @@ -2,18 +2,27 @@ package main
import (
"chat-room/config"
"chat-room/pkg/global/log"
"chat-room/internal/kafka"
"chat-room/internal/router"
"chat-room/internal/server"
"go.uber.org/zap"
"chat-room/pkg/common/constant"
"chat-room/pkg/global/log"
"net/http"
"time"
"go.uber.org/zap"
)
func main() {
log.InitLogger(config.GetConfig().Log.Path, config.GetConfig().Log.Level)
log.Info("config", zap.Any("config", config.GetConfig()))
if config.GetConfig().MsgChannelType.ChannelType == constant.KAFKA {
kafka.InitProducer(config.GetConfig().MsgChannelType.KafkaTopic, config.GetConfig().MsgChannelType.KafkaHosts)
kafka.InitConsumer(config.GetConfig().MsgChannelType.KafkaHosts)
go kafka.ConsumerMsg(server.ConsumerKafkaMsg)
}
log.Info("start server", zap.String("start", "start web sever..."))
newRouter := router.NewRouter()
@ -21,7 +30,7 @@ func main() { @@ -21,7 +30,7 @@ func main() {
go server.MyServer.Start()
s := &http.Server{
Addr: "127.0.0.1:8888",
Addr: ":8888",
Handler: newRouter,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,

6
config.toml

@ -14,3 +14,9 @@ path = "logs/chat.log" @@ -14,3 +14,9 @@ path = "logs/chat.log"
[staticPath]
filePath = "web/static/file/"
[msgChannelType]
channelType = "gochannel"
kafkaHosts = "myAddress:9092"
kafkaTopic = "go-chat-message"

21
config/toml_config.go

@ -7,12 +7,14 @@ import ( @@ -7,12 +7,14 @@ import (
)
type TomlConfig struct {
AppName string
MySQL MySQLConfig
Log LogConfig
StaticPath PathConfig
AppName string
MySQL MySQLConfig
Log LogConfig
StaticPath PathConfig
MsgChannelType MsgChannelType
}
// MySQL相关配置
type MySQLConfig struct {
Host string
Name string
@ -22,15 +24,26 @@ type MySQLConfig struct { @@ -22,15 +24,26 @@ type MySQLConfig struct {
User string
}
// 日志保存地址
type LogConfig struct {
Path string
Level string
}
// 相关地址信息,例如静态文件地址
type PathConfig struct {
FilePath string
}
// 消息队列类型及其消息队列相关信息
// gochannel为单机使用go默认的channel进行消息传递
// kafka是使用kafka作为消息队列,可以分布式扩展消息聊天程序
type MsgChannelType struct {
ChannelType string
KafkaHosts string
KafkaTopic string
}
var c TomlConfig
func init() {

6
deployments/docker/Dockerfile

@ -0,0 +1,6 @@ @@ -0,0 +1,6 @@
FROM hub.c.163.com/public/centos:7.0
RUN [ "mkdir", "/usr/local/gochat" ]
WORKDIR /usr/local/gochat
COPY ../../bin/chat /usr/local/gochat
COPY ../../config.toml /usr/local/gochat
CMD [ "/usr/local/gochat/chat" ]

56
deployments/docker/docker-compose.yml

@ -0,0 +1,56 @@ @@ -0,0 +1,56 @@
version: '3'
services:
mysql8:
image: mysql:8.0
container_name: mysql8
restart: always
environment:
TZ: Asia/Shanghai
MYSQL_ROOT_PASSWORD: thepswdforroot
MYSQL_DATABASE: go-chat-message
MYSQL_USER: gochat
MYSQL_PASSWORD: thepswdforgochat
ports:
- 3306:3306
command:
# 将mysql8.0默认密码策略 修改为 原先 策略 (mysql8.0对其默认策略做了更改 会导致密码无法匹配)
--default-authentication-plugin=mysql_native_password
--character-set-server=utf8mb4
--collation-server=utf8mb4_general_ci
--explicit_defaults_for_timestamp=true
--lower_case_table_names=1
nginx:
image: nginx:latest
container_name: nginx
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
ports:
- 80:80
links:
- gochat
- gochat1
depends_on:
- gochat
- gochat1
gochat:
image: konenet/gochat:1.0
container_name: gochat
restart: always
ports:
- 8888:8888
links:
- mysql8
depends_on:
- mysql8
gochat1:
image: konenet/gochat:1.0
restart: always
container_name: gochat1
links:
- mysql8
depends_on:
- mysql8

48
deployments/docker/nginx.conf

@ -0,0 +1,48 @@ @@ -0,0 +1,48 @@
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log notice;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
#tcp_nopush on;
keepalive_timeout 65;
#gzip on;
#include /etc/nginx/conf.d/*.conf;
upstream chat {
ip_hash;
server gochat:8888;
server gochat1:8888;
}
server {
listen 80;
server_name chat;
location / {
proxy_pass http://chat;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
}

48
internal/kafka/consumer.go

@ -0,0 +1,48 @@ @@ -0,0 +1,48 @@
package kafka
import (
"chat-room/pkg/global/log"
"github.com/Shopify/sarama"
"strings"
)
var consumer sarama.Consumer
type ConsumerCallback func(data []byte)
// 初始化消费者
func InitConsumer(hosts string) {
config := sarama.NewConfig()
client, err := sarama.NewClient(strings.Split(hosts, ","), config)
if nil != err {
log.Error("init kafka consumer client error", log.Any("init kafka consumer client error", err.Error()))
}
consumer, err = sarama.NewConsumerFromClient(client)
if nil != err {
log.Error("init kafka consumer error", log.Any("init kafka consumer error", err.Error()))
}
}
// 消费消息,通过回调函数进行
func ConsumerMsg(callBack ConsumerCallback) {
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if nil != err {
log.Error("iConsumePartition error", log.Any("ConsumePartition error", err.Error()))
return
}
defer partitionConsumer.Close()
for {
msg := <-partitionConsumer.Messages()
if nil != callBack {
callBack(msg.Value)
}
}
}
func CloseConsumer() {
if nil != consumer {
consumer.Close()
}
}

37
internal/kafka/producer.go

@ -0,0 +1,37 @@ @@ -0,0 +1,37 @@
package kafka
import (
"strings"
"chat-room/pkg/global/log"
"github.com/Shopify/sarama"
)
var producer sarama.AsyncProducer
var topic string = "default_message"
func InitProducer(topicInput, hosts string) {
topic = topicInput
config := sarama.NewConfig()
config.Producer.Compression = sarama.CompressionGZIP
client, err := sarama.NewClient(strings.Split(hosts, ","), config)
if nil != err {
log.Error("init kafka client error", log.Any("init kafka client error", err.Error()))
}
producer, err = sarama.NewAsyncProducerFromClient(client)
if nil != err {
log.Error("init kafka async client error", log.Any("init kafka async client error", err.Error()))
}
}
func Send(data []byte) {
be := sarama.ByteEncoder(data)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: be}
}
func Close() {
if producer != nil {
producer.Close()
}
}

8
internal/server/client.go

@ -1,6 +1,8 @@ @@ -1,6 +1,8 @@
package server
import (
"chat-room/config"
"chat-room/internal/kafka"
"chat-room/pkg/common/constant"
"chat-room/pkg/global/log"
"chat-room/pkg/protocol"
@ -46,7 +48,11 @@ func (c *Client) Read() { @@ -46,7 +48,11 @@ func (c *Client) Read() {
}
c.Conn.WriteMessage(websocket.BinaryMessage, pongByte)
} else {
MyServer.Broadcast <- message
if config.GetConfig().MsgChannelType.ChannelType == constant.KAFKA {
kafka.Send(message)
} else {
MyServer.Broadcast <- message
}
}
}
}

11
internal/server/server.go

@ -2,11 +2,11 @@ package server @@ -2,11 +2,11 @@ package server
import (
"chat-room/config"
"chat-room/internal/service"
"chat-room/pkg/common/constant"
"chat-room/pkg/common/util"
"chat-room/pkg/global/log"
"chat-room/pkg/protocol"
"chat-room/internal/service"
"encoding/base64"
"io/ioutil"
"strings"
@ -36,6 +36,11 @@ func NewServer() *Server { @@ -36,6 +36,11 @@ func NewServer() *Server {
}
}
// 消费kafka里面的消息, 然后直接放入go channel中统一进行消费
func ConsumerKafkaMsg(data []byte) {
MyServer.Broadcast <- data
}
func (s *Server) Start() {
log.Info("start server", log.Any("start server", "start server..."))
for {
@ -154,7 +159,7 @@ func saveMessage(message *protocol.Message) { @@ -154,7 +159,7 @@ func saveMessage(message *protocol.Message) {
log.Error("transfer base64 to file error", log.String("transfer base64 to file error", dataErr.Error()))
return
}
err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath + url, dataBuffer, 0666)
err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath+url, dataBuffer, 0666)
if err != nil {
log.Error("write file error", log.String("write file error", err.Error()))
return
@ -170,7 +175,7 @@ func saveMessage(message *protocol.Message) { @@ -170,7 +175,7 @@ func saveMessage(message *protocol.Message) {
}
contentType := util.GetContentTypeBySuffix(fileSuffix)
url := uuid.New().String() + "." + fileSuffix
err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath + url, message.File, 0666)
err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath+url, message.File, 0666)
if err != nil {
log.Error("write file error", log.String("write file error", err.Error()))
return

6
pkg/common/constant/constant.go

@ -4,9 +4,11 @@ const ( @@ -4,9 +4,11 @@ const (
HEAT_BEAT = "heatbeat"
PONG = "pong"
// 消息类型,单聊或者群聊
MESSAGE_TYPE_USER = 1
MESSAGE_TYPE_GROUP = 2
// 消息内容类型
TEXT = 1
FILE = 2
IMAGE = 3
@ -14,4 +16,8 @@ const ( @@ -14,4 +16,8 @@ const (
VIDEO = 5
AUDIO_ONLINE = 6
VIDEO_ONLINE = 7
// 消息队列类型
GO_CHANNEL = "gochannel"
KAFKA = "kafka"
)

8
test/proto_test.go → test/kafka_test.go

@ -9,7 +9,7 @@ import ( @@ -9,7 +9,7 @@ import (
)
func TestKafka(t *testing.T) {
hosts := "127.0.0.1:9092"
hosts := "myAddress:9092"
config := sarama.NewConfig()
client, _ := sarama.NewClient(strings.Split(hosts, ","), config)
@ -32,9 +32,11 @@ func TestKafka(t *testing.T) { @@ -32,9 +32,11 @@ func TestKafka(t *testing.T) {
}
var consumer sarama.Consumer
type ConsumerCallBack func(data []byte)
func TestKafkaConsumer(t *testing.T) {
hosts := "127.0.0.1:9092"
hosts := "myAddress:9092"
config := sarama.NewConfig()
client, _ := sarama.NewClient(strings.Split(hosts, ","), config)
@ -47,4 +49,4 @@ func TestKafkaConsumer(t *testing.T) { @@ -47,4 +49,4 @@ func TestKafkaConsumer(t *testing.T) {
fmt.Println(msg.Value)
fmt.Println(string(msg.Value))
}
}
}
Loading…
Cancel
Save