Browse Source

split sourcertmp and rtmpinfo

pull/235/head
aler9 5 years ago
parent
commit
17c22577c9
  1. 133
      internal/rtmpinfo/rtmpinfo.go
  2. 202
      internal/sourcertmp/source.go

133
internal/rtmpinfo/rtmpinfo.go

@ -0,0 +1,133 @@ @@ -0,0 +1,133 @@
package rtmpinfo
import (
"fmt"
"net"
"time"
"github.com/aler9/gortsplib"
"github.com/notedit/rtmp/av"
"github.com/notedit/rtmp/codec/h264"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/notedit/rtmp/format/rtmp"
)
const (
codecH264 = 7
codecAAC = 10
)
func readMetadata(rconn *rtmp.Conn) (flvio.AMFMap, error) {
pkt, err := rconn.ReadPacket()
if err != nil {
return nil, err
}
if pkt.Type != av.Metadata {
return nil, fmt.Errorf("first packet must be metadata")
}
arr, err := flvio.ParseAMFVals(pkt.Data, false)
if err != nil {
return nil, err
}
if len(arr) != 1 {
return nil, fmt.Errorf("invalid metadata")
}
ma, ok := arr[0].(flvio.AMFMap)
if !ok {
return nil, fmt.Errorf("invalid metadata")
}
return ma, nil
}
// Info extracts track informations from a RTMP connection.
func Info(rconn *rtmp.Conn, nconn net.Conn, readTimeout time.Duration) (
*gortsplib.Track, *gortsplib.Track, error) {
var videoTrack *gortsplib.Track
var audioTrack *gortsplib.Track
// configuration must be completed within readTimeout
nconn.SetReadDeadline(time.Now().Add(readTimeout))
md, err := readMetadata(rconn)
if err != nil {
return nil, nil, err
}
hasVideo := false
if v, ok := md.GetFloat64("videocodecid"); ok {
switch v {
case codecH264:
hasVideo = true
case 0:
default:
return nil, nil, fmt.Errorf("unsupported video codec %v", v)
}
}
hasAudio := false
if v, ok := md.GetFloat64("audiocodecid"); ok {
switch v {
case codecAAC:
hasAudio = true
case 0:
default:
return nil, nil, fmt.Errorf("unsupported audio codec %v", v)
}
}
if !hasVideo && !hasAudio {
return nil, nil, fmt.Errorf("stream has no tracks")
}
for {
var pkt av.Packet
pkt, err = rconn.ReadPacket()
if err != nil {
return nil, nil, err
}
switch pkt.Type {
case av.H264DecoderConfig:
if !hasVideo {
return nil, nil, fmt.Errorf("unexpected video packet")
}
if videoTrack != nil {
return nil, nil, fmt.Errorf("video track setupped twice")
}
codec, err := h264.FromDecoderConfig(pkt.Data)
if err != nil {
return nil, nil, err
}
videoTrack, err = gortsplib.NewTrackH264(96, codec.SPS[0], codec.PPS[0])
if err != nil {
return nil, nil, err
}
case av.AACDecoderConfig:
if !hasAudio {
return nil, nil, fmt.Errorf("unexpected audio packet")
}
if audioTrack != nil {
return nil, nil, fmt.Errorf("audio track setupped twice")
}
audioTrack, err = gortsplib.NewTrackAAC(96, pkt.Data)
if err != nil {
return nil, nil, err
}
}
if (!hasVideo || videoTrack != nil) &&
(!hasAudio || audioTrack != nil) {
return videoTrack, audioTrack, nil
}
}
}

202
internal/sourcertmp/source.go

@ -13,18 +13,15 @@ import ( @@ -13,18 +13,15 @@ import (
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/notedit/rtmp/av"
"github.com/notedit/rtmp/codec/h264"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/notedit/rtmp/format/rtmp"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtmpinfo"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
const (
retryPause = 5 * time.Second
codecH264 = 7
codecAAC = 10
)
// Parent is implemeneted by path.Path.
@ -113,49 +110,22 @@ func (s *Source) run() { @@ -113,49 +110,22 @@ func (s *Source) run() {
}
}
func readMetadata(conn *rtmp.Conn) (flvio.AMFMap, error) {
pkt, err := conn.ReadPacket()
if err != nil {
return nil, err
}
if pkt.Type != av.Metadata {
return nil, fmt.Errorf("first packet must be metadata")
}
arr, err := flvio.ParseAMFVals(pkt.Data, false)
if err != nil {
return nil, err
}
if len(arr) != 1 {
return nil, fmt.Errorf("invalid metadata")
}
ma, ok := arr[0].(flvio.AMFMap)
if !ok {
return nil, fmt.Errorf("invalid metadata")
}
return ma, nil
}
func (s *Source) runInner() bool {
s.log(logger.Info, "connecting")
var conn *rtmp.Conn
var rconn *rtmp.Conn
var nconn net.Conn
var err error
dialDone := make(chan struct{}, 1)
go func() {
defer close(dialDone)
conn, nconn, err = rtmp.NewClient().Dial(s.ur, rtmp.PrepareReading)
rconn, nconn, err = rtmp.NewClient().Dial(s.ur, rtmp.PrepareReading)
}()
select {
case <-dialDone:
case <-s.terminate:
return false
case <-dialDone:
}
if err != nil {
@ -163,133 +133,57 @@ func (s *Source) runInner() bool { @@ -163,133 +133,57 @@ func (s *Source) runInner() bool {
return true
}
var tracks gortsplib.Tracks
var videoTrack *gortsplib.Track
var videoRTCPSender *rtcpsender.RTCPSender
var h264Encoder *rtph264.Encoder
var audioTrack *gortsplib.Track
var audioRTCPSender *rtcpsender.RTCPSender
var aacEncoder *rtpaac.Encoder
// configuration must be completed within readTimeout
nconn.SetReadDeadline(time.Now().Add(s.readTimeout))
confDone := make(chan error)
confDone := make(chan struct{})
go func() {
confDone <- func() error {
md, err := readMetadata(conn)
if err != nil {
return err
}
hasVideo := false
if v, ok := md.GetFloat64("videocodecid"); ok {
switch v {
case codecH264:
hasVideo = true
case 0:
default:
return fmt.Errorf("unsupported video codec %v", v)
}
}
hasAudio := false
if v, ok := md.GetFloat64("audiocodecid"); ok {
switch v {
case codecAAC:
hasAudio = true
case 0:
default:
return fmt.Errorf("unsupported audio codec %v", v)
}
}
if !hasVideo && !hasAudio {
return fmt.Errorf("stream has no tracks")
}
for {
var pkt av.Packet
pkt, err = conn.ReadPacket()
if err != nil {
return err
}
switch pkt.Type {
case av.H264DecoderConfig:
if !hasVideo {
return fmt.Errorf("unexpected video packet")
}
if videoTrack != nil {
return fmt.Errorf("video track setupped twice")
}
codec, err := h264.FromDecoderConfig(pkt.Data)
if err != nil {
return err
}
videoTrack, err = gortsplib.NewTrackH264(96, codec.SPS[0], codec.PPS[0])
if err != nil {
return err
}
clockRate, _ := videoTrack.ClockRate()
videoRTCPSender = rtcpsender.New(clockRate)
h264Encoder, err = rtph264.NewEncoder(96)
if err != nil {
return err
}
tracks = append(tracks, videoTrack)
case av.AACDecoderConfig:
if !hasAudio {
return fmt.Errorf("unexpected audio packet")
}
if audioTrack != nil {
return fmt.Errorf("audio track setupped twice")
}
defer close(confDone)
videoTrack, audioTrack, err = rtmpinfo.Info(rconn, nconn, s.readTimeout)
}()
audioTrack, err = gortsplib.NewTrackAAC(96, pkt.Data)
if err != nil {
return err
}
select {
case <-confDone:
case <-s.terminate:
nconn.Close()
<-confDone
return false
}
clockRate, _ := audioTrack.ClockRate()
audioRTCPSender = rtcpsender.New(clockRate)
if err != nil {
s.log(logger.Info, "ERR: %s", err)
return true
}
aacEncoder, err = rtpaac.NewEncoder(96, clockRate)
if err != nil {
return err
}
var tracks gortsplib.Tracks
var videoRTCPSender *rtcpsender.RTCPSender
var h264Encoder *rtph264.Encoder
var audioRTCPSender *rtcpsender.RTCPSender
var aacEncoder *rtpaac.Encoder
tracks = append(tracks, audioTrack)
}
if videoTrack != nil {
clockRate, _ := videoTrack.ClockRate()
h264Encoder, err = rtph264.NewEncoder(96)
if err != nil {
nconn.Close()
s.log(logger.Info, "ERR: %s", err)
return true
}
if (!hasVideo || videoTrack != nil) &&
(!hasAudio || audioTrack != nil) {
return nil
}
}
}()
}()
videoRTCPSender = rtcpsender.New(clockRate)
tracks = append(tracks, videoTrack)
}
select {
case err := <-confDone:
if audioTrack != nil {
clockRate, _ := audioTrack.ClockRate()
aacEncoder, err = rtpaac.NewEncoder(96, clockRate)
if err != nil {
nconn.Close()
s.log(logger.Info, "ERR: %s", err)
return true
}
case <-s.terminate:
nconn.Close()
<-confDone
return false
audioRTCPSender = rtcpsender.New(clockRate)
tracks = append(tracks, audioTrack)
}
for i, t := range tracks {
@ -338,7 +232,7 @@ func (s *Source) runInner() bool { @@ -338,7 +232,7 @@ func (s *Source) runInner() bool {
readerDone <- func() error {
for {
nconn.SetReadDeadline(time.Now().Add(s.readTimeout))
pkt, err := conn.ReadPacket()
pkt, err := rconn.ReadPacket()
if err != nil {
return err
}
@ -390,21 +284,21 @@ func (s *Source) runInner() bool { @@ -390,21 +284,21 @@ func (s *Source) runInner() bool {
for {
select {
case <-s.terminate:
case err := <-readerDone:
nconn.Close()
<-readerDone
s.log(logger.Info, "ERR: %s", err)
close(rtcpTerminate)
<-rtcpDone
return false
return true
case err := <-readerDone:
case <-s.terminate:
nconn.Close()
s.log(logger.Info, "ERR: %s", err)
<-readerDone
close(rtcpTerminate)
<-rtcpDone
return true
return false
}
}
}

Loading…
Cancel
Save