diff --git a/av/av.go b/av/av.go index 8462db6..90df2e4 100755 --- a/av/av.go +++ b/av/av.go @@ -107,11 +107,13 @@ type GetWriter interface { GetWriter(Info) WriteCloser } +// 流读写接口 type Handler interface { HandleReader(ReadCloser) HandleWriter(WriteCloser) } +// 流状态 type Alive interface { Alive() bool } @@ -125,6 +127,7 @@ type CalcTime interface { CalcBaseTimestamp() } +// 流信息 type Info struct { Key string URL string @@ -141,12 +144,14 @@ func (info Info) String() string { info.Key, info.URL, info.UID, info.Inter) } +// 读源流数据接口 type ReadCloser interface { Closer Alive Read(*Packet) error } +// 写目标流数据接口 type WriteCloser interface { Closer Alive diff --git a/main.go b/main.go index 2cf6eca..629645c 100755 --- a/main.go +++ b/main.go @@ -2,15 +2,16 @@ package main import ( "fmt" + "net" + "path" + "runtime" + "time" + "github.com/gwuhaolin/livego/configure" "github.com/gwuhaolin/livego/protocol/api" "github.com/gwuhaolin/livego/protocol/hls" "github.com/gwuhaolin/livego/protocol/httpflv" "github.com/gwuhaolin/livego/protocol/rtmp" - "net" - "path" - "runtime" - "time" log "github.com/sirupsen/logrus" ) @@ -39,7 +40,7 @@ func startHls() *hls.Server { var rtmpAddr string -func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) { +func startRtmp(stream *rtmp.StreamServer, hlsServer *hls.Server) { rtmpAddr = configure.Config.GetString("rtmp_addr") rtmpListen, err := net.Listen("tcp", rtmpAddr) @@ -66,7 +67,7 @@ func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) { rtmpServer.Serve(rtmpListen) } -func startHTTPFlv(stream *rtmp.RtmpStream) { +func startHTTPFlv(stream *rtmp.StreamServer) { httpflvAddr := configure.Config.GetString("httpflv_addr") flvListen, err := net.Listen("tcp", httpflvAddr) @@ -86,7 +87,7 @@ func startHTTPFlv(stream *rtmp.RtmpStream) { }() } -func startAPI(stream *rtmp.RtmpStream) { +func startAPI(stream *rtmp.StreamServer) { apiAddr := configure.Config.GetString("api_addr") if apiAddr != "" { @@ -134,7 +135,7 @@ func main() { version: %s `, VERSION) - stream := rtmp.NewRtmpStream() + stream := rtmp.NewStreamServers() hlsServer := startHls() startHTTPFlv(stream) startAPI(stream) diff --git a/protocol/api/api.go b/protocol/api/api.go index 778f3fc..da5cb18 100755 --- a/protocol/api/api.go +++ b/protocol/api/api.go @@ -152,7 +152,7 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) { defer res.SendJson() - rtmpStream := server.handler.(*rtmp.RtmpStream) + rtmpStream := server.handler.(*rtmp.StreamServer) if rtmpStream == nil { res.Status = 500 res.Data = "Get rtmp stream information error" @@ -161,8 +161,8 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) { msgs := new(streams) - rtmpStream.GetStreams().Range(func(key, val interface{}) bool { - if s, ok := val.(*rtmp.Stream); ok { + rtmpStream.GetServices().Range(func(key, val interface{}) bool { + if s, ok := val.(*rtmp.StreamService); ok { if s.GetReader() != nil { switch s.GetReader().(type) { case *rtmp.VirReader: @@ -176,8 +176,8 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) { return true }) - rtmpStream.GetStreams().Range(func(key, val interface{}) bool { - ws := val.(*rtmp.Stream).GetWs() + rtmpStream.GetServices().Range(func(key, val interface{}) bool { + ws := val.(*rtmp.StreamService).GetWs() ws.Range(func(k, v interface{}) bool { if pw, ok := v.(*rtmp.PackWriterCloser); ok { if pw.GetWriter() != nil { diff --git a/protocol/httpflv/server.go b/protocol/httpflv/server.go index 11cc46c..9eed5f7 100644 --- a/protocol/httpflv/server.go +++ b/protocol/httpflv/server.go @@ -46,14 +46,14 @@ func (server *Server) Serve(l net.Listener) error { // 获取发布和播放器的信息 func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *streams { - rtmpStream := server.handler.(*rtmp.RtmpStream) + rtmpStream := server.handler.(*rtmp.StreamServer) if rtmpStream == nil { return nil } msgs := new(streams) - rtmpStream.GetStreams().Range(func(key, val interface{}) bool { - if s, ok := val.(*rtmp.Stream); ok { + rtmpStream.GetServices().Range(func(key, val interface{}) bool { + if s, ok := val.(*rtmp.StreamService); ok { if s.GetReader() != nil { msg := stream{key.(string), s.GetReader().Info().UID} msgs.Publishers = append(msgs.Publishers, msg) @@ -62,8 +62,8 @@ func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *stream return true }) - rtmpStream.GetStreams().Range(func(key, val interface{}) bool { - ws := val.(*rtmp.Stream).GetWs() + rtmpStream.GetServices().Range(func(key, val interface{}) bool { + ws := val.(*rtmp.StreamService).GetWs() ws.Range(func(k, v interface{}) bool { if pw, ok := v.(*rtmp.PackWriterCloser); ok { diff --git a/protocol/rtmp/stream.go b/protocol/rtmp/stream.go index 4d50367..fac9e28 100755 --- a/protocol/rtmp/stream.go +++ b/protocol/rtmp/stream.go @@ -17,116 +17,134 @@ var ( EmptyID = "" ) -type RtmpStream struct { - streams *sync.Map //key +// 流媒体服务器。包含N个流媒体服务。 +// 每个单独的流媒体服务都可以有多种输入流,多个输出流。 +type StreamServer struct { + services *sync.Map //key } -func NewRtmpStream() *RtmpStream { - ret := &RtmpStream{ - streams: &sync.Map{}, +// 创建流媒体服务器。并启动状态监测协程。 +func NewStreamServers() *StreamServer { + ss := &StreamServer{ + services: &sync.Map{}, } - go ret.CheckAlive() - return ret + go ss.CheckAlive(5) + return ss } -func (rs *RtmpStream) HandleReader(r av.ReadCloser) { +// 注册源流处理逻辑。并启动源流读取。 +// 若已有源流,则重新启动;否则创建流媒体服务。 +func (ss *StreamServer) HandleReader(r av.ReadCloser) { info := r.Info() log.Debugf("HandleReader: info[%v]", info) - var stream *Stream - i, ok := rs.streams.Load(info.Key) - if stream, ok = i.(*Stream); ok { - stream.TransStop() - id := stream.ID() + var service *StreamService + i, ok := ss.services.Load(info.Key) + if service, ok = i.(*StreamService); ok { + service.TransStop() + id := service.ID() if id != EmptyID && id != info.UID { - ns := NewStream() - stream.Copy(ns) - stream = ns - rs.streams.Store(info.Key, ns) + ns := NewStreamService() + service.Copy(ns) + service = ns + ss.services.Store(info.Key, ns) } } else { - stream = NewStream() - rs.streams.Store(info.Key, stream) - stream.info = info + service = NewStreamService() + ss.services.Store(info.Key, service) + service.info = info } - stream.AddReader(r) + service.AddReader(r) } -func (rs *RtmpStream) HandleWriter(w av.WriteCloser) { +// 注册目标流处理逻辑。 +// 若没有则新创建流媒体服务;若有则新增流媒体服务writer。 +func (ss *StreamServer) HandleWriter(w av.WriteCloser) { info := w.Info() log.Debugf("HandleWriter: info[%v]", info) - var s *Stream - item, ok := rs.streams.Load(info.Key) + var service *StreamService + item, ok := ss.services.Load(info.Key) if !ok { log.Debugf("HandleWriter: not found create new info[%v]", info) - s = NewStream() - rs.streams.Store(info.Key, s) - s.info = info + service = NewStreamService() + ss.services.Store(info.Key, service) + service.info = info } else { - s = item.(*Stream) - s.AddWriter(w) + service = item.(*StreamService) + service.AddWriter(w) } } -func (rs *RtmpStream) GetStreams() *sync.Map { - return rs.streams +// 获取所有流媒体服务 +func (ss *StreamServer) GetServices() *sync.Map { + return ss.services } -func (rs *RtmpStream) CheckAlive() { +// 定时遍历检测所有媒体服务器 +func (ss *StreamServer) CheckAlive(ttl uint) { + + if ttl <= 1 { + ttl = 1 + } + for { - <-time.After(5 * time.Second) - rs.streams.Range(func(key, val interface{}) bool { - v := val.(*Stream) + <-time.After(time.Duration(ttl) * time.Second) + ss.services.Range(func(key, val interface{}) bool { + v := val.(*StreamService) if v.CheckAlive() == 0 { - rs.streams.Delete(key) + ss.services.Delete(key) } return true }) } } -type Stream struct { +// 流媒体服务结构信息 +type StreamService struct { isStart bool - cache *cache.Cache - r av.ReadCloser - ws *sync.Map - info av.Info + info av.Info // 源流信息 + cache *cache.Cache // 源流视频数据 + r av.ReadCloser // 读源流handler + ws *sync.Map // 推流目标地址集合 } +// 写流数据 type PackWriterCloser struct { init bool - w av.WriteCloser + w av.WriteCloser // 写目标流handler } func (p *PackWriterCloser) GetWriter() av.WriteCloser { return p.w } -func NewStream() *Stream { - return &Stream{ +// 实例化创建媒体服务。 +func NewStreamService() *StreamService { + return &StreamService{ cache: cache.NewCache(), ws: &sync.Map{}, } } -func (s *Stream) ID() string { +func (s *StreamService) ID() string { if s.r != nil { return s.r.Info().UID } return EmptyID } -func (s *Stream) GetReader() av.ReadCloser { +func (s *StreamService) GetReader() av.ReadCloser { return s.r } -func (s *Stream) GetWs() *sync.Map { +func (s *StreamService) GetWs() *sync.Map { return s.ws } -func (s *Stream) Copy(dst *Stream) { +// 复制流媒体服务。 +func (s *StreamService) Copy(dst *StreamService) { dst.info = s.info s.ws.Range(func(key, val interface{}) bool { v := val.(*PackWriterCloser) @@ -137,12 +155,13 @@ func (s *Stream) Copy(dst *Stream) { }) } -func (s *Stream) AddReader(r av.ReadCloser) { +// 新增源流处理。并启动读取流数据协程。 +func (s *StreamService) AddReader(r av.ReadCloser) { s.r = r go s.TransStart() } -func (s *Stream) AddWriter(w av.WriteCloser) { +func (s *StreamService) AddWriter(w av.WriteCloser) { info := w.Info() pw := &PackWriterCloser{w: w} s.ws.Store(info.UID, pw) @@ -150,7 +169,7 @@ func (s *Stream) AddWriter(w av.WriteCloser) { /*检测本application下是否配置static_push, 如果配置, 启动push远端的连接*/ -func (s *Stream) StartStaticPush() { +func (s *StreamService) StartStaticPush() { key := s.info.Key dscr := strings.Split(key, "/") @@ -190,7 +209,7 @@ func (s *Stream) StartStaticPush() { } } -func (s *Stream) StopStaticPush() { +func (s *StreamService) StopStaticPush() { key := s.info.Key log.Debugf("StopStaticPush......%s", key) @@ -229,7 +248,7 @@ func (s *Stream) StopStaticPush() { } } -func (s *Stream) IsSendStaticPush() bool { +func (s *StreamService) IsSendStaticPush() bool { key := s.info.Key dscr := strings.Split(key, "/") @@ -269,7 +288,7 @@ func (s *Stream) IsSendStaticPush() bool { return false } -func (s *Stream) SendStaticPush(packet av.Packet) { +func (s *StreamService) SendStaticPush(packet av.Packet) { key := s.info.Key dscr := strings.Split(key, "/") @@ -306,7 +325,8 @@ func (s *Stream) SendStaticPush(packet av.Packet) { } } -func (s *Stream) TransStart() { +// 流媒体服务读取源流数据协程 +func (s *StreamService) TransStart() { s.isStart = true var p av.Packet @@ -356,7 +376,8 @@ func (s *Stream) TransStart() { } } -func (s *Stream) TransStop() { +// 停止读取源流,并重置状态。 +func (s *StreamService) TransStop() { log.Debugf("TransStop: %s", s.info.Key) if s.isStart && s.r != nil { @@ -366,7 +387,8 @@ func (s *Stream) TransStop() { s.isStart = false } -func (s *Stream) CheckAlive() (n int) { +// 检测某个媒体服务器状态 +func (s *StreamService) CheckAlive() (n int) { if s.r != nil && s.isStart { if s.r.Alive() { n++ @@ -393,7 +415,7 @@ func (s *Stream) CheckAlive() (n int) { return } -func (s *Stream) closeInter() { +func (s *StreamService) closeInter() { if s.r != nil { s.StopStaticPush() log.Debugf("[%v] publisher closed", s.r.Info()) diff --git a/protocol/rtsp/rtsp.go b/protocol/rtsp/rtsp.go new file mode 100644 index 0000000..0f6288f --- /dev/null +++ b/protocol/rtsp/rtsp.go @@ -0,0 +1,40 @@ +package rtsp + +import ( + "net" + "net/http" + + "github.com/gwuhaolin/livego/av" +) + +type Server struct { + handler av.Handler +} + +type stream struct { + Key string `json:"key"` + Id string `json:"id"` +} + +type streams struct { + Publishers []stream `json:"publishers"` + Players []stream `json:"players"` +} + +func NewServer(h av.Handler) *Server { + return &Server{ + handler: h, + } +} + +func (server *Server) Serve(l net.Listener) error { + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + server.handleConn(w, r) + }) + mux.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) { + server.getStream(w, r) + }) + http.Serve(l, mux) + return nil +}