Browse Source

Add RTMP support

pull/198/head
WANG CHAO 4 years ago
parent
commit
637cf74cc3
  1. 3
      configure/liveconfig.go
  2. 14
      main.go
  3. 7
      protocol/api/api.go
  4. 5
      protocol/rtmp/cache/special.go
  5. 52
      protocol/rtmp/core/conn_client.go
  6. 3
      protocol/rtmp/rtmprelay/rtmprelay.go

3
configure/liveconfig.go

@ -54,6 +54,7 @@ type ServerCfg struct {
RedisPwd string `mapstructure:"redis_pwd"` RedisPwd string `mapstructure:"redis_pwd"`
ReadTimeout int `mapstructure:"read_timeout"` ReadTimeout int `mapstructure:"read_timeout"`
WriteTimeout int `mapstructure:"write_timeout"` WriteTimeout int `mapstructure:"write_timeout"`
EnableTLSVerify bool `mapstructure:"enable_tls_verify"`
GopNum int `mapstructure:"gop_num"` GopNum int `mapstructure:"gop_num"`
JWT JWT `mapstructure:"jwt"` JWT JWT `mapstructure:"jwt"`
Server Applications `mapstructure:"server"` Server Applications `mapstructure:"server"`
@ -71,6 +72,7 @@ var defaultConf = ServerCfg{
APIAddr: ":8090", APIAddr: ":8090",
WriteTimeout: 10, WriteTimeout: 10,
ReadTimeout: 10, ReadTimeout: 10,
EnableTLSVerify: true,
GopNum: 1, GopNum: 1,
Server: Applications{{ Server: Applications{{
Appname: "live", Appname: "live",
@ -127,6 +129,7 @@ func initDefault() {
pflag.Int("read_timeout", 10, "read time out") pflag.Int("read_timeout", 10, "read time out")
pflag.Int("write_timeout", 10, "write time out") pflag.Int("write_timeout", 10, "write time out")
pflag.Int("gop_num", 1, "gop num") pflag.Int("gop_num", 1, "gop num")
pflag.Bool("enable_tls_verify", true, "Use system root CA to verify RTMPS connection, set this flag to false on Windows")
pflag.Parse() pflag.Parse()
Config.BindPFlags(pflag.CommandLine) Config.BindPFlags(pflag.CommandLine)

14
main.go

@ -2,15 +2,16 @@ package main
import ( import (
"fmt" "fmt"
"net"
"path"
"runtime"
"time"
"github.com/gwuhaolin/livego/configure" "github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/api" "github.com/gwuhaolin/livego/protocol/api"
"github.com/gwuhaolin/livego/protocol/hls" "github.com/gwuhaolin/livego/protocol/hls"
"github.com/gwuhaolin/livego/protocol/httpflv" "github.com/gwuhaolin/livego/protocol/httpflv"
"github.com/gwuhaolin/livego/protocol/rtmp" "github.com/gwuhaolin/livego/protocol/rtmp"
"net"
"path"
"runtime"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -37,10 +38,8 @@ func startHls() *hls.Server {
return hlsServer return hlsServer
} }
var rtmpAddr string
func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) { func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) {
rtmpAddr = configure.Config.GetString("rtmp_addr") rtmpAddr := configure.Config.GetString("rtmp_addr")
rtmpListen, err := net.Listen("tcp", rtmpAddr) rtmpListen, err := net.Listen("tcp", rtmpAddr)
if err != nil { if err != nil {
@ -88,6 +87,7 @@ func startHTTPFlv(stream *rtmp.RtmpStream) {
func startAPI(stream *rtmp.RtmpStream) { func startAPI(stream *rtmp.RtmpStream) {
apiAddr := configure.Config.GetString("api_addr") apiAddr := configure.Config.GetString("api_addr")
rtmpAddr := configure.Config.GetString("rtmp_addr")
if apiAddr != "" { if apiAddr != "" {
opListen, err := net.Listen("tcp", apiAddr) opListen, err := net.Listen("tcp", apiAddr)

7
protocol/api/api.go

@ -202,7 +202,7 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) {
return true return true
}) })
} else { } else {
// Warning: The room should be in the "live/stream" format! // Warning: The room should be in the "live/stream" format!
roomInfo, exists := (rtmpStream.GetStreams()).Load(room) roomInfo, exists := (rtmpStream.GetStreams()).Load(room)
if exists == false { if exists == false {
res.Status = 404 res.Status = 404
@ -299,12 +299,13 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) {
log.Debugf("rtmprelay start push %s from %s", remoteurl, localurl) log.Debugf("rtmprelay start push %s from %s", remoteurl, localurl)
err = pullRtmprelay.Start() err = pullRtmprelay.Start()
if err != nil { if err != nil {
res.Status = 400
retString = fmt.Sprintf("push error=%v", err) retString = fmt.Sprintf("push error=%v", err)
} else { } else {
s.session[keyString] = pullRtmprelay s.session[keyString] = pullRtmprelay
retString = fmt.Sprintf("<h1>push url start %s ok</h1></br>", url) retString = fmt.Sprintf("<h1>pull url start %s ok</h1></br>", url)
} }
res.Status = 400
res.Data = retString res.Data = retString
log.Debugf("pull start return %s", retString) log.Debugf("pull start return %s", retString)
} }

5
protocol/rtmp/cache/special.go vendored

@ -43,5 +43,8 @@ func (specialCache *SpecialCache) Send(w av.WriteCloser) error {
if !specialCache.full { if !specialCache.full {
return nil return nil
} }
return w.Write(specialCache.p)
// demux in hls will change p.Data, only send a copy here
newPacket := *specialCache.p
return w.Write(&newPacket)
} }

52
protocol/rtmp/core/conn_client.go

@ -2,6 +2,8 @@ package core
import ( import (
"bytes" "bytes"
"crypto/tls"
"crypto/x509"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
@ -10,6 +12,7 @@ import (
"strings" "strings"
"github.com/gwuhaolin/livego/av" "github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/amf" "github.com/gwuhaolin/livego/protocol/amf"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -36,9 +39,9 @@ type ConnClient struct {
tcurl string tcurl string
app string app string
title string title string
query string
curcmdName string curcmdName string
streamid uint32 streamid uint32
isRTMPS bool
conn *Conn conn *Conn
encoder *amf.Encoder encoder *amf.Encoder
decoder *amf.Decoder decoder *amf.Decoder
@ -221,9 +224,20 @@ func (connClient *ConnClient) Start(url string, method string) error {
} }
connClient.app = ps[0] connClient.app = ps[0]
connClient.title = ps[1] connClient.title = ps[1]
connClient.query = u.RawQuery if u.RawQuery != "" {
connClient.tcurl = "rtmp://" + u.Host + "/" + connClient.app connClient.title += "?" + u.RawQuery
port := ":1935" }
connClient.isRTMPS = strings.HasPrefix(url, "rtmps://")
var port string
if connClient.isRTMPS {
connClient.tcurl = "rtmps://" + u.Host + "/" + connClient.app
port = ":443"
} else {
connClient.tcurl = "rtmp://" + u.Host + "/" + connClient.app
port = ":1935"
}
host := u.Host host := u.Host
localIP := ":0" localIP := ":0"
var remoteIP string var remoteIP string
@ -256,10 +270,32 @@ func (connClient *ConnClient) Start(url string, method string) error {
log.Warning(err) log.Warning(err)
return err return err
} }
conn, err := net.DialTCP("tcp", local, remote)
if err != nil { var conn net.Conn
log.Warning(err) if connClient.isRTMPS {
return err var config tls.Config
if configure.Config.GetBool("enable_tls_verify") {
roots, err := x509.SystemCertPool()
if err != nil {
log.Warning(err)
return err
}
config.RootCAs = roots
} else {
config.InsecureSkipVerify = true
}
conn, err = tls.Dial("tcp", remoteIP, &config)
if err != nil {
log.Warning(err)
return err
}
} else {
conn, err = net.DialTCP("tcp", local, remote)
if err != nil {
log.Warning(err)
return err
}
} }
log.Debug("connection:", "local:", conn.LocalAddr(), "remote:", conn.RemoteAddr()) log.Debug("connection:", "local:", conn.LocalAddr(), "remote:", conn.RemoteAddr())

3
protocol/rtmp/rtmprelay/rtmprelay.go

@ -3,9 +3,9 @@ package rtmprelay
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/gwuhaolin/livego/av"
"io" "io"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/protocol/amf" "github.com/gwuhaolin/livego/protocol/amf"
"github.com/gwuhaolin/livego/protocol/rtmp/core" "github.com/gwuhaolin/livego/protocol/rtmp/core"
@ -62,6 +62,7 @@ func (self *RtmpRelay) rcvPlayChunkStream() {
log.Debugf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err) log.Debugf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err)
case 18: case 18:
log.Debug("rcvPlayRtmpMediaPacket: metadata....") log.Debug("rcvPlayRtmpMediaPacket: metadata....")
self.cs_chan <- rc
case 8, 9: case 8, 9:
self.cs_chan <- rc self.cs_chan <- rc
} }

Loading…
Cancel
Save