Browse Source

Merge pull request #6 from runner365/master

新增功能和修改异常崩溃bug
pull/30/head
吴浩麟 8 years ago committed by GitHub
parent
commit
d17eee12b4
  1. 3
      av/av.go
  2. 74
      configure/liveconfig.go
  3. 2
      container/flv/muxer.go
  4. 10
      livego.cfg
  5. 42
      livego.go
  6. 28
      protocol/hls/source.go
  7. 32
      protocol/httpflv/writer.go
  8. 231
      protocol/httpopera/http_opera.go
  9. 8
      protocol/rtmp/cache/cache.go
  10. 10
      protocol/rtmp/cache/gop.go
  11. 4
      protocol/rtmp/cache/special.go
  12. 53
      protocol/rtmp/core/conn_client.go
  13. 159
      protocol/rtmp/rtmp.go
  14. 125
      protocol/rtmp/rtmprelay/rtmprelay.go
  15. 180
      protocol/rtmp/rtmprelay/staticrelay.go
  16. 196
      protocol/rtmp/stream.go

3
av/av.go

@ -62,6 +62,7 @@ type Packet struct { @@ -62,6 +62,7 @@ type Packet struct {
IsVideo bool
IsMetadata bool
TimeStamp uint32 // dts
StreamID uint32
Header PacketHeader
Data []byte
}
@ -148,5 +149,5 @@ type WriteCloser interface { @@ -148,5 +149,5 @@ type WriteCloser interface {
Closer
Alive
CalcTime
Write(Packet) error
Write(*Packet) error
}

74
configure/liveconfig.go

@ -0,0 +1,74 @@ @@ -0,0 +1,74 @@
package configure
import (
"encoding/json"
"io/ioutil"
"log"
)
/*
{
[
{
"application":"live",
"live":"on",
"hls":"on",
"static_push":["rtmp://xx/live"]
}
]
}
*/
type Application struct {
Appname string
Liveon string
Hlson string
Static_push []string
}
type ServerCfg struct {
Server []Application
}
var RtmpServercfg ServerCfg
func LoadConfig(configfilename string) error {
log.Printf("starting load configure file(%s)......", configfilename)
data, err := ioutil.ReadFile(configfilename)
if err != nil {
log.Printf("ReadFile %s error:%v", configfilename, err)
return err
}
log.Printf("loadconfig: \r\n%s", string(data))
err = json.Unmarshal(data, &RtmpServercfg)
if err != nil {
log.Printf("json.Unmarshal error:%v", err)
return err
}
log.Printf("get config json data:%v", RtmpServercfg)
return nil
}
func CheckAppName(appname string) bool {
for _, app := range RtmpServercfg.Server {
if (app.Appname == appname) && (app.Liveon == "on") {
return true
}
}
return false
}
func GetStaticPushUrlList(appname string) ([]string, bool) {
for _, app := range RtmpServercfg.Server {
if (app.Appname == appname) && (app.Liveon == "on") {
if len(app.Static_push) > 0 {
return app.Static_push, true
} else {
return nil, false
}
}
}
return nil, false
}

2
container/flv/muxer.go

@ -72,7 +72,7 @@ func NewFLVWriter(app, title, url string, ctx *os.File) *FLVWriter { @@ -72,7 +72,7 @@ func NewFLVWriter(app, title, url string, ctx *os.File) *FLVWriter {
return ret
}
func (writer *FLVWriter) Write(p av.Packet) error {
func (writer *FLVWriter) Write(p *av.Packet) error {
writer.RWBaser.SetPreTime()
h := writer.buf[:headerLen]
typeID := av.TAG_VIDEO

10
livego.cfg

@ -0,0 +1,10 @@ @@ -0,0 +1,10 @@
{
"server": [
{
"appname":"live",
"liveon":"on",
"hlson":"on"
}
]
}

42
main.go → livego.go

@ -2,21 +2,23 @@ package main @@ -2,21 +2,23 @@ package main
import (
"flag"
"net"
"time"
"log"
"github.com/gwuhaolin/livego/protocol/rtmp"
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/hls"
"github.com/gwuhaolin/livego/protocol/httpflv"
"github.com/gwuhaolin/livego/protocol/httpopera"
"github.com/gwuhaolin/livego/protocol/rtmp"
"log"
"net"
"time"
)
var (
version = "master"
rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address")
httpFlvAddr = flag.String("httpflv-addr", ":7001", "HTTP-FLV server listen address")
hlsAddr = flag.String("hls-addr", ":7002", "HLS server listen address")
operaAddr = flag.String("manage-addr", ":8080", "HTTP manage interface server listen address")
version = "master"
rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address")
httpFlvAddr = flag.String("httpflv-addr", ":7001", "HTTP-FLV server listen address")
hlsAddr = flag.String("hls-addr", ":7002", "HLS server listen address")
operaAddr = flag.String("manage-addr", ":8090", "HTTP manage interface server listen address")
configfilename = flag.String("cfgfile", "livego.cfg", "live configure filename")
)
func init() {
@ -49,7 +51,16 @@ func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) { @@ -49,7 +51,16 @@ func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) {
log.Fatal(err)
}
rtmpServer := rtmp.NewRtmpServer(stream, hlsServer)
var rtmpServer *rtmp.Server
if hlsServer == nil {
rtmpServer = rtmp.NewRtmpServer(stream, nil)
log.Printf("hls server disable....")
} else {
rtmpServer = rtmp.NewRtmpServer(stream, hlsServer)
log.Printf("hls server enable....")
}
defer func() {
if r := recover(); r != nil {
log.Println("RTMP server panic: ", r)
@ -83,7 +94,7 @@ func startHTTPOpera(stream *rtmp.RtmpStream) { @@ -83,7 +94,7 @@ func startHTTPOpera(stream *rtmp.RtmpStream) {
if err != nil {
log.Fatal(err)
}
opServer := httpopera.NewServer(stream)
opServer := httpopera.NewServer(stream, *rtmpAddr)
go func() {
defer func() {
if r := recover(); r != nil {
@ -104,9 +115,16 @@ func main() { @@ -104,9 +115,16 @@ func main() {
}
}()
log.Println("start livego, version", version)
err := configure.LoadConfig(*configfilename)
if err != nil {
return
}
stream := rtmp.NewRtmpStream()
hlsServer := startHls()
startHTTPFlv(stream)
//startHTTPOpera(stream)
startHTTPOpera(stream)
startRtmp(stream, hlsServer)
//startRtmp(stream, nil)
}

28
protocol/hls/source.go

@ -35,7 +35,7 @@ type Source struct { @@ -35,7 +35,7 @@ type Source struct {
tsCache *TSCacheItem
tsparser *parser.CodecParser
closed bool
packetQueue chan av.Packet
packetQueue chan *av.Packet
}
func NewSource(info av.Info) *Source {
@ -51,7 +51,7 @@ func NewSource(info av.Info) *Source { @@ -51,7 +51,7 @@ func NewSource(info av.Info) *Source {
tsCache: NewTSCacheItem(info.Key),
tsparser: parser.NewCodecParser(),
bwriter: bytes.NewBuffer(make([]byte, 100*1024)),
packetQueue: make(chan av.Packet, maxQueueNum),
packetQueue: make(chan *av.Packet, maxQueueNum),
}
go func() {
err := s.SendPacket()
@ -67,7 +67,7 @@ func (source *Source) GetCacheInc() *TSCacheItem { @@ -67,7 +67,7 @@ func (source *Source) GetCacheInc() *TSCacheItem {
return source.tsCache
}
func (source *Source) DropPacket(pktQue chan av.Packet, info av.Info) {
func (source *Source) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
@ -95,8 +95,19 @@ func (source *Source) DropPacket(pktQue chan av.Packet, info av.Info) { @@ -95,8 +95,19 @@ func (source *Source) DropPacket(pktQue chan av.Packet, info av.Info) {
log.Println("packet queue len: ", len(pktQue))
}
func (source *Source) Write(p av.Packet) error {
func (source *Source) Write(p *av.Packet) (err error) {
err = nil
if source.closed {
err = errors.New("hls source closed")
return
}
source.SetPreTime()
defer func() {
if e := recover(); e != nil {
errString := fmt.Sprintf("hls source has already been closed:%v", e)
err = errors.New(errString)
}
}()
if len(source.packetQueue) >= maxQueueNum-24 {
source.DropPacket(source.packetQueue, source.info)
} else {
@ -104,7 +115,7 @@ func (source *Source) Write(p av.Packet) error { @@ -104,7 +115,7 @@ func (source *Source) Write(p av.Packet) error {
source.packetQueue <- p
}
}
return nil
return
}
func (source *Source) SendPacket() error {
@ -114,6 +125,7 @@ func (source *Source) SendPacket() error { @@ -114,6 +125,7 @@ func (source *Source) SendPacket() error {
log.Println("hls SendPacket panic: ", r)
}
}()
log.Printf("[%v] hls sender start", source.info)
for {
if source.closed {
@ -126,7 +138,7 @@ func (source *Source) SendPacket() error { @@ -126,7 +138,7 @@ func (source *Source) SendPacket() error {
continue
}
err := source.demuxer.Demux(&p)
err := source.demuxer.Demux(p)
if err == flv.ErrAvcEndSEQ {
log.Println(err)
continue
@ -136,7 +148,7 @@ func (source *Source) SendPacket() error { @@ -136,7 +148,7 @@ func (source *Source) SendPacket() error {
return err
}
}
compositionTime, isSeq, err := source.parse(&p)
compositionTime, isSeq, err := source.parse(p)
if err != nil {
log.Println(err)
}
@ -146,7 +158,7 @@ func (source *Source) SendPacket() error { @@ -146,7 +158,7 @@ func (source *Source) SendPacket() error {
if source.btswriter != nil {
source.stat.update(p.IsVideo, p.TimeStamp)
source.calcPtsDts(p.IsVideo, p.TimeStamp, uint32(compositionTime))
source.tsMux(&p)
source.tsMux(p)
}
} else {
return errors.New("closed")

32
protocol/httpflv/writer.go

@ -3,6 +3,7 @@ package httpflv @@ -3,6 +3,7 @@ package httpflv
import (
"time"
"errors"
"fmt"
"log"
"net/http"
"github.com/gwuhaolin/livego/utils/uid"
@ -17,14 +18,14 @@ const ( @@ -17,14 +18,14 @@ const (
)
type FLVWriter struct {
Uid string
Uid string
av.RWBaser
app, title, url string
buf []byte
closed bool
closedChan chan struct{}
ctx http.ResponseWriter
packetQueue chan av.Packet
packetQueue chan *av.Packet
}
func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
@ -37,7 +38,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { @@ -37,7 +38,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
RWBaser: av.NewRWBaser(time.Second * 10),
closedChan: make(chan struct{}),
buf: make([]byte, headerLen),
packetQueue: make(chan av.Packet, maxQueueNum),
packetQueue: make(chan *av.Packet, maxQueueNum),
}
ret.ctx.Write([]byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09})
@ -53,7 +54,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { @@ -53,7 +54,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
return ret
}
func (flvWriter *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) {
func (flvWriter *FLVWriter) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
@ -80,18 +81,25 @@ func (flvWriter *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) { @@ -80,18 +81,25 @@ func (flvWriter *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) {
log.Println("packet queue len: ", len(pktQue))
}
func (flvWriter *FLVWriter) Write(p av.Packet) error {
if !flvWriter.closed {
if len(flvWriter.packetQueue) >= maxQueueNum-24 {
flvWriter.DropPacket(flvWriter.packetQueue, flvWriter.Info())
} else {
flvWriter.packetQueue <- p
func (flvWriter *FLVWriter) Write(p *av.Packet) (err error) {
err = nil
if flvWriter.closed {
err = errors.New("flvwrite source closed")
return
}
defer func() {
if e := recover(); e != nil {
errString := fmt.Sprintf("FLVWriter has already been closed:%v", e)
err = errors.New(errString)
}
return nil
}()
if len(flvWriter.packetQueue) >= maxQueueNum-24 {
flvWriter.DropPacket(flvWriter.packetQueue, flvWriter.Info())
} else {
return errors.New("closed")
flvWriter.packetQueue <- p
}
return
}
func (flvWriter *FLVWriter) SendPacket() error {

231
protocol/httpopera/http_opera.go

@ -2,7 +2,9 @@ package httpopera @@ -2,7 +2,9 @@ package httpopera
import (
"encoding/json"
"io/ioutil"
"fmt"
"github.com/gwuhaolin/livego/protocol/rtmp/rtmprelay"
"io"
"net"
"net/http"
"log"
@ -35,86 +37,203 @@ type OperationChange struct { @@ -35,86 +37,203 @@ type OperationChange struct {
Stop bool `json:"stop"`
}
type ClientInfo struct {
url string
rtmpRemoteClient *rtmp.Client
rtmpLocalClient *rtmp.Client
}
type Server struct {
handler av.Handler
handler av.Handler
session map[string]*rtmprelay.RtmpRelay
rtmpAddr string
}
func NewServer(h av.Handler) *Server {
func NewServer(h av.Handler, rtmpAddr string) *Server {
return &Server{
handler: h,
handler: h,
session: make(map[string]*rtmprelay.RtmpRelay),
rtmpAddr: rtmpAddr,
}
}
func (s *Server) Serve(l net.Listener) error {
mux := http.NewServeMux()
mux.HandleFunc("/rtmp/operation", func(w http.ResponseWriter, r *http.Request) {
s.handleOpera(w, r)
mux.Handle("/statics", http.FileServer(http.Dir("statics")))
mux.HandleFunc("/control/push", func(w http.ResponseWriter, r *http.Request) {
s.handlePush(w, r)
})
mux.HandleFunc("/control/pull", func(w http.ResponseWriter, r *http.Request) {
s.handlePull(w, r)
})
mux.HandleFunc("/stat/livestat", func(w http.ResponseWriter, r *http.Request) {
s.GetLiveStatics(w, r)
})
http.Serve(l, mux)
return nil
}
// handleOpera, 拉流和推流的http api
// @Path: /rtmp/operation
// @Method: POST
// @Param: json
// method string, "push" or "pull"
// url string
// stop bool
// @Example,
// curl -v -H "Content-Type: application/json" -X POST --data \
// '{"method":"pull","url":"rtmp://127.0.0.1:1935/live/test"}' \
// http://localhost:8087/rtmp/operation
func (s *Server) handleOpera(w http.ResponseWriter, r *http.Request) {
rep := &Response{
w: w,
type stream struct {
Key string `json:"key"`
Url string `json:"Url"`
StreamId uint32 `json:"StreamId"`
VideoTotalBytes uint64 `json:123456`
VideoSpeed uint64 `json:123456`
AudioTotalBytes uint64 `json:123456`
AudioSpeed uint64 `json:123456`
}
type streams struct {
Publishers []stream `json:"publishers"`
Players []stream `json:"players"`
}
//http://127.0.0.1:8090/stat/livestat
func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) {
rtmpStream := server.handler.(*rtmp.RtmpStream)
if rtmpStream == nil {
io.WriteString(w, "<h1>Get rtmp stream information error</h1>")
return
}
if r.Method != "POST" {
rep.Status = 14000
rep.Message = "bad request method"
rep.SendJson()
msgs := new(streams)
for item := range rtmpStream.GetStreams().IterBuffered() {
if s, ok := item.Val.(*rtmp.Stream); ok {
if s.GetReader() != nil {
switch s.GetReader().(type) {
case *rtmp.VirReader:
v := s.GetReader().(*rtmp.VirReader)
msg := stream{item.Key, v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS,
v.ReadBWInfo.AudioDatainBytes, v.ReadBWInfo.AudioSpeedInBytesperMS}
msgs.Publishers = append(msgs.Publishers, msg)
}
}
}
}
for item := range rtmpStream.GetStreams().IterBuffered() {
ws := item.Val.(*rtmp.Stream).GetWs()
for s := range ws.IterBuffered() {
if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok {
if pw.GetWriter() != nil {
switch pw.GetWriter().(type) {
case *rtmp.VirWriter:
v := pw.GetWriter().(*rtmp.VirWriter)
msg := stream{item.Key, v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS,
v.WriteBWInfo.AudioDatainBytes, v.WriteBWInfo.AudioSpeedInBytesperMS}
msgs.Players = append(msgs.Players, msg)
}
}
}
}
}
resp, _ := json.Marshal(msgs)
w.Header().Set("Content-Type", "application/json")
w.Write(resp)
}
//http://127.0.0.1:8090/control/push?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456
func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) {
var retString string
var err error
req.ParseForm()
oper := req.Form["oper"]
app := req.Form["app"]
name := req.Form["name"]
url := req.Form["url"]
log.Printf("control pull: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url)
if (len(app) <= 0) || (len(name) <= 0) || (len(url) <= 0) {
io.WriteString(w, "control push parameter error, please check them.</br>")
return
} else {
result, err := ioutil.ReadAll(r.Body)
if err != nil {
rep.Status = 15000
rep.Message = "read request body error"
rep.SendJson()
}
remoteurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app[0] + "/" + name[0]
localurl := url[0]
keyString := "pull:" + app[0] + "/" + name[0]
if oper[0] == "stop" {
pullRtmprelay, found := s.session[keyString]
if !found {
retString = fmt.Sprintf("session key[%s] not exist, please check it again.", keyString)
io.WriteString(w, retString)
return
}
r.Body.Close()
log.Println("post body", result)
log.Printf("rtmprelay stop push %s from %s", remoteurl, localurl)
pullRtmprelay.Stop()
var op Operation
err = json.Unmarshal(result, &op)
delete(s.session, keyString)
retString = fmt.Sprintf("<h1>push url stop %s ok</h1></br>", url[0])
io.WriteString(w, retString)
log.Printf("pull stop return %s", retString)
} else {
pullRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)
log.Printf("rtmprelay start push %s from %s", remoteurl, localurl)
err = pullRtmprelay.Start()
if err != nil {
rep.Status = 12000
rep.Message = "parse json body failed"
rep.SendJson()
return
retString = fmt.Sprintf("push error=%v", err)
} else {
s.session[keyString] = pullRtmprelay
retString = fmt.Sprintf("<h1>push url start %s ok</h1></br>", url[0])
}
io.WriteString(w, retString)
log.Printf("pull start return %s", retString)
}
}
switch op.Method {
case "push":
s.Push(op.URL, op.Stop)
case "pull":
s.Pull(op.URL, op.Stop)
}
//http://127.0.0.1:8090/control/push?&oper=start&app=live&name=123456&url=rtmp://192.168.16.136/live/123456
func (s *Server) handlePush(w http.ResponseWriter, req *http.Request) {
var retString string
var err error
req.ParseForm()
oper := req.Form["oper"]
app := req.Form["app"]
name := req.Form["name"]
url := req.Form["url"]
rep.Status = 10000
rep.Message = op.Method + " " + op.URL + " success"
rep.SendJson()
log.Printf("control push: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url)
if (len(app) <= 0) || (len(name) <= 0) || (len(url) <= 0) {
io.WriteString(w, "control push parameter error, please check them.</br>")
return
}
}
func (s *Server) Push(uri string, stop bool) error {
rtmpClient := rtmp.NewRtmpClient(s.handler, nil)
return rtmpClient.Dial(uri, av.PUBLISH)
}
localurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app[0] + "/" + name[0]
remoteurl := url[0]
keyString := "push:" + app[0] + "/" + name[0]
if oper[0] == "stop" {
pushRtmprelay, found := s.session[keyString]
if !found {
retString = fmt.Sprintf("<h1>session key[%s] not exist, please check it again.</h1>", keyString)
io.WriteString(w, retString)
return
}
log.Printf("rtmprelay stop push %s from %s", remoteurl, localurl)
pushRtmprelay.Stop()
func (s *Server) Pull(uri string, stop bool) error {
rtmpClient := rtmp.NewRtmpClient(s.handler, nil)
return rtmpClient.Dial(uri, av.PLAY)
delete(s.session, keyString)
retString = fmt.Sprintf("<h1>push url stop %s ok</h1></br>", url[0])
io.WriteString(w, retString)
log.Printf("push stop return %s", retString)
} else {
pushRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)
log.Printf("rtmprelay start push %s from %s", remoteurl, localurl)
err = pushRtmprelay.Start()
if err != nil {
retString = fmt.Sprintf("push error=%v", err)
} else {
retString = fmt.Sprintf("<h1>push url start %s ok</h1></br>", url[0])
s.session[keyString] = pushRtmprelay
}
io.WriteString(w, retString)
log.Printf("push start return %s", retString)
}
}

8
protocol/rtmp/cache/cache.go vendored

@ -27,7 +27,7 @@ func NewCache() *Cache { @@ -27,7 +27,7 @@ func NewCache() *Cache {
func (cache *Cache) Write(p av.Packet) {
if p.IsMetadata {
cache.metadata.Write(p)
cache.metadata.Write(&p)
return
} else {
if !p.IsVideo {
@ -35,7 +35,7 @@ func (cache *Cache) Write(p av.Packet) { @@ -35,7 +35,7 @@ func (cache *Cache) Write(p av.Packet) {
if ok {
if ah.SoundFormat() == av.SOUND_AAC &&
ah.AACPacketType() == av.AAC_SEQHDR {
cache.audioSeq.Write(p)
cache.audioSeq.Write(&p)
return
} else {
return
@ -46,7 +46,7 @@ func (cache *Cache) Write(p av.Packet) { @@ -46,7 +46,7 @@ func (cache *Cache) Write(p av.Packet) {
vh, ok := p.Header.(av.VideoPacketHeader)
if ok {
if vh.IsSeq() {
cache.videoSeq.Write(p)
cache.videoSeq.Write(&p)
return
}
} else {
@ -55,7 +55,7 @@ func (cache *Cache) Write(p av.Packet) { @@ -55,7 +55,7 @@ func (cache *Cache) Write(p av.Packet) {
}
}
cache.gop.Write(p)
cache.gop.Write(&p)
}
func (cache *Cache) Send(w av.WriteCloser) error {

10
protocol/rtmp/cache/gop.go vendored

@ -12,13 +12,13 @@ var ( @@ -12,13 +12,13 @@ var (
type array struct {
index int
packets []av.Packet
packets []*av.Packet
}
func newArray() *array {
ret := &array{
index: 0,
packets: make([]av.Packet, 0, maxGOPCap),
packets: make([]*av.Packet, 0, maxGOPCap),
}
return ret
}
@ -28,7 +28,7 @@ func (array *array) reset() { @@ -28,7 +28,7 @@ func (array *array) reset() {
array.packets = array.packets[:0]
}
func (array *array) write(packet av.Packet) error {
func (array *array) write(packet *av.Packet) error {
if array.index >= maxGOPCap {
return ErrGopTooBig
}
@ -63,7 +63,7 @@ func NewGopCache(num int) *GopCache { @@ -63,7 +63,7 @@ func NewGopCache(num int) *GopCache {
}
}
func (gopCache *GopCache) writeToArray(chunk av.Packet, startNew bool) error {
func (gopCache *GopCache) writeToArray(chunk *av.Packet, startNew bool) error {
var ginc *array
if startNew {
ginc = gopCache.gops[gopCache.nextindex]
@ -83,7 +83,7 @@ func (gopCache *GopCache) writeToArray(chunk av.Packet, startNew bool) error { @@ -83,7 +83,7 @@ func (gopCache *GopCache) writeToArray(chunk av.Packet, startNew bool) error {
return nil
}
func (gopCache *GopCache) Write(p av.Packet) {
func (gopCache *GopCache) Write(p *av.Packet) {
var ok bool
if p.IsVideo {
vh := p.Header.(av.VideoPacketHeader)

4
protocol/rtmp/cache/special.go vendored

@ -26,14 +26,14 @@ func init() { @@ -26,14 +26,14 @@ func init() {
type SpecialCache struct {
full bool
p av.Packet
p *av.Packet
}
func NewSpecialCache() *SpecialCache {
return &SpecialCache{}
}
func (specialCache *SpecialCache) Write(p av.Packet) {
func (specialCache *SpecialCache) Write(p *av.Packet) {
specialCache.p = p
specialCache.full = true
}

53
protocol/rtmp/core/conn_client.go

@ -22,6 +22,7 @@ var ( @@ -22,6 +22,7 @@ var (
publishStart = "NetStream.Publish.Start"
playStart = "NetStream.Play.Start"
connectSuccess = "NetConnection.Connect.Success"
onBWDone = "onBWDone"
)
var (
@ -53,6 +54,11 @@ func NewConnClient() *ConnClient { @@ -53,6 +54,11 @@ func NewConnClient() *ConnClient {
}
}
func (connClient *ConnClient) DecodeBatch(r io.Reader, ver amf.Version) (ret []interface{}, err error) {
vs, err := connClient.decoder.DecodeBatch(r, ver)
return vs, err
}
func (connClient *ConnClient) readRespMsg() error {
var err error
var rc ChunkStream
@ -60,21 +66,24 @@ func (connClient *ConnClient) readRespMsg() error { @@ -60,21 +66,24 @@ func (connClient *ConnClient) readRespMsg() error {
if err = connClient.conn.Read(&rc); err != nil {
return err
}
if err != nil && err != io.EOF {
return err
}
switch rc.TypeID {
case 20, 17:
r := bytes.NewReader(rc.Data)
vs, err := connClient.decoder.DecodeBatch(r, amf.AMF0)
if err != nil && err != io.EOF {
return err
}
vs, _ := connClient.decoder.DecodeBatch(r, amf.AMF0)
log.Printf("readRespMsg: vs=%v", vs)
for k, v := range vs {
switch v.(type) {
case string:
switch connClient.curcmdName {
case cmdConnect, cmdCreateStream:
if v.(string) != respResult {
return ErrFail
return errors.New(v.(string))
}
case cmdPublish:
if v.(string) != onStatus {
return ErrFail
@ -84,6 +93,7 @@ func (connClient *ConnClient) readRespMsg() error { @@ -84,6 +93,7 @@ func (connClient *ConnClient) readRespMsg() error {
switch connClient.curcmdName {
case cmdConnect, cmdCreateStream:
id := int(v.(float64))
if k == 1 {
if id != connClient.transID {
return ErrFail
@ -112,6 +122,7 @@ func (connClient *ConnClient) readRespMsg() error { @@ -112,6 +122,7 @@ func (connClient *ConnClient) readRespMsg() error {
}
}
}
return nil
}
}
@ -146,6 +157,7 @@ func (connClient *ConnClient) writeConnectMsg() error { @@ -146,6 +157,7 @@ func (connClient *ConnClient) writeConnectMsg() error {
event["tcUrl"] = connClient.tcurl
connClient.curcmdName = cmdConnect
log.Printf("writeConnectMsg: connClient.transID=%d, event=%v", connClient.transID, event)
if err := connClient.writeMsg(cmdConnect, connClient.transID, event); err != nil {
return err
}
@ -155,10 +167,24 @@ func (connClient *ConnClient) writeConnectMsg() error { @@ -155,10 +167,24 @@ func (connClient *ConnClient) writeConnectMsg() error {
func (connClient *ConnClient) writeCreateStreamMsg() error {
connClient.transID++
connClient.curcmdName = cmdCreateStream
log.Printf("writeCreateStreamMsg: connClient.transID=%d", connClient.transID)
if err := connClient.writeMsg(cmdCreateStream, connClient.transID, nil); err != nil {
return err
}
return connClient.readRespMsg()
for {
err := connClient.readRespMsg()
if err == nil {
return err
}
if err == ErrFail {
log.Println("writeCreateStreamMsg readRespMsg err=%v", err)
return err
}
}
}
func (connClient *ConnClient) writePublishMsg() error {
@ -173,6 +199,9 @@ func (connClient *ConnClient) writePublishMsg() error { @@ -173,6 +199,9 @@ func (connClient *ConnClient) writePublishMsg() error {
func (connClient *ConnClient) writePlayMsg() error {
connClient.transID++
connClient.curcmdName = cmdPlay
log.Printf("writePlayMsg: connClient.transID=%d, cmdPlay=%v, connClient.title=%v",
connClient.transID, cmdPlay, connClient.title)
if err := connClient.writeMsg(cmdPlay, 0, nil, connClient.title); err != nil {
return err
}
@ -236,15 +265,23 @@ func (connClient *ConnClient) Start(url string, method string) error { @@ -236,15 +265,23 @@ func (connClient *ConnClient) Start(url string, method string) error {
log.Println("connection:", "local:", conn.LocalAddr(), "remote:", conn.RemoteAddr())
connClient.conn = NewConn(conn, 4*1024)
log.Println("HandshakeClient....")
if err := connClient.conn.HandshakeClient(); err != nil {
return err
}
log.Println("writeConnectMsg....")
if err := connClient.writeConnectMsg(); err != nil {
return err
}
log.Println("writeCreateStreamMsg....")
if err := connClient.writeCreateStreamMsg(); err != nil {
log.Println("writeCreateStreamMsg error", err)
return err
}
log.Println("method control:", method, av.PUBLISH, av.PLAY)
if method == av.PUBLISH {
if err := connClient.writePublishMsg(); err != nil {
return err
@ -281,6 +318,10 @@ func (connClient *ConnClient) GetInfo() (app string, name string, url string) { @@ -281,6 +318,10 @@ func (connClient *ConnClient) GetInfo() (app string, name string, url string) {
return
}
func (connClient *ConnClient) GetStreamId() uint32 {
return connClient.streamid
}
func (connClient *ConnClient) Close(err error) {
connClient.conn.Close()
}

159
protocol/rtmp/rtmp.go

@ -1,21 +1,25 @@ @@ -1,21 +1,25 @@
package rtmp
import (
"net"
"time"
"net/url"
"strings"
"errors"
"flag"
"log"
"fmt"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/utils/uid"
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/container/flv"
"github.com/gwuhaolin/livego/protocol/rtmp/core"
"github.com/gwuhaolin/livego/utils/uid"
"log"
"net"
"net/url"
"reflect"
"strings"
"time"
)
const (
maxQueueNum = 1024
maxQueueNum = 1024
SAVE_STATICS_INTERVAL = 5000
)
var (
@ -42,9 +46,11 @@ func (c *Client) Dial(url string, method string) error { @@ -42,9 +46,11 @@ func (c *Client) Dial(url string, method string) error {
}
if method == av.PUBLISH {
writer := NewVirWriter(connClient)
log.Printf("client Dial call NewVirWriter url=%s, method=%s", url, method)
c.handler.HandleWriter(writer)
} else if method == av.PLAY {
reader := NewVirReader(connClient)
log.Printf("client Dial call NewVirReader url=%s, method=%s", url, method)
c.handler.HandleReader(reader)
if c.getter != nil {
writer := c.getter.GetWriter(reader.Info())
@ -103,12 +109,28 @@ func (s *Server) handleConn(conn *core.Conn) error { @@ -103,12 +109,28 @@ func (s *Server) handleConn(conn *core.Conn) error {
log.Println("handleConn read msg err:", err)
return err
}
appname, _, _ := connServer.GetInfo()
if ret := configure.CheckAppName(appname); !ret {
err := errors.New("application name=%s is not configured")
conn.Close()
log.Println("CheckAppName err:", err)
return err
}
log.Printf("handleConn: IsPublisher=%v", connServer.IsPublisher())
if connServer.IsPublisher() {
if pushlist, ret := configure.GetStaticPushUrlList(appname); ret && (pushlist != nil) {
log.Printf("GetStaticPushUrlList: %v", pushlist)
}
reader := NewVirReader(connServer)
s.handler.HandleReader(reader)
log.Printf("new publisher: %+v", reader.Info())
if s.getter != nil {
writeType := reflect.TypeOf(s.getter)
log.Printf("handleConn:writeType=%v", writeType)
writer := s.getter.GetWriter(reader.Info())
s.handler.HandleWriter(writer)
}
@ -132,12 +154,26 @@ type StreamReadWriteCloser interface { @@ -132,12 +154,26 @@ type StreamReadWriteCloser interface {
Read(c *core.ChunkStream) error
}
type StaticsBW struct {
StreamId uint32
VideoDatainBytes uint64
LastVideoDatainBytes uint64
VideoSpeedInBytesperMS uint64
AudioDatainBytes uint64
LastAudioDatainBytes uint64
AudioSpeedInBytesperMS uint64
LastTimestamp int64
}
type VirWriter struct {
Uid string
closed bool
Uid string
closed bool
av.RWBaser
conn StreamReadWriteCloser
packetQueue chan av.Packet
packetQueue chan *av.Packet
WriteBWInfo StaticsBW
}
func NewVirWriter(conn StreamReadWriteCloser) *VirWriter {
@ -145,8 +181,10 @@ func NewVirWriter(conn StreamReadWriteCloser) *VirWriter { @@ -145,8 +181,10 @@ func NewVirWriter(conn StreamReadWriteCloser) *VirWriter {
Uid: uid.NewId(),
conn: conn,
RWBaser: av.NewRWBaser(time.Second * time.Duration(*writeTimeout)),
packetQueue: make(chan av.Packet, maxQueueNum),
packetQueue: make(chan *av.Packet, maxQueueNum),
WriteBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0},
}
go ret.Check()
go func() {
err := ret.SendPacket()
@ -157,6 +195,30 @@ func NewVirWriter(conn StreamReadWriteCloser) *VirWriter { @@ -157,6 +195,30 @@ func NewVirWriter(conn StreamReadWriteCloser) *VirWriter {
return ret
}
func (v *VirWriter) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) {
nowInMS := int64(time.Now().UnixNano() / 1e6)
v.WriteBWInfo.StreamId = streamid
if isVideoFlag {
v.WriteBWInfo.VideoDatainBytes = v.WriteBWInfo.VideoDatainBytes + length
} else {
v.WriteBWInfo.AudioDatainBytes = v.WriteBWInfo.AudioDatainBytes + length
}
if v.WriteBWInfo.LastTimestamp == 0 {
v.WriteBWInfo.LastTimestamp = nowInMS
} else if (nowInMS - v.WriteBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL {
diffTimestamp := (nowInMS - v.WriteBWInfo.LastTimestamp) / 1000
v.WriteBWInfo.VideoSpeedInBytesperMS = (v.WriteBWInfo.VideoDatainBytes - v.WriteBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.WriteBWInfo.AudioSpeedInBytesperMS = (v.WriteBWInfo.AudioDatainBytes - v.WriteBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.WriteBWInfo.LastVideoDatainBytes = v.WriteBWInfo.VideoDatainBytes
v.WriteBWInfo.LastAudioDatainBytes = v.WriteBWInfo.AudioDatainBytes
v.WriteBWInfo.LastTimestamp = nowInMS
}
}
func (v *VirWriter) Check() {
var c core.ChunkStream
for {
@ -167,7 +229,7 @@ func (v *VirWriter) Check() { @@ -167,7 +229,7 @@ func (v *VirWriter) Check() {
}
}
func (v *VirWriter) DropPacket(pktQue chan av.Packet, info av.Info) {
func (v *VirWriter) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
@ -199,17 +261,26 @@ func (v *VirWriter) DropPacket(pktQue chan av.Packet, info av.Info) { @@ -199,17 +261,26 @@ func (v *VirWriter) DropPacket(pktQue chan av.Packet, info av.Info) {
}
//
func (v *VirWriter) Write(p av.Packet) error {
if !v.closed {
if len(v.packetQueue) >= maxQueueNum-24 {
v.DropPacket(v.packetQueue, v.Info())
} else {
v.packetQueue <- p
func (v *VirWriter) Write(p *av.Packet) (err error) {
err = nil
if v.closed {
err = errors.New("VirWriter closed")
return
}
defer func() {
if e := recover(); e != nil {
errString := fmt.Sprintf("VirWriter has already been closed:%v", e)
err = errors.New(errString)
}
return nil
}()
if len(v.packetQueue) >= maxQueueNum-24 {
v.DropPacket(v.packetQueue, v.Info())
} else {
return errors.New("closed")
v.packetQueue <- p
}
return
}
func (v *VirWriter) SendPacket() error {
@ -219,7 +290,7 @@ func (v *VirWriter) SendPacket() error { @@ -219,7 +290,7 @@ func (v *VirWriter) SendPacket() error {
if ok {
cs.Data = p.Data
cs.Length = uint32(len(p.Data))
cs.StreamID = 1
cs.StreamID = p.StreamID
cs.Timestamp = p.TimeStamp
cs.Timestamp += v.BaseTimeStamp()
@ -233,6 +304,7 @@ func (v *VirWriter) SendPacket() error { @@ -233,6 +304,7 @@ func (v *VirWriter) SendPacket() error {
}
}
v.SaveStatics(p.StreamID, uint64(cs.Length), p.IsVideo)
v.SetPreTime()
v.RecTimeStamp(cs.Timestamp, cs.TypeID)
err := v.conn.Write(cs)
@ -240,6 +312,7 @@ func (v *VirWriter) SendPacket() error { @@ -240,6 +312,7 @@ func (v *VirWriter) SendPacket() error {
v.closed = true
return err
}
} else {
return errors.New("closed")
}
@ -271,18 +344,45 @@ func (v *VirWriter) Close(err error) { @@ -271,18 +344,45 @@ func (v *VirWriter) Close(err error) {
}
type VirReader struct {
Uid string
Uid string
av.RWBaser
demuxer *flv.Demuxer
conn StreamReadWriteCloser
demuxer *flv.Demuxer
conn StreamReadWriteCloser
ReadBWInfo StaticsBW
}
func NewVirReader(conn StreamReadWriteCloser) *VirReader {
return &VirReader{
Uid: uid.NewId(),
conn: conn,
RWBaser: av.NewRWBaser(time.Second * time.Duration(*writeTimeout)),
demuxer: flv.NewDemuxer(),
Uid: uid.NewId(),
conn: conn,
RWBaser: av.NewRWBaser(time.Second * time.Duration(*writeTimeout)),
demuxer: flv.NewDemuxer(),
ReadBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0},
}
}
func (v *VirReader) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) {
nowInMS := int64(time.Now().UnixNano() / 1e6)
v.ReadBWInfo.StreamId = streamid
if isVideoFlag {
v.ReadBWInfo.VideoDatainBytes = v.ReadBWInfo.VideoDatainBytes + length
} else {
v.ReadBWInfo.AudioDatainBytes = v.ReadBWInfo.AudioDatainBytes + length
}
if v.ReadBWInfo.LastTimestamp == 0 {
v.ReadBWInfo.LastTimestamp = nowInMS
} else if (nowInMS - v.ReadBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL {
diffTimestamp := (nowInMS - v.ReadBWInfo.LastTimestamp) / 1000
//log.Printf("now=%d, last=%d, diff=%d", nowInMS, v.ReadBWInfo.LastTimestamp, diffTimestamp)
v.ReadBWInfo.VideoSpeedInBytesperMS = (v.ReadBWInfo.VideoDatainBytes - v.ReadBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.ReadBWInfo.AudioSpeedInBytesperMS = (v.ReadBWInfo.AudioDatainBytes - v.ReadBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000
v.ReadBWInfo.LastVideoDatainBytes = v.ReadBWInfo.VideoDatainBytes
v.ReadBWInfo.LastAudioDatainBytes = v.ReadBWInfo.AudioDatainBytes
v.ReadBWInfo.LastTimestamp = nowInMS
}
}
@ -311,8 +411,11 @@ func (v *VirReader) Read(p *av.Packet) (err error) { @@ -311,8 +411,11 @@ func (v *VirReader) Read(p *av.Packet) (err error) {
p.IsAudio = cs.TypeID == av.TAG_AUDIO
p.IsVideo = cs.TypeID == av.TAG_VIDEO
p.IsMetadata = cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3
p.StreamID = cs.StreamID
p.Data = cs.Data
p.TimeStamp = cs.Timestamp
v.SaveStatics(p.StreamID, uint64(len(p.Data)), p.IsVideo)
v.demuxer.DemuxH(p)
return err
}

125
protocol/rtmp/rtmprelay/rtmprelay.go

@ -0,0 +1,125 @@ @@ -0,0 +1,125 @@
package rtmprelay
import (
"bytes"
"errors"
"fmt"
"github.com/gwuhaolin/livego/protocol/amf"
"github.com/gwuhaolin/livego/protocol/rtmp/core"
"io"
"log"
)
var (
STOP_CTRL = "RTMPRELAY_STOP"
)
type RtmpRelay struct {
PlayUrl string
PublishUrl string
cs_chan chan core.ChunkStream
sndctrl_chan chan string
connectPlayClient *core.ConnClient
connectPublishClient *core.ConnClient
startflag bool
}
func NewRtmpRelay(playurl *string, publishurl *string) *RtmpRelay {
return &RtmpRelay{
PlayUrl: *playurl,
PublishUrl: *publishurl,
cs_chan: make(chan core.ChunkStream, 500),
sndctrl_chan: make(chan string),
connectPlayClient: nil,
connectPublishClient: nil,
startflag: false,
}
}
func (self *RtmpRelay) rcvPlayChunkStream() {
log.Println("rcvPlayRtmpMediaPacket connectClient.Read...")
for {
var rc core.ChunkStream
if self.startflag == false {
self.connectPlayClient.Close(nil)
log.Printf("rcvPlayChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
break
}
err := self.connectPlayClient.Read(&rc)
if err != nil && err == io.EOF {
break
}
//log.Printf("connectPlayClient.Read return rc.TypeID=%v length=%d, err=%v", rc.TypeID, len(rc.Data), err)
switch rc.TypeID {
case 20, 17:
r := bytes.NewReader(rc.Data)
vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0)
log.Printf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err)
case 18:
log.Printf("rcvPlayRtmpMediaPacket: metadata....")
case 8, 9:
self.cs_chan <- rc
}
}
}
func (self *RtmpRelay) sendPublishChunkStream() {
for {
select {
case rc := <-self.cs_chan:
//log.Printf("sendPublishChunkStream: rc.TypeID=%v length=%d", rc.TypeID, len(rc.Data))
self.connectPublishClient.Write(rc)
case ctrlcmd := <-self.sndctrl_chan:
if ctrlcmd == STOP_CTRL {
self.connectPublishClient.Close(nil)
log.Printf("sendPublishChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
break
}
}
}
}
func (self *RtmpRelay) Start() error {
if self.startflag {
err := errors.New(fmt.Sprintf("The rtmprelay already started, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl))
return err
}
self.connectPlayClient = core.NewConnClient()
self.connectPublishClient = core.NewConnClient()
log.Printf("play server addr:%v starting....", self.PlayUrl)
err := self.connectPlayClient.Start(self.PlayUrl, "play")
if err != nil {
log.Printf("connectPlayClient.Start url=%v error", self.PlayUrl)
return err
}
log.Printf("publish server addr:%v starting....", self.PublishUrl)
err = self.connectPublishClient.Start(self.PublishUrl, "publish")
if err != nil {
log.Printf("connectPublishClient.Start url=%v error", self.PublishUrl)
self.connectPlayClient.Close(nil)
return err
}
self.startflag = true
go self.rcvPlayChunkStream()
go self.sendPublishChunkStream()
return nil
}
func (self *RtmpRelay) Stop() {
if !self.startflag {
log.Printf("The rtmprelay already stoped, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
return
}
self.startflag = false
self.sndctrl_chan <- STOP_CTRL
}

180
protocol/rtmp/rtmprelay/staticrelay.go

@ -0,0 +1,180 @@ @@ -0,0 +1,180 @@
package rtmprelay
import (
"errors"
"fmt"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/protocol/rtmp/core"
"log"
"sync"
)
type StaticPush struct {
RtmpUrl string
packet_chan chan *av.Packet
sndctrl_chan chan string
connectClient *core.ConnClient
startflag bool
}
var G_StaticPushMap = make(map[string](*StaticPush))
var g_MapLock = new(sync.RWMutex)
var (
STATIC_RELAY_STOP_CTRL = "STATIC_RTMPRELAY_STOP"
)
func GetStaticPushList(appname string) ([]string, error) {
pushurlList, ok := configure.GetStaticPushUrlList(appname)
if !ok {
return nil, errors.New("no static push url")
}
return pushurlList, nil
}
func GetAndCreateStaticPushObject(rtmpurl string) *StaticPush {
g_MapLock.RLock()
staticpush, ok := G_StaticPushMap[rtmpurl]
log.Printf("GetAndCreateStaticPushObject: %s, return %v", rtmpurl, ok)
if !ok {
g_MapLock.RUnlock()
newStaticpush := NewStaticPush(rtmpurl)
g_MapLock.Lock()
G_StaticPushMap[rtmpurl] = newStaticpush
g_MapLock.Unlock()
return newStaticpush
}
g_MapLock.RUnlock()
return staticpush
}
func GetStaticPushObject(rtmpurl string) (*StaticPush, error) {
g_MapLock.RLock()
if staticpush, ok := G_StaticPushMap[rtmpurl]; ok {
g_MapLock.RUnlock()
return staticpush, nil
}
g_MapLock.RUnlock()
return nil, errors.New(fmt.Sprintf("G_StaticPushMap[%s] not exist...."))
}
func ReleaseStaticPushObject(rtmpurl string) {
g_MapLock.RLock()
if _, ok := G_StaticPushMap[rtmpurl]; ok {
g_MapLock.RUnlock()
log.Printf("ReleaseStaticPushObject %s ok", rtmpurl)
g_MapLock.Lock()
delete(G_StaticPushMap, rtmpurl)
g_MapLock.Unlock()
} else {
g_MapLock.RUnlock()
log.Printf("ReleaseStaticPushObject: not find %s", rtmpurl)
}
}
func NewStaticPush(rtmpurl string) *StaticPush {
return &StaticPush{
RtmpUrl: rtmpurl,
packet_chan: make(chan *av.Packet, 500),
sndctrl_chan: make(chan string),
connectClient: nil,
startflag: false,
}
}
func (self *StaticPush) Start() error {
if self.startflag {
return errors.New(fmt.Sprintf("StaticPush already start %s", self.RtmpUrl))
}
self.connectClient = core.NewConnClient()
log.Printf("static publish server addr:%v starting....", self.RtmpUrl)
err := self.connectClient.Start(self.RtmpUrl, "publish")
if err != nil {
log.Printf("connectClient.Start url=%v error", self.RtmpUrl)
return err
}
log.Printf("static publish server addr:%v started, streamid=%d", self.RtmpUrl, self.connectClient.GetStreamId())
go self.HandleAvPacket()
self.startflag = true
return nil
}
func (self *StaticPush) Stop() {
if !self.startflag {
return
}
log.Printf("StaticPush Stop: %s", self.RtmpUrl)
self.sndctrl_chan <- STATIC_RELAY_STOP_CTRL
self.startflag = false
}
func (self *StaticPush) WriteAvPacket(packet *av.Packet) {
if !self.startflag {
return
}
self.packet_chan <- packet
}
func (self *StaticPush) sendPacket(p *av.Packet) {
if !self.startflag {
return
}
var cs core.ChunkStream
cs.Data = p.Data
cs.Length = uint32(len(p.Data))
cs.StreamID = self.connectClient.GetStreamId()
cs.Timestamp = p.TimeStamp
//cs.Timestamp += v.BaseTimeStamp()
//log.Printf("Static sendPacket: rtmpurl=%s, length=%d, streamid=%d",
// self.RtmpUrl, len(p.Data), cs.StreamID)
if p.IsVideo {
cs.TypeID = av.TAG_VIDEO
} else {
if p.IsMetadata {
cs.TypeID = av.TAG_SCRIPTDATAAMF0
} else {
cs.TypeID = av.TAG_AUDIO
}
}
self.connectClient.Write(cs)
}
func (self *StaticPush) HandleAvPacket() {
if !self.IsStart() {
log.Printf("static push %s not started", self.RtmpUrl)
return
}
for {
select {
case packet := <-self.packet_chan:
self.sendPacket(packet)
case ctrlcmd := <-self.sndctrl_chan:
if ctrlcmd == STATIC_RELAY_STOP_CTRL {
self.connectClient.Close(nil)
log.Printf("Static HandleAvPacket close: publishurl=%s", self.RtmpUrl)
break
}
}
}
}
func (self *StaticPush) IsStart() bool {
return self.startflag
}

196
protocol/rtmp/stream.go

@ -2,11 +2,13 @@ package rtmp @@ -2,11 +2,13 @@ package rtmp
import (
"errors"
"time"
"log"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/protocol/rtmp/cache"
"github.com/gwuhaolin/livego/protocol/rtmp/rtmprelay"
"github.com/orcaman/concurrent-map"
"log"
"strings"
"time"
)
var (
@ -14,7 +16,7 @@ var ( @@ -14,7 +16,7 @@ var (
)
type RtmpStream struct {
streams cmap.ConcurrentMap
streams cmap.ConcurrentMap //key
}
func NewRtmpStream() *RtmpStream {
@ -27,6 +29,8 @@ func NewRtmpStream() *RtmpStream { @@ -27,6 +29,8 @@ func NewRtmpStream() *RtmpStream {
func (rs *RtmpStream) HandleReader(r av.ReadCloser) {
info := r.Info()
log.Printf("HandleReader: info[%v]", info)
var stream *Stream
i, ok := rs.streams.Get(info.Key)
if stream, ok = i.(*Stream); ok {
@ -38,9 +42,10 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) { @@ -38,9 +42,10 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) {
stream = ns
rs.streams.Set(info.Key, ns)
}
}else {
} else {
stream = NewStream()
rs.streams.Set(info.Key, stream)
stream.info = info
}
stream.AddReader(r)
@ -48,11 +53,14 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) { @@ -48,11 +53,14 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) {
func (rs *RtmpStream) HandleWriter(w av.WriteCloser) {
info := w.Info()
log.Printf("HandleWriter: info[%v]", info)
var s *Stream
ok := rs.streams.Has(info.Key)
if !ok {
s = NewStream()
rs.streams.Set(info.Key, s)
s.info = info
} else {
item, ok := rs.streams.Get(info.Key)
if ok {
@ -60,7 +68,6 @@ func (rs *RtmpStream) HandleWriter(w av.WriteCloser) { @@ -60,7 +68,6 @@ func (rs *RtmpStream) HandleWriter(w av.WriteCloser) {
s.AddWriter(w)
}
}
}
func (rs *RtmpStream) GetStreams() cmap.ConcurrentMap {
@ -84,6 +91,7 @@ type Stream struct { @@ -84,6 +91,7 @@ type Stream struct {
cache *cache.Cache
r av.ReadCloser
ws cmap.ConcurrentMap
info av.Info
}
type PackWriterCloser struct {
@ -137,9 +145,172 @@ func (s *Stream) AddWriter(w av.WriteCloser) { @@ -137,9 +145,172 @@ func (s *Stream) AddWriter(w av.WriteCloser) {
s.ws.Set(info.UID, pw)
}
/*检测本application下是否配置static_push,
如果配置, 启动push远端的连接*/
func (s *Stream) StartStaticPush() {
key := s.info.Key
dscr := strings.Split(key, "/")
if len(dscr) < 1 {
return
}
index := strings.Index(key, "/")
if index < 0 {
return
}
streamname := key[index+1:]
appname := dscr[0]
log.Printf("StartStaticPush: current streamname=%s, appname=%s", streamname, appname)
pushurllist, err := rtmprelay.GetStaticPushList(appname)
if err != nil || len(pushurllist) < 1 {
log.Printf("StartStaticPush: GetStaticPushList error=%v", err)
return
}
for _, pushurl := range pushurllist {
pushurl := pushurl + "/" + streamname
log.Printf("StartStaticPush: static pushurl=%s", pushurl)
staticpushObj := rtmprelay.GetAndCreateStaticPushObject(pushurl)
if staticpushObj != nil {
if err := staticpushObj.Start(); err != nil {
log.Printf("StartStaticPush: staticpushObj.Start %s error=%v", pushurl, err)
} else {
log.Printf("StartStaticPush: staticpushObj.Start %s ok", pushurl)
}
} else {
log.Printf("StartStaticPush GetStaticPushObject %s error", pushurl)
}
}
}
func (s *Stream) StopStaticPush() {
key := s.info.Key
log.Printf("StopStaticPush......%s", key)
dscr := strings.Split(key, "/")
if len(dscr) < 1 {
return
}
index := strings.Index(key, "/")
if index < 0 {
return
}
streamname := key[index+1:]
appname := dscr[0]
log.Printf("StopStaticPush: current streamname=%s, appname=%s", streamname, appname)
pushurllist, err := rtmprelay.GetStaticPushList(appname)
if err != nil || len(pushurllist) < 1 {
log.Printf("StopStaticPush: GetStaticPushList error=%v", err)
return
}
for _, pushurl := range pushurllist {
pushurl := pushurl + "/" + streamname
log.Printf("StopStaticPush: static pushurl=%s", pushurl)
staticpushObj, err := rtmprelay.GetStaticPushObject(pushurl)
if (staticpushObj != nil) && (err == nil) {
staticpushObj.Stop()
rtmprelay.ReleaseStaticPushObject(pushurl)
log.Printf("StopStaticPush: staticpushObj.Stop %s ", pushurl)
} else {
log.Printf("StopStaticPush GetStaticPushObject %s error", pushurl)
}
}
}
func (s *Stream) IsSendStaticPush() bool {
key := s.info.Key
dscr := strings.Split(key, "/")
if len(dscr) < 1 {
return false
}
appname := dscr[0]
//log.Printf("SendStaticPush: current streamname=%s, appname=%s", streamname, appname)
pushurllist, err := rtmprelay.GetStaticPushList(appname)
if err != nil || len(pushurllist) < 1 {
//log.Printf("SendStaticPush: GetStaticPushList error=%v", err)
return false
}
index := strings.Index(key, "/")
if index < 0 {
return false
}
streamname := key[index+1:]
for _, pushurl := range pushurllist {
pushurl := pushurl + "/" + streamname
//log.Printf("SendStaticPush: static pushurl=%s", pushurl)
staticpushObj, err := rtmprelay.GetStaticPushObject(pushurl)
if (staticpushObj != nil) && (err == nil) {
return true
//staticpushObj.WriteAvPacket(&packet)
//log.Printf("SendStaticPush: WriteAvPacket %s ", pushurl)
} else {
log.Printf("SendStaticPush GetStaticPushObject %s error", pushurl)
}
}
return false
}
func (s *Stream) SendStaticPush(packet av.Packet) {
key := s.info.Key
dscr := strings.Split(key, "/")
if len(dscr) < 1 {
return
}
index := strings.Index(key, "/")
if index < 0 {
return
}
streamname := key[index+1:]
appname := dscr[0]
//log.Printf("SendStaticPush: current streamname=%s, appname=%s", streamname, appname)
pushurllist, err := rtmprelay.GetStaticPushList(appname)
if err != nil || len(pushurllist) < 1 {
//log.Printf("SendStaticPush: GetStaticPushList error=%v", err)
return
}
for _, pushurl := range pushurllist {
pushurl := pushurl + "/" + streamname
//log.Printf("SendStaticPush: static pushurl=%s", pushurl)
staticpushObj, err := rtmprelay.GetStaticPushObject(pushurl)
if (staticpushObj != nil) && (err == nil) {
staticpushObj.WriteAvPacket(&packet)
//log.Printf("SendStaticPush: WriteAvPacket %s ", pushurl)
} else {
log.Printf("SendStaticPush GetStaticPushObject %s error", pushurl)
}
}
}
func (s *Stream) TransStart() {
s.isStart = true
var p av.Packet
log.Printf("TransStart:%v", s.info)
s.StartStaticPush()
for {
if !s.isStart {
s.closeInter()
@ -151,11 +322,17 @@ func (s *Stream) TransStart() { @@ -151,11 +322,17 @@ func (s *Stream) TransStart() {
s.isStart = false
return
}
if s.IsSendStaticPush() {
s.SendStaticPush(p)
}
s.cache.Write(p)
for item := range s.ws.IterBuffered() {
v := item.Val.(*PackWriterCloser)
if !v.init {
//log.Printf("cache.send: %v", v.w.Info())
if err = s.cache.Send(v.w); err != nil {
log.Printf("[%s] send cache packet error: %v, remove", v.w.Info(), err)
s.ws.Remove(item.Key)
@ -163,7 +340,10 @@ func (s *Stream) TransStart() { @@ -163,7 +340,10 @@ func (s *Stream) TransStart() {
}
v.init = true
} else {
if err = v.w.Write(p); err != nil {
new_packet := p
//writeType := reflect.TypeOf(v.w)
//log.Printf("w.Write: type=%v, %v", writeType, v.w.Info())
if err = v.w.Write(&new_packet); err != nil {
log.Printf("[%s] write packet error: %v, remove", v.w.Info(), err)
s.ws.Remove(item.Key)
}
@ -173,9 +353,12 @@ func (s *Stream) TransStart() { @@ -173,9 +353,12 @@ func (s *Stream) TransStart() {
}
func (s *Stream) TransStop() {
log.Printf("TransStop: %s", s.info.Key)
if s.isStart && s.r != nil {
s.r.Close(errors.New("stop old"))
}
s.isStart = false
}
@ -204,6 +387,7 @@ func (s *Stream) CheckAlive() (n int) { @@ -204,6 +387,7 @@ func (s *Stream) CheckAlive() (n int) {
func (s *Stream) closeInter() {
if s.r != nil {
s.StopStaticPush()
log.Printf("[%v] publisher closed", s.r.Info())
}

Loading…
Cancel
Save