From 52f26b5299c277fa5eb86a5e6bd0f150c9d5ce57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?halwu=28=E5=90=B4=E6=B5=A9=E9=BA=9F=29?= Date: Mon, 29 May 2017 19:37:19 +0800 Subject: [PATCH] feat: add WebSocketFlv support --- main.go | 34 ++++++++++--- protocol/httpflv/writer.go | 6 +-- protocol/websocketflv/server.go | 84 +++++++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+), 9 deletions(-) create mode 100644 protocol/websocketflv/server.go diff --git a/main.go b/main.go index bf03611..c0fe707 100755 --- a/main.go +++ b/main.go @@ -9,16 +9,19 @@ import ( "github.com/gwuhaolin/livego/protocol/hls" "github.com/gwuhaolin/livego/protocol/httpflv" "github.com/gwuhaolin/livego/protocol/httpopera" + "github.com/gwuhaolin/livego/protocol/websocketflv" ) var ( - rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address") - operaAddr = flag.String("manage-addr", ":8080", "HTTP manage interface server listen address") - flvAddr = flag.String("flv-addr", ":7001", "HTTP-FLV server listen address") - hlsAddr = flag.String("hls-addr", ":7002", "HLS server listen address") + rtmpAddr = flag.String("rtmp-addr", ":1935", "RTMP server listen address") + httpFlvAddr = flag.String("http-flv-addr", ":7001", "HTTP-FLV server listen address") + websocketFlvAddr = flag.String("websocket-flv-addr", ":7002", "HTTP-FLV server listen address") + hlsAddr = flag.String("hls-addr", ":7003", "HLS server listen address") + operaAddr = flag.String("manage-addr", ":8080", "HTTP manage interface server listen address") ) func init() { + log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) flag.Parse() } @@ -58,7 +61,7 @@ func startRtmp(stream *rtmp.RtmpStream, hlsServer *hls.Server) { } func startHTTPFlv(stream *rtmp.RtmpStream) { - flvListen, err := net.Listen("tcp", *flvAddr) + flvListen, err := net.Listen("tcp", *httpFlvAddr) if err != nil { log.Fatal(err) } @@ -70,7 +73,25 @@ func startHTTPFlv(stream *rtmp.RtmpStream) { log.Println("HTTP-FLV server panic: ", r) } }() - log.Println("HTTP-FLV Listen On", *flvAddr) + log.Println("HTTP-FLV Listen On", *httpFlvAddr) + hdlServer.Serve(flvListen) + }() +} + +func startWebSocketFlv(stream *rtmp.RtmpStream) { + flvListen, err := net.Listen("tcp", *websocketFlvAddr) + if err != nil { + log.Fatal(err) + } + + hdlServer := websocketflv.NewServer(stream) + go func() { + defer func() { + if r := recover(); r != nil { + log.Println("WebSocket-FLV server panic: ", r) + } + }() + log.Println("WebSocket-FLV Listen On", *websocketFlvAddr) hdlServer.Serve(flvListen) }() } @@ -105,6 +126,7 @@ func main() { stream := rtmp.NewRtmpStream() hlsServer := startHls() startHTTPFlv(stream) + startWebSocketFlv(stream) //startHTTPOpera(stream) startRtmp(stream, hlsServer) } diff --git a/protocol/httpflv/writer.go b/protocol/httpflv/writer.go index 355f57e..5920dfa 100755 --- a/protocol/httpflv/writer.go +++ b/protocol/httpflv/writer.go @@ -1,7 +1,7 @@ package httpflv import ( - "net/http" + "io" "time" "errors" "log" @@ -23,11 +23,11 @@ type FLVWriter struct { buf []byte closed bool closedChan chan struct{} - ctx http.ResponseWriter + ctx io.Writer packetQueue chan av.Packet } -func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter { +func NewFLVWriter(app, title, url string, ctx io.Writer) *FLVWriter { ret := &FLVWriter{ Uid: uid.NewId(), app: app, diff --git a/protocol/websocketflv/server.go b/protocol/websocketflv/server.go new file mode 100644 index 0000000..e6213a8 --- /dev/null +++ b/protocol/websocketflv/server.go @@ -0,0 +1,84 @@ +package websocketflv + +import ( + "strings" + "net" + "net/http" + "log" + "github.com/gwuhaolin/livego/av" + "github.com/gwuhaolin/livego/protocol/httpflv" + "github.com/gorilla/websocket" + "io" +) + +type Server struct { + handler av.Handler +} + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +func NewServer(h av.Handler) *Server { + return &Server{ + handler: h, + } +} + +func (server *Server) Serve(listener net.Listener) error { + mux := http.NewServeMux() + mux.HandleFunc("/", server.handleConn) + http.Serve(listener, mux) + return nil +} + +func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) { + defer func() { + if r := recover(); r != nil { + log.Println("websocket flv handleConn panic: ", r) + } + }() + + url := r.URL.String() + u := r.URL.Path + if pos := strings.LastIndex(u, "."); pos < 0 || u[pos:] != ".flv" { + http.Error(w, "invalid path", http.StatusBadRequest) + return + } + path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv") + paths := strings.SplitN(path, "/", 2) + if len(paths) != 2 { + http.Error(w, "invalid path", http.StatusBadRequest) + return + } + log.Println("url:", u, "path:", path, "paths:", paths) + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + conn.Close() + return + } + + writer, err := conn.NextWriter(websocket.BinaryMessage) + if err != nil { + conn.Close() + return + } + + flvWriter := httpflv.NewFLVWriter(paths[0], paths[1], url, &Writer{writer }) + server.handler.HandleWriter(flvWriter) + flvWriter.Wait() +} + +type Writer struct { + writer io.WriteCloser +} + +func (writer *Writer) Write(p []byte) (n int, err error) { + n, err = writer.writer.Write(p) + writer.writer.Close() + return +}