Browse Source

Merge pull request #198 from fzdy1914/rtmps-support

Add RTMPS support
pull/202/head
Matthew 3 years ago committed by GitHub
parent
commit
0fd0e41c42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      configure/liveconfig.go
  2. 14
      main.go
  3. 7
      protocol/api/api.go
  4. 5
      protocol/rtmp/cache/special.go
  5. 52
      protocol/rtmp/core/conn_client.go
  6. 3
      protocol/rtmp/rtmprelay/rtmprelay.go

3
configure/liveconfig.go

@ -54,6 +54,7 @@ type ServerCfg struct { @@ -54,6 +54,7 @@ type ServerCfg struct {
RedisPwd string `mapstructure:"redis_pwd"`
ReadTimeout int `mapstructure:"read_timeout"`
WriteTimeout int `mapstructure:"write_timeout"`
EnableTLSVerify bool `mapstructure:"enable_tls_verify"`
GopNum int `mapstructure:"gop_num"`
JWT JWT `mapstructure:"jwt"`
Server Applications `mapstructure:"server"`
@ -71,6 +72,7 @@ var defaultConf = ServerCfg{ @@ -71,6 +72,7 @@ var defaultConf = ServerCfg{
APIAddr: ":8090",
WriteTimeout: 10,
ReadTimeout: 10,
EnableTLSVerify: true,
GopNum: 1,
Server: Applications{{
Appname: "live",
@ -127,6 +129,7 @@ func initDefault() { @@ -127,6 +129,7 @@ func initDefault() {
pflag.Int("read_timeout", 10, "read time out")
pflag.Int("write_timeout", 10, "write time out")
pflag.Int("gop_num", 1, "gop num")
pflag.Bool("enable_tls_verify", true, "Use system root CA to verify RTMPS connection, set this flag to false on Windows")
pflag.Parse()
Config.BindPFlags(pflag.CommandLine)

14
main.go

@ -2,15 +2,16 @@ package main @@ -2,15 +2,16 @@ package main
import (
"fmt"
"net"
"path"
"runtime"
"time"
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/api"
"github.com/gwuhaolin/livego/protocol/hls"
"github.com/gwuhaolin/livego/protocol/httpflv"
"github.com/gwuhaolin/livego/protocol/rtmp"
"net"
"path"
"runtime"
"time"
log "github.com/sirupsen/logrus"
)
@ -37,10 +38,8 @@ func startHls() *hls.Server { @@ -37,10 +38,8 @@ func startHls() *hls.Server {
return hlsServer
}
var rtmpAddr string
func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) {
rtmpAddr = configure.Config.GetString("rtmp_addr")
rtmpAddr := configure.Config.GetString("rtmp_addr")
rtmpListen, err := net.Listen("tcp", rtmpAddr)
if err != nil {
@ -88,6 +87,7 @@ func startHTTPFlv(stream *rtmp.RtmpStream) { @@ -88,6 +87,7 @@ func startHTTPFlv(stream *rtmp.RtmpStream) {
func startAPI(stream *rtmp.RtmpStream) {
apiAddr := configure.Config.GetString("api_addr")
rtmpAddr := configure.Config.GetString("rtmp_addr")
if apiAddr != "" {
opListen, err := net.Listen("tcp", apiAddr)

7
protocol/api/api.go

@ -202,7 +202,7 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) { @@ -202,7 +202,7 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) {
return true
})
} else {
// Warning: The room should be in the "live/stream" format!
// Warning: The room should be in the "live/stream" format!
roomInfo, exists := (rtmpStream.GetStreams()).Load(room)
if exists == false {
res.Status = 404
@ -299,12 +299,13 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) { @@ -299,12 +299,13 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) {
log.Debugf("rtmprelay start push %s from %s", remoteurl, localurl)
err = pullRtmprelay.Start()
if err != nil {
res.Status = 400
retString = fmt.Sprintf("push error=%v", err)
} else {
s.session[keyString] = pullRtmprelay
retString = fmt.Sprintf("<h1>push url start %s ok</h1></br>", url)
retString = fmt.Sprintf("<h1>pull url start %s ok</h1></br>", url)
}
res.Status = 400
res.Data = retString
log.Debugf("pull start return %s", retString)
}

5
protocol/rtmp/cache/special.go vendored

@ -43,5 +43,8 @@ func (specialCache *SpecialCache) Send(w av.WriteCloser) error { @@ -43,5 +43,8 @@ func (specialCache *SpecialCache) Send(w av.WriteCloser) error {
if !specialCache.full {
return nil
}
return w.Write(specialCache.p)
// demux in hls will change p.Data, only send a copy here
newPacket := *specialCache.p
return w.Write(&newPacket)
}

52
protocol/rtmp/core/conn_client.go

@ -2,6 +2,8 @@ package core @@ -2,6 +2,8 @@ package core
import (
"bytes"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"math/rand"
@ -10,6 +12,7 @@ import ( @@ -10,6 +12,7 @@ import (
"strings"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/amf"
log "github.com/sirupsen/logrus"
@ -36,9 +39,9 @@ type ConnClient struct { @@ -36,9 +39,9 @@ type ConnClient struct {
tcurl string
app string
title string
query string
curcmdName string
streamid uint32
isRTMPS bool
conn *Conn
encoder *amf.Encoder
decoder *amf.Decoder
@ -221,9 +224,20 @@ func (connClient *ConnClient) Start(url string, method string) error { @@ -221,9 +224,20 @@ func (connClient *ConnClient) Start(url string, method string) error {
}
connClient.app = ps[0]
connClient.title = ps[1]
connClient.query = u.RawQuery
connClient.tcurl = "rtmp://" + u.Host + "/" + connClient.app
port := ":1935"
if u.RawQuery != "" {
connClient.title += "?" + u.RawQuery
}
connClient.isRTMPS = strings.HasPrefix(url, "rtmps://")
var port string
if connClient.isRTMPS {
connClient.tcurl = "rtmps://" + u.Host + "/" + connClient.app
port = ":443"
} else {
connClient.tcurl = "rtmp://" + u.Host + "/" + connClient.app
port = ":1935"
}
host := u.Host
localIP := ":0"
var remoteIP string
@ -256,10 +270,32 @@ func (connClient *ConnClient) Start(url string, method string) error { @@ -256,10 +270,32 @@ func (connClient *ConnClient) Start(url string, method string) error {
log.Warning(err)
return err
}
conn, err := net.DialTCP("tcp", local, remote)
if err != nil {
log.Warning(err)
return err
var conn net.Conn
if connClient.isRTMPS {
var config tls.Config
if configure.Config.GetBool("enable_tls_verify") {
roots, err := x509.SystemCertPool()
if err != nil {
log.Warning(err)
return err
}
config.RootCAs = roots
} else {
config.InsecureSkipVerify = true
}
conn, err = tls.Dial("tcp", remoteIP, &config)
if err != nil {
log.Warning(err)
return err
}
} else {
conn, err = net.DialTCP("tcp", local, remote)
if err != nil {
log.Warning(err)
return err
}
}
log.Debug("connection:", "local:", conn.LocalAddr(), "remote:", conn.RemoteAddr())

3
protocol/rtmp/rtmprelay/rtmprelay.go

@ -3,9 +3,9 @@ package rtmprelay @@ -3,9 +3,9 @@ package rtmprelay
import (
"bytes"
"fmt"
"github.com/gwuhaolin/livego/av"
"io"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/protocol/amf"
"github.com/gwuhaolin/livego/protocol/rtmp/core"
@ -62,6 +62,7 @@ func (self *RtmpRelay) rcvPlayChunkStream() { @@ -62,6 +62,7 @@ func (self *RtmpRelay) rcvPlayChunkStream() {
log.Debugf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err)
case 18:
log.Debug("rcvPlayRtmpMediaPacket: metadata....")
self.cs_chan <- rc
case 8, 9:
self.cs_chan <- rc
}

Loading…
Cancel
Save