diff --git a/protocol/api/api.go b/protocol/api/api.go index c424580..778f3fc 100755 --- a/protocol/api/api.go +++ b/protocol/api/api.go @@ -160,36 +160,40 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) { } msgs := new(streams) - for item := range rtmpStream.GetStreams().IterBuffered() { - if s, ok := item.Val.(*rtmp.Stream); ok { + + rtmpStream.GetStreams().Range(func(key, val interface{}) bool { + if s, ok := val.(*rtmp.Stream); ok { if s.GetReader() != nil { switch s.GetReader().(type) { case *rtmp.VirReader: v := s.GetReader().(*rtmp.VirReader) - msg := stream{item.Key, v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS, + msg := stream{key.(string), v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS, v.ReadBWInfo.AudioDatainBytes, v.ReadBWInfo.AudioSpeedInBytesperMS} msgs.Publishers = append(msgs.Publishers, msg) } } } - } + return true + }) - for item := range rtmpStream.GetStreams().IterBuffered() { - ws := item.Val.(*rtmp.Stream).GetWs() - for s := range ws.IterBuffered() { - if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok { + rtmpStream.GetStreams().Range(func(key, val interface{}) bool { + ws := val.(*rtmp.Stream).GetWs() + ws.Range(func(k, v interface{}) bool { + if pw, ok := v.(*rtmp.PackWriterCloser); ok { if pw.GetWriter() != nil { switch pw.GetWriter().(type) { case *rtmp.VirWriter: v := pw.GetWriter().(*rtmp.VirWriter) - msg := stream{item.Key, v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS, + msg := stream{key.(string), v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS, v.WriteBWInfo.AudioDatainBytes, v.WriteBWInfo.AudioSpeedInBytesperMS} msgs.Players = append(msgs.Players, msg) } } } - } - } + return true + }) + return true + }) resp, _ := json.Marshal(msgs) res.Data = resp diff --git a/protocol/hls/hls.go b/protocol/hls/hls.go index 57e2ba2..82159cb 100755 --- a/protocol/hls/hls.go +++ b/protocol/hls/hls.go @@ -2,17 +2,18 @@ package hls import ( "fmt" - "github.com/gwuhaolin/livego/configure" "net" "net/http" "path" "strconv" "strings" + "sync" "time" + "github.com/gwuhaolin/livego/configure" + "github.com/gwuhaolin/livego/av" - cmap "github.com/orcaman/concurrent-map" log "github.com/sirupsen/logrus" ) @@ -35,12 +36,12 @@ var crossdomainxml = []byte(` type Server struct { listener net.Listener - conns cmap.ConcurrentMap + conns *sync.Map } func NewServer() *Server { ret := &Server{ - conns: cmap.New(), + conns: &sync.Map{}, } go ret.checkStop() return ret @@ -58,20 +59,19 @@ func (server *Server) Serve(listener net.Listener) error { func (server *Server) GetWriter(info av.Info) av.WriteCloser { var s *Source - ok := server.conns.Has(info.Key) + v, ok := server.conns.Load(info.Key) if !ok { log.Debug("new hls source") s = NewSource(info) - server.conns.Set(info.Key, s) + server.conns.Store(info.Key, s) } else { - v, _ := server.conns.Get(info.Key) s = v.(*Source) } return s } func (server *Server) getConn(key string) *Source { - v, ok := server.conns.Get(key) + v, ok := server.conns.Load(key) if !ok { return nil } @@ -81,13 +81,15 @@ func (server *Server) getConn(key string) *Source { func (server *Server) checkStop() { for { <-time.After(5 * time.Second) - for item := range server.conns.IterBuffered() { - v := item.Val.(*Source) + + server.conns.Range(func(key, val interface{}) bool { + v := val.(*Source) if !v.Alive() && !configure.Config.GetBool("hls_keep_after_end") { log.Debug("check stop and remove: ", v.Info()) - server.conns.Remove(item.Key) + server.conns.Delete(key) } - } + return true + }) } } diff --git a/protocol/httpflv/server.go b/protocol/httpflv/server.go index 3d7f665..11cc46c 100644 --- a/protocol/httpflv/server.go +++ b/protocol/httpflv/server.go @@ -51,26 +51,31 @@ func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *stream return nil } msgs := new(streams) - for item := range rtmpStream.GetStreams().IterBuffered() { - if s, ok := item.Val.(*rtmp.Stream); ok { + + rtmpStream.GetStreams().Range(func(key, val interface{}) bool { + if s, ok := val.(*rtmp.Stream); ok { if s.GetReader() != nil { - msg := stream{item.Key, s.GetReader().Info().UID} + msg := stream{key.(string), s.GetReader().Info().UID} msgs.Publishers = append(msgs.Publishers, msg) } } - } + return true + }) + + rtmpStream.GetStreams().Range(func(key, val interface{}) bool { + ws := val.(*rtmp.Stream).GetWs() - for item := range rtmpStream.GetStreams().IterBuffered() { - ws := item.Val.(*rtmp.Stream).GetWs() - for s := range ws.IterBuffered() { - if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok { + ws.Range(func(k, v interface{}) bool { + if pw, ok := v.(*rtmp.PackWriterCloser); ok { if pw.GetWriter() != nil { - msg := stream{item.Key, pw.GetWriter().Info().UID} + msg := stream{key.(string), pw.GetWriter().Info().UID} msgs.Players = append(msgs.Players, msg) } } - } - } + return true + }) + return true + }) return msgs } diff --git a/protocol/rtmp/rtmp.go b/protocol/rtmp/rtmp.go index 81bd165..132ac50 100755 --- a/protocol/rtmp/rtmp.go +++ b/protocol/rtmp/rtmp.go @@ -124,7 +124,7 @@ func (s *Server) handleConn(conn *core.Conn) error { if connServer.IsPublisher() { channel, err := configure.RoomKeys.GetChannel(name) if err != nil { - err := fmt.Errorf("invalid key") + err := fmt.Errorf("invalid key err=%s", err.Error()) conn.Close() log.Error("CheckKey err: ", err) return err diff --git a/protocol/rtmp/rtmprelay/staticrelay.go b/protocol/rtmp/rtmprelay/staticrelay.go index 2783c8f..0bf67c9 100644 --- a/protocol/rtmp/rtmprelay/staticrelay.go +++ b/protocol/rtmp/rtmprelay/staticrelay.go @@ -21,19 +21,28 @@ type StaticPush struct { var G_StaticPushMap = make(map[string](*StaticPush)) var g_MapLock = new(sync.RWMutex) +var G_PushUrlList []string = nil var ( STATIC_RELAY_STOP_CTRL = "STATIC_RTMPRELAY_STOP" ) func GetStaticPushList(appname string) ([]string, error) { - pushurlList, ok := configure.GetStaticPushUrlList(appname) + if G_PushUrlList == nil { + // Do not unmarshel the config every time, lots of reflect works -gs + pushurlList, ok := configure.GetStaticPushUrlList(appname) + if !ok { + G_PushUrlList = []string{} + } else { + G_PushUrlList = pushurlList + } + } - if !ok { + if len(G_PushUrlList) == 0 { return nil, fmt.Errorf("no static push url") } - return pushurlList, nil + return G_PushUrlList, nil } func GetAndCreateStaticPushObject(rtmpurl string) *StaticPush { diff --git a/protocol/rtmp/stream.go b/protocol/rtmp/stream.go index 3ebd276..4d50367 100755 --- a/protocol/rtmp/stream.go +++ b/protocol/rtmp/stream.go @@ -3,13 +3,13 @@ package rtmp import ( "fmt" "strings" + "sync" "time" "github.com/gwuhaolin/livego/av" "github.com/gwuhaolin/livego/protocol/rtmp/cache" "github.com/gwuhaolin/livego/protocol/rtmp/rtmprelay" - cmap "github.com/orcaman/concurrent-map" log "github.com/sirupsen/logrus" ) @@ -18,12 +18,12 @@ var ( ) type RtmpStream struct { - streams cmap.ConcurrentMap //key + streams *sync.Map //key } func NewRtmpStream() *RtmpStream { ret := &RtmpStream{ - streams: cmap.New(), + streams: &sync.Map{}, } go ret.CheckAlive() return ret @@ -34,7 +34,7 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) { log.Debugf("HandleReader: info[%v]", info) var stream *Stream - i, ok := rs.streams.Get(info.Key) + i, ok := rs.streams.Load(info.Key) if stream, ok = i.(*Stream); ok { stream.TransStop() id := stream.ID() @@ -42,11 +42,11 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) { ns := NewStream() stream.Copy(ns) stream = ns - rs.streams.Set(info.Key, ns) + rs.streams.Store(info.Key, ns) } } else { stream = NewStream() - rs.streams.Set(info.Key, stream) + rs.streams.Store(info.Key, stream) stream.info = info } @@ -58,33 +58,32 @@ func (rs *RtmpStream) HandleWriter(w av.WriteCloser) { log.Debugf("HandleWriter: info[%v]", info) var s *Stream - ok := rs.streams.Has(info.Key) + item, ok := rs.streams.Load(info.Key) if !ok { + log.Debugf("HandleWriter: not found create new info[%v]", info) s = NewStream() - rs.streams.Set(info.Key, s) + rs.streams.Store(info.Key, s) s.info = info } else { - item, ok := rs.streams.Get(info.Key) - if ok { - s = item.(*Stream) - s.AddWriter(w) - } + s = item.(*Stream) + s.AddWriter(w) } } -func (rs *RtmpStream) GetStreams() cmap.ConcurrentMap { +func (rs *RtmpStream) GetStreams() *sync.Map { return rs.streams } func (rs *RtmpStream) CheckAlive() { for { <-time.After(5 * time.Second) - for item := range rs.streams.IterBuffered() { - v := item.Val.(*Stream) + rs.streams.Range(func(key, val interface{}) bool { + v := val.(*Stream) if v.CheckAlive() == 0 { - rs.streams.Remove(item.Key) + rs.streams.Delete(key) } - } + return true + }) } } @@ -92,7 +91,7 @@ type Stream struct { isStart bool cache *cache.Cache r av.ReadCloser - ws cmap.ConcurrentMap + ws *sync.Map info av.Info } @@ -108,7 +107,7 @@ func (p *PackWriterCloser) GetWriter() av.WriteCloser { func NewStream() *Stream { return &Stream{ cache: cache.NewCache(), - ws: cmap.New(), + ws: &sync.Map{}, } } @@ -123,17 +122,19 @@ func (s *Stream) GetReader() av.ReadCloser { return s.r } -func (s *Stream) GetWs() cmap.ConcurrentMap { +func (s *Stream) GetWs() *sync.Map { return s.ws } func (s *Stream) Copy(dst *Stream) { - for item := range s.ws.IterBuffered() { - v := item.Val.(*PackWriterCloser) - s.ws.Remove(item.Key) + dst.info = s.info + s.ws.Range(func(key, val interface{}) bool { + v := val.(*PackWriterCloser) + s.ws.Delete(key) v.w.CalcBaseTimestamp() dst.AddWriter(v.w) - } + return true + }) } func (s *Stream) AddReader(r av.ReadCloser) { @@ -144,7 +145,7 @@ func (s *Stream) AddReader(r av.ReadCloser) { func (s *Stream) AddWriter(w av.WriteCloser) { info := w.Info() pw := &PackWriterCloser{w: w} - s.ws.Set(info.UID, pw) + s.ws.Store(info.UID, pw) } /*检测本application下是否配置static_push, @@ -331,26 +332,27 @@ func (s *Stream) TransStart() { s.cache.Write(p) - for item := range s.ws.IterBuffered() { - v := item.Val.(*PackWriterCloser) + s.ws.Range(func(key, val interface{}) bool { + v := val.(*PackWriterCloser) if !v.init { //log.Debugf("cache.send: %v", v.w.Info()) if err = s.cache.Send(v.w); err != nil { log.Debugf("[%s] send cache packet error: %v, remove", v.w.Info(), err) - s.ws.Remove(item.Key) - continue + s.ws.Delete(key) + return true } v.init = true } else { - new_packet := p + newPacket := p //writeType := reflect.TypeOf(v.w) //log.Debugf("w.Write: type=%v, %v", writeType, v.w.Info()) - if err = v.w.Write(&new_packet); err != nil { + if err = v.w.Write(&newPacket); err != nil { log.Debugf("[%s] write packet error: %v, remove", v.w.Info(), err) - s.ws.Remove(item.Key) + s.ws.Delete(key) } } - } + return true + }) } } @@ -372,18 +374,22 @@ func (s *Stream) CheckAlive() (n int) { s.r.Close(fmt.Errorf("read timeout")) } } - for item := range s.ws.IterBuffered() { - v := item.Val.(*PackWriterCloser) + + s.ws.Range(func(key, val interface{}) bool { + v := val.(*PackWriterCloser) if v.w != nil { + //Alive from RWBaser, check last frame now - timestamp, if > timeout then Remove it if !v.w.Alive() && s.isStart { - s.ws.Remove(item.Key) + log.Infof("write timeout remove") + s.ws.Delete(key) v.w.Close(fmt.Errorf("write timeout")) - continue + return true } n++ } + return true + }) - } return } @@ -393,14 +399,15 @@ func (s *Stream) closeInter() { log.Debugf("[%v] publisher closed", s.r.Info()) } - for item := range s.ws.IterBuffered() { - v := item.Val.(*PackWriterCloser) + s.ws.Range(func(key, val interface{}) bool { + v := val.(*PackWriterCloser) if v.w != nil { if v.w.Info().IsInterval() { v.w.Close(fmt.Errorf("closed")) - s.ws.Remove(item.Key) + s.ws.Delete(key) log.Debugf("[%v] player closed and remove\n", v.w.Info()) } } - } + return true + }) }