Browse Source

clientrtmp: perform rtsp->rtmp conversion entirely inside the writer routine

pull/340/head
aler9 5 years ago
parent
commit
201fcd4b34
  1. 117
      internal/clientrtmp/client.go

117
internal/clientrtmp/client.go

@ -56,6 +56,11 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values) {
return pathName, ur.Query() return pathName, ur.Query()
} }
type trackIDBufPair struct {
trackID int
buf []byte
}
// Parent is implemented by clientman.ClientMan. // Parent is implemented by clientman.ClientMan.
type Parent interface { type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
@ -313,48 +318,70 @@ func (c *Client) runRead() {
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
} }
pair := data.(trackIDBufPair)
now := time.Now() now := time.Now()
switch tdata := data.(type) { if c.videoTrack != nil && pair.trackID == c.videoTrack.ID {
case *rtph264.NALUAndTimestamp: nts, err := c.h264Decoder.Decode(pair.buf)
if !videoInitialized { if err != nil {
videoInitialized = true if err != rtph264.ErrMorePacketsNeeded {
videoStartDTS = now c.log(logger.Debug, "ERR while decoding video track: %v", err)
videoPTS = tdata.Timestamp }
continue
} }
// aggregate NALUs by PTS for _, nt := range nts {
if tdata.Timestamp != videoPTS { if !videoInitialized {
pkt := av.Packet{ videoInitialized = true
Type: av.H264, videoStartDTS = now
Data: h264.FillNALUsAVCC(videoBuf), videoPTS = nt.Timestamp
Time: now.Sub(videoStartDTS),
} }
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) // aggregate NALUs by PTS
err := c.conn.WritePacket(pkt) if nt.Timestamp != videoPTS {
if err != nil { pkt := av.Packet{
return err Type: av.H264,
Data: h264.FillNALUsAVCC(videoBuf),
Time: now.Sub(videoStartDTS),
}
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WritePacket(pkt)
if err != nil {
return err
}
videoBuf = nil
} }
videoBuf = nil videoPTS = nt.Timestamp
videoBuf = append(videoBuf, nt.NALU)
} }
continue
}
videoPTS = tdata.Timestamp if c.audioTrack != nil && pair.trackID == c.audioTrack.ID {
videoBuf = append(videoBuf, tdata.NALU) ats, err := c.aacDecoder.Decode(pair.buf)
if err != nil {
case *rtpaac.AUAndTimestamp: c.log(logger.Debug, "ERR while decoding audio track: %v", err)
pkt := av.Packet{ continue
Type: av.AAC,
Data: tdata.AU,
Time: tdata.Timestamp,
} }
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) for _, at := range ats {
err := c.conn.WritePacket(pkt) pkt := av.Packet{
if err != nil { Type: av.AAC,
return err Data: at.AU,
Time: at.Timestamp,
}
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WritePacket(pkt)
if err != nil {
return err
}
} }
continue
} }
} }
}() }()
@ -616,36 +643,6 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
// OnIncomingFrame implements path.Reader. // OnIncomingFrame implements path.Reader.
func (c *Client) OnIncomingFrame(trackID int, streamType gortsplib.StreamType, buf []byte) { func (c *Client) OnIncomingFrame(trackID int, streamType gortsplib.StreamType, buf []byte) {
if streamType == gortsplib.StreamTypeRTP { if streamType == gortsplib.StreamTypeRTP {
if c.videoTrack != nil { c.ringBuffer.Push(trackIDBufPair{trackID, buf})
if trackID == c.videoTrack.ID {
nts, err := c.h264Decoder.Decode(buf)
if err != nil {
if err != rtph264.ErrMorePacketsNeeded {
c.log(logger.Debug, "ERR while decoding video track: %v", err)
}
return
}
for _, nt := range nts {
c.ringBuffer.Push(nt)
}
return
}
}
if c.audioTrack != nil {
if trackID == c.audioTrack.ID {
ats, err := c.aacDecoder.Decode(buf)
if err != nil {
c.log(logger.Debug, "ERR while decoding audio track: %v", err)
return
}
for _, at := range ats {
c.ringBuffer.Push(at)
}
return
}
}
} }
} }

Loading…
Cancel
Save