Browse Source

Use cache

pull/88/head
Ruben Cid 5 years ago
parent
commit
d875076f97
  1. 3
      .gitignore
  2. 1
      Dockerfile
  3. 128
      configure/channel.go
  4. 26
      configure/liveconfig.go
  5. 1
      go.mod
  6. 2
      go.sum
  7. 12
      main.go
  8. 16
      protocol/api/api.go

3
.gitignore vendored

@ -1,7 +1,6 @@
# Created by .ignore support plugin (hsz.mobi) # Created by .ignore support plugin (hsz.mobi)
.idea .idea
dist dist
room_keys.json
.vscode .vscode
.tmp .tmp
vendor vendor

1
Dockerfile

@ -6,7 +6,6 @@ COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o livego . RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o livego .
FROM alpine:latest FROM alpine:latest
LABEL maintainer="Ruben Cid Lara <rubencidlara@gmail.com>"
RUN mkdir -p /app/config RUN mkdir -p /app/config
WORKDIR /app WORKDIR /app
ENV RTMP_PORT 1935 ENV RTMP_PORT 1935

128
configure/channel.go

@ -1,49 +1,43 @@
package configure package configure
import ( import (
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"log" "log"
"math/rand" "math/rand"
"sync"
"time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/patrickmn/go-cache"
) )
var RoomKeys = LoadRoomKey(*GetKeyFile()) var RoomKeys *RoomKeysType
var roomUpdated = false var roomUpdated = false
var saveInFile = true var saveInLocal = true
var redisCli *redis.Client
type RoomKeysType struct {
redisCli *redis.Client
localCache *cache.Cache
}
func Init() { func Init() {
saveInFile = GetRedisAddr() == nil saveInLocal = GetRedisAddr() == nil
rand.Seed(time.Now().UnixNano()) RoomKeys = &RoomKeysType{
if saveInFile { localCache: cache.New(cache.NoExpiration, 0),
go func() { }
for {
time.Sleep(15 * time.Second)
if roomUpdated {
RoomKeys.Save(*roomKeySaveFile)
roomUpdated = false
}
}
}()
if saveInLocal {
return return
} }
redisCli = redis.NewClient(&redis.Options{ RoomKeys.redisCli = redis.NewClient(&redis.Options{
Addr: *GetRedisAddr(), Addr: *GetRedisAddr(),
Password: *GetRedisPwd(), Password: *GetRedisPwd(),
DB: 0, DB: 0,
}) })
_, err := redisCli.Ping().Result() _, err := RoomKeys.redisCli.Ping().Result()
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -51,62 +45,18 @@ func Init() {
log.Printf("Redis connected") log.Printf("Redis connected")
} }
type RoomKeysType struct {
mapChanKey sync.Map
mapKeyChan sync.Map
}
func LoadRoomKey(f string) *RoomKeysType {
result := &RoomKeysType{
mapChanKey: sync.Map{},
mapKeyChan: sync.Map{},
}
raw := map[string]string{}
content, err := ioutil.ReadFile(f)
if err != nil {
log.Printf("Failed to read file %s for room keys", f)
return result
}
if json.Unmarshal(content, &raw) != nil {
log.Printf("Failed to unmarshal file %s for room keys", f)
return result
}
for room, key := range raw {
result.mapChanKey.Store(room, key)
result.mapKeyChan.Store(key, room)
}
return result
}
func (r *RoomKeysType) Save(f string) {
raw := map[string]string{}
r.mapChanKey.Range(func(channel, key interface{}) bool {
raw[channel.(string)] = key.(string)
return true
})
content, err := json.Marshal(raw)
if err != nil {
log.Println("Failed to marshal room keys")
return
}
if ioutil.WriteFile(f, content, 0644) != nil {
log.Println("Failed to save room keys")
return
}
}
// set/reset a random key for channel // set/reset a random key for channel
func (r *RoomKeysType) SetKey(channel string) (key string, err error) { func (r *RoomKeysType) SetKey(channel string) (key string, err error) {
if !saveInFile { if !saveInLocal {
for { for {
key = randStringRunes(48) key = randStringRunes(48)
if _, err = redisCli.Get(key).Result(); err == redis.Nil { if _, err = r.redisCli.Get(key).Result(); err == redis.Nil {
err = redisCli.Set(channel, key, 0).Err() err = r.redisCli.Set(channel, key, 0).Err()
if err != nil { if err != nil {
return return
} }
err = redisCli.Set(key, channel, 0).Err() err = r.redisCli.Set(key, channel, 0).Err()
return return
} else if err != nil { } else if err != nil {
return return
@ -116,9 +66,9 @@ func (r *RoomKeysType) SetKey(channel string) (key string, err error) {
for { for {
key = randStringRunes(48) key = randStringRunes(48)
if _, found := r.mapKeyChan.Load(key); !found { if _, found := r.localCache.Get(key); !found {
r.mapChanKey.Store(channel, key) r.localCache.SetDefault(channel, key)
r.mapKeyChan.Store(key, channel) r.localCache.SetDefault(key, channel)
break break
} }
} }
@ -127,8 +77,8 @@ func (r *RoomKeysType) SetKey(channel string) (key string, err error) {
} }
func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) { func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) {
if !saveInFile { if !saveInLocal {
if newKey, err = redisCli.Get(channel).Result(); err == redis.Nil { if newKey, err = r.redisCli.Get(channel).Result(); err == redis.Nil {
newKey, err = r.SetKey(channel) newKey, err = r.SetKey(channel)
log.Printf("[KEY] new channel [%s]: %s", channel, newKey) log.Printf("[KEY] new channel [%s]: %s", channel, newKey)
return return
@ -139,7 +89,7 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) {
var key interface{} var key interface{}
var found bool var found bool
if key, found = r.mapChanKey.Load(channel); found { if key, found = r.localCache.Get(channel); found {
return key.(string), nil return key.(string), nil
} }
newKey, err = r.SetKey(channel) newKey, err = r.SetKey(channel)
@ -148,11 +98,11 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) {
} }
func (r *RoomKeysType) GetChannel(key string) (channel string, err error) { func (r *RoomKeysType) GetChannel(key string) (channel string, err error) {
if !saveInFile { if !saveInLocal {
return redisCli.Get(key).Result() return r.redisCli.Get(key).Result()
} }
chann, found := r.mapKeyChan.Load(key) chann, found := r.localCache.Get(key)
if found { if found {
return chann.(string), nil return chann.(string), nil
} else { } else {
@ -161,28 +111,28 @@ func (r *RoomKeysType) GetChannel(key string) (channel string, err error) {
} }
func (r *RoomKeysType) DeleteChannel(channel string) bool { func (r *RoomKeysType) DeleteChannel(channel string) bool {
if !saveInFile { if !saveInLocal {
return redisCli.Del(channel).Err() != nil return r.redisCli.Del(channel).Err() != nil
} }
key, ok := r.mapChanKey.Load(channel) key, ok := r.localCache.Get(channel)
if ok { if ok {
r.mapChanKey.Delete(channel) r.localCache.Delete(channel)
r.mapKeyChan.Delete(key) r.localCache.Delete(key.(string))
return true return true
} }
return false return false
} }
func (r *RoomKeysType) DeleteKey(key string) bool { func (r *RoomKeysType) DeleteKey(key string) bool {
if !saveInFile { if !saveInLocal {
return redisCli.Del(key).Err() != nil return r.redisCli.Del(key).Err() != nil
} }
channel, ok := r.mapKeyChan.Load(key) channel, ok := r.localCache.Get(key)
if ok { if ok {
r.mapChanKey.Delete(channel) r.localCache.Delete(channel.(string))
r.mapKeyChan.Delete(key) r.localCache.Delete(key)
return true return true
} }
return false return false

26
configure/liveconfig.go

@ -20,9 +20,8 @@ import (
} }
*/ */
var ( var (
roomKeySaveFile = flag.String("KeyFile", "room_keys.json", "path to save room keys") redisAddr = flag.String("redis_addr", "", "redis addr to save room keys ex. localhost:6379")
RedisAddr = flag.String("redis_addr", "", "redis addr to save room keys ex. localhost:6379") redisPwd = flag.String("redis_pwd", "", "redis password")
RedisPwd = flag.String("redis_pwd", "", "redis password")
) )
type Application struct { type Application struct {
@ -31,14 +30,11 @@ type Application struct {
Hlson string `json:"hlson"` Hlson string `json:"hlson"`
StaticPush []string `json:"static_push"` StaticPush []string `json:"static_push"`
} }
type JWTCfg struct { type JWTCfg struct {
Secret string `json:"secret"` Secret string `json:"secret"`
Algorithm string `json:"algorithm"` Algorithm string `json:"algorithm"`
} }
type ServerCfg struct { type ServerCfg struct {
KeyFile string `json:"key_file"`
RedisAddr string `json:"redis_addr"` RedisAddr string `json:"redis_addr"`
RedisPwd string `json:"redis_pwd"` RedisPwd string `json:"redis_pwd"`
JWTCfg `json:"jwt"` JWTCfg `json:"jwt"`
@ -69,32 +65,24 @@ func LoadConfig(configfilename string) error {
return nil return nil
} }
func GetKeyFile() *string {
if len(RtmpServercfg.KeyFile) > 0 {
*roomKeySaveFile = RtmpServercfg.KeyFile
}
return roomKeySaveFile
}
func GetRedisAddr() *string { func GetRedisAddr() *string {
if len(RtmpServercfg.RedisAddr) > 0 { if len(RtmpServercfg.RedisAddr) > 0 {
*RedisAddr = RtmpServercfg.RedisAddr *redisAddr = RtmpServercfg.RedisAddr
} }
if len(*RedisAddr) == 0 { if len(*redisAddr) == 0 {
return nil return nil
} }
return RedisAddr return redisAddr
} }
func GetRedisPwd() *string { func GetRedisPwd() *string {
if len(RtmpServercfg.RedisPwd) > 0 { if len(RtmpServercfg.RedisPwd) > 0 {
*RedisPwd = RtmpServercfg.RedisPwd *redisPwd = RtmpServercfg.RedisPwd
} }
return RedisPwd return redisPwd
} }
func CheckAppName(appname string) bool { func CheckAppName(appname string) bool {

1
go.mod

@ -8,6 +8,7 @@ require (
github.com/go-redis/redis/v7 v7.2.0 github.com/go-redis/redis/v7 v7.2.0
github.com/gorilla/mux v1.7.4 // indirect github.com/gorilla/mux v1.7.4 // indirect
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/satori/go.uuid v1.2.0 github.com/satori/go.uuid v1.2.0
github.com/smartystreets/goconvey v1.6.4 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.4.0 github.com/stretchr/testify v1.4.0

2
go.sum

@ -31,6 +31,8 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 h1:lNCW6THrCKBiJBpz8kbVGjC7MgdCGKwuvBgc7LoD6sw= github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 h1:lNCW6THrCKBiJBpz8kbVGjC7MgdCGKwuvBgc7LoD6sw=
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=

12
main.go

@ -7,9 +7,9 @@ import (
"time" "time"
"livego/configure" "livego/configure"
"livego/protocol/api"
"livego/protocol/hls" "livego/protocol/hls"
"livego/protocol/httpflv" "livego/protocol/httpflv"
"livego/protocol/httpopera"
"livego/protocol/rtmp" "livego/protocol/rtmp"
) )
@ -89,20 +89,20 @@ func startHTTPFlv(stream *rtmp.RtmpStream) {
}() }()
} }
func startHTTPOpera(stream *rtmp.RtmpStream) { func startAPI(stream *rtmp.RtmpStream) {
if *operaAddr != "" { if *operaAddr != "" {
opListen, err := net.Listen("tcp", *operaAddr) opListen, err := net.Listen("tcp", *operaAddr)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
opServer := httpopera.NewServer(stream, *rtmpAddr) opServer := api.NewServer(stream, *rtmpAddr)
go func() { go func() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Println("HTTP-Operation server panic: ", r) log.Println("HTTP-API server panic: ", r)
} }
}() }()
log.Println("HTTP-Operation listen On", *operaAddr) log.Println("HTTP-API listen On", *operaAddr)
opServer.Serve(opListen) opServer.Serve(opListen)
}() }()
} }
@ -124,7 +124,7 @@ func main() {
stream := rtmp.NewRtmpStream() stream := rtmp.NewRtmpStream()
hlsServer := startHls() hlsServer := startHls()
startHTTPFlv(stream) startHTTPFlv(stream)
startHTTPOpera(stream) startAPI(stream)
startRtmp(stream, hlsServer) startRtmp(stream, hlsServer)
//startRtmp(stream, nil) //startRtmp(stream, nil)

16
protocol/httpopera/http_opera.go → protocol/api/api.go

@ -1,4 +1,4 @@
package httpopera package api
import ( import (
"encoding/json" "encoding/json"
@ -327,14 +327,14 @@ func (s *Server) handleReset(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
res.Status = 400 res.Status = 400
res.Data = "url: /control/reset?room=ROOM_NAME" res.Data = "url: /control/reset?room=<ROOM_NAME>"
return return
} }
room := r.Form.Get("room") room := r.Form.Get("room")
if len(room) == 0 { if len(room) == 0 {
res.Status = 400 res.Status = 400
res.Data = "url: /control/get?room=ROOM_NAME" res.Data = "url: /control/get?room=<ROOM_NAME>"
return return
} }
@ -359,7 +359,7 @@ func (s *Server) handleGet(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
res.Status = 400 res.Status = 400
res.Data = "url: /control/get?room=ROOM_NAME" res.Data = "url: /control/get?room=<ROOM_NAME>"
return return
} }
@ -367,7 +367,7 @@ func (s *Server) handleGet(w http.ResponseWriter, r *http.Request) {
if len(room) == 0 { if len(room) == 0 {
res.Status = 400 res.Status = 400
res.Data = "url: /control/get?room=ROOM_NAME" res.Data = "url: /control/get?room=<ROOM_NAME>"
return return
} }
@ -390,7 +390,7 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil { if err := r.ParseForm(); err != nil {
res.Status = 400 res.Status = 400
res.Data = "url: /control/delete?room=ROOM_NAME" res.Data = "url: /control/delete?room=<ROOM_NAME>"
return return
} }
@ -398,7 +398,7 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) {
if len(room) == 0 { if len(room) == 0 {
res.Status = 400 res.Status = 400
res.Data = "url: /control/get?room=ROOM_NAME" res.Data = "url: /control/get?room=<ROOM_NAME>"
return return
} }
@ -407,5 +407,5 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) {
return return
} }
res.Status = 404 res.Status = 404
res.Data = "Room not found" res.Data = "room not found"
} }
Loading…
Cancel
Save