Browse Source

huge performance improvement, almost 100% increase, very close to nginx-rtmp-module

pull/137/head
gaoshen.gs 5 years ago
parent
commit
e008907cde
  1. 26
      protocol/api/api.go
  2. 26
      protocol/hls/hls.go
  3. 27
      protocol/httpflv/server.go
  4. 2
      protocol/rtmp/rtmp.go
  5. 15
      protocol/rtmp/rtmprelay/staticrelay.go
  6. 93
      protocol/rtmp/stream.go

26
protocol/api/api.go

@ -160,36 +160,40 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) { @@ -160,36 +160,40 @@ func (server *Server) GetLiveStatics(w http.ResponseWriter, req *http.Request) {
}
msgs := new(streams)
for item := range rtmpStream.GetStreams().IterBuffered() {
if s, ok := item.Val.(*rtmp.Stream); ok {
rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
if s, ok := val.(*rtmp.Stream); ok {
if s.GetReader() != nil {
switch s.GetReader().(type) {
case *rtmp.VirReader:
v := s.GetReader().(*rtmp.VirReader)
msg := stream{item.Key, v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS,
msg := stream{key.(string), v.Info().URL, v.ReadBWInfo.StreamId, v.ReadBWInfo.VideoDatainBytes, v.ReadBWInfo.VideoSpeedInBytesperMS,
v.ReadBWInfo.AudioDatainBytes, v.ReadBWInfo.AudioSpeedInBytesperMS}
msgs.Publishers = append(msgs.Publishers, msg)
}
}
}
}
return true
})
for item := range rtmpStream.GetStreams().IterBuffered() {
ws := item.Val.(*rtmp.Stream).GetWs()
for s := range ws.IterBuffered() {
if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok {
rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
ws := val.(*rtmp.Stream).GetWs()
ws.Range(func(k, v interface{}) bool {
if pw, ok := v.(*rtmp.PackWriterCloser); ok {
if pw.GetWriter() != nil {
switch pw.GetWriter().(type) {
case *rtmp.VirWriter:
v := pw.GetWriter().(*rtmp.VirWriter)
msg := stream{item.Key, v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS,
msg := stream{key.(string), v.Info().URL, v.WriteBWInfo.StreamId, v.WriteBWInfo.VideoDatainBytes, v.WriteBWInfo.VideoSpeedInBytesperMS,
v.WriteBWInfo.AudioDatainBytes, v.WriteBWInfo.AudioSpeedInBytesperMS}
msgs.Players = append(msgs.Players, msg)
}
}
}
}
}
return true
})
return true
})
resp, _ := json.Marshal(msgs)
res.Data = resp

26
protocol/hls/hls.go

@ -2,17 +2,18 @@ package hls @@ -2,17 +2,18 @@ package hls
import (
"fmt"
"github.com/gwuhaolin/livego/configure"
"net"
"net/http"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/gwuhaolin/livego/configure"
"github.com/gwuhaolin/livego/av"
cmap "github.com/orcaman/concurrent-map"
log "github.com/sirupsen/logrus"
)
@ -35,12 +36,12 @@ var crossdomainxml = []byte(`<?xml version="1.0" ?> @@ -35,12 +36,12 @@ var crossdomainxml = []byte(`<?xml version="1.0" ?>
type Server struct {
listener net.Listener
conns cmap.ConcurrentMap
conns *sync.Map
}
func NewServer() *Server {
ret := &Server{
conns: cmap.New(),
conns: &sync.Map{},
}
go ret.checkStop()
return ret
@ -58,20 +59,19 @@ func (server *Server) Serve(listener net.Listener) error { @@ -58,20 +59,19 @@ func (server *Server) Serve(listener net.Listener) error {
func (server *Server) GetWriter(info av.Info) av.WriteCloser {
var s *Source
ok := server.conns.Has(info.Key)
v, ok := server.conns.Load(info.Key)
if !ok {
log.Debug("new hls source")
s = NewSource(info)
server.conns.Set(info.Key, s)
server.conns.Store(info.Key, s)
} else {
v, _ := server.conns.Get(info.Key)
s = v.(*Source)
}
return s
}
func (server *Server) getConn(key string) *Source {
v, ok := server.conns.Get(key)
v, ok := server.conns.Load(key)
if !ok {
return nil
}
@ -81,13 +81,15 @@ func (server *Server) getConn(key string) *Source { @@ -81,13 +81,15 @@ func (server *Server) getConn(key string) *Source {
func (server *Server) checkStop() {
for {
<-time.After(5 * time.Second)
for item := range server.conns.IterBuffered() {
v := item.Val.(*Source)
server.conns.Range(func(key, val interface{}) bool {
v := val.(*Source)
if !v.Alive() && !configure.Config.GetBool("hls_keep_after_end") {
log.Debug("check stop and remove: ", v.Info())
server.conns.Remove(item.Key)
server.conns.Delete(key)
}
}
return true
})
}
}

27
protocol/httpflv/server.go

@ -51,26 +51,31 @@ func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *stream @@ -51,26 +51,31 @@ func (server *Server) getStreams(w http.ResponseWriter, r *http.Request) *stream
return nil
}
msgs := new(streams)
for item := range rtmpStream.GetStreams().IterBuffered() {
if s, ok := item.Val.(*rtmp.Stream); ok {
rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
if s, ok := val.(*rtmp.Stream); ok {
if s.GetReader() != nil {
msg := stream{item.Key, s.GetReader().Info().UID}
msg := stream{key.(string), s.GetReader().Info().UID}
msgs.Publishers = append(msgs.Publishers, msg)
}
}
}
return true
})
rtmpStream.GetStreams().Range(func(key, val interface{}) bool {
ws := val.(*rtmp.Stream).GetWs()
for item := range rtmpStream.GetStreams().IterBuffered() {
ws := item.Val.(*rtmp.Stream).GetWs()
for s := range ws.IterBuffered() {
if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok {
ws.Range(func(k, v interface{}) bool {
if pw, ok := v.(*rtmp.PackWriterCloser); ok {
if pw.GetWriter() != nil {
msg := stream{item.Key, pw.GetWriter().Info().UID}
msg := stream{key.(string), pw.GetWriter().Info().UID}
msgs.Players = append(msgs.Players, msg)
}
}
}
}
return true
})
return true
})
return msgs
}

2
protocol/rtmp/rtmp.go

@ -124,7 +124,7 @@ func (s *Server) handleConn(conn *core.Conn) error { @@ -124,7 +124,7 @@ func (s *Server) handleConn(conn *core.Conn) error {
if connServer.IsPublisher() {
channel, err := configure.RoomKeys.GetChannel(name)
if err != nil {
err := fmt.Errorf("invalid key")
err := fmt.Errorf("invalid key err=%s", err.Error())
conn.Close()
log.Error("CheckKey err: ", err)
return err

15
protocol/rtmp/rtmprelay/staticrelay.go

@ -21,19 +21,28 @@ type StaticPush struct { @@ -21,19 +21,28 @@ type StaticPush struct {
var G_StaticPushMap = make(map[string](*StaticPush))
var g_MapLock = new(sync.RWMutex)
var G_PushUrlList []string = nil
var (
STATIC_RELAY_STOP_CTRL = "STATIC_RTMPRELAY_STOP"
)
func GetStaticPushList(appname string) ([]string, error) {
pushurlList, ok := configure.GetStaticPushUrlList(appname)
if G_PushUrlList == nil {
// Do not unmarshel the config every time, lots of reflect works -gs
pushurlList, ok := configure.GetStaticPushUrlList(appname)
if !ok {
G_PushUrlList = []string{}
} else {
G_PushUrlList = pushurlList
}
}
if !ok {
if len(G_PushUrlList) == 0 {
return nil, fmt.Errorf("no static push url")
}
return pushurlList, nil
return G_PushUrlList, nil
}
func GetAndCreateStaticPushObject(rtmpurl string) *StaticPush {

93
protocol/rtmp/stream.go

@ -3,13 +3,13 @@ package rtmp @@ -3,13 +3,13 @@ package rtmp
import (
"fmt"
"strings"
"sync"
"time"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/protocol/rtmp/cache"
"github.com/gwuhaolin/livego/protocol/rtmp/rtmprelay"
cmap "github.com/orcaman/concurrent-map"
log "github.com/sirupsen/logrus"
)
@ -18,12 +18,12 @@ var ( @@ -18,12 +18,12 @@ var (
)
type RtmpStream struct {
streams cmap.ConcurrentMap //key
streams *sync.Map //key
}
func NewRtmpStream() *RtmpStream {
ret := &RtmpStream{
streams: cmap.New(),
streams: &sync.Map{},
}
go ret.CheckAlive()
return ret
@ -34,7 +34,7 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) { @@ -34,7 +34,7 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) {
log.Debugf("HandleReader: info[%v]", info)
var stream *Stream
i, ok := rs.streams.Get(info.Key)
i, ok := rs.streams.Load(info.Key)
if stream, ok = i.(*Stream); ok {
stream.TransStop()
id := stream.ID()
@ -42,11 +42,11 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) { @@ -42,11 +42,11 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) {
ns := NewStream()
stream.Copy(ns)
stream = ns
rs.streams.Set(info.Key, ns)
rs.streams.Store(info.Key, ns)
}
} else {
stream = NewStream()
rs.streams.Set(info.Key, stream)
rs.streams.Store(info.Key, stream)
stream.info = info
}
@ -58,33 +58,32 @@ func (rs *RtmpStream) HandleWriter(w av.WriteCloser) { @@ -58,33 +58,32 @@ func (rs *RtmpStream) HandleWriter(w av.WriteCloser) {
log.Debugf("HandleWriter: info[%v]", info)
var s *Stream
ok := rs.streams.Has(info.Key)
item, ok := rs.streams.Load(info.Key)
if !ok {
log.Debugf("HandleWriter: not found create new info[%v]", info)
s = NewStream()
rs.streams.Set(info.Key, s)
rs.streams.Store(info.Key, s)
s.info = info
} else {
item, ok := rs.streams.Get(info.Key)
if ok {
s = item.(*Stream)
s.AddWriter(w)
}
s = item.(*Stream)
s.AddWriter(w)
}
}
func (rs *RtmpStream) GetStreams() cmap.ConcurrentMap {
func (rs *RtmpStream) GetStreams() *sync.Map {
return rs.streams
}
func (rs *RtmpStream) CheckAlive() {
for {
<-time.After(5 * time.Second)
for item := range rs.streams.IterBuffered() {
v := item.Val.(*Stream)
rs.streams.Range(func(key, val interface{}) bool {
v := val.(*Stream)
if v.CheckAlive() == 0 {
rs.streams.Remove(item.Key)
rs.streams.Delete(key)
}
}
return true
})
}
}
@ -92,7 +91,7 @@ type Stream struct { @@ -92,7 +91,7 @@ type Stream struct {
isStart bool
cache *cache.Cache
r av.ReadCloser
ws cmap.ConcurrentMap
ws *sync.Map
info av.Info
}
@ -108,7 +107,7 @@ func (p *PackWriterCloser) GetWriter() av.WriteCloser { @@ -108,7 +107,7 @@ func (p *PackWriterCloser) GetWriter() av.WriteCloser {
func NewStream() *Stream {
return &Stream{
cache: cache.NewCache(),
ws: cmap.New(),
ws: &sync.Map{},
}
}
@ -123,17 +122,19 @@ func (s *Stream) GetReader() av.ReadCloser { @@ -123,17 +122,19 @@ func (s *Stream) GetReader() av.ReadCloser {
return s.r
}
func (s *Stream) GetWs() cmap.ConcurrentMap {
func (s *Stream) GetWs() *sync.Map {
return s.ws
}
func (s *Stream) Copy(dst *Stream) {
for item := range s.ws.IterBuffered() {
v := item.Val.(*PackWriterCloser)
s.ws.Remove(item.Key)
dst.info = s.info
s.ws.Range(func(key, val interface{}) bool {
v := val.(*PackWriterCloser)
s.ws.Delete(key)
v.w.CalcBaseTimestamp()
dst.AddWriter(v.w)
}
return true
})
}
func (s *Stream) AddReader(r av.ReadCloser) {
@ -144,7 +145,7 @@ func (s *Stream) AddReader(r av.ReadCloser) { @@ -144,7 +145,7 @@ func (s *Stream) AddReader(r av.ReadCloser) {
func (s *Stream) AddWriter(w av.WriteCloser) {
info := w.Info()
pw := &PackWriterCloser{w: w}
s.ws.Set(info.UID, pw)
s.ws.Store(info.UID, pw)
}
/*检测本application下是否配置static_push,
@ -331,26 +332,27 @@ func (s *Stream) TransStart() { @@ -331,26 +332,27 @@ func (s *Stream) TransStart() {
s.cache.Write(p)
for item := range s.ws.IterBuffered() {
v := item.Val.(*PackWriterCloser)
s.ws.Range(func(key, val interface{}) bool {
v := val.(*PackWriterCloser)
if !v.init {
//log.Debugf("cache.send: %v", v.w.Info())
if err = s.cache.Send(v.w); err != nil {
log.Debugf("[%s] send cache packet error: %v, remove", v.w.Info(), err)
s.ws.Remove(item.Key)
continue
s.ws.Delete(key)
return true
}
v.init = true
} else {
new_packet := p
newPacket := p
//writeType := reflect.TypeOf(v.w)
//log.Debugf("w.Write: type=%v, %v", writeType, v.w.Info())
if err = v.w.Write(&new_packet); err != nil {
if err = v.w.Write(&newPacket); err != nil {
log.Debugf("[%s] write packet error: %v, remove", v.w.Info(), err)
s.ws.Remove(item.Key)
s.ws.Delete(key)
}
}
}
return true
})
}
}
@ -372,18 +374,22 @@ func (s *Stream) CheckAlive() (n int) { @@ -372,18 +374,22 @@ func (s *Stream) CheckAlive() (n int) {
s.r.Close(fmt.Errorf("read timeout"))
}
}
for item := range s.ws.IterBuffered() {
v := item.Val.(*PackWriterCloser)
s.ws.Range(func(key, val interface{}) bool {
v := val.(*PackWriterCloser)
if v.w != nil {
//Alive from RWBaser, check last frame now - timestamp, if > timeout then Remove it
if !v.w.Alive() && s.isStart {
s.ws.Remove(item.Key)
log.Infof("write timeout remove")
s.ws.Delete(key)
v.w.Close(fmt.Errorf("write timeout"))
continue
return true
}
n++
}
return true
})
}
return
}
@ -393,14 +399,15 @@ func (s *Stream) closeInter() { @@ -393,14 +399,15 @@ func (s *Stream) closeInter() {
log.Debugf("[%v] publisher closed", s.r.Info())
}
for item := range s.ws.IterBuffered() {
v := item.Val.(*PackWriterCloser)
s.ws.Range(func(key, val interface{}) bool {
v := val.(*PackWriterCloser)
if v.w != nil {
if v.w.Info().IsInterval() {
v.w.Close(fmt.Errorf("closed"))
s.ws.Remove(item.Key)
s.ws.Delete(key)
log.Debugf("[%v] player closed and remove\n", v.w.Info())
}
}
}
return true
})
}

Loading…
Cancel
Save