package rtmp import ( "errors" "time" "log" "github.com/gwuhaolin/livego/av" "github.com/gwuhaolin/livego/protocol/rtmp/cache" "github.com/orcaman/concurrent-map" ) var ( EmptyID = "" ) type RtmpStream struct { streams cmap.ConcurrentMap } func NewRtmpStream() *RtmpStream { ret := &RtmpStream{ streams: cmap.New(), } go ret.CheckAlive() return ret } func (rs *RtmpStream) HandleReader(r av.ReadCloser) { info := r.Info() var stream *Stream i, ok := rs.streams.Get(info.Key) if stream, ok = i.(*Stream); ok { stream.TransStop() id := stream.ID() if id != EmptyID && id != info.UID { ns := NewStream() stream.Copy(ns) stream = ns rs.streams.Set(info.Key, ns) } }else { stream = NewStream() rs.streams.Set(info.Key, stream) } stream.AddReader(r) } func (rs *RtmpStream) HandleWriter(w av.WriteCloser) { info := w.Info() var s *Stream ok := rs.streams.Has(info.Key) if !ok { s = NewStream() rs.streams.Set(info.Key, s) } else { item, ok := rs.streams.Get(info.Key) if ok { s = item.(*Stream) s.AddWriter(w) } } } func (rs *RtmpStream) GetStreams() cmap.ConcurrentMap { return rs.streams } func (rs *RtmpStream) CheckAlive() { for { <-time.After(5 * time.Second) for item := range rs.streams.IterBuffered() { v := item.Val.(*Stream) if v.CheckAlive() == 0 { rs.streams.Remove(item.Key) } } } } type Stream struct { isStart bool cache *cache.Cache r av.ReadCloser ws cmap.ConcurrentMap } type PackWriterCloser struct { init bool w av.WriteCloser } func (p *PackWriterCloser) GetWriter() av.WriteCloser { return p.w } func NewStream() *Stream { return &Stream{ cache: cache.NewCache(), ws: cmap.New(), } } func (s *Stream) ID() string { if s.r != nil { return s.r.Info().UID } return EmptyID } func (s *Stream) GetReader() av.ReadCloser { return s.r } func (s *Stream) GetWs() cmap.ConcurrentMap { return s.ws } func (s *Stream) Copy(dst *Stream) { for item := range s.ws.IterBuffered() { v := item.Val.(*PackWriterCloser) s.ws.Remove(item.Key) v.w.CalcBaseTimestamp() dst.AddWriter(v.w) } } func (s *Stream) AddReader(r av.ReadCloser) { s.r = r go s.TransStart() } func (s *Stream) AddWriter(w av.WriteCloser) { info := w.Info() pw := &PackWriterCloser{w: w} s.ws.Set(info.UID, pw) } func (s *Stream) TransStart() { s.isStart = true var p av.Packet for { if !s.isStart { s.closeInter() return } err := s.r.Read(&p) if err != nil { s.closeInter() s.isStart = false return } s.cache.Write(p) for item := range s.ws.IterBuffered() { v := item.Val.(*PackWriterCloser) if !v.init { if err = s.cache.Send(v.w); err != nil { log.Printf("[%s] send cache packet error: %v, remove", v.w.Info(), err) s.ws.Remove(item.Key) continue } v.init = true } else { if err = v.w.Write(p); err != nil { log.Printf("[%s] write packet error: %v, remove", v.w.Info(), err) s.ws.Remove(item.Key) } } } } } func (s *Stream) TransStop() { if s.isStart && s.r != nil { s.r.Close(errors.New("stop old")) } s.isStart = false } func (s *Stream) CheckAlive() (n int) { if s.r != nil && s.isStart { if s.r.Alive() { n++ } else { s.r.Close(errors.New("read timeout")) } } for item := range s.ws.IterBuffered() { v := item.Val.(*PackWriterCloser) if v.w != nil { if !v.w.Alive() && s.isStart { s.ws.Remove(item.Key) v.w.Close(errors.New("write timeout")) continue } n++ } } return } func (s *Stream) closeInter() { if s.r != nil { log.Printf("[%v] publisher closed", s.r.Info()) } for item := range s.ws.IterBuffered() { v := item.Val.(*PackWriterCloser) if v.w != nil { if v.w.Info().IsInterval() { v.w.Close(errors.New("closed")) s.ws.Remove(item.Key) log.Printf("[%v] player closed and remove\n", v.w.Info()) } } } }