Browse Source

opt:尝试优化使用goroutine发rtmp包,优化连接数过多时可能会带来性能问题

pull/225/head
jerojiang 2 years ago
parent
commit
f08565357b
  1. 43
      protocol/rtmp/stream.go

43
protocol/rtmp/stream.go

@ -98,12 +98,22 @@ type Stream struct { @@ -98,12 +98,22 @@ type Stream struct {
type PackWriterCloser struct {
init bool
w av.WriteCloser
pch chan *TransPacketData
}
type TransPacketData struct {
P *av.Packet
SessKey interface{}
}
func (p *PackWriterCloser) GetWriter() av.WriteCloser {
return p.w
}
func (p *PackWriterCloser) SendPacket(tp *TransPacketData) {
p.pch <- tp
}
func NewStream() *Stream {
return &Stream{
cache: cache.NewCache(),
@ -145,11 +155,26 @@ func (s *Stream) AddReader(r av.ReadCloser) { @@ -145,11 +155,26 @@ func (s *Stream) AddReader(r av.ReadCloser) {
func (s *Stream) AddWriter(w av.WriteCloser) {
info := w.Info()
pw := &PackWriterCloser{w: w}
pw.pch = make(chan *TransPacketData, 50)
//run a goroutine to write player packet
go func() {
for {
p := <-pw.pch
if err := pw.w.Write(p.P); err != nil {
log.Debugf("[%s] write packet error: %v, remove", p.SessKey, err)
s.ws.Delete(p.SessKey)
break
}
}
}()
s.ws.Store(info.UID, pw)
}
/*检测本application下是否配置static_push,
如果配置, 启动push远端的连接*/
/*
检测本application下是否配置static_push,
如果配置, 启动push远端的连接
*/
func (s *Stream) StartStaticPush() {
key := s.info.Key
@ -346,10 +371,18 @@ func (s *Stream) TransStart() { @@ -346,10 +371,18 @@ func (s *Stream) TransStart() {
newPacket := p
//writeType := reflect.TypeOf(v.w)
//log.Debugf("w.Write: type=%v, %v", writeType, v.w.Info())
if err = v.w.Write(&newPacket); err != nil {
log.Debugf("[%s] write packet error: %v, remove", v.w.Info(), err)
s.ws.Delete(key)
// if err = v.w.Write(&newPacket); err != nil {
// log.Debugf("[%s] write packet error: %v, remove", v.w.Info(), err)
// s.ws.Delete(key)
// }
d := TransPacketData{
P: &newPacket,
SessKey: key,
}
go v.SendPacket(&d)
}
return true
})

Loading…
Cancel
Save