diff --git a/Makefile b/Makefile index 4d39d08..8b739a6 100644 --- a/Makefile +++ b/Makefile @@ -58,4 +58,4 @@ clean: # protoc build .PHONY: protoc protoc: - protoc --gogo_out=. protocol/*.proto + protoc --gogo_out=. pkg/protocol/*.proto diff --git a/README.md b/README.md index 03045cc..0855134 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ * 视频消息 * 屏幕共享(基于图片) * 视频通话(基于WebRTC的p2p视频通话) +* 分布式部署(通过kafka全局消息队列,统一消息传递,可以水平扩展系统) ## 后端 [代码仓库](https://github.com/kone-net/go-chat) @@ -186,43 +187,61 @@ npm start http://127.0.0.1:3000/login ``` +### 分布式部署 +* 拉取代码 +将代码拉取到服务器,运行make build构建后端代码。 +* 构建后端服务镜像 +进入目录deployments/docker +通过目录下的Dockerfile构建镜像 +``` +docker build -t konenet/gochat:1.0 . +``` +* 部署服务 +需要部署nginx进行反向代理,mysql保存数据,1个或者多个后端服务。 +* 在config.toml中配置分布式消息队列 +将msgChannelType中的channelType修改为kafka,就为分布式消息队列。需要填写消息队列对应的地址和topic +* 启动服务 +通过deployments/docker下的docker-compose.yml进行启动。 +``` +docker-compose up -d +``` + ## 代码结构 ``` -├── Makefile 代码编译,打包,结构化等操作 +├── Makefile 代码编译,打包,结构化等操作 ├── README.md -├── api -│   └── v1 controller类,对外的接口,如添加好友,查找好友等。所有http请求的入口 -├── bin -│   └── chat 打包的二进制文件 -├── chat.sql 整个项目的SQL -├── cmd main函数入口,程序启动 -├── common -│   ├── constant 常量 -│   └── util 工具类 -├── config 配置初始化类 -├── config.toml 配置文件 -├── dao -│   └── pool 数据库连接池 -├── errors 封装的异常类 -├── global -│   └── log 封装的日志类,使用时不会出现第三方的包依赖 +├── api controller类,对外的接口,如添加好友,查找好友等。所有http请求的入口 +│   └── v1 +├── assets +│   └── screenshot 系统使用到的资源,markdown用到的截图文件 +├── bin 打包的二进制文件 +├── chat.sql 整个项目的SQL +├── cmd +│   └── main.go main函数入口,程序启动 +├── config +│   └── toml_config.go 系统全局的配置文件配置类 +├── config.toml 配置文件 +├── deployments +│   └── docker docker构建镜像,docker-compose.yml等文件 ├── go.mod ├── go.sum -├── logs 日志文件 -├── model 数据库模型,和表一一对应 -│   ├── request 请求的实体类 -│   ├── response 响应的实体类 -├── protocol 消息协议 -│   ├── message.pb.go protoc buffer自动生成的文件 -│   └── message.proto 定义的protoc buffer字段 -├── response 全局响应,通过http请求的,都包含code,msg,data三个字段 -├── router gin和controller类进行绑定 -├── server WebSocket中消息的接受和转发的主要逻辑 -├── service controller调用的服务类 -├── static 静态文件,图片等 -│   ├── img -│   └── screenshot markdown用到的截图文件 -└── test 测试文件 +├── internal +│   ├── dao 数据库 +│   ├── kafka kafka消费者和生产者 +│   ├── model 数据库模型,和表一一对应 +│   ├── router gin和controller类进行绑定 +│   ├── server WebSocket中消息的接受和转发的主要逻辑 +│   └── service 调用的服务类 +├── logs +├── pkg +│   ├── common 常量,工具类 +│   ├── errors 封装的异常类 +│   ├── global 封装的日志类,使用时不会出现第三方的包依赖 +│   └── protocol protoc buffer自动生成的文件,定义的protoc buffer字段 +├── test +│   └── kafka_test.go +└── web + └── static 上传的文件等 ``` ## Makefile diff --git a/cmd/main.go b/cmd/main.go index 2318111..9520c78 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,18 +2,27 @@ package main import ( "chat-room/config" - "chat-room/pkg/global/log" + "chat-room/internal/kafka" "chat-room/internal/router" "chat-room/internal/server" - "go.uber.org/zap" + "chat-room/pkg/common/constant" + "chat-room/pkg/global/log" "net/http" "time" + + "go.uber.org/zap" ) func main() { log.InitLogger(config.GetConfig().Log.Path, config.GetConfig().Log.Level) log.Info("config", zap.Any("config", config.GetConfig())) + if config.GetConfig().MsgChannelType.ChannelType == constant.KAFKA { + kafka.InitProducer(config.GetConfig().MsgChannelType.KafkaTopic, config.GetConfig().MsgChannelType.KafkaHosts) + kafka.InitConsumer(config.GetConfig().MsgChannelType.KafkaHosts) + go kafka.ConsumerMsg(server.ConsumerKafkaMsg) + } + log.Info("start server", zap.String("start", "start web sever...")) newRouter := router.NewRouter() @@ -21,7 +30,7 @@ func main() { go server.MyServer.Start() s := &http.Server{ - Addr: "127.0.0.1:8888", + Addr: ":8888", Handler: newRouter, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, diff --git a/config.toml b/config.toml index 0a8fa07..a588516 100644 --- a/config.toml +++ b/config.toml @@ -14,3 +14,9 @@ path = "logs/chat.log" [staticPath] filePath = "web/static/file/" + +[msgChannelType] +channelType = "gochannel" + +kafkaHosts = "myAddress:9092" +kafkaTopic = "go-chat-message" \ No newline at end of file diff --git a/config/toml_config.go b/config/toml_config.go index ecf9edf..676847b 100644 --- a/config/toml_config.go +++ b/config/toml_config.go @@ -7,12 +7,14 @@ import ( ) type TomlConfig struct { - AppName string - MySQL MySQLConfig - Log LogConfig - StaticPath PathConfig + AppName string + MySQL MySQLConfig + Log LogConfig + StaticPath PathConfig + MsgChannelType MsgChannelType } +// MySQL相关配置 type MySQLConfig struct { Host string Name string @@ -22,15 +24,26 @@ type MySQLConfig struct { User string } +// 日志保存地址 type LogConfig struct { Path string Level string } +// 相关地址信息,例如静态文件地址 type PathConfig struct { FilePath string } +// 消息队列类型及其消息队列相关信息 +// gochannel为单机使用go默认的channel进行消息传递 +// kafka是使用kafka作为消息队列,可以分布式扩展消息聊天程序 +type MsgChannelType struct { + ChannelType string + KafkaHosts string + KafkaTopic string +} + var c TomlConfig func init() { diff --git a/deployments/docker/Dockerfile b/deployments/docker/Dockerfile new file mode 100644 index 0000000..827dedc --- /dev/null +++ b/deployments/docker/Dockerfile @@ -0,0 +1,6 @@ +FROM hub.c.163.com/public/centos:7.0 +RUN [ "mkdir", "/usr/local/gochat" ] +WORKDIR /usr/local/gochat +COPY ../../bin/chat /usr/local/gochat +COPY ../../config.toml /usr/local/gochat +CMD [ "/usr/local/gochat/chat" ] \ No newline at end of file diff --git a/deployments/docker/docker-compose.yml b/deployments/docker/docker-compose.yml new file mode 100644 index 0000000..ff87639 --- /dev/null +++ b/deployments/docker/docker-compose.yml @@ -0,0 +1,56 @@ +version: '3' + +services: + mysql8: + image: mysql:8.0 + container_name: mysql8 + restart: always + environment: + TZ: Asia/Shanghai + MYSQL_ROOT_PASSWORD: thepswdforroot + MYSQL_DATABASE: go-chat-message + MYSQL_USER: gochat + MYSQL_PASSWORD: thepswdforgochat + ports: + - 3306:3306 + command: + # 将mysql8.0默认密码策略 修改为 原先 策略 (mysql8.0对其默认策略做了更改 会导致密码无法匹配) + --default-authentication-plugin=mysql_native_password + --character-set-server=utf8mb4 + --collation-server=utf8mb4_general_ci + --explicit_defaults_for_timestamp=true + --lower_case_table_names=1 + + nginx: + image: nginx:latest + container_name: nginx + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf + ports: + - 80:80 + links: + - gochat + - gochat1 + depends_on: + - gochat + - gochat1 + + gochat: + image: konenet/gochat:1.0 + container_name: gochat + restart: always + ports: + - 8888:8888 + links: + - mysql8 + depends_on: + - mysql8 + + gochat1: + image: konenet/gochat:1.0 + restart: always + container_name: gochat1 + links: + - mysql8 + depends_on: + - mysql8 diff --git a/deployments/docker/nginx.conf b/deployments/docker/nginx.conf new file mode 100644 index 0000000..b90c128 --- /dev/null +++ b/deployments/docker/nginx.conf @@ -0,0 +1,48 @@ +user nginx; +worker_processes auto; + +error_log /var/log/nginx/error.log notice; +pid /var/run/nginx.pid; + + +events { + worker_connections 1024; +} + + +http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + access_log /var/log/nginx/access.log main; + + sendfile on; + #tcp_nopush on; + + keepalive_timeout 65; + + #gzip on; + + #include /etc/nginx/conf.d/*.conf; + + upstream chat { + ip_hash; + server gochat:8888; + server gochat1:8888; + } + + server { + listen 80; + server_name chat; + location / { + proxy_pass http://chat; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + } +} \ No newline at end of file diff --git a/internal/kafka/consumer.go b/internal/kafka/consumer.go new file mode 100644 index 0000000..e7446b3 --- /dev/null +++ b/internal/kafka/consumer.go @@ -0,0 +1,48 @@ +package kafka + +import ( + "chat-room/pkg/global/log" + "github.com/Shopify/sarama" + "strings" +) + +var consumer sarama.Consumer + +type ConsumerCallback func(data []byte) + +// 初始化消费者 +func InitConsumer(hosts string) { + config := sarama.NewConfig() + client, err := sarama.NewClient(strings.Split(hosts, ","), config) + if nil != err { + log.Error("init kafka consumer client error", log.Any("init kafka consumer client error", err.Error())) + } + + consumer, err = sarama.NewConsumerFromClient(client) + if nil != err { + log.Error("init kafka consumer error", log.Any("init kafka consumer error", err.Error())) + } +} + +// 消费消息,通过回调函数进行 +func ConsumerMsg(callBack ConsumerCallback) { + partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest) + if nil != err { + log.Error("iConsumePartition error", log.Any("ConsumePartition error", err.Error())) + return + } + + defer partitionConsumer.Close() + for { + msg := <-partitionConsumer.Messages() + if nil != callBack { + callBack(msg.Value) + } + } +} + +func CloseConsumer() { + if nil != consumer { + consumer.Close() + } +} diff --git a/internal/kafka/producer.go b/internal/kafka/producer.go new file mode 100644 index 0000000..2bb5c56 --- /dev/null +++ b/internal/kafka/producer.go @@ -0,0 +1,37 @@ +package kafka + +import ( + "strings" + + "chat-room/pkg/global/log" + "github.com/Shopify/sarama" +) + +var producer sarama.AsyncProducer +var topic string = "default_message" + +func InitProducer(topicInput, hosts string) { + topic = topicInput + config := sarama.NewConfig() + config.Producer.Compression = sarama.CompressionGZIP + client, err := sarama.NewClient(strings.Split(hosts, ","), config) + if nil != err { + log.Error("init kafka client error", log.Any("init kafka client error", err.Error())) + } + + producer, err = sarama.NewAsyncProducerFromClient(client) + if nil != err { + log.Error("init kafka async client error", log.Any("init kafka async client error", err.Error())) + } +} + +func Send(data []byte) { + be := sarama.ByteEncoder(data) + producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: be} +} + +func Close() { + if producer != nil { + producer.Close() + } +} diff --git a/internal/server/client.go b/internal/server/client.go index f4e8267..a4360ac 100644 --- a/internal/server/client.go +++ b/internal/server/client.go @@ -1,6 +1,8 @@ package server import ( + "chat-room/config" + "chat-room/internal/kafka" "chat-room/pkg/common/constant" "chat-room/pkg/global/log" "chat-room/pkg/protocol" @@ -46,7 +48,11 @@ func (c *Client) Read() { } c.Conn.WriteMessage(websocket.BinaryMessage, pongByte) } else { - MyServer.Broadcast <- message + if config.GetConfig().MsgChannelType.ChannelType == constant.KAFKA { + kafka.Send(message) + } else { + MyServer.Broadcast <- message + } } } } diff --git a/internal/server/server.go b/internal/server/server.go index 0977cbe..ee81c7c 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -2,11 +2,11 @@ package server import ( "chat-room/config" + "chat-room/internal/service" "chat-room/pkg/common/constant" "chat-room/pkg/common/util" "chat-room/pkg/global/log" "chat-room/pkg/protocol" - "chat-room/internal/service" "encoding/base64" "io/ioutil" "strings" @@ -36,6 +36,11 @@ func NewServer() *Server { } } +// 消费kafka里面的消息, 然后直接放入go channel中统一进行消费 +func ConsumerKafkaMsg(data []byte) { + MyServer.Broadcast <- data +} + func (s *Server) Start() { log.Info("start server", log.Any("start server", "start server...")) for { @@ -154,7 +159,7 @@ func saveMessage(message *protocol.Message) { log.Error("transfer base64 to file error", log.String("transfer base64 to file error", dataErr.Error())) return } - err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath + url, dataBuffer, 0666) + err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath+url, dataBuffer, 0666) if err != nil { log.Error("write file error", log.String("write file error", err.Error())) return @@ -170,7 +175,7 @@ func saveMessage(message *protocol.Message) { } contentType := util.GetContentTypeBySuffix(fileSuffix) url := uuid.New().String() + "." + fileSuffix - err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath + url, message.File, 0666) + err := ioutil.WriteFile(config.GetConfig().StaticPath.FilePath+url, message.File, 0666) if err != nil { log.Error("write file error", log.String("write file error", err.Error())) return diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 3f99551..e367fe4 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -4,9 +4,11 @@ const ( HEAT_BEAT = "heatbeat" PONG = "pong" + // 消息类型,单聊或者群聊 MESSAGE_TYPE_USER = 1 MESSAGE_TYPE_GROUP = 2 + // 消息内容类型 TEXT = 1 FILE = 2 IMAGE = 3 @@ -14,4 +16,8 @@ const ( VIDEO = 5 AUDIO_ONLINE = 6 VIDEO_ONLINE = 7 + + // 消息队列类型 + GO_CHANNEL = "gochannel" + KAFKA = "kafka" ) diff --git a/test/proto_test.go b/test/kafka_test.go similarity index 94% rename from test/proto_test.go rename to test/kafka_test.go index 410daf1..8953827 100644 --- a/test/proto_test.go +++ b/test/kafka_test.go @@ -9,7 +9,7 @@ import ( ) func TestKafka(t *testing.T) { - hosts := "127.0.0.1:9092" + hosts := "myAddress:9092" config := sarama.NewConfig() client, _ := sarama.NewClient(strings.Split(hosts, ","), config) @@ -32,9 +32,11 @@ func TestKafka(t *testing.T) { } var consumer sarama.Consumer + type ConsumerCallBack func(data []byte) + func TestKafkaConsumer(t *testing.T) { - hosts := "127.0.0.1:9092" + hosts := "myAddress:9092" config := sarama.NewConfig() client, _ := sarama.NewClient(strings.Split(hosts, ","), config) @@ -47,4 +49,4 @@ func TestKafkaConsumer(t *testing.T) { fmt.Println(msg.Value) fmt.Println(string(msg.Value)) } -} \ No newline at end of file +}