Browse Source

Makefile, log system, errors to fmt

pull/88/head
Ruben Cid 5 years ago
parent
commit
b464789a48
  1. 1
      .gitignore
  2. 0
      AG
  3. 15
      CHANGELOG.md
  4. 36
      Makefile
  5. 2
      README.md
  6. 24
      configure/channel.go
  7. 36
      configure/liveconfig.go
  8. 5
      container/flv/demuxer.go
  9. 19
      container/flv/muxer.go
  10. 1
      go.mod
  11. 9
      go.sum
  12. 14
      livego.json
  13. 73
      main.go
  14. 6
      parser/aac/parser.go
  15. 20
      parser/h264/parser.go
  16. 4
      parser/h264/parser_test.go
  17. 8
      parser/mp3/parser.go
  18. 4
      parser/parser.go
  19. 5
      protocol/amf/amf.go
  20. 9
      protocol/amf/amf_test.go
  21. 43
      protocol/amf/decoder_amf0.go
  22. 63
      protocol/amf/decoder_amf3.go
  23. 20
      protocol/amf/decoder_amf3_external.go
  24. 6
      protocol/amf/decoder_amf3_test.go
  25. 31
      protocol/amf/encoder_amf0.go
  26. 41
      protocol/amf/encoder_amf3.go
  27. 3
      protocol/amf/metadata.go
  28. 9
      protocol/amf/util.go
  29. 66
      protocol/api/api.go
  30. 3
      protocol/hls/cache.go
  31. 19
      protocol/hls/hls.go
  32. 33
      protocol/hls/source.go
  33. 7
      protocol/httpflv/server.go
  34. 25
      protocol/httpflv/writer.go
  35. 4
      protocol/rtmp/cache/gop.go
  36. 3
      protocol/rtmp/cache/special.go
  37. 43
      protocol/rtmp/core/conn_client.go
  38. 16
      protocol/rtmp/core/conn_server.go
  39. 61
      protocol/rtmp/rtmp.go
  40. 32
      protocol/rtmp/rtmprelay/rtmprelay.go
  41. 28
      protocol/rtmp/rtmprelay/staticrelay.go
  42. 77
      protocol/rtmp/stream.go

1
.gitignore vendored

@ -4,3 +4,4 @@ dist @@ -4,3 +4,4 @@ dist
.vscode
.tmp
vendor
livego

15
CHANGELOG.md

@ -13,13 +13,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 @@ -13,13 +13,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
{
"jwt": {
"secret": "testing",
"algorithm": "HS256s"
"algorithm": "HS256"
},
"server": [
{
"appname": "live",
"liveon": "on",
"hlson": "on"
"live": true,
"hls": true
}
]
}
@ -32,14 +32,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 @@ -32,14 +32,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
"server": [
{
"appname": "live",
"liveon": "on",
"hlson": "on"
"live": true,
"hls": true
}
]
}
```
- Makefile
### Changed
- Show `players`.
- Show `stream_id`.
- Deleted keys saved in physical file, now the keys are in cached using `go-cache` by default.
- Using `logrus` like log system.
- Using method `.Get(queryParamName)` to get an url query param.
- Replaced `errors.New(...)` to `fmt.Errorf(...)`.
- Replaced types string on config params `liveon` and `hlson` to booleans `live: true/false` and `hls: true/false`

36
Makefile

@ -0,0 +1,36 @@ @@ -0,0 +1,36 @@
GOCMD ?= go
GOBUILD = $(GOCMD) build
GOCLEAN = $(GOCMD) clean
GOTEST = $(GOCMD) test
GOGET = $(GOCMD) get
BINARY_NAME = livego
BINARY_UNIX = $(BINARY_NAME)_unix
DOCKER_ACC ?= gwuhaolin
DOCKER_REPO ?= livego
TAG := $(shell git describe --tags --abbrev=0 2>/dev/null)
default: all
all: test build dockerize
build:
$(GOBUILD) -o $(BINARY_NAME) -v -ldflags="-X main.VERSION=$(TAG)"
test:
$(GOTEST) -v ./...
clean:
$(GOCLEAN)
rm -f $(BINARY_NAME)
rm -f $(BINARY_UNIX)
run: build
./$(BINARY_NAME)
build-linux:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 $(GOBUILD) -o $(BINARY_UNIX) -v
dockerize:
docker build -t $(DOCKER_ACC)/$(DOCKER_REPO):$(TAG) .
docker push $(DOCKER_ACC)/$(DOCKER_REPO):$(TAG)

2
README.md

@ -30,7 +30,7 @@ Run `docker run -p 1935:1935 -p 7001:7001 -p 7002:7002 -d --name livego gwuhaoli @@ -30,7 +30,7 @@ Run `docker run -p 1935:1935 -p 7001:7001 -p 7002:7002 -d --name livego gwuhaoli
#### Compile from source
1. Download the source code `git clone https://github.com/gwuhaolin/livego.git`
2. Go to the livego directory and execute `go build`
2. Go to the livego directory and execute `go build` or `make run`
## Use
2. Start the service: execute the livego binary file to start the livego service;

24
configure/channel.go

@ -2,29 +2,27 @@ package configure @@ -2,29 +2,27 @@ package configure
import (
"fmt"
"log"
"livego/utils/uid"
"github.com/go-redis/redis/v7"
"github.com/patrickmn/go-cache"
log "github.com/sirupsen/logrus"
)
var RoomKeys *RoomKeysType
var saveInLocal = true
type RoomKeysType struct {
redisCli *redis.Client
localCache *cache.Cache
}
func Init() {
saveInLocal = GetRedisAddr() == nil
var RoomKeys = &RoomKeysType{
localCache: cache.New(cache.NoExpiration, 0),
}
RoomKeys = &RoomKeysType{
localCache: cache.New(cache.NoExpiration, 0),
}
var saveInLocal = true
func Init() {
saveInLocal = GetRedisAddr() == nil
if saveInLocal {
return
}
@ -37,10 +35,10 @@ func Init() { @@ -37,10 +35,10 @@ func Init() {
_, err := RoomKeys.redisCli.Ping().Result()
if err != nil {
panic(err)
log.Panic("Redis: ", err)
}
log.Printf("Redis connected")
log.Debug("Redis connected")
}
// set/reset a random key for channel
@ -77,7 +75,7 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) { @@ -77,7 +75,7 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) {
if !saveInLocal {
if newKey, err = r.redisCli.Get(channel).Result(); err == redis.Nil {
newKey, err = r.SetKey(channel)
log.Printf("[KEY] new channel [%s]: %s", channel, newKey)
log.Debugf("[KEY] new channel [%s]: %s", channel, newKey)
return
}
@ -90,7 +88,7 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) { @@ -90,7 +88,7 @@ func (r *RoomKeysType) GetKey(channel string) (newKey string, err error) {
return key.(string), nil
}
newKey, err = r.SetKey(channel)
log.Printf("[KEY] new channel [%s]: %s", channel, newKey)
log.Debugf("[KEY] new channel [%s]: %s", channel, newKey)
return
}

36
configure/liveconfig.go

@ -4,7 +4,8 @@ import ( @@ -4,7 +4,8 @@ import (
"encoding/json"
"flag"
"io/ioutil"
"log"
log "github.com/sirupsen/logrus"
)
/*
@ -12,8 +13,8 @@ import ( @@ -12,8 +13,8 @@ import (
"server": [
{
"appname": "live",
"liveon": "on",
"hlson": "on",
"live": true,
"hls": true,
"static_push": []
}
]
@ -26,8 +27,8 @@ var ( @@ -26,8 +27,8 @@ var (
type Application struct {
Appname string `json:"appname"`
Liveon string `json:"liveon"`
Hlson string `json:"hlson"`
Live bool `json:"liveon"`
Hls bool `json:"hls"`
StaticPush []string `json:"static_push"`
}
type JWTCfg struct {
@ -45,26 +46,29 @@ type ServerCfg struct { @@ -45,26 +46,29 @@ type ServerCfg struct {
var RtmpServercfg = ServerCfg{
Server: []Application{{
Appname: "livego",
Liveon: "on",
Hlson: "on",
Live: true,
Hls: true,
StaticPush: nil,
}},
}
func LoadConfig(configfilename string) {
log.Printf("starting load configure file %s", configfilename)
defer Init()
log.Infof("starting load configure file %s", configfilename)
data, err := ioutil.ReadFile(configfilename)
if err != nil {
log.Printf("ReadFile %s error:%v", configfilename, err)
log.Warningf("ReadFile %s error:%v", configfilename, err)
log.Info("Using default config")
return
}
err = json.Unmarshal(data, &RtmpServercfg)
if err != nil {
log.Printf("json.Unmarshal error:%v", err)
log.Errorf("json.Unmarshal error:%v", err)
log.Info("Using default config")
}
log.Printf("get config json data:%v", RtmpServercfg)
Init()
log.Debugf("get config json data:%v", RtmpServercfg)
}
func GetRedisAddr() *string {
@ -89,8 +93,8 @@ func GetRedisPwd() *string { @@ -89,8 +93,8 @@ func GetRedisPwd() *string {
func CheckAppName(appname string) bool {
for _, app := range RtmpServercfg.Server {
if (app.Appname == appname) && (app.Liveon == "on") {
return true
if app.Appname == appname {
return app.Live
}
}
return false
@ -98,7 +102,7 @@ func CheckAppName(appname string) bool { @@ -98,7 +102,7 @@ func CheckAppName(appname string) bool {
func GetStaticPushUrlList(appname string) ([]string, bool) {
for _, app := range RtmpServercfg.Server {
if (app.Appname == appname) && (app.Liveon == "on") {
if (app.Appname == appname) && app.Live {
if len(app.StaticPush) > 0 {
return app.StaticPush, true
} else {

5
container/flv/demuxer.go

@ -1,13 +1,12 @@ @@ -1,13 +1,12 @@
package flv
import (
"errors"
"fmt"
"livego/av"
)
var (
ErrAvcEndSEQ = errors.New("avc end sequence")
ErrAvcEndSEQ = fmt.Errorf("avc end sequence")
)
type Demuxer struct {

19
container/flv/muxer.go

@ -3,7 +3,6 @@ package flv @@ -3,7 +3,6 @@ package flv
import (
"flag"
"fmt"
"log"
"os"
"path"
"strings"
@ -13,6 +12,8 @@ import ( @@ -13,6 +12,8 @@ import (
"livego/protocol/amf"
"livego/utils/pio"
"livego/utils/uid"
log "github.com/sirupsen/logrus"
)
var (
@ -25,13 +26,13 @@ func NewFlv(handler av.Handler, info av.Info) { @@ -25,13 +26,13 @@ func NewFlv(handler av.Handler, info av.Info) {
patths := strings.SplitN(info.Key, "/", 2)
if len(patths) != 2 {
log.Println("invalid info")
log.Warning("invalid info")
return
}
w, err := os.OpenFile(*flvFile, os.O_CREATE|os.O_RDWR, 0755)
if err != nil {
log.Println("open file error: ", err)
log.Error("open file error: ", err)
}
writer := NewFLVWriter(patths[0], patths[1], info.URL, w)
@ -40,7 +41,7 @@ func NewFlv(handler av.Handler, info av.Info) { @@ -40,7 +41,7 @@ func NewFlv(handler av.Handler, info av.Info) {
writer.Wait()
// close flv file
log.Println("close flv file")
log.Debug("close flv file")
writer.ctx.Close()
}
*/
@ -147,25 +148,25 @@ type FlvDvr struct{} @@ -147,25 +148,25 @@ type FlvDvr struct{}
func (f *FlvDvr) GetWriter(info av.Info) av.WriteCloser {
paths := strings.SplitN(info.Key, "/", 2)
if len(paths) != 2 {
log.Println("invalid info")
log.Warning("invalid info")
return nil
}
err := os.MkdirAll(path.Join(*flvDir, paths[0]), 0755)
if err != nil {
log.Println("mkdir error:", err)
log.Error("mkdir error: ", err)
return nil
}
fileName := fmt.Sprintf("%s_%d.%s", path.Join(*flvDir, info.Key), time.Now().Unix(), "flv")
log.Println("flv dvr save stream to: ", fileName)
log.Debug("flv dvr save stream to: ", fileName)
w, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0755)
if err != nil {
log.Println("open file error: ", err)
log.Error("open file error: ", err)
return nil
}
writer := NewFLVWriter(paths[0], paths[1], info.URL, w)
log.Println("new flv dvr: ", writer.Info())
log.Debug("new flv dvr: ", writer.Info())
return writer
}

1
go.mod

@ -10,6 +10,7 @@ require ( @@ -10,6 +10,7 @@ require (
github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.5.0
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.4.0
github.com/urfave/negroni v1.0.0 // indirect

9
go.sum

@ -1,7 +1,8 @@ @@ -1,7 +1,8 @@
github.com/auth0/go-jwt-middleware v0.0.0-20190805220309-36081240882b h1:CvoEHGmxWl5kONC5icxwqV899dkf4VjOScbxLpllEnw=
github.com/auth0/go-jwt-middleware v0.0.0-20190805220309-36081240882b/go.mod h1:LWMyo4iOLWXHGdBki7NIht1kHru/0wM179h+d3g8ATM=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
@ -19,6 +20,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= @@ -19,6 +20,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -37,11 +40,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb @@ -37,11 +40,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q=
github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc=
@ -54,6 +60,7 @@ golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLL @@ -54,6 +60,7 @@ golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

14
livego.json

@ -1,9 +1,9 @@ @@ -1,9 +1,9 @@
{
"server": [
{
"appname": "livego",
"liveon": "on",
"hlson": "on"
}
]
"server": [
{
"appname": "livego",
"live": true,
"hls": true
}
]
}

73
main.go

@ -2,31 +2,31 @@ package main @@ -2,31 +2,31 @@ package main
import (
"flag"
"log"
"net"
"time"
"fmt"
"livego/configure"
"livego/protocol/api"
"livego/protocol/hls"
"livego/protocol/httpflv"
"livego/protocol/rtmp"
"net"
"path"
"runtime"
"time"
log "github.com/sirupsen/logrus"
)
var VERSION = "master"
var (
version = "master"
rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address")
httpFlvAddr = flag.String("httpflv-addr", ":7001", "HTTP-FLV server listen address")
hlsAddr = flag.String("hls-addr", ":7002", "HLS server listen address")
operaAddr = flag.String("manage-addr", ":8090", "HTTP manage interface server listen address")
configfilename = flag.String("config-file", "livego.json", "configure filename")
levelLog = flag.String("level", "info", "Log level")
)
func init() {
log.SetFlags(log.Lshortfile | log.Ltime | log.Ldate)
flag.Parse()
}
func startHls() *hls.Server {
hlsListen, err := net.Listen("tcp", *hlsAddr)
if err != nil {
@ -37,10 +37,10 @@ func startHls() *hls.Server { @@ -37,10 +37,10 @@ func startHls() *hls.Server {
go func() {
defer func() {
if r := recover(); r != nil {
log.Println("HLS server panic: ", r)
log.Error("HLS server panic: ", r)
}
}()
log.Println("HLS listen On", *hlsAddr)
log.Info("HLS listen On ", *hlsAddr)
hlsServer.Serve(hlsListen)
}()
return hlsServer
@ -56,18 +56,18 @@ func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) { @@ -56,18 +56,18 @@ func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) {
if hlsServer == nil {
rtmpServer = rtmp.NewRtmpServer(stream, nil)
log.Printf("hls server disable....")
log.Info("hls server disable....")
} else {
rtmpServer = rtmp.NewRtmpServer(stream, hlsServer)
log.Printf("hls server enable....")
log.Info("hls server enable....")
}
defer func() {
if r := recover(); r != nil {
log.Println("RTMP server panic: ", r)
log.Error("RTMP server panic: ", r)
}
}()
log.Println("RTMP Listen On", *rtmpAddr)
log.Info("RTMP Listen On ", *rtmpAddr)
rtmpServer.Serve(rtmpListen)
}
@ -81,10 +81,10 @@ func startHTTPFlv(stream *rtmp.RtmpStream) { @@ -81,10 +81,10 @@ func startHTTPFlv(stream *rtmp.RtmpStream) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Println("HTTP-FLV server panic: ", r)
log.Error("HTTP-FLV server panic: ", r)
}
}()
log.Println("HTTP-FLV listen On", *httpFlvAddr)
log.Info("HTTP-FLV listen On ", *httpFlvAddr)
hdlServer.Serve(flvListen)
}()
}
@ -99,23 +99,51 @@ func startAPI(stream *rtmp.RtmpStream) { @@ -99,23 +99,51 @@ func startAPI(stream *rtmp.RtmpStream) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Println("HTTP-API server panic: ", r)
log.Error("HTTP-API server panic: ", r)
}
}()
log.Println("HTTP-API listen On", *operaAddr)
log.Info("HTTP-API listen On ", *operaAddr)
opServer.Serve(opListen)
}()
}
}
func InitLog() {
flag.Parse()
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
filename := path.Base(f.File)
return fmt.Sprintf("%s()", f.Function), fmt.Sprintf(" %s:%d", filename, f.Line)
},
})
if l, err := log.ParseLevel(*levelLog); err == nil {
log.SetLevel(l)
log.SetReportCaller(l == log.DebugLevel)
}
}
func main() {
defer func() {
if r := recover(); r != nil {
log.Println("livego panic: ", r)
log.Error("livego panic: ", r)
time.Sleep(1 * time.Second)
}
}()
log.Println("start livego, version", version)
// Log options
InitLog()
log.Infof(`
_ _ ____
| | (_)_ _____ / ___| ___
| | | \ \ / / _ \ | _ / _ \
| |___| |\ V / __/ |_| | (_) |
|_____|_| \_/ \___|\____|\___/
version: %s
`, VERSION)
configure.LoadConfig(*configfilename)
stream := rtmp.NewRtmpStream()
@ -124,5 +152,4 @@ func main() { @@ -124,5 +152,4 @@ func main() {
startAPI(stream)
startRtmp(stream, hlsServer)
//startRtmp(stream, nil)
}

6
parser/aac/parser.go

@ -1,7 +1,7 @@ @@ -1,7 +1,7 @@
package aac
import (
"errors"
"fmt"
"io"
"livego/av"
@ -26,8 +26,8 @@ type mpegCfgInfo struct { @@ -26,8 +26,8 @@ type mpegCfgInfo struct {
var aacRates = []int{96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350}
var (
specificBufInvalid = errors.New("audio mpegspecific error")
audioBufInvalid = errors.New("audiodata invalid")
specificBufInvalid = fmt.Errorf("audio mpegspecific error")
audioBufInvalid = fmt.Errorf("audiodata invalid")
)
const (

20
parser/h264/parser.go

@ -2,7 +2,7 @@ package h264 @@ -2,7 +2,7 @@ package h264
import (
"bytes"
"errors"
"fmt"
"io"
)
@ -34,14 +34,14 @@ const ( @@ -34,14 +34,14 @@ const (
)
var (
decDataNil = errors.New("dec buf is nil")
spsDataError = errors.New("sps data error")
ppsHeaderError = errors.New("pps header error")
ppsDataError = errors.New("pps data error")
naluHeaderInvalid = errors.New("nalu header invalid")
videoDataInvalid = errors.New("video data not match")
dataSizeNotMatch = errors.New("data size not match")
naluBodyLenError = errors.New("nalu body len error")
decDataNil = fmt.Errorf("dec buf is nil")
spsDataError = fmt.Errorf("sps data error")
ppsHeaderError = fmt.Errorf("pps header error")
ppsDataError = fmt.Errorf("pps data error")
naluHeaderInvalid = fmt.Errorf("nalu header invalid")
videoDataInvalid = fmt.Errorf("video data not match")
dataSizeNotMatch = fmt.Errorf("data size not match")
naluBodyLenError = fmt.Errorf("nalu body len error")
)
var startCode = []byte{0x00, 0x00, 0x00, 0x01}
@ -132,7 +132,7 @@ func (parser *Parser) isNaluHeader(src []byte) bool { @@ -132,7 +132,7 @@ func (parser *Parser) isNaluHeader(src []byte) bool {
func (parser *Parser) naluSize(src []byte) (int, error) {
if len(src) < naluBytesLen {
return 0, errors.New("nalusizedata invalid")
return 0, fmt.Errorf("nalusizedata invalid")
}
buf := src[:naluBytesLen]
size := int(0)

4
parser/h264/parser_test.go

@ -2,7 +2,7 @@ package h264 @@ -2,7 +2,7 @@ package h264
import (
"bytes"
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
@ -47,7 +47,7 @@ func TestH264NalueSizeException(t *testing.T) { @@ -47,7 +47,7 @@ func TestH264NalueSizeException(t *testing.T) {
d := NewParser()
w := bytes.NewBuffer(nil)
err := d.Parse(nalu, false, w)
at.Equal(err, errors.New("video data not match"))
at.Equal(err, fmt.Errorf("video data not match"))
}
func TestH264Mp4Demux(t *testing.T) {

8
parser/mp3/parser.go

@ -1,6 +1,8 @@ @@ -1,6 +1,8 @@
package mp3
import "errors"
import (
"fmt"
)
type Parser struct {
samplingFrequency int
@ -17,8 +19,8 @@ func NewParser() *Parser { @@ -17,8 +19,8 @@ func NewParser() *Parser {
// '11' reserved
var mp3Rates = []int{44100, 48000, 32000}
var (
errMp3DataInvalid = errors.New("mp3data invalid")
errIndexInvalid = errors.New("invalid rate index")
errMp3DataInvalid = fmt.Errorf("mp3data invalid")
errIndexInvalid = fmt.Errorf("invalid rate index")
)
func (parser *Parser) Parse(src []byte) error {

4
parser/parser.go

@ -1,7 +1,7 @@ @@ -1,7 +1,7 @@
package parser
import (
"errors"
"fmt"
"io"
"livego/av"
@ -11,7 +11,7 @@ import ( @@ -11,7 +11,7 @@ import (
)
var (
errNoAudio = errors.New("demuxer no audio")
errNoAudio = fmt.Errorf("demuxer no audio")
)
type CodecParser struct {

5
protocol/amf/amf.go

@ -1,7 +1,6 @@ @@ -1,7 +1,6 @@
package amf
import (
"errors"
"fmt"
"io"
)
@ -26,7 +25,7 @@ func (d *Decoder) Decode(r io.Reader, ver Version) (interface{}, error) { @@ -26,7 +25,7 @@ func (d *Decoder) Decode(r io.Reader, ver Version) (interface{}, error) {
return d.DecodeAmf3(r)
}
return nil, errors.New(fmt.Sprintf("decode amf: unsupported version %d", ver))
return nil, fmt.Errorf("decode amf: unsupported version %d", ver)
}
func (e *Encoder) EncodeBatch(w io.Writer, ver Version, val ...interface{}) (int, error) {
@ -46,5 +45,5 @@ func (e *Encoder) Encode(w io.Writer, val interface{}, ver Version) (int, error) @@ -46,5 +45,5 @@ func (e *Encoder) Encode(w io.Writer, val interface{}, ver Version) (int, error)
return e.EncodeAmf3(w, val)
}
return 0, Error("encode amf: unsupported version %d", ver)
return 0, fmt.Errorf("encode amf: unsupported version %d", ver)
}

9
protocol/amf/amf_test.go

@ -2,7 +2,6 @@ package amf @@ -2,7 +2,6 @@ package amf
import (
"bytes"
"errors"
"fmt"
"reflect"
"testing"
@ -17,12 +16,12 @@ func EncodeAndDecode(val interface{}, ver Version) (result interface{}, err erro @@ -17,12 +16,12 @@ func EncodeAndDecode(val interface{}, ver Version) (result interface{}, err erro
_, err = enc.Encode(buf, val, ver)
if err != nil {
return nil, errors.New(fmt.Sprintf("error in encode: %s", err))
return nil, fmt.Errorf("error in encode: %s", err)
}
result, err = dec.Decode(buf, ver)
if err != nil {
return nil, errors.New(fmt.Sprintf("error in decode: %s", err))
return nil, fmt.Errorf("error in decode: %s", err)
}
return
@ -108,7 +107,7 @@ func TestAmf0Array(t *testing.T) { @@ -108,7 +107,7 @@ func TestAmf0Array(t *testing.T) {
res, err := EncodeAndDecode(arr, 0)
if err != nil {
t.Error("amf0 object: %s", err)
t.Errorf("amf0 object: %s", err)
}
result, ok := res.(Array)
@ -170,7 +169,7 @@ func TestAmf3Array(t *testing.T) { @@ -170,7 +169,7 @@ func TestAmf3Array(t *testing.T) {
res, err := EncodeAndDecode(arr, 3)
if err != nil {
t.Error("amf3 object: %s", err)
t.Errorf("amf3 object: %s", err)
}
result, ok := res.(Array)

43
protocol/amf/decoder_amf0.go

@ -2,6 +2,7 @@ package amf @@ -2,6 +2,7 @@ package amf
import (
"encoding/binary"
"fmt"
"io"
)
@ -22,13 +23,13 @@ func (d *Decoder) DecodeAmf0(r io.Reader) (interface{}, error) { @@ -22,13 +23,13 @@ func (d *Decoder) DecodeAmf0(r io.Reader) (interface{}, error) {
case AMF0_OBJECT_MARKER:
return d.DecodeAmf0Object(r, false)
case AMF0_MOVIECLIP_MARKER:
return nil, Error("decode amf0: unsupported type movieclip")
return nil, fmt.Errorf("decode amf0: unsupported type movieclip")
case AMF0_NULL_MARKER:
return d.DecodeAmf0Null(r, false)
case AMF0_UNDEFINED_MARKER:
return d.DecodeAmf0Undefined(r, false)
case AMF0_REFERENCE_MARKER:
return nil, Error("decode amf0: unsupported type reference")
return nil, fmt.Errorf("decode amf0: unsupported type reference")
case AMF0_ECMA_ARRAY_MARKER:
return d.DecodeAmf0EcmaArray(r, false)
case AMF0_STRICT_ARRAY_MARKER:
@ -40,7 +41,7 @@ func (d *Decoder) DecodeAmf0(r io.Reader) (interface{}, error) { @@ -40,7 +41,7 @@ func (d *Decoder) DecodeAmf0(r io.Reader) (interface{}, error) {
case AMF0_UNSUPPORTED_MARKER:
return d.DecodeAmf0Unsupported(r, false)
case AMF0_RECORDSET_MARKER:
return nil, Error("decode amf0: unsupported type recordset")
return nil, fmt.Errorf("decode amf0: unsupported type recordset")
case AMF0_XML_DOCUMENT_MARKER:
return d.DecodeAmf0XmlDocument(r, false)
case AMF0_TYPED_OBJECT_MARKER:
@ -49,7 +50,7 @@ func (d *Decoder) DecodeAmf0(r io.Reader) (interface{}, error) { @@ -49,7 +50,7 @@ func (d *Decoder) DecodeAmf0(r io.Reader) (interface{}, error) {
return d.DecodeAmf3(r)
}
return nil, Error("decode amf0: unsupported type %d", marker)
return nil, fmt.Errorf("decode amf0: unsupported type %d", marker)
}
// marker: 1 byte 0x00
@ -61,7 +62,7 @@ func (d *Decoder) DecodeAmf0Number(r io.Reader, decodeMarker bool) (result float @@ -61,7 +62,7 @@ func (d *Decoder) DecodeAmf0Number(r io.Reader, decodeMarker bool) (result float
err = binary.Read(r, binary.BigEndian, &result)
if err != nil {
return float64(0), Error("amf0 decode: unable to read number: %s", err)
return float64(0), fmt.Errorf("amf0 decode: unable to read number: %s", err)
}
return
@ -85,7 +86,7 @@ func (d *Decoder) DecodeAmf0Boolean(r io.Reader, decodeMarker bool) (result bool @@ -85,7 +86,7 @@ func (d *Decoder) DecodeAmf0Boolean(r io.Reader, decodeMarker bool) (result bool
return true, nil
}
return false, Error("decode amf0: unexpected value %v for boolean", b)
return false, fmt.Errorf("decode amf0: unexpected value %v for boolean", b)
}
// marker: 1 byte 0x02
@ -100,12 +101,12 @@ func (d *Decoder) DecodeAmf0String(r io.Reader, decodeMarker bool) (result strin @@ -100,12 +101,12 @@ func (d *Decoder) DecodeAmf0String(r io.Reader, decodeMarker bool) (result strin
var length uint16
err = binary.Read(r, binary.BigEndian, &length)
if err != nil {
return "", Error("decode amf0: unable to decode string length: %s", err)
return "", fmt.Errorf("decode amf0: unable to decode string length: %s", err)
}
var bytes = make([]byte, length)
if bytes, err = ReadBytes(r, int(length)); err != nil {
return "", Error("decode amf0: unable to decode string value: %s", err)
return "", fmt.Errorf("decode amf0: unable to decode string value: %s", err)
}
return string(bytes), nil
@ -131,7 +132,7 @@ func (d *Decoder) DecodeAmf0Object(r io.Reader, decodeMarker bool) (Object, erro @@ -131,7 +132,7 @@ func (d *Decoder) DecodeAmf0Object(r io.Reader, decodeMarker bool) (Object, erro
if key == "" {
if err = AssertMarker(r, true, AMF0_OBJECT_END_MARKER); err != nil {
return nil, Error("decode amf0: expected object end marker: %s", err)
return nil, fmt.Errorf("decode amf0: expected object end marker: %s", err)
}
break
@ -139,7 +140,7 @@ func (d *Decoder) DecodeAmf0Object(r io.Reader, decodeMarker bool) (Object, erro @@ -139,7 +140,7 @@ func (d *Decoder) DecodeAmf0Object(r io.Reader, decodeMarker bool) (Object, erro
value, err := d.DecodeAmf0(r)
if err != nil {
return nil, Error("decode amf0: unable to decode object value: %s", err)
return nil, fmt.Errorf("decode amf0: unable to decode object value: %s", err)
}
result[key] = value
@ -176,11 +177,11 @@ func (d *Decoder) DecodeAmf0Reference(r io.Reader, decodeMarker bool) (interface @@ -176,11 +177,11 @@ func (d *Decoder) DecodeAmf0Reference(r io.Reader, decodeMarker bool) (interface
err = binary.Read(r, binary.BigEndian, &ref)
if err != nil {
return nil, Error("decode amf0: unable to decode reference id: %s", err)
return nil, fmt.Errorf("decode amf0: unable to decode reference id: %s", err)
}
if int(ref) > len(d.refCache) {
return nil, Error("decode amf0: bad reference %d (current length %d)", ref, len(d.refCache))
return nil, fmt.Errorf("decode amf0: bad reference %d (current length %d)", ref, len(d.refCache))
}
result := d.refCache[ref]
@ -205,7 +206,7 @@ func (d *Decoder) DecodeAmf0EcmaArray(r io.Reader, decodeMarker bool) (Object, e @@ -205,7 +206,7 @@ func (d *Decoder) DecodeAmf0EcmaArray(r io.Reader, decodeMarker bool) (Object, e
result, err := d.DecodeAmf0Object(r, false)
if err != nil {
return nil, Error("decode amf0: unable to decode ecma array object: %s", err)
return nil, fmt.Errorf("decode amf0: unable to decode ecma array object: %s", err)
}
return result, nil
@ -223,7 +224,7 @@ func (d *Decoder) DecodeAmf0StrictArray(r io.Reader, decodeMarker bool) (result @@ -223,7 +224,7 @@ func (d *Decoder) DecodeAmf0StrictArray(r io.Reader, decodeMarker bool) (result
var length uint32
err = binary.Read(r, binary.BigEndian, &length)
if err != nil {
return nil, Error("decode amf0: unable to decode strict array length: %s", err)
return nil, fmt.Errorf("decode amf0: unable to decode strict array length: %s", err)
}
d.refCache = append(d.refCache, result)
@ -231,7 +232,7 @@ func (d *Decoder) DecodeAmf0StrictArray(r io.Reader, decodeMarker bool) (result @@ -231,7 +232,7 @@ func (d *Decoder) DecodeAmf0StrictArray(r io.Reader, decodeMarker bool) (result
for i := uint32(0); i < length; i++ {
tmp, err := d.DecodeAmf0(r)
if err != nil {
return nil, Error("decode amf0: unable to decode strict array object: %s", err)
return nil, fmt.Errorf("decode amf0: unable to decode strict array object: %s", err)
}
result = append(result, tmp)
}
@ -250,11 +251,11 @@ func (d *Decoder) DecodeAmf0Date(r io.Reader, decodeMarker bool) (result float64 @@ -250,11 +251,11 @@ func (d *Decoder) DecodeAmf0Date(r io.Reader, decodeMarker bool) (result float64
}
if result, err = d.DecodeAmf0Number(r, false); err != nil {
return float64(0), Error("decode amf0: unable to decode float in date: %s", err)
return float64(0), fmt.Errorf("decode amf0: unable to decode float in date: %s", err)
}
if _, err = ReadBytes(r, 2); err != nil {
return float64(0), Error("decode amf0: unable to read 2 trail bytes in date: %s", err)
return float64(0), fmt.Errorf("decode amf0: unable to read 2 trail bytes in date: %s", err)
}
return
@ -272,12 +273,12 @@ func (d *Decoder) DecodeAmf0LongString(r io.Reader, decodeMarker bool) (result s @@ -272,12 +273,12 @@ func (d *Decoder) DecodeAmf0LongString(r io.Reader, decodeMarker bool) (result s
var length uint32
err = binary.Read(r, binary.BigEndian, &length)
if err != nil {
return "", Error("decode amf0: unable to decode long string length: %s", err)
return "", fmt.Errorf("decode amf0: unable to decode long string length: %s", err)
}
var bytes = make([]byte, length)
if bytes, err = ReadBytes(r, int(length)); err != nil {
return "", Error("decode amf0: unable to decode long string value: %s", err)
return "", fmt.Errorf("decode amf0: unable to decode long string value: %s", err)
}
return string(bytes), nil
@ -323,12 +324,12 @@ func (d *Decoder) DecodeAmf0TypedObject(r io.Reader, decodeMarker bool) (TypedOb @@ -323,12 +324,12 @@ func (d *Decoder) DecodeAmf0TypedObject(r io.Reader, decodeMarker bool) (TypedOb
result.Type, err = d.DecodeAmf0String(r, false)
if err != nil {
return result, Error("decode amf0: typed object unable to determine type: %s", err)
return result, fmt.Errorf("decode amf0: typed object unable to determine type: %s", err)
}
result.Object, err = d.DecodeAmf0Object(r, false)
if err != nil {
return result, Error("decode amf0: typed object unable to determine object: %s", err)
return result, fmt.Errorf("decode amf0: typed object unable to determine object: %s", err)
}
return result, nil

63
protocol/amf/decoder_amf3.go

@ -2,6 +2,7 @@ package amf @@ -2,6 +2,7 @@ package amf
import (
"encoding/binary"
"fmt"
"io"
"time"
)
@ -42,7 +43,7 @@ func (d *Decoder) DecodeAmf3(r io.Reader) (interface{}, error) { @@ -42,7 +43,7 @@ func (d *Decoder) DecodeAmf3(r io.Reader) (interface{}, error) {
return d.DecodeAmf3ByteArray(r, false)
}
return nil, Error("decode amf3: unsupported type %d", marker)
return nil, fmt.Errorf("decode amf3: unsupported type %d", marker)
}
// marker: 1 byte 0x00
@ -103,7 +104,7 @@ func (d *Decoder) DecodeAmf3Double(r io.Reader, decodeMarker bool) (result float @@ -103,7 +104,7 @@ func (d *Decoder) DecodeAmf3Double(r io.Reader, decodeMarker bool) (result float
err = binary.Read(r, binary.BigEndian, &result)
if err != nil {
return float64(0), Error("amf3 decode: unable to read double: %s", err)
return float64(0), fmt.Errorf("amf3 decode: unable to read double: %s", err)
}
return
@ -122,7 +123,7 @@ func (d *Decoder) DecodeAmf3String(r io.Reader, decodeMarker bool) (result strin @@ -122,7 +123,7 @@ func (d *Decoder) DecodeAmf3String(r io.Reader, decodeMarker bool) (result strin
var refVal uint32
isRef, refVal, err = d.decodeReferenceInt(r)
if err != nil {
return "", Error("amf3 decode: unable to decode string reference and length: %s", err)
return "", fmt.Errorf("amf3 decode: unable to decode string reference and length: %s", err)
}
if isRef {
@ -133,7 +134,7 @@ func (d *Decoder) DecodeAmf3String(r io.Reader, decodeMarker bool) (result strin @@ -133,7 +134,7 @@ func (d *Decoder) DecodeAmf3String(r io.Reader, decodeMarker bool) (result strin
buf := make([]byte, refVal)
_, err = r.Read(buf)
if err != nil {
return "", Error("amf3 decode: unable to read string: %s", err)
return "", fmt.Errorf("amf3 decode: unable to read string: %s", err)
}
result = string(buf)
@ -157,13 +158,13 @@ func (d *Decoder) DecodeAmf3Date(r io.Reader, decodeMarker bool) (result time.Ti @@ -157,13 +158,13 @@ func (d *Decoder) DecodeAmf3Date(r io.Reader, decodeMarker bool) (result time.Ti
var refVal uint32
isRef, refVal, err = d.decodeReferenceInt(r)
if err != nil {
return result, Error("amf3 decode: unable to decode date reference and length: %s", err)
return result, fmt.Errorf("amf3 decode: unable to decode date reference and length: %s", err)
}
if isRef {
res, ok := d.objectRefs[refVal].(time.Time)
if ok != true {
return result, Error("amf3 decode: unable to extract time from date object references")
return result, fmt.Errorf("amf3 decode: unable to extract time from date object references")
}
return res, err
@ -172,7 +173,7 @@ func (d *Decoder) DecodeAmf3Date(r io.Reader, decodeMarker bool) (result time.Ti @@ -172,7 +173,7 @@ func (d *Decoder) DecodeAmf3Date(r io.Reader, decodeMarker bool) (result time.Ti
var u64 float64
err = binary.Read(r, binary.BigEndian, &u64)
if err != nil {
return result, Error("amf3 decode: unable to read double: %s", err)
return result, fmt.Errorf("amf3 decode: unable to read double: %s", err)
}
result = time.Unix(int64(u64/1000), 0).UTC()
@ -196,7 +197,7 @@ func (d *Decoder) DecodeAmf3Array(r io.Reader, decodeMarker bool) (result Array, @@ -196,7 +197,7 @@ func (d *Decoder) DecodeAmf3Array(r io.Reader, decodeMarker bool) (result Array,
var refVal uint32
isRef, refVal, err = d.decodeReferenceInt(r)
if err != nil {
return result, Error("amf3 decode: unable to decode array reference and length: %s", err)
return result, fmt.Errorf("amf3 decode: unable to decode array reference and length: %s", err)
}
if isRef {
@ -204,7 +205,7 @@ func (d *Decoder) DecodeAmf3Array(r io.Reader, decodeMarker bool) (result Array, @@ -204,7 +205,7 @@ func (d *Decoder) DecodeAmf3Array(r io.Reader, decodeMarker bool) (result Array,
res, ok := d.objectRefs[objRefId].(Array)
if ok != true {
return result, Error("amf3 decode: unable to extract array from object references")
return result, fmt.Errorf("amf3 decode: unable to extract array from object references")
}
return res, err
@ -213,17 +214,17 @@ func (d *Decoder) DecodeAmf3Array(r io.Reader, decodeMarker bool) (result Array, @@ -213,17 +214,17 @@ func (d *Decoder) DecodeAmf3Array(r io.Reader, decodeMarker bool) (result Array,
var key string
key, err = d.DecodeAmf3String(r, false)
if err != nil {
return result, Error("amf3 decode: unable to read key for array: %s", err)
return result, fmt.Errorf("amf3 decode: unable to read key for array: %s", err)
}
if key != "" {
return result, Error("amf3 decode: array key is not empty, can't handle associative array")
return result, fmt.Errorf("amf3 decode: array key is not empty, can't handle associative array")
}
for i := uint32(0); i < refVal; i++ {
tmp, err := d.DecodeAmf3(r)
if err != nil {
return result, Error("amf3 decode: array element could not be decoded: %s", err)
return result, fmt.Errorf("amf3 decode: array element could not be decoded: %s", err)
}
result = append(result, tmp)
}
@ -243,7 +244,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter @@ -243,7 +244,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter
// decode the initial u29
isRef, refVal, err := d.decodeReferenceInt(r)
if err != nil {
return nil, Error("amf3 decode: unable to decode object reference and length: %s", err)
return nil, fmt.Errorf("amf3 decode: unable to decode object reference and length: %s", err)
}
// if this is a object reference only, grab it and return it
@ -272,7 +273,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter @@ -272,7 +273,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter
var cls string
cls, err = d.DecodeAmf3String(r, false)
if err != nil {
return result, Error("amf3 decode: unable to read trait type for object: %s", err)
return result, fmt.Errorf("amf3 decode: unable to read trait type for object: %s", err)
}
trait.Type = cls
@ -281,7 +282,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter @@ -281,7 +282,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter
for i := uint32(0); i < propLength; i++ {
tmp, err := d.DecodeAmf3String(r, false)
if err != nil {
return result, Error("amf3 decode: unable to read trait property for object: %s", err)
return result, fmt.Errorf("amf3 decode: unable to read trait property for object: %s", err)
}
trait.Properties = append(trait.Properties, tmp)
}
@ -299,17 +300,17 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter @@ -299,17 +300,17 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter
case "DSA": // AsyncMessageExt
result, err = d.decodeAsyncMessageExt(r)
if err != nil {
return result, Error("amf3 decode: unable to decode dsa: %s", err)
return result, fmt.Errorf("amf3 decode: unable to decode dsa: %s", err)
}
case "DSK": // AcknowledgeMessageExt
result, err = d.decodeAcknowledgeMessageExt(r)
if err != nil {
return result, Error("amf3 decode: unable to decode dsk: %s", err)
return result, fmt.Errorf("amf3 decode: unable to decode dsk: %s", err)
}
case "flex.messaging.io.ArrayCollection":
result, err = d.decodeArrayCollection(r)
if err != nil {
return result, Error("amf3 decode: unable to decode ac: %s", err)
return result, fmt.Errorf("amf3 decode: unable to decode ac: %s", err)
}
// store an extra reference to array collection container
@ -320,10 +321,10 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter @@ -320,10 +321,10 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter
if ok {
result, err = fn(d, r)
if err != nil {
return result, Error("amf3 decode: unable to call external decoder for type %s: %s", trait.Type, err)
return result, fmt.Errorf("amf3 decode: unable to call external decoder for type %s: %s", trait.Type, err)
}
} else {
return result, Error("amf3 decode: unable to decode external type %s, no handler", trait.Type)
return result, fmt.Errorf("amf3 decode: unable to decode external type %s, no handler", trait.Type)
}
}
@ -341,7 +342,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter @@ -341,7 +342,7 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter
for _, key = range trait.Properties {
val, err = d.DecodeAmf3(r)
if err != nil {
return result, Error("amf3 decode: unable to decode object property: %s", err)
return result, fmt.Errorf("amf3 decode: unable to decode object property: %s", err)
}
obj[key] = val
@ -353,14 +354,14 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter @@ -353,14 +354,14 @@ func (d *Decoder) DecodeAmf3Object(r io.Reader, decodeMarker bool) (result inter
for {
key, err = d.DecodeAmf3String(r, false)
if err != nil {
return result, Error("amf3 decode: unable to decode dynamic key: %s", err)
return result, fmt.Errorf("amf3 decode: unable to decode dynamic key: %s", err)
}
if key == "" {
break
}
val, err = d.DecodeAmf3(r)
if err != nil {
return result, Error("amf3 decode: unable to decode dynamic value: %s", err)
return result, fmt.Errorf("amf3 decode: unable to decode dynamic value: %s", err)
}
obj[key] = val
@ -385,7 +386,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string, @@ -385,7 +386,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string,
}
if (marker != AMF3_XMLDOC_MARKER) && (marker != AMF3_XMLSTRING_MARKER) {
return "", Error("decode assert marker failed: expected %v or %v, got %v", AMF3_XMLDOC_MARKER, AMF3_XMLSTRING_MARKER, marker)
return "", fmt.Errorf("decode assert marker failed: expected %v or %v, got %v", AMF3_XMLDOC_MARKER, AMF3_XMLSTRING_MARKER, marker)
}
}
@ -393,7 +394,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string, @@ -393,7 +394,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string,
var refVal uint32
isRef, refVal, err = d.decodeReferenceInt(r)
if err != nil {
return "", Error("amf3 decode: unable to decode xml reference and length: %s", err)
return "", fmt.Errorf("amf3 decode: unable to decode xml reference and length: %s", err)
}
if isRef {
@ -401,7 +402,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string, @@ -401,7 +402,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string,
buf := d.objectRefs[refVal]
result, ok = buf.(string)
if ok != true {
return "", Error("amf3 decode: cannot coerce object reference into xml string")
return "", fmt.Errorf("amf3 decode: cannot coerce object reference into xml string")
}
return
@ -410,7 +411,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string, @@ -410,7 +411,7 @@ func (d *Decoder) DecodeAmf3Xml(r io.Reader, decodeMarker bool) (result string,
buf := make([]byte, refVal)
_, err = r.Read(buf)
if err != nil {
return "", Error("amf3 decode: unable to read xml string: %s", err)
return "", fmt.Errorf("amf3 decode: unable to read xml string: %s", err)
}
result = string(buf)
@ -435,14 +436,14 @@ func (d *Decoder) DecodeAmf3ByteArray(r io.Reader, decodeMarker bool) (result [] @@ -435,14 +436,14 @@ func (d *Decoder) DecodeAmf3ByteArray(r io.Reader, decodeMarker bool) (result []
var refVal uint32
isRef, refVal, err = d.decodeReferenceInt(r)
if err != nil {
return result, Error("amf3 decode: unable to decode byte array reference and length: %s", err)
return result, fmt.Errorf("amf3 decode: unable to decode byte array reference and length: %s", err)
}
if isRef {
var ok bool
result, ok = d.objectRefs[refVal].([]byte)
if ok != true {
return result, Error("amf3 decode: unable to convert object ref to bytes")
return result, fmt.Errorf("amf3 decode: unable to convert object ref to bytes")
}
return
@ -451,7 +452,7 @@ func (d *Decoder) DecodeAmf3ByteArray(r io.Reader, decodeMarker bool) (result [] @@ -451,7 +452,7 @@ func (d *Decoder) DecodeAmf3ByteArray(r io.Reader, decodeMarker bool) (result []
result = make([]byte, refVal)
_, err = r.Read(result)
if err != nil {
return result, Error("amf3 decode: unable to read bytearray: %s", err)
return result, fmt.Errorf("amf3 decode: unable to read bytearray: %s", err)
}
d.objectRefs = append(d.objectRefs, result)
@ -486,7 +487,7 @@ func (d *Decoder) decodeU29(r io.Reader) (result uint32, err error) { @@ -486,7 +487,7 @@ func (d *Decoder) decodeU29(r io.Reader) (result uint32, err error) {
func (d *Decoder) decodeReferenceInt(r io.Reader) (isRef bool, refVal uint32, err error) {
u29, err := d.decodeU29(r)
if err != nil {
return false, 0, Error("amf3 decode: unable to decode reference int: %s", err)
return false, 0, fmt.Errorf("amf3 decode: unable to decode reference int: %s", err)
}
isRef = u29&0x01 == 0

20
protocol/amf/decoder_amf3_external.go

@ -13,7 +13,7 @@ func (d *Decoder) decodeAbstractMessage(r io.Reader) (result Object, err error) @@ -13,7 +13,7 @@ func (d *Decoder) decodeAbstractMessage(r io.Reader) (result Object, err error)
if err = d.decodeExternal(r, &result,
[]string{"body", "clientId", "destination", "headers", "messageId", "timeStamp", "timeToLive"},
[]string{"clientIdBytes", "messageIdBytes"}); err != nil {
return result, Error("unable to decode abstract external: %s", err)
return result, fmt.Errorf("unable to decode abstract external: %s", err)
}
return
@ -26,11 +26,11 @@ func (d *Decoder) decodeAsyncMessageExt(r io.Reader) (result Object, err error) @@ -26,11 +26,11 @@ func (d *Decoder) decodeAsyncMessageExt(r io.Reader) (result Object, err error)
func (d *Decoder) decodeAsyncMessage(r io.Reader) (result Object, err error) {
result, err = d.decodeAbstractMessage(r)
if err != nil {
return result, Error("unable to decode abstract for async: %s", err)
return result, fmt.Errorf("unable to decode abstract for async: %s", err)
}
if err = d.decodeExternal(r, &result, []string{"correlationId", "correlationIdBytes"}); err != nil {
return result, Error("unable to decode async external: %s", err)
return result, fmt.Errorf("unable to decode async external: %s", err)
}
return
@ -43,11 +43,11 @@ func (d *Decoder) decodeAcknowledgeMessageExt(r io.Reader) (result Object, err e @@ -43,11 +43,11 @@ func (d *Decoder) decodeAcknowledgeMessageExt(r io.Reader) (result Object, err e
func (d *Decoder) decodeAcknowledgeMessage(r io.Reader) (result Object, err error) {
result, err = d.decodeAsyncMessage(r)
if err != nil {
return result, Error("unable to decode async for ack: %s", err)
return result, fmt.Errorf("unable to decode async for ack: %s", err)
}
if err = d.decodeExternal(r, &result); err != nil {
return result, Error("unable to decode ack external: %s", err)
return result, fmt.Errorf("unable to decode ack external: %s", err)
}
return
@ -57,7 +57,7 @@ func (d *Decoder) decodeAcknowledgeMessage(r io.Reader) (result Object, err erro @@ -57,7 +57,7 @@ func (d *Decoder) decodeAcknowledgeMessage(r io.Reader) (result Object, err erro
func (d *Decoder) decodeArrayCollection(r io.Reader) (interface{}, error) {
result, err := d.DecodeAmf3(r)
if err != nil {
return result, Error("cannot decode child of array collection: %s", err)
return result, fmt.Errorf("cannot decode child of array collection: %s", err)
}
return result, nil
@ -70,7 +70,7 @@ func (d *Decoder) decodeExternal(r io.Reader, obj *Object, fieldSets ...[]string @@ -70,7 +70,7 @@ func (d *Decoder) decodeExternal(r io.Reader, obj *Object, fieldSets ...[]string
flagSet, err = readFlags(r)
if err != nil {
return Error("unable to read flags: %s", err)
return fmt.Errorf("unable to read flags: %s", err)
}
for i, flags := range flagSet {
@ -87,7 +87,7 @@ func (d *Decoder) decodeExternal(r io.Reader, obj *Object, fieldSets ...[]string @@ -87,7 +87,7 @@ func (d *Decoder) decodeExternal(r io.Reader, obj *Object, fieldSets ...[]string
if (flags & flagBit) != 0 {
tmp, err := d.DecodeAmf3(r)
if err != nil {
return Error("unable to decode external field %s %d %d (%#v): %s", field, i, p, flagSet, err)
return fmt.Errorf("unable to decode external field %s %d %d (%#v): %s", field, i, p, flagSet, err)
}
(*obj)[field] = tmp
}
@ -99,7 +99,7 @@ func (d *Decoder) decodeExternal(r io.Reader, obj *Object, fieldSets ...[]string @@ -99,7 +99,7 @@ func (d *Decoder) decodeExternal(r io.Reader, obj *Object, fieldSets ...[]string
field := fmt.Sprintf("extra_%d_%d", i, j)
tmp, err := d.DecodeAmf3(r)
if err != nil {
return Error("unable to decode post-external field %d %d (%#v): %s", i, j, flagSet, err)
return fmt.Errorf("unable to decode post-external field %d %d (%#v): %s", i, j, flagSet, err)
}
(*obj)[field] = tmp
}
@ -114,7 +114,7 @@ func readFlags(r io.Reader) (result []uint8, err error) { @@ -114,7 +114,7 @@ func readFlags(r io.Reader) (result []uint8, err error) {
for {
flag, err := ReadByte(r)
if err != nil {
return result, Error("unable to read flags: %s", err)
return result, fmt.Errorf("unable to read flags: %s", err)
}
result = append(result, flag)

6
protocol/amf/decoder_amf3_test.go

@ -186,7 +186,7 @@ func TestDecodeAmf3Array(t *testing.T) { @@ -186,7 +186,7 @@ func TestDecodeAmf3Array(t *testing.T) {
for i, v := range expect {
if got[i] != v {
t.Error("expected array element %d to be %v, got %v", i, v, got[i])
t.Errorf("expected array element %d to be %v, got %v", i, v, got[i])
}
}
}
@ -211,10 +211,10 @@ func TestDecodeAmf3Object(t *testing.T) { @@ -211,10 +211,10 @@ func TestDecodeAmf3Object(t *testing.T) {
}
if to["foo"] != "bar" {
t.Error("expected foo to be bar, got: %+v", to["foo"])
t.Errorf("expected foo to be bar, got: %+v", to["foo"])
}
if to["baz"] != nil {
t.Error("expected baz to be nil, got: %+v", to["baz"])
t.Errorf("expected baz to be nil, got: %+v", to["baz"])
}
}

31
protocol/amf/encoder_amf0.go

@ -2,6 +2,7 @@ package amf @@ -2,6 +2,7 @@ package amf
import (
"encoding/binary"
"fmt"
"io"
"reflect"
)
@ -43,16 +44,16 @@ func (e *Encoder) EncodeAmf0(w io.Writer, val interface{}) (int, error) { @@ -43,16 +44,16 @@ func (e *Encoder) EncodeAmf0(w io.Writer, val interface{}) (int, error) {
case reflect.Map:
obj, ok := val.(Object)
if ok != true {
return 0, Error("encode amf0: unable to create object from map")
return 0, fmt.Errorf("encode amf0: unable to create object from map")
}
return e.EncodeAmf0Object(w, obj, true)
}
if _, ok := val.(TypedObject); ok {
return 0, Error("encode amf0: unsupported type typed object")
return 0, fmt.Errorf("encode amf0: unsupported type typed object")
}
return 0, Error("encode amf0: unsupported type %s", v.Type())
return 0, fmt.Errorf("encode amf0: unsupported type %s", v.Type())
}
// marker: 1 byte 0x00
@ -117,13 +118,13 @@ func (e *Encoder) EncodeAmf0String(w io.Writer, val string, encodeMarker bool) ( @@ -117,13 +118,13 @@ func (e *Encoder) EncodeAmf0String(w io.Writer, val string, encodeMarker bool) (
length := uint16(len(val))
err = binary.Write(w, binary.BigEndian, length)
if err != nil {
return n, Error("encode amf0: unable to encode string length: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode string length: %s", err)
}
n += 2
m, err = w.Write([]byte(val))
if err != nil {
return n, Error("encode amf0: unable to encode string value: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode string value: %s", err)
}
n += m
@ -146,26 +147,26 @@ func (e *Encoder) EncodeAmf0Object(w io.Writer, val Object, encodeMarker bool) ( @@ -146,26 +147,26 @@ func (e *Encoder) EncodeAmf0Object(w io.Writer, val Object, encodeMarker bool) (
for k, v := range val {
m, err = e.EncodeAmf0String(w, k, false)
if err != nil {
return n, Error("encode amf0: unable to encode object key: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode object key: %s", err)
}
n += m
m, err = e.EncodeAmf0(w, v)
if err != nil {
return n, Error("encode amf0: unable to encode object value: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode object value: %s", err)
}
n += m
}
m, err = e.EncodeAmf0String(w, "", false)
if err != nil {
return n, Error("encode amf0: unable to encode object empty string: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode object empty string: %s", err)
}
n += m
err = WriteMarker(w, AMF0_OBJECT_END_MARKER)
if err != nil {
return n, Error("encode amf0: unable to object end marker: %s", err)
return n, fmt.Errorf("encode amf0: unable to object end marker: %s", err)
}
n += 1
@ -216,13 +217,13 @@ func (e *Encoder) EncodeAmf0EcmaArray(w io.Writer, val Object, encodeMarker bool @@ -216,13 +217,13 @@ func (e *Encoder) EncodeAmf0EcmaArray(w io.Writer, val Object, encodeMarker bool
length := uint32(len(val))
err = binary.Write(w, binary.BigEndian, length)
if err != nil {
return n, Error("encode amf0: unable to encode ecma array length: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode ecma array length: %s", err)
}
n += 4
m, err = e.EncodeAmf0Object(w, val, false)
if err != nil {
return n, Error("encode amf0: unable to encode ecma array object: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode ecma array object: %s", err)
}
n += m
@ -245,14 +246,14 @@ func (e *Encoder) EncodeAmf0StrictArray(w io.Writer, val Array, encodeMarker boo @@ -245,14 +246,14 @@ func (e *Encoder) EncodeAmf0StrictArray(w io.Writer, val Array, encodeMarker boo
length := uint32(len(val))
err = binary.Write(w, binary.BigEndian, length)
if err != nil {
return n, Error("encode amf0: unable to encode strict array length: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode strict array length: %s", err)
}
n += 4
for _, v := range val {
m, err = e.EncodeAmf0(w, v)
if err != nil {
return n, Error("encode amf0: unable to encode strict array element: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode strict array element: %s", err)
}
n += m
}
@ -276,13 +277,13 @@ func (e *Encoder) EncodeAmf0LongString(w io.Writer, val string, encodeMarker boo @@ -276,13 +277,13 @@ func (e *Encoder) EncodeAmf0LongString(w io.Writer, val string, encodeMarker boo
length := uint32(len(val))
err = binary.Write(w, binary.BigEndian, length)
if err != nil {
return n, Error("encode amf0: unable to encode long string length: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode long string length: %s", err)
}
n += 4
m, err = w.Write([]byte(val))
if err != nil {
return n, Error("encode amf0: unable to encode long string value: %s", err)
return n, fmt.Errorf("encode amf0: unable to encode long string value: %s", err)
}
n += m

41
protocol/amf/encoder_amf3.go

@ -2,6 +2,7 @@ package amf @@ -2,6 +2,7 @@ package amf
import (
"encoding/binary"
"fmt"
"io"
"reflect"
"sort"
@ -59,7 +60,7 @@ func (e *Encoder) EncodeAmf3(w io.Writer, val interface{}) (int, error) { @@ -59,7 +60,7 @@ func (e *Encoder) EncodeAmf3(w io.Writer, val interface{}) (int, error) {
case reflect.Map:
obj, ok := val.(Object)
if ok != true {
return 0, Error("encode amf3: unable to create object from map")
return 0, fmt.Errorf("encode amf3: unable to create object from map")
}
to := *new(TypedObject)
@ -76,7 +77,7 @@ func (e *Encoder) EncodeAmf3(w io.Writer, val interface{}) (int, error) { @@ -76,7 +77,7 @@ func (e *Encoder) EncodeAmf3(w io.Writer, val interface{}) (int, error) {
return e.EncodeAmf3Object(w, to, true)
}
return 0, Error("encode amf3: unsupported type %s", v.Type())
return 0, fmt.Errorf("encode amf3: unsupported type %s", v.Type())
}
// marker: 1 byte 0x00
@ -204,14 +205,14 @@ func (e *Encoder) EncodeAmf3Date(w io.Writer, val time.Time, encodeMarker bool) @@ -204,14 +205,14 @@ func (e *Encoder) EncodeAmf3Date(w io.Writer, val time.Time, encodeMarker bool)
}
if err = WriteMarker(w, 0x01); err != nil {
return n, Error("amf3 encode: cannot encode u29 for array: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode u29 for array: %s", err)
}
n += 1
u64 := float64(val.Unix()) * 1000.0
err = binary.Write(w, binary.BigEndian, &u64)
if err != nil {
return n, Error("amf3 encode: unable to write date double: %s", err)
return n, fmt.Errorf("amf3 encode: unable to write date double: %s", err)
}
n += 8
@ -237,20 +238,20 @@ func (e *Encoder) EncodeAmf3Array(w io.Writer, val Array, encodeMarker bool) (n @@ -237,20 +238,20 @@ func (e *Encoder) EncodeAmf3Array(w io.Writer, val Array, encodeMarker bool) (n
m, err = e.encodeAmf3Uint29(w, u29)
if err != nil {
return n, Error("amf3 encode: cannot encode u29 for array: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode u29 for array: %s", err)
}
n += m
m, err = e.encodeAmf3Utf8(w, "")
if err != nil {
return n, Error("amf3 encode: cannot encode empty string for array: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode empty string for array: %s", err)
}
n += m
for _, v := range val {
m, err := e.EncodeAmf3(w, v)
if err != nil {
return n, Error("amf3 encode: cannot encode array element: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode array element: %s", err)
}
n += m
}
@ -294,32 +295,32 @@ func (e *Encoder) EncodeAmf3Object(w io.Writer, val TypedObject, encodeMarker bo @@ -294,32 +295,32 @@ func (e *Encoder) EncodeAmf3Object(w io.Writer, val TypedObject, encodeMarker bo
m, err = e.encodeAmf3Uint29(w, u29)
if err != nil {
return n, Error("amf3 encode: cannot encode trait header for object: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode trait header for object: %s", err)
}
n += m
m, err = e.encodeAmf3Utf8(w, trait.Type)
if err != nil {
return n, Error("amf3 encode: cannot encode trait type for object: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode trait type for object: %s", err)
}
n += m
for _, prop := range trait.Properties {
m, err = e.encodeAmf3Utf8(w, prop)
if err != nil {
return n, Error("amf3 encode: cannot encode trait property for object: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode trait property for object: %s", err)
}
n += m
}
if trait.Externalizable {
return n, Error("amf3 encode: cannot encode externalizable object")
return n, fmt.Errorf("amf3 encode: cannot encode externalizable object")
}
for _, prop := range trait.Properties {
m, err = e.EncodeAmf3(w, val.Object[prop])
if err != nil {
return n, Error("amf3 encode: cannot encode sealed object value: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode sealed object value: %s", err)
}
n += m
}
@ -337,20 +338,20 @@ func (e *Encoder) EncodeAmf3Object(w io.Writer, val TypedObject, encodeMarker bo @@ -337,20 +338,20 @@ func (e *Encoder) EncodeAmf3Object(w io.Writer, val TypedObject, encodeMarker bo
if foundProp != true {
m, err = e.encodeAmf3Utf8(w, k)
if err != nil {
return n, Error("amf3 encode: cannot encode dynamic object property key: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode dynamic object property key: %s", err)
}
n += m
m, err = e.EncodeAmf3(w, v)
if err != nil {
return n, Error("amf3 encode: cannot encode dynamic object value: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode dynamic object value: %s", err)
}
n += m
}
m, err = e.encodeAmf3Utf8(w, "")
if err != nil {
return n, Error("amf3 encode: cannot encode dynamic object ending marker string: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode dynamic object ending marker string: %s", err)
}
n += m
}
@ -378,13 +379,13 @@ func (e *Encoder) EncodeAmf3ByteArray(w io.Writer, val []byte, encodeMarker bool @@ -378,13 +379,13 @@ func (e *Encoder) EncodeAmf3ByteArray(w io.Writer, val []byte, encodeMarker bool
m, err = e.encodeAmf3Uint29(w, u29)
if err != nil {
return n, Error("amf3 encode: cannot encode u29 for bytearray: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode u29 for bytearray: %s", err)
}
n += m
m, err = w.Write(val)
if err != nil {
return n, Error("encode amf3: unable to encode bytearray value: %s", err)
return n, fmt.Errorf("encode amf3: unable to encode bytearray value: %s", err)
}
n += m
@ -398,13 +399,13 @@ func (e *Encoder) encodeAmf3Utf8(w io.Writer, val string) (n int, err error) { @@ -398,13 +399,13 @@ func (e *Encoder) encodeAmf3Utf8(w io.Writer, val string) (n int, err error) {
var m int
m, err = e.encodeAmf3Uint29(w, u29)
if err != nil {
return n, Error("amf3 encode: cannot encode u29 for string: %s", err)
return n, fmt.Errorf("amf3 encode: cannot encode u29 for string: %s", err)
}
n += m
m, err = w.Write([]byte(val))
if err != nil {
return n, Error("encode amf3: unable to encode string value: %s", err)
return n, fmt.Errorf("encode amf3: unable to encode string value: %s", err)
}
n += m
@ -424,7 +425,7 @@ func (e *Encoder) encodeAmf3Uint29(w io.Writer, val uint32) (n int, err error) { @@ -424,7 +425,7 @@ func (e *Encoder) encodeAmf3Uint29(w io.Writer, val uint32) (n int, err error) {
} else if val <= 0x1FFFFFFF {
n, err = w.Write([]byte{byte(val>>22 | 0x80), byte(val>>15&0x7F | 0x80), byte(val>>8&0x7F | 0x80), byte(val)})
} else {
return n, Error("amf3 encode: cannot encode u29 with value %d (out of range)", val)
return n, fmt.Errorf("amf3 encode: cannot encode u29 with value %d (out of range)", val)
}
return

3
protocol/amf/metadata.go

@ -3,7 +3,8 @@ package amf @@ -3,7 +3,8 @@ package amf
import (
"bytes"
"fmt"
"log"
log "github.com/sirupsen/logrus"
)
const (

9
protocol/amf/util.go

@ -2,7 +2,6 @@ package amf @@ -2,7 +2,6 @@ package amf
import (
"encoding/json"
"errors"
"fmt"
"io"
)
@ -18,17 +17,13 @@ func DumpBytes(label string, buf []byte, size int) { @@ -18,17 +17,13 @@ func DumpBytes(label string, buf []byte, size int) {
func Dump(label string, val interface{}) error {
json, err := json.MarshalIndent(val, "", " ")
if err != nil {
return Error("Error dumping %s: %s", label, err)
return fmt.Errorf("Error dumping %s: %s", label, err)
}
fmt.Printf("Dumping %s:\n%s\n", label, json)
return nil
}
func Error(f string, v ...interface{}) error {
return errors.New(fmt.Sprintf(f, v...))
}
func WriteByte(w io.Writer, b byte) (err error) {
bytes := make([]byte, 1)
bytes[0] = b
@ -85,7 +80,7 @@ func AssertMarker(r io.Reader, checkMarker bool, m byte) error { @@ -85,7 +80,7 @@ func AssertMarker(r io.Reader, checkMarker bool, m byte) error {
}
if marker != m {
return Error("decode assert marker failed: expected %v got %v", m, marker)
return fmt.Errorf("decode assert marker failed: expected %v got %v", m, marker)
}
return nil

66
protocol/api/api.go

@ -3,7 +3,6 @@ package api @@ -3,7 +3,6 @@ package api
import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
@ -14,6 +13,7 @@ import ( @@ -14,6 +13,7 @@ import (
jwtmiddleware "github.com/auth0/go-jwt-middleware"
"github.com/dgrijalva/jwt-go"
log "github.com/sirupsen/logrus"
)
type Response struct {
@ -204,23 +204,23 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) { @@ -204,23 +204,23 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) {
return
}
oper := req.Form["oper"]
app := req.Form["app"]
name := req.Form["name"]
url := req.Form["url"]
oper := req.Form.Get("oper")
app := req.Form.Get("app")
name := req.Form.Get("name")
url := req.Form.Get("url")
log.Printf("control pull: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url)
log.Debugf("control pull: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url)
if (len(app) <= 0) || (len(name) <= 0) || (len(url) <= 0) {
res.Status = 400
res.Data = "control push parameter error, please check them."
return
}
remoteurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app[0] + "/" + name[0]
localurl := url[0]
remoteurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app + "/" + name
localurl := url
keyString := "pull:" + app[0] + "/" + name[0]
if oper[0] == "stop" {
keyString := "pull:" + app + "/" + name
if oper == "stop" {
pullRtmprelay, found := s.session[keyString]
if !found {
@ -229,27 +229,27 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) { @@ -229,27 +229,27 @@ func (s *Server) handlePull(w http.ResponseWriter, req *http.Request) {
res.Data = retString
return
}
log.Printf("rtmprelay stop push %s from %s", remoteurl, localurl)
log.Debugf("rtmprelay stop push %s from %s", remoteurl, localurl)
pullRtmprelay.Stop()
delete(s.session, keyString)
retString = fmt.Sprintf("<h1>push url stop %s ok</h1></br>", url[0])
retString = fmt.Sprintf("<h1>push url stop %s ok</h1></br>", url)
res.Status = 400
res.Data = retString
log.Printf("pull stop return %s", retString)
log.Debugf("pull stop return %s", retString)
} else {
pullRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)
log.Printf("rtmprelay start push %s from %s", remoteurl, localurl)
log.Debugf("rtmprelay start push %s from %s", remoteurl, localurl)
err = pullRtmprelay.Start()
if err != nil {
retString = fmt.Sprintf("push error=%v", err)
} else {
s.session[keyString] = pullRtmprelay
retString = fmt.Sprintf("<h1>push url start %s ok</h1></br>", url[0])
retString = fmt.Sprintf("<h1>push url start %s ok</h1></br>", url)
}
res.Status = 400
res.Data = retString
log.Printf("pull start return %s", retString)
log.Debugf("pull start return %s", retString)
}
}
@ -271,48 +271,48 @@ func (s *Server) handlePush(w http.ResponseWriter, req *http.Request) { @@ -271,48 +271,48 @@ func (s *Server) handlePush(w http.ResponseWriter, req *http.Request) {
return
}
oper := req.Form["oper"]
app := req.Form["app"]
name := req.Form["name"]
url := req.Form["url"]
oper := req.Form.Get("oper")
app := req.Form.Get("app")
name := req.Form.Get("name")
url := req.Form.Get("url")
log.Printf("control push: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url)
log.Debugf("control push: oper=%v, app=%v, name=%v, url=%v", oper, app, name, url)
if (len(app) <= 0) || (len(name) <= 0) || (len(url) <= 0) {
res.Data = "control push parameter error, please check them."
return
}
localurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app[0] + "/" + name[0]
remoteurl := url[0]
localurl := "rtmp://127.0.0.1" + s.rtmpAddr + "/" + app + "/" + name
remoteurl := url
keyString := "push:" + app[0] + "/" + name[0]
if oper[0] == "stop" {
keyString := "push:" + app + "/" + name
if oper == "stop" {
pushRtmprelay, found := s.session[keyString]
if !found {
retString = fmt.Sprintf("<h1>session key[%s] not exist, please check it again.</h1>", keyString)
res.Data = retString
return
}
log.Printf("rtmprelay stop push %s from %s", remoteurl, localurl)
log.Debugf("rtmprelay stop push %s from %s", remoteurl, localurl)
pushRtmprelay.Stop()
delete(s.session, keyString)
retString = fmt.Sprintf("<h1>push url stop %s ok</h1></br>", url[0])
retString = fmt.Sprintf("<h1>push url stop %s ok</h1></br>", url)
res.Data = retString
log.Printf("push stop return %s", retString)
log.Debugf("push stop return %s", retString)
} else {
pushRtmprelay := rtmprelay.NewRtmpRelay(&localurl, &remoteurl)
log.Printf("rtmprelay start push %s from %s", remoteurl, localurl)
log.Debugf("rtmprelay start push %s from %s", remoteurl, localurl)
err = pushRtmprelay.Start()
if err != nil {
retString = fmt.Sprintf("push error=%v", err)
} else {
retString = fmt.Sprintf("<h1>push url start %s ok</h1></br>", url[0])
retString = fmt.Sprintf("<h1>push url start %s ok</h1></br>", url)
s.session[keyString] = pushRtmprelay
}
res.Data = retString
log.Printf("push start return %s", retString)
log.Debugf("push start return %s", retString)
}
}
@ -334,7 +334,7 @@ func (s *Server) handleReset(w http.ResponseWriter, r *http.Request) { @@ -334,7 +334,7 @@ func (s *Server) handleReset(w http.ResponseWriter, r *http.Request) {
if len(room) == 0 {
res.Status = 400
res.Data = "url: /control/get?room=<ROOM_NAME>"
res.Data = "url: /control/reset?room=<ROOM_NAME>"
return
}
@ -398,7 +398,7 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) { @@ -398,7 +398,7 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) {
if len(room) == 0 {
res.Status = 400
res.Data = "url: /control/get?room=<ROOM_NAME>"
res.Data = "url: /control/delete?room=<ROOM_NAME>"
return
}

3
protocol/hls/cache.go

@ -3,7 +3,6 @@ package hls @@ -3,7 +3,6 @@ package hls
import (
"bytes"
"container/list"
"errors"
"fmt"
"sync"
)
@ -13,7 +12,7 @@ const ( @@ -13,7 +12,7 @@ const (
)
var (
ErrNoKey = errors.New("No key for cache")
ErrNoKey = fmt.Errorf("No key for cache")
)
type TSCacheItem struct {

19
protocol/hls/hls.go

@ -1,9 +1,7 @@ @@ -1,9 +1,7 @@
package hls
import (
"errors"
"fmt"
"log"
"net"
"net/http"
"path"
@ -14,6 +12,7 @@ import ( @@ -14,6 +12,7 @@ import (
"livego/av"
cmap "github.com/orcaman/concurrent-map"
log "github.com/sirupsen/logrus"
)
const (
@ -21,10 +20,10 @@ const ( @@ -21,10 +20,10 @@ const (
)
var (
ErrNoPublisher = errors.New("No publisher")
ErrInvalidReq = errors.New("invalid req url path")
ErrNoSupportVideoCodec = errors.New("no support video codec")
ErrNoSupportAudioCodec = errors.New("no support audio codec")
ErrNoPublisher = fmt.Errorf("no publisher")
ErrInvalidReq = fmt.Errorf("invalid req url path")
ErrNoSupportVideoCodec = fmt.Errorf("no support video codec")
ErrNoSupportAudioCodec = fmt.Errorf("no support audio codec")
)
var crossdomainxml = []byte(`<?xml version="1.0" ?>
@ -60,7 +59,7 @@ func (server *Server) GetWriter(info av.Info) av.WriteCloser { @@ -60,7 +59,7 @@ func (server *Server) GetWriter(info av.Info) av.WriteCloser {
var s *Source
ok := server.conns.Has(info.Key)
if !ok {
log.Println("new hls source")
log.Debug("new hls source")
s = NewSource(info)
server.conns.Set(info.Key, s)
} else {
@ -84,7 +83,7 @@ func (server *Server) checkStop() { @@ -84,7 +83,7 @@ func (server *Server) checkStop() {
for item := range server.conns.IterBuffered() {
v := item.Val.(*Source)
if !v.Alive() {
log.Println("check stop and remove: ", v.Info())
log.Debug("check stop and remove: ", v.Info())
server.conns.Remove(item.Key)
}
}
@ -112,7 +111,7 @@ func (server *Server) handle(w http.ResponseWriter, r *http.Request) { @@ -112,7 +111,7 @@ func (server *Server) handle(w http.ResponseWriter, r *http.Request) {
}
body, err := tsCache.GenM3U8PlayList()
if err != nil {
log.Println("GenM3U8PlayList error: ", err)
log.Debug("GenM3U8PlayList error: ", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
@ -132,7 +131,7 @@ func (server *Server) handle(w http.ResponseWriter, r *http.Request) { @@ -132,7 +131,7 @@ func (server *Server) handle(w http.ResponseWriter, r *http.Request) {
tsCache := conn.GetCacheInc()
item, err := tsCache.GetItem(r.URL.Path)
if err != nil {
log.Println("GetItem error: ", err)
log.Debug("GetItem error: ", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

33
protocol/hls/source.go

@ -2,15 +2,15 @@ package hls @@ -2,15 +2,15 @@ package hls
import (
"bytes"
"errors"
"fmt"
"log"
"time"
"livego/av"
"livego/container/flv"
"livego/container/ts"
"livego/parser"
log "github.com/sirupsen/logrus"
)
const (
@ -57,7 +57,7 @@ func NewSource(info av.Info) *Source { @@ -57,7 +57,7 @@ func NewSource(info av.Info) *Source {
go func() {
err := s.SendPacket()
if err != nil {
log.Println("send packet error: ", err)
log.Warning("send packet error: ", err)
s.closed = true
}
}()
@ -69,7 +69,7 @@ func (source *Source) GetCacheInc() *TSCacheItem { @@ -69,7 +69,7 @@ func (source *Source) GetCacheInc() *TSCacheItem {
}
func (source *Source) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
log.Warningf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
// try to don't drop audio
@ -93,20 +93,19 @@ func (source *Source) DropPacket(pktQue chan *av.Packet, info av.Info) { @@ -93,20 +93,19 @@ func (source *Source) DropPacket(pktQue chan *av.Packet, info av.Info) {
}
}
log.Println("packet queue len: ", len(pktQue))
log.Debug("packet queue len: ", len(pktQue))
}
func (source *Source) Write(p *av.Packet) (err error) {
err = nil
if source.closed {
err = errors.New("hls source closed")
err = fmt.Errorf("hls source closed")
return
}
source.SetPreTime()
defer func() {
if e := recover(); e != nil {
errString := fmt.Sprintf("hls source has already been closed:%v", e)
err = errors.New(errString)
err = fmt.Errorf("hls source has already been closed:%v", e)
}
}()
if len(source.packetQueue) >= maxQueueNum-24 {
@ -121,16 +120,16 @@ func (source *Source) Write(p *av.Packet) (err error) { @@ -121,16 +120,16 @@ func (source *Source) Write(p *av.Packet) (err error) {
func (source *Source) SendPacket() error {
defer func() {
log.Printf("[%v] hls sender stop", source.info)
log.Debugf("[%v] hls sender stop", source.info)
if r := recover(); r != nil {
log.Println("hls SendPacket panic: ", r)
log.Warning("hls SendPacket panic: ", r)
}
}()
log.Printf("[%v] hls sender start", source.info)
log.Debugf("[%v] hls sender start", source.info)
for {
if source.closed {
return errors.New("closed")
return fmt.Errorf("closed")
}
p, ok := <-source.packetQueue
@ -141,17 +140,17 @@ func (source *Source) SendPacket() error { @@ -141,17 +140,17 @@ func (source *Source) SendPacket() error {
err := source.demuxer.Demux(p)
if err == flv.ErrAvcEndSEQ {
log.Println(err)
log.Warning(err)
continue
} else {
if err != nil {
log.Println(err)
log.Warning(err)
return err
}
}
compositionTime, isSeq, err := source.parse(p)
if err != nil {
log.Println(err)
log.Warning(err)
}
if err != nil || isSeq {
continue
@ -162,7 +161,7 @@ func (source *Source) SendPacket() error { @@ -162,7 +161,7 @@ func (source *Source) SendPacket() error {
source.tsMux(p)
}
} else {
return errors.New("closed")
return fmt.Errorf("closed")
}
}
}
@ -180,7 +179,7 @@ func (source *Source) cleanup() { @@ -180,7 +179,7 @@ func (source *Source) cleanup() {
}
func (source *Source) Close(err error) {
log.Println("hls source closed: ", source.info)
log.Debug("hls source closed: ", source.info)
if !source.closed {
source.cleanup()
}

7
protocol/httpflv/server.go

@ -2,13 +2,14 @@ package httpflv @@ -2,13 +2,14 @@ package httpflv
import (
"encoding/json"
"log"
"net"
"net/http"
"strings"
"livego/av"
"livego/protocol/rtmp"
log "github.com/sirupsen/logrus"
)
type Server struct {
@ -87,7 +88,7 @@ func (server *Server) getStream(w http.ResponseWriter, r *http.Request) { @@ -87,7 +88,7 @@ func (server *Server) getStream(w http.ResponseWriter, r *http.Request) {
func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) {
defer func() {
if r := recover(); r != nil {
log.Println("http flv handleConn panic: ", r)
log.Error("http flv handleConn panic: ", r)
}
}()
@ -99,7 +100,7 @@ func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) { @@ -99,7 +100,7 @@ func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) {
}
path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv")
paths := strings.SplitN(path, "/", 2)
log.Println("url:", u, "path:", path, "paths:", paths)
log.Debug("url:", u, "path:", path, "paths:", paths)
if len(paths) != 2 {
http.Error(w, "invalid path", http.StatusBadRequest)

25
protocol/httpflv/writer.go

@ -1,9 +1,7 @@ @@ -1,9 +1,7 @@
package httpflv
import (
"errors"
"fmt"
"log"
"net/http"
"time"
@ -11,6 +9,8 @@ import ( @@ -11,6 +9,8 @@ import (
"livego/protocol/amf"
"livego/utils/pio"
"livego/utils/uid"
log "github.com/sirupsen/logrus"
)
const (
@ -48,7 +48,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { @@ -48,7 +48,7 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
go func() {
err := ret.SendPacket()
if err != nil {
log.Println("SendPacket error:", err)
log.Error("SendPacket error: ", err)
ret.closed = true
}
}()
@ -56,14 +56,14 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { @@ -56,14 +56,14 @@ func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
}
func (flvWriter *FLVWriter) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
log.Warningf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
if ok && tmpPkt.IsVideo {
videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader)
// dont't drop sps config and dont't drop key frame
if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) {
log.Println("insert keyframe to queue")
log.Debug("insert keyframe to queue")
pktQue <- tmpPkt
}
@ -75,25 +75,26 @@ func (flvWriter *FLVWriter) DropPacket(pktQue chan *av.Packet, info av.Info) { @@ -75,25 +75,26 @@ func (flvWriter *FLVWriter) DropPacket(pktQue chan *av.Packet, info av.Info) {
}
// try to don't drop audio
if ok && tmpPkt.IsAudio {
log.Println("insert audio to queue")
log.Debug("insert audio to queue")
pktQue <- tmpPkt
}
}
log.Println("packet queue len: ", len(pktQue))
log.Debug("packet queue len: ", len(pktQue))
}
func (flvWriter *FLVWriter) Write(p *av.Packet) (err error) {
err = nil
if flvWriter.closed {
err = errors.New("flvwrite source closed")
err = fmt.Errorf("flvwrite source closed")
return
}
defer func() {
if e := recover(); e != nil {
errString := fmt.Sprintf("FLVWriter has already been closed:%v", e)
err = errors.New(errString)
err = fmt.Errorf("FLVWriter has already been closed:%v", e)
}
}()
if len(flvWriter.packetQueue) >= maxQueueNum-24 {
flvWriter.DropPacket(flvWriter.packetQueue, flvWriter.Info())
} else {
@ -149,7 +150,7 @@ func (flvWriter *FLVWriter) SendPacket() error { @@ -149,7 +150,7 @@ func (flvWriter *FLVWriter) SendPacket() error {
return err
}
} else {
return errors.New("closed")
return fmt.Errorf("closed")
}
}
@ -165,7 +166,7 @@ func (flvWriter *FLVWriter) Wait() { @@ -165,7 +166,7 @@ func (flvWriter *FLVWriter) Wait() {
}
func (flvWriter *FLVWriter) Close(error) {
log.Println("http flv closed")
log.Debug("http flv closed")
if !flvWriter.closed {
close(flvWriter.packetQueue)
close(flvWriter.closedChan)

4
protocol/rtmp/cache/gop.go vendored

@ -1,14 +1,14 @@ @@ -1,14 +1,14 @@
package cache
import (
"errors"
"fmt"
"livego/av"
)
var (
maxGOPCap int = 1024
ErrGopTooBig = errors.New("gop to big")
ErrGopTooBig = fmt.Errorf("gop to big")
)
type array struct {

3
protocol/rtmp/cache/special.go vendored

@ -2,10 +2,11 @@ package cache @@ -2,10 +2,11 @@ package cache
import (
"bytes"
"log"
"livego/av"
"livego/protocol/amf"
log "github.com/sirupsen/logrus"
)
const (

43
protocol/rtmp/core/conn_client.go

@ -2,7 +2,6 @@ package core @@ -2,7 +2,6 @@ package core
import (
"bytes"
"errors"
"fmt"
"io"
"math/rand"
@ -10,10 +9,10 @@ import ( @@ -10,10 +9,10 @@ import (
neturl "net/url"
"strings"
"log"
"livego/av"
"livego/protocol/amf"
log "github.com/sirupsen/logrus"
)
var (
@ -27,7 +26,7 @@ var ( @@ -27,7 +26,7 @@ var (
)
var (
ErrFail = errors.New("respone err")
ErrFail = fmt.Errorf("respone err")
)
type ConnClient struct {
@ -75,14 +74,14 @@ func (connClient *ConnClient) readRespMsg() error { @@ -75,14 +74,14 @@ func (connClient *ConnClient) readRespMsg() error {
r := bytes.NewReader(rc.Data)
vs, _ := connClient.decoder.DecodeBatch(r, amf.AMF0)
log.Printf("readRespMsg: vs=%v", vs)
log.Debugf("readRespMsg: vs=%v", vs)
for k, v := range vs {
switch v.(type) {
case string:
switch connClient.curcmdName {
case cmdConnect, cmdCreateStream:
if v.(string) != respResult {
return errors.New(v.(string))
return fmt.Errorf(v.(string))
}
case cmdPublish:
@ -158,7 +157,7 @@ func (connClient *ConnClient) writeConnectMsg() error { @@ -158,7 +157,7 @@ func (connClient *ConnClient) writeConnectMsg() error {
event["tcUrl"] = connClient.tcurl
connClient.curcmdName = cmdConnect
log.Printf("writeConnectMsg: connClient.transID=%d, event=%v", connClient.transID, event)
log.Debugf("writeConnectMsg: connClient.transID=%d, event=%v", connClient.transID, event)
if err := connClient.writeMsg(cmdConnect, connClient.transID, event); err != nil {
return err
}
@ -169,7 +168,7 @@ func (connClient *ConnClient) writeCreateStreamMsg() error { @@ -169,7 +168,7 @@ func (connClient *ConnClient) writeCreateStreamMsg() error {
connClient.transID++
connClient.curcmdName = cmdCreateStream
log.Printf("writeCreateStreamMsg: connClient.transID=%d", connClient.transID)
log.Debugf("writeCreateStreamMsg: connClient.transID=%d", connClient.transID)
if err := connClient.writeMsg(cmdCreateStream, connClient.transID, nil); err != nil {
return err
}
@ -181,7 +180,7 @@ func (connClient *ConnClient) writeCreateStreamMsg() error { @@ -181,7 +180,7 @@ func (connClient *ConnClient) writeCreateStreamMsg() error {
}
if err == ErrFail {
log.Println("writeCreateStreamMsg readRespMsg err=%v", err)
log.Debugf("writeCreateStreamMsg readRespMsg err=%v", err)
return err
}
}
@ -200,7 +199,7 @@ func (connClient *ConnClient) writePublishMsg() error { @@ -200,7 +199,7 @@ func (connClient *ConnClient) writePublishMsg() error {
func (connClient *ConnClient) writePlayMsg() error {
connClient.transID++
connClient.curcmdName = cmdPlay
log.Printf("writePlayMsg: connClient.transID=%d, cmdPlay=%v, connClient.title=%v",
log.Debugf("writePlayMsg: connClient.transID=%d, cmdPlay=%v, connClient.title=%v",
connClient.transID, cmdPlay, connClient.title)
if err := connClient.writeMsg(cmdPlay, 0, nil, connClient.title); err != nil {
@ -236,9 +235,9 @@ func (connClient *ConnClient) Start(url string, method string) error { @@ -236,9 +235,9 @@ func (connClient *ConnClient) Start(url string, method string) error {
port = ":" + port
}
ips, err := net.LookupIP(host)
log.Printf("ips: %v, host: %v", ips, host)
log.Debugf("ips: %v, host: %v", ips, host)
if err != nil {
log.Println(err)
log.Warning(err)
return err
}
remoteIP = ips[rand.Intn(len(ips))].String()
@ -248,41 +247,41 @@ func (connClient *ConnClient) Start(url string, method string) error { @@ -248,41 +247,41 @@ func (connClient *ConnClient) Start(url string, method string) error {
local, err := net.ResolveTCPAddr("tcp", localIP)
if err != nil {
log.Println(err)
log.Warning(err)
return err
}
log.Println("remoteIP: ", remoteIP)
log.Debug("remoteIP: ", remoteIP)
remote, err := net.ResolveTCPAddr("tcp", remoteIP)
if err != nil {
log.Println(err)
log.Warning(err)
return err
}
conn, err := net.DialTCP("tcp", local, remote)
if err != nil {
log.Println(err)
log.Warning(err)
return err
}
log.Println("connection:", "local:", conn.LocalAddr(), "remote:", conn.RemoteAddr())
log.Debug("connection:", "local:", conn.LocalAddr(), "remote:", conn.RemoteAddr())
connClient.conn = NewConn(conn, 4*1024)
log.Println("HandshakeClient....")
log.Debug("HandshakeClient....")
if err := connClient.conn.HandshakeClient(); err != nil {
return err
}
log.Println("writeConnectMsg....")
log.Debug("writeConnectMsg....")
if err := connClient.writeConnectMsg(); err != nil {
return err
}
log.Println("writeCreateStreamMsg....")
log.Debug("writeCreateStreamMsg....")
if err := connClient.writeCreateStreamMsg(); err != nil {
log.Println("writeCreateStreamMsg error", err)
log.Debug("writeCreateStreamMsg error", err)
return err
}
log.Println("method control:", method, av.PUBLISH, av.PLAY)
log.Debug("method control:", method, av.PUBLISH, av.PLAY)
if method == av.PUBLISH {
if err := connClient.writePublishMsg(); err != nil {
return err

16
protocol/rtmp/core/conn_server.go

@ -2,13 +2,13 @@ package core @@ -2,13 +2,13 @@ package core
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"livego/av"
"livego/protocol/amf"
log "github.com/sirupsen/logrus"
)
var (
@ -18,7 +18,7 @@ var ( @@ -18,7 +18,7 @@ var (
)
var (
ErrReq = errors.New("req error")
ErrReq = fmt.Errorf("req error")
)
var (
@ -251,7 +251,7 @@ func (connServer *ConnServer) handleCmdMsg(c *ChunkStream) error { @@ -251,7 +251,7 @@ func (connServer *ConnServer) handleCmdMsg(c *ChunkStream) error {
if err != nil && err != io.EOF {
return err
}
// log.Printf("rtmp req: %#v", vs)
// log.Debugf("rtmp req: %#v", vs)
switch vs[0].(type) {
case string:
switch vs[0].(string) {
@ -278,7 +278,7 @@ func (connServer *ConnServer) handleCmdMsg(c *ChunkStream) error { @@ -278,7 +278,7 @@ func (connServer *ConnServer) handleCmdMsg(c *ChunkStream) error {
}
connServer.done = true
connServer.isPublisher = true
log.Println("handle publish req done")
log.Debug("handle publish req done")
case cmdPlay:
if err = connServer.publishOrPlay(vs[1:]); err != nil {
return err
@ -288,7 +288,7 @@ func (connServer *ConnServer) handleCmdMsg(c *ChunkStream) error { @@ -288,7 +288,7 @@ func (connServer *ConnServer) handleCmdMsg(c *ChunkStream) error {
}
connServer.done = true
connServer.isPublisher = false
log.Println("handle play req done")
log.Debug("handle play req done")
case cmdFcpublish:
connServer.fcPublish(vs)
case cmdReleaseStream:
@ -296,7 +296,7 @@ func (connServer *ConnServer) handleCmdMsg(c *ChunkStream) error { @@ -296,7 +296,7 @@ func (connServer *ConnServer) handleCmdMsg(c *ChunkStream) error {
case cmdFCUnpublish:
case cmdDeleteStream:
default:
log.Println("no support command=", vs[0].(string))
log.Debug("no support command=", vs[0].(string))
}
}

61
protocol/rtmp/rtmp.go

@ -1,10 +1,8 @@ @@ -1,10 +1,8 @@
package rtmp
import (
"errors"
"flag"
"fmt"
"log"
"net"
"net/url"
"reflect"
@ -17,6 +15,8 @@ import ( @@ -17,6 +15,8 @@ import (
"livego/configure"
"livego/container/flv"
"livego/protocol/rtmp/core"
log "github.com/sirupsen/logrus"
)
const (
@ -48,11 +48,11 @@ func (c *Client) Dial(url string, method string) error { @@ -48,11 +48,11 @@ func (c *Client) Dial(url string, method string) error {
}
if method == av.PUBLISH {
writer := NewVirWriter(connClient)
log.Printf("client Dial call NewVirWriter url=%s, method=%s", url, method)
log.Debugf("client Dial call NewVirWriter url=%s, method=%s", url, method)
c.handler.HandleWriter(writer)
} else if method == av.PLAY {
reader := NewVirReader(connClient)
log.Printf("client Dial call NewVirReader url=%s, method=%s", url, method)
log.Debugf("client Dial call NewVirReader url=%s, method=%s", url, method)
c.handler.HandleReader(reader)
if c.getter != nil {
writer := c.getter.GetWriter(reader.Info())
@ -81,7 +81,7 @@ func NewRtmpServer(h av.Handler, getter av.GetWriter) *Server { @@ -81,7 +81,7 @@ func NewRtmpServer(h av.Handler, getter av.GetWriter) *Server {
func (s *Server) Serve(listener net.Listener) (err error) {
defer func() {
if r := recover(); r != nil {
log.Println("rtmp serve panic: ", r)
log.Error("rtmp serve panic: ", r)
}
}()
@ -92,7 +92,7 @@ func (s *Server) Serve(listener net.Listener) (err error) { @@ -92,7 +92,7 @@ func (s *Server) Serve(listener net.Listener) (err error) {
return
}
conn := core.NewConn(netconn, 4*1024)
log.Println("new client, connect remote:", conn.RemoteAddr().String(),
log.Debug("new client, connect remote: ", conn.RemoteAddr().String(),
"local:", conn.LocalAddr().String())
go s.handleConn(conn)
}
@ -101,46 +101,46 @@ func (s *Server) Serve(listener net.Listener) (err error) { @@ -101,46 +101,46 @@ func (s *Server) Serve(listener net.Listener) (err error) {
func (s *Server) handleConn(conn *core.Conn) error {
if err := conn.HandshakeServer(); err != nil {
conn.Close()
log.Println("handleConn HandshakeServer err:", err)
log.Error("handleConn HandshakeServer err: ", err)
return err
}
connServer := core.NewConnServer(conn)
if err := connServer.ReadMsg(); err != nil {
conn.Close()
log.Println("handleConn read msg err:", err)
log.Error("handleConn read msg err: ", err)
return err
}
appname, name, _ := connServer.GetInfo()
if ret := configure.CheckAppName(appname); !ret {
err := errors.New(fmt.Sprintf("application name=%s is not configured", appname))
err := fmt.Errorf("application name=%s is not configured", appname)
conn.Close()
log.Println("CheckAppName err:", err)
log.Error("CheckAppName err: ", err)
return err
}
log.Printf("handleConn: IsPublisher=%v", connServer.IsPublisher())
log.Debugf("handleConn: IsPublisher=%v", connServer.IsPublisher())
if connServer.IsPublisher() {
channel, err := configure.RoomKeys.GetChannel(name)
if err != nil {
err := errors.New(fmt.Sprintf("invalid key"))
err := fmt.Errorf("invalid key")
conn.Close()
log.Println("CheckKey err:", err)
log.Error("CheckKey err: ", err)
return err
}
connServer.PublishInfo.Name = channel
if pushlist, ret := configure.GetStaticPushUrlList(appname); ret && (pushlist != nil) {
log.Printf("GetStaticPushUrlList: %v", pushlist)
log.Debugf("GetStaticPushUrlList: %v", pushlist)
}
reader := NewVirReader(connServer)
s.handler.HandleReader(reader)
log.Printf("new publisher: %+v", reader.Info())
log.Debugf("new publisher: %+v", reader.Info())
if s.getter != nil {
writeType := reflect.TypeOf(s.getter)
log.Printf("handleConn:writeType=%v", writeType)
log.Debugf("handleConn:writeType=%v", writeType)
writer := s.getter.GetWriter(reader.Info())
s.handler.HandleWriter(writer)
}
@ -148,7 +148,7 @@ func (s *Server) handleConn(conn *core.Conn) error { @@ -148,7 +148,7 @@ func (s *Server) handleConn(conn *core.Conn) error {
s.handler.HandleWriter(flvWriter.GetWriter(reader.Info()))
} else {
writer := NewVirWriter(connServer)
log.Printf("new player: %+v", writer.Info())
log.Debugf("new player: %+v", writer.Info())
s.handler.HandleWriter(writer)
}
@ -201,7 +201,7 @@ func NewVirWriter(conn StreamReadWriteCloser) *VirWriter { @@ -201,7 +201,7 @@ func NewVirWriter(conn StreamReadWriteCloser) *VirWriter {
go func() {
err := ret.SendPacket()
if err != nil {
log.Println(err)
log.Warning(err)
}
}()
return ret
@ -242,13 +242,13 @@ func (v *VirWriter) Check() { @@ -242,13 +242,13 @@ func (v *VirWriter) Check() {
}
func (v *VirWriter) DropPacket(pktQue chan *av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
log.Warningf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
// try to don't drop audio
if ok && tmpPkt.IsAudio {
if len(pktQue) > maxQueueNum-2 {
log.Println("drop audio pkt")
log.Debug("drop audio pkt")
<-pktQue
} else {
pktQue <- tmpPkt
@ -263,13 +263,13 @@ func (v *VirWriter) DropPacket(pktQue chan *av.Packet, info av.Info) { @@ -263,13 +263,13 @@ func (v *VirWriter) DropPacket(pktQue chan *av.Packet, info av.Info) {
pktQue <- tmpPkt
}
if len(pktQue) > maxQueueNum-10 {
log.Println("drop video pkt")
log.Debug("drop video pkt")
<-pktQue
}
}
}
log.Println("packet queue len: ", len(pktQue))
log.Debug("packet queue len: ", len(pktQue))
}
//
@ -277,13 +277,12 @@ func (v *VirWriter) Write(p *av.Packet) (err error) { @@ -277,13 +277,12 @@ func (v *VirWriter) Write(p *av.Packet) (err error) {
err = nil
if v.closed {
err = errors.New("VirWriter closed")
err = fmt.Errorf("VirWriter closed")
return
}
defer func() {
if e := recover(); e != nil {
errString := fmt.Sprintf("VirWriter has already been closed:%v", e)
err = errors.New(errString)
err = fmt.Errorf("VirWriter has already been closed:%v", e)
}
}()
if len(v.packetQueue) >= maxQueueNum-24 {
@ -327,7 +326,7 @@ func (v *VirWriter) SendPacket() error { @@ -327,7 +326,7 @@ func (v *VirWriter) SendPacket() error {
}
Flush.Call(nil)
} else {
return errors.New("closed")
return fmt.Errorf("closed")
}
}
@ -340,7 +339,7 @@ func (v *VirWriter) Info() (ret av.Info) { @@ -340,7 +339,7 @@ func (v *VirWriter) Info() (ret av.Info) {
ret.URL = URL
_url, err := url.Parse(URL)
if err != nil {
log.Println(err)
log.Warning(err)
}
ret.Key = strings.TrimLeft(_url.Path, "/")
ret.Inter = true
@ -348,7 +347,7 @@ func (v *VirWriter) Info() (ret av.Info) { @@ -348,7 +347,7 @@ func (v *VirWriter) Info() (ret av.Info) {
}
func (v *VirWriter) Close(err error) {
log.Println("player ", v.Info(), "closed: "+err.Error())
log.Warning("player ", v.Info(), "closed: "+err.Error())
if !v.closed {
close(v.packetQueue)
}
@ -402,7 +401,7 @@ func (v *VirReader) SaveStatics(streamid uint32, length uint64, isVideoFlag bool @@ -402,7 +401,7 @@ func (v *VirReader) SaveStatics(streamid uint32, length uint64, isVideoFlag bool
func (v *VirReader) Read(p *av.Packet) (err error) {
defer func() {
if r := recover(); r != nil {
log.Println("rtmp read packet panic: ", r)
log.Warning("rtmp read packet panic: ", r)
}
}()
@ -439,13 +438,13 @@ func (v *VirReader) Info() (ret av.Info) { @@ -439,13 +438,13 @@ func (v *VirReader) Info() (ret av.Info) {
ret.URL = URL
_url, err := url.Parse(URL)
if err != nil {
log.Println(err)
log.Warning(err)
}
ret.Key = strings.TrimLeft(_url.Path, "/")
return
}
func (v *VirReader) Close(err error) {
log.Println("publisher ", v.Info(), "closed: "+err.Error())
log.Debug("publisher ", v.Info(), "closed: "+err.Error())
v.conn.Close(err)
}

32
protocol/rtmp/rtmprelay/rtmprelay.go

@ -2,13 +2,13 @@ package rtmprelay @@ -2,13 +2,13 @@ package rtmprelay
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"livego/protocol/amf"
"livego/protocol/rtmp/core"
log "github.com/sirupsen/logrus"
)
var (
@ -38,13 +38,13 @@ func NewRtmpRelay(playurl *string, publishurl *string) *RtmpRelay { @@ -38,13 +38,13 @@ func NewRtmpRelay(playurl *string, publishurl *string) *RtmpRelay {
}
func (self *RtmpRelay) rcvPlayChunkStream() {
log.Println("rcvPlayRtmpMediaPacket connectClient.Read...")
log.Debug("rcvPlayRtmpMediaPacket connectClient.Read...")
for {
var rc core.ChunkStream
if self.startflag == false {
self.connectPlayClient.Close(nil)
log.Printf("rcvPlayChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
log.Debugf("rcvPlayChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
break
}
err := self.connectPlayClient.Read(&rc)
@ -52,15 +52,15 @@ func (self *RtmpRelay) rcvPlayChunkStream() { @@ -52,15 +52,15 @@ func (self *RtmpRelay) rcvPlayChunkStream() {
if err != nil && err == io.EOF {
break
}
//log.Printf("connectPlayClient.Read return rc.TypeID=%v length=%d, err=%v", rc.TypeID, len(rc.Data), err)
//log.Debugf("connectPlayClient.Read return rc.TypeID=%v length=%d, err=%v", rc.TypeID, len(rc.Data), err)
switch rc.TypeID {
case 20, 17:
r := bytes.NewReader(rc.Data)
vs, err := self.connectPlayClient.DecodeBatch(r, amf.AMF0)
log.Printf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err)
log.Debugf("rcvPlayRtmpMediaPacket: vs=%v, err=%v", vs, err)
case 18:
log.Printf("rcvPlayRtmpMediaPacket: metadata....")
log.Debug("rcvPlayRtmpMediaPacket: metadata....")
case 8, 9:
self.cs_chan <- rc
}
@ -71,12 +71,12 @@ func (self *RtmpRelay) sendPublishChunkStream() { @@ -71,12 +71,12 @@ func (self *RtmpRelay) sendPublishChunkStream() {
for {
select {
case rc := <-self.cs_chan:
//log.Printf("sendPublishChunkStream: rc.TypeID=%v length=%d", rc.TypeID, len(rc.Data))
//log.Debugf("sendPublishChunkStream: rc.TypeID=%v length=%d", rc.TypeID, len(rc.Data))
self.connectPublishClient.Write(rc)
case ctrlcmd := <-self.sndctrl_chan:
if ctrlcmd == STOP_CTRL {
self.connectPublishClient.Close(nil)
log.Printf("sendPublishChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
log.Debugf("sendPublishChunkStream close: playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
break
}
}
@ -85,24 +85,23 @@ func (self *RtmpRelay) sendPublishChunkStream() { @@ -85,24 +85,23 @@ func (self *RtmpRelay) sendPublishChunkStream() {
func (self *RtmpRelay) Start() error {
if self.startflag {
err := errors.New(fmt.Sprintf("The rtmprelay already started, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl))
return err
return fmt.Errorf("The rtmprelay already started, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
}
self.connectPlayClient = core.NewConnClient()
self.connectPublishClient = core.NewConnClient()
log.Printf("play server addr:%v starting....", self.PlayUrl)
log.Debugf("play server addr:%v starting....", self.PlayUrl)
err := self.connectPlayClient.Start(self.PlayUrl, "play")
if err != nil {
log.Printf("connectPlayClient.Start url=%v error", self.PlayUrl)
log.Debugf("connectPlayClient.Start url=%v error", self.PlayUrl)
return err
}
log.Printf("publish server addr:%v starting....", self.PublishUrl)
log.Debugf("publish server addr:%v starting....", self.PublishUrl)
err = self.connectPublishClient.Start(self.PublishUrl, "publish")
if err != nil {
log.Printf("connectPublishClient.Start url=%v error", self.PublishUrl)
log.Debugf("connectPublishClient.Start url=%v error", self.PublishUrl)
self.connectPlayClient.Close(nil)
return err
}
@ -116,11 +115,10 @@ func (self *RtmpRelay) Start() error { @@ -116,11 +115,10 @@ func (self *RtmpRelay) Start() error {
func (self *RtmpRelay) Stop() {
if !self.startflag {
log.Printf("The rtmprelay already stoped, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
log.Debugf("The rtmprelay already stoped, playurl=%s, publishurl=%s", self.PlayUrl, self.PublishUrl)
return
}
self.startflag = false
self.sndctrl_chan <- STOP_CTRL
}

28
protocol/rtmp/rtmprelay/staticrelay.go

@ -1,14 +1,14 @@ @@ -1,14 +1,14 @@
package rtmprelay
import (
"errors"
"fmt"
"log"
"sync"
"livego/av"
"livego/configure"
"livego/protocol/rtmp/core"
log "github.com/sirupsen/logrus"
)
type StaticPush struct {
@ -30,7 +30,7 @@ func GetStaticPushList(appname string) ([]string, error) { @@ -30,7 +30,7 @@ func GetStaticPushList(appname string) ([]string, error) {
pushurlList, ok := configure.GetStaticPushUrlList(appname)
if !ok {
return nil, errors.New("no static push url")
return nil, fmt.Errorf("no static push url")
}
return pushurlList, nil
@ -39,7 +39,7 @@ func GetStaticPushList(appname string) ([]string, error) { @@ -39,7 +39,7 @@ func GetStaticPushList(appname string) ([]string, error) {
func GetAndCreateStaticPushObject(rtmpurl string) *StaticPush {
g_MapLock.RLock()
staticpush, ok := G_StaticPushMap[rtmpurl]
log.Printf("GetAndCreateStaticPushObject: %s, return %v", rtmpurl, ok)
log.Debugf("GetAndCreateStaticPushObject: %s, return %v", rtmpurl, ok)
if !ok {
g_MapLock.RUnlock()
newStaticpush := NewStaticPush(rtmpurl)
@ -63,7 +63,7 @@ func GetStaticPushObject(rtmpurl string) (*StaticPush, error) { @@ -63,7 +63,7 @@ func GetStaticPushObject(rtmpurl string) (*StaticPush, error) {
}
g_MapLock.RUnlock()
return nil, errors.New(fmt.Sprintf("G_StaticPushMap[%s] not exist....", rtmpurl))
return nil, fmt.Errorf("G_StaticPushMap[%s] not exist....", rtmpurl)
}
func ReleaseStaticPushObject(rtmpurl string) {
@ -71,13 +71,13 @@ func ReleaseStaticPushObject(rtmpurl string) { @@ -71,13 +71,13 @@ func ReleaseStaticPushObject(rtmpurl string) {
if _, ok := G_StaticPushMap[rtmpurl]; ok {
g_MapLock.RUnlock()
log.Printf("ReleaseStaticPushObject %s ok", rtmpurl)
log.Debugf("ReleaseStaticPushObject %s ok", rtmpurl)
g_MapLock.Lock()
delete(G_StaticPushMap, rtmpurl)
g_MapLock.Unlock()
} else {
g_MapLock.RUnlock()
log.Printf("ReleaseStaticPushObject: not find %s", rtmpurl)
log.Debugf("ReleaseStaticPushObject: not find %s", rtmpurl)
}
}
@ -93,18 +93,18 @@ func NewStaticPush(rtmpurl string) *StaticPush { @@ -93,18 +93,18 @@ func NewStaticPush(rtmpurl string) *StaticPush {
func (self *StaticPush) Start() error {
if self.startflag {
return errors.New(fmt.Sprintf("StaticPush already start %s", self.RtmpUrl))
return fmt.Errorf("StaticPush already start %s", self.RtmpUrl)
}
self.connectClient = core.NewConnClient()
log.Printf("static publish server addr:%v starting....", self.RtmpUrl)
log.Debugf("static publish server addr:%v starting....", self.RtmpUrl)
err := self.connectClient.Start(self.RtmpUrl, "publish")
if err != nil {
log.Printf("connectClient.Start url=%v error", self.RtmpUrl)
log.Debugf("connectClient.Start url=%v error", self.RtmpUrl)
return err
}
log.Printf("static publish server addr:%v started, streamid=%d", self.RtmpUrl, self.connectClient.GetStreamId())
log.Debugf("static publish server addr:%v started, streamid=%d", self.RtmpUrl, self.connectClient.GetStreamId())
go self.HandleAvPacket()
self.startflag = true
@ -116,7 +116,7 @@ func (self *StaticPush) Stop() { @@ -116,7 +116,7 @@ func (self *StaticPush) Stop() {
return
}
log.Printf("StaticPush Stop: %s", self.RtmpUrl)
log.Debugf("StaticPush Stop: %s", self.RtmpUrl)
self.sndctrl_chan <- STATIC_RELAY_STOP_CTRL
self.startflag = false
}
@ -158,7 +158,7 @@ func (self *StaticPush) sendPacket(p *av.Packet) { @@ -158,7 +158,7 @@ func (self *StaticPush) sendPacket(p *av.Packet) {
func (self *StaticPush) HandleAvPacket() {
if !self.IsStart() {
log.Printf("static push %s not started", self.RtmpUrl)
log.Debugf("static push %s not started", self.RtmpUrl)
return
}
@ -169,7 +169,7 @@ func (self *StaticPush) HandleAvPacket() { @@ -169,7 +169,7 @@ func (self *StaticPush) HandleAvPacket() {
case ctrlcmd := <-self.sndctrl_chan:
if ctrlcmd == STATIC_RELAY_STOP_CTRL {
self.connectClient.Close(nil)
log.Printf("Static HandleAvPacket close: publishurl=%s", self.RtmpUrl)
log.Debugf("Static HandleAvPacket close: publishurl=%s", self.RtmpUrl)
break
}
}

77
protocol/rtmp/stream.go

@ -1,8 +1,7 @@ @@ -1,8 +1,7 @@
package rtmp
import (
"errors"
"log"
"fmt"
"strings"
"time"
@ -11,6 +10,7 @@ import ( @@ -11,6 +10,7 @@ import (
"livego/protocol/rtmp/rtmprelay"
cmap "github.com/orcaman/concurrent-map"
log "github.com/sirupsen/logrus"
)
var (
@ -31,7 +31,7 @@ func NewRtmpStream() *RtmpStream { @@ -31,7 +31,7 @@ func NewRtmpStream() *RtmpStream {
func (rs *RtmpStream) HandleReader(r av.ReadCloser) {
info := r.Info()
log.Printf("HandleReader: info[%v]", info)
log.Debugf("HandleReader: info[%v]", info)
var stream *Stream
i, ok := rs.streams.Get(info.Key)
@ -55,7 +55,7 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) { @@ -55,7 +55,7 @@ func (rs *RtmpStream) HandleReader(r av.ReadCloser) {
func (rs *RtmpStream) HandleWriter(w av.WriteCloser) {
info := w.Info()
log.Printf("HandleWriter: info[%v]", info)
log.Debugf("HandleWriter: info[%v]", info)
var s *Stream
ok := rs.streams.Has(info.Key)
@ -165,26 +165,26 @@ func (s *Stream) StartStaticPush() { @@ -165,26 +165,26 @@ func (s *Stream) StartStaticPush() {
streamname := key[index+1:]
appname := dscr[0]
log.Printf("StartStaticPush: current streamname=%s, appname=%s", streamname, appname)
log.Debugf("StartStaticPush: current streamname=%s, appname=%s", streamname, appname)
pushurllist, err := rtmprelay.GetStaticPushList(appname)
if err != nil || len(pushurllist) < 1 {
log.Printf("StartStaticPush: GetStaticPushList error=%v", err)
log.Debugf("StartStaticPush: GetStaticPushList error=%v", err)
return
}
for _, pushurl := range pushurllist {
pushurl := pushurl + "/" + streamname
log.Printf("StartStaticPush: static pushurl=%s", pushurl)
log.Debugf("StartStaticPush: static pushurl=%s", pushurl)
staticpushObj := rtmprelay.GetAndCreateStaticPushObject(pushurl)
if staticpushObj != nil {
if err := staticpushObj.Start(); err != nil {
log.Printf("StartStaticPush: staticpushObj.Start %s error=%v", pushurl, err)
log.Debugf("StartStaticPush: staticpushObj.Start %s error=%v", pushurl, err)
} else {
log.Printf("StartStaticPush: staticpushObj.Start %s ok", pushurl)
log.Debugf("StartStaticPush: staticpushObj.Start %s ok", pushurl)
}
} else {
log.Printf("StartStaticPush GetStaticPushObject %s error", pushurl)
log.Debugf("StartStaticPush GetStaticPushObject %s error", pushurl)
}
}
}
@ -192,7 +192,7 @@ func (s *Stream) StartStaticPush() { @@ -192,7 +192,7 @@ func (s *Stream) StartStaticPush() {
func (s *Stream) StopStaticPush() {
key := s.info.Key
log.Printf("StopStaticPush......%s", key)
log.Debugf("StopStaticPush......%s", key)
dscr := strings.Split(key, "/")
if len(dscr) < 1 {
return
@ -206,24 +206,24 @@ func (s *Stream) StopStaticPush() { @@ -206,24 +206,24 @@ func (s *Stream) StopStaticPush() {
streamname := key[index+1:]
appname := dscr[0]
log.Printf("StopStaticPush: current streamname=%s, appname=%s", streamname, appname)
log.Debugf("StopStaticPush: current streamname=%s, appname=%s", streamname, appname)
pushurllist, err := rtmprelay.GetStaticPushList(appname)
if err != nil || len(pushurllist) < 1 {
log.Printf("StopStaticPush: GetStaticPushList error=%v", err)
log.Debugf("StopStaticPush: GetStaticPushList error=%v", err)
return
}
for _, pushurl := range pushurllist {
pushurl := pushurl + "/" + streamname
log.Printf("StopStaticPush: static pushurl=%s", pushurl)
log.Debugf("StopStaticPush: static pushurl=%s", pushurl)
staticpushObj, err := rtmprelay.GetStaticPushObject(pushurl)
if (staticpushObj != nil) && (err == nil) {
staticpushObj.Stop()
rtmprelay.ReleaseStaticPushObject(pushurl)
log.Printf("StopStaticPush: staticpushObj.Stop %s ", pushurl)
log.Debugf("StopStaticPush: staticpushObj.Stop %s ", pushurl)
} else {
log.Printf("StopStaticPush GetStaticPushObject %s error", pushurl)
log.Debugf("StopStaticPush GetStaticPushObject %s error", pushurl)
}
}
}
@ -238,10 +238,10 @@ func (s *Stream) IsSendStaticPush() bool { @@ -238,10 +238,10 @@ func (s *Stream) IsSendStaticPush() bool {
appname := dscr[0]
//log.Printf("SendStaticPush: current streamname=%s, appname=%s", streamname, appname)
//log.Debugf("SendStaticPush: current streamname=%s, appname=%s", streamname, appname)
pushurllist, err := rtmprelay.GetStaticPushList(appname)
if err != nil || len(pushurllist) < 1 {
//log.Printf("SendStaticPush: GetStaticPushList error=%v", err)
//log.Debugf("SendStaticPush: GetStaticPushList error=%v", err)
return false
}
@ -254,15 +254,15 @@ func (s *Stream) IsSendStaticPush() bool { @@ -254,15 +254,15 @@ func (s *Stream) IsSendStaticPush() bool {
for _, pushurl := range pushurllist {
pushurl := pushurl + "/" + streamname
//log.Printf("SendStaticPush: static pushurl=%s", pushurl)
//log.Debugf("SendStaticPush: static pushurl=%s", pushurl)
staticpushObj, err := rtmprelay.GetStaticPushObject(pushurl)
if (staticpushObj != nil) && (err == nil) {
return true
//staticpushObj.WriteAvPacket(&packet)
//log.Printf("SendStaticPush: WriteAvPacket %s ", pushurl)
//log.Debugf("SendStaticPush: WriteAvPacket %s ", pushurl)
} else {
log.Printf("SendStaticPush GetStaticPushObject %s error", pushurl)
log.Debugf("SendStaticPush GetStaticPushObject %s error", pushurl)
}
}
return false
@ -284,23 +284,23 @@ func (s *Stream) SendStaticPush(packet av.Packet) { @@ -284,23 +284,23 @@ func (s *Stream) SendStaticPush(packet av.Packet) {
streamname := key[index+1:]
appname := dscr[0]
//log.Printf("SendStaticPush: current streamname=%s, appname=%s", streamname, appname)
//log.Debugf("SendStaticPush: current streamname=%s, appname=%s", streamname, appname)
pushurllist, err := rtmprelay.GetStaticPushList(appname)
if err != nil || len(pushurllist) < 1 {
//log.Printf("SendStaticPush: GetStaticPushList error=%v", err)
//log.Debugf("SendStaticPush: GetStaticPushList error=%v", err)
return
}
for _, pushurl := range pushurllist {
pushurl := pushurl + "/" + streamname
//log.Printf("SendStaticPush: static pushurl=%s", pushurl)
//log.Debugf("SendStaticPush: static pushurl=%s", pushurl)
staticpushObj, err := rtmprelay.GetStaticPushObject(pushurl)
if (staticpushObj != nil) && (err == nil) {
staticpushObj.WriteAvPacket(&packet)
//log.Printf("SendStaticPush: WriteAvPacket %s ", pushurl)
//log.Debugf("SendStaticPush: WriteAvPacket %s ", pushurl)
} else {
log.Printf("SendStaticPush GetStaticPushObject %s error", pushurl)
log.Debugf("SendStaticPush GetStaticPushObject %s error", pushurl)
}
}
}
@ -309,7 +309,7 @@ func (s *Stream) TransStart() { @@ -309,7 +309,7 @@ func (s *Stream) TransStart() {
s.isStart = true
var p av.Packet
log.Printf("TransStart:%v", s.info)
log.Debugf("TransStart: %v", s.info)
s.StartStaticPush()
@ -334,9 +334,9 @@ func (s *Stream) TransStart() { @@ -334,9 +334,9 @@ func (s *Stream) TransStart() {
for item := range s.ws.IterBuffered() {
v := item.Val.(*PackWriterCloser)
if !v.init {
//log.Printf("cache.send: %v", v.w.Info())
//log.Debugf("cache.send: %v", v.w.Info())
if err = s.cache.Send(v.w); err != nil {
log.Printf("[%s] send cache packet error: %v, remove", v.w.Info(), err)
log.Debugf("[%s] send cache packet error: %v, remove", v.w.Info(), err)
s.ws.Remove(item.Key)
continue
}
@ -344,9 +344,9 @@ func (s *Stream) TransStart() { @@ -344,9 +344,9 @@ func (s *Stream) TransStart() {
} else {
new_packet := p
//writeType := reflect.TypeOf(v.w)
//log.Printf("w.Write: type=%v, %v", writeType, v.w.Info())
//log.Debugf("w.Write: type=%v, %v", writeType, v.w.Info())
if err = v.w.Write(&new_packet); err != nil {
log.Printf("[%s] write packet error: %v, remove", v.w.Info(), err)
log.Debugf("[%s] write packet error: %v, remove", v.w.Info(), err)
s.ws.Remove(item.Key)
}
}
@ -355,10 +355,10 @@ func (s *Stream) TransStart() { @@ -355,10 +355,10 @@ func (s *Stream) TransStart() {
}
func (s *Stream) TransStop() {
log.Printf("TransStop: %s", s.info.Key)
log.Debugf("TransStop: %s", s.info.Key)
if s.isStart && s.r != nil {
s.r.Close(errors.New("stop old"))
s.r.Close(fmt.Errorf("stop old"))
}
s.isStart = false
@ -369,7 +369,7 @@ func (s *Stream) CheckAlive() (n int) { @@ -369,7 +369,7 @@ func (s *Stream) CheckAlive() (n int) {
if s.r.Alive() {
n++
} else {
s.r.Close(errors.New("read timeout"))
s.r.Close(fmt.Errorf("read timeout"))
}
}
for item := range s.ws.IterBuffered() {
@ -377,7 +377,7 @@ func (s *Stream) CheckAlive() (n int) { @@ -377,7 +377,7 @@ func (s *Stream) CheckAlive() (n int) {
if v.w != nil {
if !v.w.Alive() && s.isStart {
s.ws.Remove(item.Key)
v.w.Close(errors.New("write timeout"))
v.w.Close(fmt.Errorf("write timeout"))
continue
}
n++
@ -390,18 +390,17 @@ func (s *Stream) CheckAlive() (n int) { @@ -390,18 +390,17 @@ func (s *Stream) CheckAlive() (n int) {
func (s *Stream) closeInter() {
if s.r != nil {
s.StopStaticPush()
log.Printf("[%v] publisher closed", s.r.Info())
log.Debugf("[%v] publisher closed", s.r.Info())
}
for item := range s.ws.IterBuffered() {
v := item.Val.(*PackWriterCloser)
if v.w != nil {
if v.w.Info().IsInterval() {
v.w.Close(errors.New("closed"))
v.w.Close(fmt.Errorf("closed"))
s.ws.Remove(item.Key)
log.Printf("[%v] player closed and remove\n", v.w.Info())
log.Debugf("[%v] player closed and remove\n", v.w.Info())
}
}
}
}

Loading…
Cancel
Save