From f08565357b3a95f2f74e0a86d2fb7ff3b08e5456 Mon Sep 17 00:00:00 2001 From: jerojiang Date: Thu, 11 May 2023 14:49:03 +0800 Subject: [PATCH] =?UTF-8?q?opt:=E5=B0=9D=E8=AF=95=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BD=BF=E7=94=A8goroutine=E5=8F=91rtmp=E5=8C=85=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=BF=9E=E6=8E=A5=E6=95=B0=E8=BF=87=E5=A4=9A?= =?UTF-8?q?=E6=97=B6=E5=8F=AF=E8=83=BD=E4=BC=9A=E5=B8=A6=E6=9D=A5=E6=80=A7?= =?UTF-8?q?=E8=83=BD=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- protocol/rtmp/stream.go | 43 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/protocol/rtmp/stream.go b/protocol/rtmp/stream.go index f793bb9..7bb19eb 100755 --- a/protocol/rtmp/stream.go +++ b/protocol/rtmp/stream.go @@ -98,12 +98,22 @@ type Stream struct { type PackWriterCloser struct { init bool w av.WriteCloser + pch chan *TransPacketData +} + +type TransPacketData struct { + P *av.Packet + SessKey interface{} } func (p *PackWriterCloser) GetWriter() av.WriteCloser { return p.w } +func (p *PackWriterCloser) SendPacket(tp *TransPacketData) { + p.pch <- tp +} + func NewStream() *Stream { return &Stream{ cache: cache.NewCache(), @@ -145,11 +155,26 @@ func (s *Stream) AddReader(r av.ReadCloser) { func (s *Stream) AddWriter(w av.WriteCloser) { info := w.Info() pw := &PackWriterCloser{w: w} + pw.pch = make(chan *TransPacketData, 50) + + //run a goroutine to write player packet + go func() { + for { + p := <-pw.pch + if err := pw.w.Write(p.P); err != nil { + log.Debugf("[%s] write packet error: %v, remove", p.SessKey, err) + s.ws.Delete(p.SessKey) + break + } + } + }() s.ws.Store(info.UID, pw) } -/*检测本application下是否配置static_push, -如果配置, 启动push远端的连接*/ +/* +检测本application下是否配置static_push, +如果配置, 启动push远端的连接 +*/ func (s *Stream) StartStaticPush() { key := s.info.Key @@ -346,10 +371,18 @@ func (s *Stream) TransStart() { newPacket := p //writeType := reflect.TypeOf(v.w) //log.Debugf("w.Write: type=%v, %v", writeType, v.w.Info()) - if err = v.w.Write(&newPacket); err != nil { - log.Debugf("[%s] write packet error: %v, remove", v.w.Info(), err) - s.ws.Delete(key) + + // if err = v.w.Write(&newPacket); err != nil { + // log.Debugf("[%s] write packet error: %v, remove", v.w.Info(), err) + // s.ws.Delete(key) + // } + d := TransPacketData{ + P: &newPacket, + SessKey: key, } + + go v.SendPacket(&d) + } return true })