mirror of https://github.com/gwuhaolin/livego.git
16 changed files with 1016 additions and 141 deletions
@ -0,0 +1,74 @@ |
|||||||
|
package configure |
||||||
|
|
||||||
|
import ( |
||||||
|
"encoding/json" |
||||||
|
"io/ioutil" |
||||||
|
"log" |
||||||
|
) |
||||||
|
|
||||||
|
/* |
||||||
|
{ |
||||||
|
[ |
||||||
|
{ |
||||||
|
"application":"live", |
||||||
|
"live":"on", |
||||||
|
"hls":"on", |
||||||
|
"static_push":["rtmp://xx/live"] |
||||||
|
} |
||||||
|
] |
||||||
|
} |
||||||
|
*/ |
||||||
|
type Application struct { |
||||||
|
Appname string |
||||||
|
Liveon string |
||||||
|
Hlson string |
||||||
|
Static_push []string |
||||||
|
} |
||||||
|
|
||||||
|
type ServerCfg struct { |
||||||
|
Server []Application |
||||||
|
} |
||||||
|
|
||||||
|
var RtmpServercfg ServerCfg |
||||||
|
|
||||||
|
func LoadConfig(configfilename string) error { |
||||||
|
log.Printf("starting load configure file(%s)......", configfilename) |
||||||
|
data, err := ioutil.ReadFile(configfilename) |
||||||
|
if err != nil { |
||||||
|
log.Printf("ReadFile %s error:%v", configfilename, err) |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
log.Printf("loadconfig: \r\n%s", string(data)) |
||||||
|
|
||||||
|
err = json.Unmarshal(data, &RtmpServercfg) |
||||||
|
if err != nil { |
||||||
|
log.Printf("json.Unmarshal error:%v", err) |
||||||
|
return err |
||||||
|
} |
||||||
|
log.Printf("get config json data:%v", RtmpServercfg) |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
func CheckAppName(appname string) bool { |
||||||
|
for _, app := range RtmpServercfg.Server { |
||||||
|
if (app.Appname == appname) && (app.Liveon == "on") { |
||||||
|
return true |
||||||
|
} |
||||||
|
} |
||||||
|
return false |
||||||
|
} |
||||||
|
|
||||||
|
func GetStaticPushUrlList(appname string) ([]string, bool) { |
||||||
|
for _, app := range RtmpServercfg.Server { |
||||||
|
if (app.Appname == appname) && (app.Liveon == "on") { |
||||||
|
if len(app.Static_push) > 0 { |
||||||
|
return app.Static_push, true |
||||||
|
} else { |
||||||
|
return nil, false |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
return nil, false |
||||||
|
} |
||||||
@ -0,0 +1,10 @@ |
|||||||
|
{ |
||||||
|
"server": [ |
||||||
|
{ |
||||||
|
"appname":"live", |
||||||
|
"liveon":"on", |
||||||
|
"hlson":"on" |
||||||
|
} |
||||||
|
] |
||||||
|
} |
||||||
|
|
||||||
@ -0,0 +1,125 @@ |
|||||||
|
package rtmprelay |
||||||
|
|
||||||
|
import ( |
||||||
|
"bytes" |
||||||
|
"errors" |
||||||
|
"fmt" |
||||||
|
"github.com/gwuhaolin/livego/protocol/amf" |
||||||
|
"github.com/gwuhaolin/livego/protocol/rtmp/core" |
||||||
|
"io" |
||||||
|
"log" |
||||||
|
) |
||||||
|
|
||||||
|
var ( |
||||||
|
STOP_CTRL = "RTMPRELAY_STOP" |
||||||
|
) |
||||||
|
|
||||||
|
type RtmpRelay struct { |
||||||
|
PlayUrl string |
||||||
|
PublishUrl string |
||||||
|
cs_chan chan core.ChunkStream |
||||||
|
sndctrl_chan chan string |
||||||
|
connectPlayClient *core.ConnClient |
||||||
|
connectPublishClient *core.ConnClient |
||||||
|
startflag bool |
||||||
|
} |
||||||
|
|
||||||
|
func NewRtmpRelay(playurl *string, publishurl *string) *RtmpRelay { |
||||||
|
return &RtmpRelay{ |
||||||
|
PlayUrl: *playurl, |
||||||
|
PublishUrl: *publishurl, |
||||||
|
cs_chan: make(chan core.ChunkStream, 500), |
||||||
|
sndctrl_chan: make(chan string), |
||||||
|
connectPlayClient: nil, |
||||||
|
connectPublishClient: nil, |
||||||
|
startflag: false, |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func (self *RtmpRelay) rcvPlayChunkStream() { |
||||||
|
log.Println("rcvPlayRtmpMediaPacket connectClient.Read...") |
||||||
|
for { |
||||||
|
var rc core.ChunkStream |
||||||
|
|
||||||
|
if self.startflag == false { |
||||||
|
self.connectPlayClient.Close(nil) |
||||||
|
log.Printf("rcvPlayChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl) |
||||||
|
break |
||||||
|
} |
||||||
|
err := self.connectPlayClient.Read(&rc) |
||||||
|
|
||||||
|
if err != nil && err == io.EOF { |
||||||
|
break |
||||||
|
} |
||||||
|
//log.Printf("connectPlayClient.Read return rc.TypeID=%v length=%d, err=%v", rc.TypeID, len(rc.Data), err)
|
||||||
|
switch rc.TypeID { |
||||||
|
case 20, 17: |
||||||
|
r := bytes.NewReader(rc.Data) |
||||||
|
vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0) |
||||||
|
|
||||||
|
log.Printf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err) |
||||||
|
case 18: |
||||||
|
log.Printf("rcvPlayRtmpMediaPacket: metadata....") |
||||||
|
case 8, 9: |
||||||
|
self.cs_chan <- rc |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func (self *RtmpRelay) sendPublishChunkStream() { |
||||||
|
for { |
||||||
|
select { |
||||||
|
case rc := <-self.cs_chan: |
||||||
|
//log.Printf("sendPublishChunkStream: rc.TypeID=%v length=%d", rc.TypeID, len(rc.Data))
|
||||||
|
self.connectPublishClient.Write(rc) |
||||||
|
case ctrlcmd := <-self.sndctrl_chan: |
||||||
|
if ctrlcmd == STOP_CTRL { |
||||||
|
self.connectPublishClient.Close(nil) |
||||||
|
log.Printf("sendPublishChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl) |
||||||
|
break |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func (self *RtmpRelay) Start() error { |
||||||
|
if self.startflag { |
||||||
|
err := errors.New(fmt.Sprintf("The rtmprelay already started, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)) |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
self.connectPlayClient = core.NewConnClient() |
||||||
|
self.connectPublishClient = core.NewConnClient() |
||||||
|
|
||||||
|
log.Printf("play server addr:%v starting....", self.PlayUrl) |
||||||
|
err := self.connectPlayClient.Start(self.PlayUrl, "play") |
||||||
|
if err != nil { |
||||||
|
log.Printf("connectPlayClient.Start url=%v error", self.PlayUrl) |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
log.Printf("publish server addr:%v starting....", self.PublishUrl) |
||||||
|
err = self.connectPublishClient.Start(self.PublishUrl, "publish") |
||||||
|
if err != nil { |
||||||
|
log.Printf("connectPublishClient.Start url=%v error", self.PublishUrl) |
||||||
|
self.connectPlayClient.Close(nil) |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
self.startflag = true |
||||||
|
go self.rcvPlayChunkStream() |
||||||
|
go self.sendPublishChunkStream() |
||||||
|
|
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
func (self *RtmpRelay) Stop() { |
||||||
|
if !self.startflag { |
||||||
|
log.Printf("The rtmprelay already stoped, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl) |
||||||
|
return |
||||||
|
} |
||||||
|
|
||||||
|
self.startflag = false |
||||||
|
self.sndctrl_chan <- STOP_CTRL |
||||||
|
|
||||||
|
} |
||||||
@ -0,0 +1,180 @@ |
|||||||
|
package rtmprelay |
||||||
|
|
||||||
|
import ( |
||||||
|
"errors" |
||||||
|
"fmt" |
||||||
|
"github.com/gwuhaolin/livego/av" |
||||||
|
"github.com/gwuhaolin/livego/configure" |
||||||
|
"github.com/gwuhaolin/livego/protocol/rtmp/core" |
||||||
|
"log" |
||||||
|
"sync" |
||||||
|
) |
||||||
|
|
||||||
|
type StaticPush struct { |
||||||
|
RtmpUrl string |
||||||
|
packet_chan chan *av.Packet |
||||||
|
sndctrl_chan chan string |
||||||
|
connectClient *core.ConnClient |
||||||
|
startflag bool |
||||||
|
} |
||||||
|
|
||||||
|
var G_StaticPushMap = make(map[string](*StaticPush)) |
||||||
|
var g_MapLock = new(sync.RWMutex) |
||||||
|
|
||||||
|
var ( |
||||||
|
STATIC_RELAY_STOP_CTRL = "STATIC_RTMPRELAY_STOP" |
||||||
|
) |
||||||
|
|
||||||
|
func GetStaticPushList(appname string) ([]string, error) { |
||||||
|
pushurlList, ok := configure.GetStaticPushUrlList(appname) |
||||||
|
|
||||||
|
if !ok { |
||||||
|
return nil, errors.New("no static push url") |
||||||
|
} |
||||||
|
|
||||||
|
return pushurlList, nil |
||||||
|
} |
||||||
|
|
||||||
|
func GetAndCreateStaticPushObject(rtmpurl string) *StaticPush { |
||||||
|
g_MapLock.RLock() |
||||||
|
staticpush, ok := G_StaticPushMap[rtmpurl] |
||||||
|
log.Printf("GetAndCreateStaticPushObject: %s, return %v", rtmpurl, ok) |
||||||
|
if !ok { |
||||||
|
g_MapLock.RUnlock() |
||||||
|
newStaticpush := NewStaticPush(rtmpurl) |
||||||
|
|
||||||
|
g_MapLock.Lock() |
||||||
|
G_StaticPushMap[rtmpurl] = newStaticpush |
||||||
|
g_MapLock.Unlock() |
||||||
|
|
||||||
|
return newStaticpush |
||||||
|
} |
||||||
|
g_MapLock.RUnlock() |
||||||
|
|
||||||
|
return staticpush |
||||||
|
} |
||||||
|
|
||||||
|
func GetStaticPushObject(rtmpurl string) (*StaticPush, error) { |
||||||
|
g_MapLock.RLock() |
||||||
|
if staticpush, ok := G_StaticPushMap[rtmpurl]; ok { |
||||||
|
g_MapLock.RUnlock() |
||||||
|
return staticpush, nil |
||||||
|
} |
||||||
|
g_MapLock.RUnlock() |
||||||
|
|
||||||
|
return nil, errors.New(fmt.Sprintf("G_StaticPushMap[%s] not exist....")) |
||||||
|
} |
||||||
|
|
||||||
|
func ReleaseStaticPushObject(rtmpurl string) { |
||||||
|
g_MapLock.RLock() |
||||||
|
if _, ok := G_StaticPushMap[rtmpurl]; ok { |
||||||
|
g_MapLock.RUnlock() |
||||||
|
|
||||||
|
log.Printf("ReleaseStaticPushObject %s ok", rtmpurl) |
||||||
|
g_MapLock.Lock() |
||||||
|
delete(G_StaticPushMap, rtmpurl) |
||||||
|
g_MapLock.Unlock() |
||||||
|
} else { |
||||||
|
g_MapLock.RUnlock() |
||||||
|
log.Printf("ReleaseStaticPushObject: not find %s", rtmpurl) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func NewStaticPush(rtmpurl string) *StaticPush { |
||||||
|
return &StaticPush{ |
||||||
|
RtmpUrl: rtmpurl, |
||||||
|
packet_chan: make(chan *av.Packet, 500), |
||||||
|
sndctrl_chan: make(chan string), |
||||||
|
connectClient: nil, |
||||||
|
startflag: false, |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func (self *StaticPush) Start() error { |
||||||
|
if self.startflag { |
||||||
|
return errors.New(fmt.Sprintf("StaticPush already start %s", self.RtmpUrl)) |
||||||
|
} |
||||||
|
|
||||||
|
self.connectClient = core.NewConnClient() |
||||||
|
|
||||||
|
log.Printf("static publish server addr:%v starting....", self.RtmpUrl) |
||||||
|
err := self.connectClient.Start(self.RtmpUrl, "publish") |
||||||
|
if err != nil { |
||||||
|
log.Printf("connectClient.Start url=%v error", self.RtmpUrl) |
||||||
|
return err |
||||||
|
} |
||||||
|
log.Printf("static publish server addr:%v started, streamid=%d", self.RtmpUrl, self.connectClient.GetStreamId()) |
||||||
|
go self.HandleAvPacket() |
||||||
|
|
||||||
|
self.startflag = true |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
func (self *StaticPush) Stop() { |
||||||
|
if !self.startflag { |
||||||
|
return |
||||||
|
} |
||||||
|
|
||||||
|
log.Printf("StaticPush Stop: %s", self.RtmpUrl) |
||||||
|
self.sndctrl_chan <- STATIC_RELAY_STOP_CTRL |
||||||
|
self.startflag = false |
||||||
|
} |
||||||
|
|
||||||
|
func (self *StaticPush) WriteAvPacket(packet *av.Packet) { |
||||||
|
if !self.startflag { |
||||||
|
return |
||||||
|
} |
||||||
|
|
||||||
|
self.packet_chan <- packet |
||||||
|
} |
||||||
|
|
||||||
|
func (self *StaticPush) sendPacket(p *av.Packet) { |
||||||
|
if !self.startflag { |
||||||
|
return |
||||||
|
} |
||||||
|
var cs core.ChunkStream |
||||||
|
|
||||||
|
cs.Data = p.Data |
||||||
|
cs.Length = uint32(len(p.Data)) |
||||||
|
cs.StreamID = self.connectClient.GetStreamId() |
||||||
|
cs.Timestamp = p.TimeStamp |
||||||
|
//cs.Timestamp += v.BaseTimeStamp()
|
||||||
|
|
||||||
|
//log.Printf("Static sendPacket: rtmpurl=%s, length=%d, streamid=%d",
|
||||||
|
// self.RtmpUrl, len(p.Data), cs.StreamID)
|
||||||
|
if p.IsVideo { |
||||||
|
cs.TypeID = av.TAG_VIDEO |
||||||
|
} else { |
||||||
|
if p.IsMetadata { |
||||||
|
cs.TypeID = av.TAG_SCRIPTDATAAMF0 |
||||||
|
} else { |
||||||
|
cs.TypeID = av.TAG_AUDIO |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
self.connectClient.Write(cs) |
||||||
|
} |
||||||
|
|
||||||
|
func (self *StaticPush) HandleAvPacket() { |
||||||
|
if !self.IsStart() { |
||||||
|
log.Printf("static push %s not started", self.RtmpUrl) |
||||||
|
return |
||||||
|
} |
||||||
|
|
||||||
|
for { |
||||||
|
select { |
||||||
|
case packet := <-self.packet_chan: |
||||||
|
self.sendPacket(packet) |
||||||
|
case ctrlcmd := <-self.sndctrl_chan: |
||||||
|
if ctrlcmd == STATIC_RELAY_STOP_CTRL { |
||||||
|
self.connectClient.Close(nil) |
||||||
|
log.Printf("Static HandleAvPacket close: publishurl=%s", self.RtmpUrl) |
||||||
|
break |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
func (self *StaticPush) IsStart() bool { |
||||||
|
return self.startflag |
||||||
|
} |
||||||
Loading…
Reference in new issue