Browse Source

fix: split to server and writer

pull/6/head
halwu(吴浩麟) 8 years ago
parent
commit
74eb057ce7
  1. 271
      protocol/httpflv/http_flv.go
  2. 104
      protocol/httpflv/server.go
  3. 173
      protocol/httpflv/writer.go

271
protocol/httpflv/http_flv.go

@ -1,271 +0,0 @@ @@ -1,271 +0,0 @@
package httpflv
import (
"encoding/json"
"net"
"net/http"
"strings"
"time"
"errors"
"github.com/gwuhaolin/livego/utils/uid"
"github.com/gwuhaolin/livego/protocol/amf"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/utils/pio"
"log"
"github.com/gwuhaolin/livego/protocol/rtmp"
)
type Server struct {
handler av.Handler
}
type stream struct {
Key string `json:"key"`
Id string `json:"id"`
}
type streams struct {
Publishers []stream `json:"publishers"`
Players []stream `json:"players"`
}
func NewServer(h av.Handler) *Server {
return &Server{
handler: h,
}
}
func (self *Server) Serve(l net.Listener) error {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
self.handleConn(w, r)
})
mux.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) {
self.getStream(w, r)
})
http.Serve(l, mux)
return nil
}
func (s *Server) getStream(w http.ResponseWriter, r *http.Request) {
rtmpStream := s.handler.(*rtmp.RtmpStream)
if rtmpStream == nil {
return
}
msgs := new(streams)
for item := range rtmpStream.GetStreams().IterBuffered() {
if s, ok := item.Val.(*rtmp.Stream); ok {
if s.GetReader() != nil {
msg := stream{item.Key, s.GetReader().Info().UID}
msgs.Publishers = append(msgs.Publishers, msg)
}
}
}
for item := range rtmpStream.GetStreams().IterBuffered() {
ws := item.Val.(*rtmp.Stream).GetWs()
for s := range ws.IterBuffered() {
if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok {
if pw.GetWriter() != nil {
msg := stream{item.Key, pw.GetWriter().Info().UID}
msgs.Players = append(msgs.Players, msg)
}
}
}
}
resp, _ := json.Marshal(msgs)
w.Header().Set("Content-Type", "application/json")
w.Write(resp)
}
func (self *Server) handleConn(w http.ResponseWriter, r *http.Request) {
defer func() {
if r := recover(); r != nil {
log.Println("http flv handleConn panic: ", r)
}
}()
url := r.URL.String()
u := r.URL.Path
if pos := strings.LastIndex(u, "."); pos < 0 || u[pos:] != ".flv" {
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv")
paths := strings.SplitN(path, "/", 2)
log.Println("url:", u, "path:", path, "paths:", paths)
if len(paths) != 2 {
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
writer := NewFLVWriter(paths[0], paths[1], url, w)
self.handler.HandleWriter(writer)
writer.Wait()
}
const (
headerLen = 11
maxQueueNum = 1024
)
type FLVWriter struct {
Uid string
av.RWBaser
app, title, url string
buf []byte
closed bool
closedChan chan struct{}
ctx http.ResponseWriter
packetQueue chan av.Packet
}
func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
ret := &FLVWriter{
Uid: uid.NewId(),
app: app,
title: title,
url: url,
ctx: ctx,
RWBaser: av.NewRWBaser(time.Second * 10),
closedChan: make(chan struct{}),
buf: make([]byte, headerLen),
packetQueue: make(chan av.Packet, maxQueueNum),
}
ret.ctx.Write([]byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09})
pio.PutI32BE(ret.buf[:4], 0)
ret.ctx.Write(ret.buf[:4])
go func() {
err := ret.SendPacket()
if err != nil {
log.Println("SendPacket error:", err)
ret.closed = true
}
}()
return ret
}
func (self *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
if ok && tmpPkt.IsVideo {
videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader)
// dont't drop sps config and dont't drop key frame
if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) {
log.Println("insert keyframe to queue")
pktQue <- tmpPkt
}
if len(pktQue) > maxQueueNum-10 {
<-pktQue
}
// drop other packet
<-pktQue
}
// try to don't drop audio
if ok && tmpPkt.IsAudio {
log.Println("insert audio to queue")
pktQue <- tmpPkt
}
}
log.Println("packet queue len: ", len(pktQue))
}
func (self *FLVWriter) Write(p av.Packet) error {
if !self.closed {
if len(self.packetQueue) >= maxQueueNum-24 {
self.DropPacket(self.packetQueue, self.Info())
} else {
self.packetQueue <- p
}
return nil
} else {
return errors.New("closed")
}
}
// func (self *FLVWriter) Write(p av.Packet) error {
func (self *FLVWriter) SendPacket() error {
for {
p, ok := <-self.packetQueue
if ok {
self.RWBaser.SetPreTime()
h := self.buf[:headerLen]
typeID := av.TAG_VIDEO
if !p.IsVideo {
if p.IsMetadata {
var err error
typeID = av.TAG_SCRIPTDATAAMF0
p.Data, err = amf.MetaDataReform(p.Data, amf.DEL)
if err != nil {
return err
}
} else {
typeID = av.TAG_AUDIO
}
}
dataLen := len(p.Data)
timestamp := p.TimeStamp
timestamp += self.BaseTimeStamp()
self.RWBaser.RecTimeStamp(timestamp, uint32(typeID))
preDataLen := dataLen + headerLen
timestampbase := timestamp & 0xffffff
timestampExt := timestamp >> 24 & 0xff
pio.PutU8(h[0:1], uint8(typeID))
pio.PutI24BE(h[1:4], int32(dataLen))
pio.PutI24BE(h[4:7], int32(timestampbase))
pio.PutU8(h[7:8], uint8(timestampExt))
if _, err := self.ctx.Write(h); err != nil {
return err
}
if _, err := self.ctx.Write(p.Data); err != nil {
return err
}
pio.PutI32BE(h[:4], int32(preDataLen))
if _, err := self.ctx.Write(h[:4]); err != nil {
return err
}
} else {
return errors.New("closed")
}
}
return nil
}
func (self *FLVWriter) Wait() {
select {
case <-self.closedChan:
return
}
}
func (self *FLVWriter) Close(error) {
log.Println("http flv closed")
if !self.closed {
close(self.packetQueue)
close(self.closedChan)
}
self.closed = true
}
func (self *FLVWriter) Info() (ret av.Info) {
ret.UID = self.Uid
ret.URL = self.url
ret.Key = self.app + "/" + self.title
ret.Inter = true
return
}

104
protocol/httpflv/server.go

@ -0,0 +1,104 @@ @@ -0,0 +1,104 @@
package httpflv
import (
"encoding/json"
"strings"
"net"
"net/http"
"log"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/protocol/rtmp"
)
type Server struct {
handler av.Handler
}
type stream struct {
Key string `json:"key"`
Id string `json:"id"`
}
type streams struct {
Publishers []stream `json:"publishers"`
Players []stream `json:"players"`
}
func NewServer(h av.Handler) *Server {
return &Server{
handler: h,
}
}
func (server *Server) Serve(l net.Listener) error {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
server.handleConn(w, r)
})
mux.HandleFunc("/streams", func(w http.ResponseWriter, r *http.Request) {
server.getStream(w, r)
})
http.Serve(l, mux)
return nil
}
func (server *Server) getStream(w http.ResponseWriter, r *http.Request) {
rtmpStream := server.handler.(*rtmp.RtmpStream)
if rtmpStream == nil {
return
}
msgs := new(streams)
for item := range rtmpStream.GetStreams().IterBuffered() {
if s, ok := item.Val.(*rtmp.Stream); ok {
if s.GetReader() != nil {
msg := stream{item.Key, s.GetReader().Info().UID}
msgs.Publishers = append(msgs.Publishers, msg)
}
}
}
for item := range rtmpStream.GetStreams().IterBuffered() {
ws := item.Val.(*rtmp.Stream).GetWs()
for s := range ws.IterBuffered() {
if pw, ok := s.Val.(*rtmp.PackWriterCloser); ok {
if pw.GetWriter() != nil {
msg := stream{item.Key, pw.GetWriter().Info().UID}
msgs.Players = append(msgs.Players, msg)
}
}
}
}
resp, _ := json.Marshal(msgs)
w.Header().Set("Content-Type", "application/json")
w.Write(resp)
}
func (server *Server) handleConn(w http.ResponseWriter, r *http.Request) {
defer func() {
if r := recover(); r != nil {
log.Println("http flv handleConn panic: ", r)
}
}()
url := r.URL.String()
u := r.URL.Path
if pos := strings.LastIndex(u, "."); pos < 0 || u[pos:] != ".flv" {
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
path := strings.TrimSuffix(strings.TrimLeft(u, "/"), ".flv")
paths := strings.SplitN(path, "/", 2)
log.Println("url:", u, "path:", path, "paths:", paths)
if len(paths) != 2 {
http.Error(w, "invalid path", http.StatusBadRequest)
return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
writer := NewFLVWriter(paths[0], paths[1], url, w)
server.handler.HandleWriter(writer)
writer.Wait()
}

173
protocol/httpflv/writer.go

@ -0,0 +1,173 @@ @@ -0,0 +1,173 @@
package httpflv
import (
"net/http"
"time"
"errors"
"log"
"github.com/gwuhaolin/livego/utils/uid"
"github.com/gwuhaolin/livego/protocol/amf"
"github.com/gwuhaolin/livego/av"
"github.com/gwuhaolin/livego/utils/pio"
)
const (
headerLen = 11
maxQueueNum = 1024
)
type FLVWriter struct {
Uid string
av.RWBaser
app, title, url string
buf []byte
closed bool
closedChan chan struct{}
ctx http.ResponseWriter
packetQueue chan av.Packet
}
func NewFLVWriter(app, title, url string, ctx http.ResponseWriter) *FLVWriter {
ret := &FLVWriter{
Uid: uid.NewId(),
app: app,
title: title,
url: url,
ctx: ctx,
RWBaser: av.NewRWBaser(time.Second * 10),
closedChan: make(chan struct{}),
buf: make([]byte, headerLen),
packetQueue: make(chan av.Packet, maxQueueNum),
}
ret.ctx.Write([]byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0x00, 0x00, 0x00, 0x09})
pio.PutI32BE(ret.buf[:4], 0)
ret.ctx.Write(ret.buf[:4])
go func() {
err := ret.SendPacket()
if err != nil {
log.Println("SendPacket error:", err)
ret.closed = true
}
}()
return ret
}
func (flvWriter *FLVWriter) DropPacket(pktQue chan av.Packet, info av.Info) {
log.Printf("[%v] packet queue max!!!", info)
for i := 0; i < maxQueueNum-84; i++ {
tmpPkt, ok := <-pktQue
if ok && tmpPkt.IsVideo {
videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader)
// dont't drop sps config and dont't drop key frame
if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) {
log.Println("insert keyframe to queue")
pktQue <- tmpPkt
}
if len(pktQue) > maxQueueNum-10 {
<-pktQue
}
// drop other packet
<-pktQue
}
// try to don't drop audio
if ok && tmpPkt.IsAudio {
log.Println("insert audio to queue")
pktQue <- tmpPkt
}
}
log.Println("packet queue len: ", len(pktQue))
}
func (flvWriter *FLVWriter) Write(p av.Packet) error {
if !flvWriter.closed {
if len(flvWriter.packetQueue) >= maxQueueNum-24 {
flvWriter.DropPacket(flvWriter.packetQueue, flvWriter.Info())
} else {
flvWriter.packetQueue <- p
}
return nil
} else {
return errors.New("closed")
}
}
func (flvWriter *FLVWriter) SendPacket() error {
for {
p, ok := <-flvWriter.packetQueue
if ok {
flvWriter.RWBaser.SetPreTime()
h := flvWriter.buf[:headerLen]
typeID := av.TAG_VIDEO
if !p.IsVideo {
if p.IsMetadata {
var err error
typeID = av.TAG_SCRIPTDATAAMF0
p.Data, err = amf.MetaDataReform(p.Data, amf.DEL)
if err != nil {
return err
}
} else {
typeID = av.TAG_AUDIO
}
}
dataLen := len(p.Data)
timestamp := p.TimeStamp
timestamp += flvWriter.BaseTimeStamp()
flvWriter.RWBaser.RecTimeStamp(timestamp, uint32(typeID))
preDataLen := dataLen + headerLen
timestampbase := timestamp & 0xffffff
timestampExt := timestamp >> 24 & 0xff
pio.PutU8(h[0:1], uint8(typeID))
pio.PutI24BE(h[1:4], int32(dataLen))
pio.PutI24BE(h[4:7], int32(timestampbase))
pio.PutU8(h[7:8], uint8(timestampExt))
if _, err := flvWriter.ctx.Write(h); err != nil {
return err
}
if _, err := flvWriter.ctx.Write(p.Data); err != nil {
return err
}
pio.PutI32BE(h[:4], int32(preDataLen))
if _, err := flvWriter.ctx.Write(h[:4]); err != nil {
return err
}
} else {
return errors.New("closed")
}
}
return nil
}
func (flvWriter *FLVWriter) Wait() {
select {
case <-flvWriter.closedChan:
return
}
}
func (flvWriter *FLVWriter) Close(error) {
log.Println("http flv closed")
if !flvWriter.closed {
close(flvWriter.packetQueue)
close(flvWriter.closedChan)
}
flvWriter.closed = true
}
func (flvWriter *FLVWriter) Info() (ret av.Info) {
ret.UID = flvWriter.Uid
ret.URL = flvWriter.url
ret.Key = flvWriter.app + "/" + flvWriter.title
ret.Inter = true
return
}
Loading…
Cancel
Save