mirror of https://github.com/kone-net/go-chat.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
48 lines
1.1 KiB
48 lines
1.1 KiB
package kafka |
|
|
|
import ( |
|
"chat-room/pkg/global/log" |
|
"github.com/Shopify/sarama" |
|
"strings" |
|
) |
|
|
|
var consumer sarama.Consumer |
|
|
|
type ConsumerCallback func(data []byte) |
|
|
|
// 初始化消费者 |
|
func InitConsumer(hosts string) { |
|
config := sarama.NewConfig() |
|
client, err := sarama.NewClient(strings.Split(hosts, ","), config) |
|
if nil != err { |
|
log.Logger.Error("init kafka consumer client error", log.Any("init kafka consumer client error", err.Error())) |
|
} |
|
|
|
consumer, err = sarama.NewConsumerFromClient(client) |
|
if nil != err { |
|
log.Logger.Error("init kafka consumer error", log.Any("init kafka consumer error", err.Error())) |
|
} |
|
} |
|
|
|
// 消费消息,通过回调函数进行 |
|
func ConsumerMsg(callBack ConsumerCallback) { |
|
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest) |
|
if nil != err { |
|
log.Logger.Error("iConsumePartition error", log.Any("ConsumePartition error", err.Error())) |
|
return |
|
} |
|
|
|
defer partitionConsumer.Close() |
|
for { |
|
msg := <-partitionConsumer.Messages() |
|
if nil != callBack { |
|
callBack(msg.Value) |
|
} |
|
} |
|
} |
|
|
|
func CloseConsumer() { |
|
if nil != consumer { |
|
consumer.Close() |
|
} |
|
}
|
|
|