diff --git a/configure/channel.go b/configure/channel.go new file mode 100755 index 0000000..25bb061 --- /dev/null +++ b/configure/channel.go @@ -0,0 +1,122 @@ +package configure + +import ( + "encoding/json" + "io/ioutil" + "log" + "math/rand" + "sync" + "time" +) + +const roomKeySaveFile = "room_keys.json" + +var RoomKeys = LoadRoomKey(roomKeySaveFile) + +var roomUpdated = false + +func init() { + rand.Seed(time.Now().UnixNano()) + go func() { + for { + time.Sleep(15 * time.Second) + if roomUpdated { + RoomKeys.Save(roomKeySaveFile) + roomUpdated = false + } + } + }() +} + + +type RoomKeysType struct { + mapChanKey sync.Map + mapKeyChan sync.Map +} + +func LoadRoomKey(f string) *RoomKeysType { + result := &RoomKeysType { + mapChanKey: sync.Map{}, + mapKeyChan: sync.Map{}, + } + raw := map[string]string{} + content, err := ioutil.ReadFile(f) + if err != nil { + log.Printf("Failed to read file %s for room keys", f) + return result + } + if json.Unmarshal(content, &raw) != nil { + log.Printf("Failed to unmarshal file %s for room keys", f) + return result + } + for room, key := range raw { + result.mapChanKey.Store(room, key) + result.mapKeyChan.Store(key, room) + } + return result +} + +func (r *RoomKeysType) Save(f string) { + raw := map[string]string{} + r.mapChanKey.Range(func(channel, key interface{}) bool { + raw[channel.(string)] = key.(string) + return true + }) + content, err := json.Marshal(raw) + if err != nil { + log.Println("Failed to marshal room keys") + return + } + if ioutil.WriteFile(f, content, 0644) != nil { + log.Println("Failed to save room keys") + return + } +} + +// set/reset a random key for channel +func (r *RoomKeysType) SetKey(channel string) string { + var key string + for { + key = randStringRunes(48) + if _, found := r.mapKeyChan.Load(key); !found { + r.mapChanKey.Store(channel, key) + r.mapKeyChan.Store(key, channel) + break + } + } + roomUpdated = true + return key +} + +func (r *RoomKeysType) GetKey(channel string) string { + var key interface{} + var found bool + if key, found = r.mapChanKey.Load(channel); found { + return key.(string) + } else { + newkey := r.SetKey(channel) + log.Printf("[KEY] new channel [%s]: %s", channel, newkey) + return newkey + } +} + +func (r *RoomKeysType) GetChannel(key string) string { + channel, found := r.mapKeyChan.Load(key) + if found { + return channel.(string) + } else { + return "" + } +} + + +// helpers +var letterRunes = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randStringRunes(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) +} diff --git a/configure/liveconfig.go b/configure/liveconfig.go index 6830573..302bf52 100644 --- a/configure/liveconfig.go +++ b/configure/liveconfig.go @@ -8,38 +8,39 @@ import ( /* { - [ - { - "application":"live", - "live":"on", - "hls":"on", - "static_push":["rtmp://xx/live"] - } - ] + "server": [ + { + "appname": "live", + "liveon": "on", + "hlson": "on", + "static_push": [] + } + ] } */ + type Application struct { - Appname string - Liveon string - Hlson string - Static_push []string + Appname string `json:"appname"` + Liveon string `json:"liveon"` + Hlson string `json:"hlson"` + StaticPush []string `json:"static_push"` } type ServerCfg struct { - Server []Application + Server []Application `json:"server"` } var RtmpServercfg ServerCfg func LoadConfig(configfilename string) error { - log.Printf("starting load configure file(%s)......", configfilename) + log.Printf("starting load configure file %s", configfilename) data, err := ioutil.ReadFile(configfilename) if err != nil { log.Printf("ReadFile %s error:%v", configfilename, err) return err } - log.Printf("loadconfig: \r\n%s", string(data)) + // log.Printf("loadconfig: \r\n%s", string(data)) err = json.Unmarshal(data, &RtmpServercfg) if err != nil { @@ -62,8 +63,8 @@ func CheckAppName(appname string) bool { func GetStaticPushUrlList(appname string) ([]string, bool) { for _, app := range RtmpServercfg.Server { if (app.Appname == appname) && (app.Liveon == "on") { - if len(app.Static_push) > 0 { - return app.Static_push, true + if len(app.StaticPush) > 0 { + return app.StaticPush, true } else { return nil, false } diff --git a/protocol/rtmp/rtmp.go b/protocol/rtmp/rtmp.go index b8b475a..69f0bea 100755 --- a/protocol/rtmp/rtmp.go +++ b/protocol/rtmp/rtmp.go @@ -4,6 +4,7 @@ import ( "errors" "flag" "fmt" + "github.com/gwuhaolin/livego/utils/uid" "log" "net" "net/url" @@ -15,7 +16,6 @@ import ( "github.com/gwuhaolin/livego/configure" "github.com/gwuhaolin/livego/container/flv" "github.com/gwuhaolin/livego/protocol/rtmp/core" - "github.com/gwuhaolin/livego/utils/uid" ) const ( @@ -111,7 +111,7 @@ func (s *Server) handleConn(conn *core.Conn) error { return err } - appname, _, _ := connServer.GetInfo() + appname, name, _ := connServer.GetInfo() if ret := configure.CheckAppName(appname); !ret { err := errors.New(fmt.Sprintf("application name=%s is not configured", appname)) @@ -122,6 +122,14 @@ func (s *Server) handleConn(conn *core.Conn) error { log.Printf("handleConn: IsPublisher=%v", connServer.IsPublisher()) if connServer.IsPublisher() { + var channel = configure.RoomKeys.GetChannel(name) + if channel == "" { + err := errors.New(fmt.Sprintf("invalid key")) + conn.Close() + log.Println("CheckKey err:", err) + return err + } + connServer.PublishInfo.Name = channel if pushlist, ret := configure.GetStaticPushUrlList(appname); ret && (pushlist != nil) { log.Printf("GetStaticPushUrlList: %v", pushlist) } @@ -138,6 +146,7 @@ func (s *Server) handleConn(conn *core.Conn) error { flvWriter := new(flv.FlvDvr) s.handler.HandleWriter(flvWriter.GetWriter(reader.Info())) } else { + configure.RoomKeys.GetKey(name) // set new key if this channel not exists writer := NewVirWriter(connServer) log.Printf("new player: %+v", writer.Info()) s.handler.HandleWriter(writer) diff --git a/protocol/rtmp/rtmprelay/staticrelay.go b/protocol/rtmp/rtmprelay/staticrelay.go index 6a376b8..4fb17ff 100644 --- a/protocol/rtmp/rtmprelay/staticrelay.go +++ b/protocol/rtmp/rtmprelay/staticrelay.go @@ -63,7 +63,7 @@ func GetStaticPushObject(rtmpurl string) (*StaticPush, error) { } g_MapLock.RUnlock() - return nil, errors.New(fmt.Sprintf("G_StaticPushMap[%s] not exist....")) + return nil, errors.New(fmt.Sprintf("G_StaticPushMap[%s] not exist....", rtmpurl)) } func ReleaseStaticPushObject(rtmpurl string) {