Browse Source

tidy up rtmp

pull/340/head
aler9 4 years ago
parent
commit
d850492ca0
  1. 48
      internal/clientrtmp/client.go
  2. 24
      internal/rtmputils/aac.go
  3. 15
      internal/rtmputils/client.go
  4. 45
      internal/rtmputils/h264.go
  5. 4
      internal/rtmputils/metadata.go
  6. 7
      internal/sourcertmp/source.go

48
internal/clientrtmp/client.go

@ -18,7 +18,6 @@ import ( @@ -18,7 +18,6 @@ import (
"github.com/aler9/gortsplib/pkg/rtpaac"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/notedit/rtmp/av"
rh264 "github.com/notedit/rtmp/codec/h264"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
@ -255,36 +254,15 @@ func (c *Client) runRead() { @@ -255,36 +254,15 @@ func (c *Client) runRead() {
c.conn.WriteMetadata(videoTrack, audioTrack)
if videoTrack != nil {
codec := rh264.Codec{
SPS: map[int][]byte{
0: h264SPS,
},
PPS: map[int][]byte{
0: h264PPS,
},
}
b := make([]byte, 128)
var n int
codec.ToConfig(b, &n)
b = b[:n]
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
c.conn.WritePacket(av.Packet{
Type: av.H264DecoderConfig,
Data: b,
})
c.conn.WriteH264Config(h264SPS, h264PPS)
c.h264Decoder = rtph264.NewDecoder()
c.videoTrack = videoTrack
}
if audioTrack != nil {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
c.conn.WritePacket(av.Packet{
Type: av.AACDecoderConfig,
Data: aacConfig,
})
c.conn.WriteAACConfig(aacConfig)
clockRate, _ := audioTrack.ClockRate()
c.aacDecoder = rtpaac.NewDecoder(clockRate)
c.audioTrack = audioTrack
@ -350,23 +328,11 @@ func (c *Client) runRead() { @@ -350,23 +328,11 @@ func (c *Client) runRead() {
// aggregate NALUs by PTS
if nt.Timestamp != videoPTS {
data, err := h264.EncodeAVCC(videoBuf)
if err != nil {
return err
}
pkt := av.Packet{
Type: av.H264,
Data: data,
Time: now.Sub(videoStartDTS),
}
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err = c.conn.WritePacket(pkt)
err := c.conn.WriteH264(videoBuf, now.Sub(videoStartDTS))
if err != nil {
return err
}
videoBuf = nil
}
@ -382,14 +348,8 @@ func (c *Client) runRead() { @@ -382,14 +348,8 @@ func (c *Client) runRead() {
}
for _, at := range ats {
pkt := av.Packet{
Type: av.AAC,
Data: at.AU,
Time: at.Timestamp,
}
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WritePacket(pkt)
err := c.conn.WriteAAC(at.AU, at.Timestamp)
if err != nil {
return err
}

24
internal/rtmputils/aac.go

@ -0,0 +1,24 @@ @@ -0,0 +1,24 @@
package rtmputils
import (
"time"
"github.com/notedit/rtmp/av"
)
// WriteAACConfig writes an AAC config.
func (c *Conn) WriteAACConfig(config []byte) error {
return c.WritePacket(av.Packet{
Type: av.AACDecoderConfig,
Data: config,
})
}
// WriteAAC writes an AAC AU.
func (c *Conn) WriteAAC(au []byte, dts time.Duration) error {
return c.WritePacket(av.Packet{
Type: av.AAC,
Data: au,
Time: dts,
})
}

15
internal/rtmputils/client.go

@ -0,0 +1,15 @@ @@ -0,0 +1,15 @@
package rtmputils
import (
"github.com/notedit/rtmp/format/rtmp"
)
// Dial connects to a server in reading mode.
func Dial(address string) (*Conn, error) {
rconn, nconn, err := rtmp.NewClient().Dial(address, rtmp.PrepareReading)
if err != nil {
return nil, err
}
return NewConn(rconn, nconn), nil
}

45
internal/rtmputils/h264.go

@ -0,0 +1,45 @@ @@ -0,0 +1,45 @@
package rtmputils
import (
"time"
"github.com/notedit/rtmp/av"
rh264 "github.com/notedit/rtmp/codec/h264"
"github.com/aler9/rtsp-simple-server/internal/h264"
)
// WriteH264Config writes a H264 config.
func (c *Conn) WriteH264Config(sps []byte, pps []byte) error {
codec := rh264.Codec{
SPS: map[int][]byte{
0: sps,
},
PPS: map[int][]byte{
0: pps,
},
}
b := make([]byte, 128)
var n int
codec.ToConfig(b, &n)
b = b[:n]
return c.WritePacket(av.Packet{
Type: av.H264DecoderConfig,
Data: b,
})
}
// WriteH264 writes H264 NALUs.
func (c *Conn) WriteH264(nalus [][]byte, dts time.Duration) error {
data, err := h264.EncodeAVCC(nalus)
if err != nil {
return err
}
return c.WritePacket(av.Packet{
Type: av.H264,
Data: data,
Time: dts,
})
}

4
internal/rtmputils/metadata.go

@ -5,7 +5,7 @@ import ( @@ -5,7 +5,7 @@ import (
"github.com/aler9/gortsplib"
"github.com/notedit/rtmp/av"
"github.com/notedit/rtmp/codec/h264"
rh264 "github.com/notedit/rtmp/codec/h264"
"github.com/notedit/rtmp/format/flv/flvio"
)
@ -125,7 +125,7 @@ func (conn *Conn) ReadMetadata() (*gortsplib.Track, *gortsplib.Track, error) { @@ -125,7 +125,7 @@ func (conn *Conn) ReadMetadata() (*gortsplib.Track, *gortsplib.Track, error) {
return nil, nil, fmt.Errorf("video track setupped twice")
}
codec, err := h264.FromDecoderConfig(pkt.Data)
codec, err := rh264.FromDecoderConfig(pkt.Data)
if err != nil {
return nil, nil, err
}

7
internal/sourcertmp/source.go

@ -2,7 +2,6 @@ package sourcertmp @@ -2,7 +2,6 @@ package sourcertmp
import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
@ -11,7 +10,6 @@ import ( @@ -11,7 +10,6 @@ import (
"github.com/aler9/gortsplib/pkg/rtpaac"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/notedit/rtmp/av"
"github.com/notedit/rtmp/format/rtmp"
"github.com/aler9/rtsp-simple-server/internal/h264"
"github.com/aler9/rtsp-simple-server/internal/logger"
@ -115,10 +113,7 @@ func (s *Source) runInner() bool { @@ -115,10 +113,7 @@ func (s *Source) runInner() bool {
dialDone := make(chan struct{}, 1)
go func() {
defer close(dialDone)
var rconn *rtmp.Conn
var nconn net.Conn
rconn, nconn, err = rtmp.NewClient().Dial(s.ur, rtmp.PrepareReading)
conn = rtmputils.NewConn(rconn, nconn)
conn, err = rtmputils.Dial(s.ur)
}()
select {

Loading…
Cancel
Save