package rtmp import ( "errors" "flag" "fmt" "log" "net" "net/url" "reflect" "strings" "time" "livego/av" "livego/configure" "livego/container/flv" "livego/protocol/rtmp/core" "livego/utils/uid" ) const ( maxQueueNum = 1024 SAVE_STATICS_INTERVAL = 5000 ) var ( readTimeout = flag.Int("readTimeout", 10, "read time out") writeTimeout = flag.Int("writeTimeout", 10, "write time out") ) type Client struct { handler av.Handler getter av.GetWriter } func NewRtmpClient(h av.Handler, getter av.GetWriter) *Client { return &Client{ handler: h, getter: getter, } } func (c *Client) Dial(url string, method string) error { connClient := core.NewConnClient() if err := connClient.Start(url, method); err != nil { return err } if method == av.PUBLISH { writer := NewVirWriter(connClient) log.Printf("client Dial call NewVirWriter url=%s, method=%s", url, method) c.handler.HandleWriter(writer) } else if method == av.PLAY { reader := NewVirReader(connClient) log.Printf("client Dial call NewVirReader url=%s, method=%s", url, method) c.handler.HandleReader(reader) if c.getter != nil { writer := c.getter.GetWriter(reader.Info()) c.handler.HandleWriter(writer) } } return nil } func (c *Client) GetHandle() av.Handler { return c.handler } type Server struct { handler av.Handler getter av.GetWriter } func NewRtmpServer(h av.Handler, getter av.GetWriter) *Server { return &Server{ handler: h, getter: getter, } } func (s *Server) Serve(listener net.Listener) (err error) { defer func() { if r := recover(); r != nil { log.Println("rtmp serve panic: ", r) } }() for { var netconn net.Conn netconn, err = listener.Accept() if err != nil { return } conn := core.NewConn(netconn, 4*1024) log.Println("new client, connect remote:", conn.RemoteAddr().String(), "local:", conn.LocalAddr().String()) go s.handleConn(conn) } } func (s *Server) handleConn(conn *core.Conn) error { if err := conn.HandshakeServer(); err != nil { conn.Close() log.Println("handleConn HandshakeServer err:", err) return err } connServer := core.NewConnServer(conn) if err := connServer.ReadMsg(); err != nil { conn.Close() log.Println("handleConn read msg err:", err) return err } appname, _, _ := connServer.GetInfo() if ret := configure.CheckAppName(appname); !ret { err := errors.New("application name=%s is not configured") conn.Close() log.Println("CheckAppName err:", err) return err } log.Printf("handleConn: IsPublisher=%v", connServer.IsPublisher()) if connServer.IsPublisher() { if pushlist, ret := configure.GetStaticPushUrlList(appname); ret && (pushlist != nil) { log.Printf("GetStaticPushUrlList: %v", pushlist) } reader := NewVirReader(connServer) s.handler.HandleReader(reader) log.Printf("new publisher: %+v", reader.Info()) if s.getter != nil { writeType := reflect.TypeOf(s.getter) log.Printf("handleConn:writeType=%v", writeType) writer := s.getter.GetWriter(reader.Info()) s.handler.HandleWriter(writer) } } else { writer := NewVirWriter(connServer) log.Printf("new player: %+v", writer.Info()) s.handler.HandleWriter(writer) } return nil } type GetInFo interface { GetInfo() (string, string, string) } type StreamReadWriteCloser interface { GetInFo Close(error) Write(core.ChunkStream) error Read(c *core.ChunkStream) error } type StaticsBW struct { StreamId uint32 VideoDatainBytes uint64 LastVideoDatainBytes uint64 VideoSpeedInBytesperMS uint64 AudioDatainBytes uint64 LastAudioDatainBytes uint64 AudioSpeedInBytesperMS uint64 LastTimestamp int64 } type VirWriter struct { Uid string closed bool av.RWBaser conn StreamReadWriteCloser packetQueue chan *av.Packet WriteBWInfo StaticsBW } func NewVirWriter(conn StreamReadWriteCloser) *VirWriter { ret := &VirWriter{ Uid: uid.NewId(), conn: conn, RWBaser: av.NewRWBaser(time.Second * time.Duration(*writeTimeout)), packetQueue: make(chan *av.Packet, maxQueueNum), WriteBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0}, } go ret.Check() go func() { err := ret.SendPacket() if err != nil { log.Println(err) } }() return ret } func (v *VirWriter) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) { nowInMS := int64(time.Now().UnixNano() / 1e6) v.WriteBWInfo.StreamId = streamid if isVideoFlag { v.WriteBWInfo.VideoDatainBytes = v.WriteBWInfo.VideoDatainBytes + length } else { v.WriteBWInfo.AudioDatainBytes = v.WriteBWInfo.AudioDatainBytes + length } if v.WriteBWInfo.LastTimestamp == 0 { v.WriteBWInfo.LastTimestamp = nowInMS } else if (nowInMS - v.WriteBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL { diffTimestamp := (nowInMS - v.WriteBWInfo.LastTimestamp) / 1000 v.WriteBWInfo.VideoSpeedInBytesperMS = (v.WriteBWInfo.VideoDatainBytes - v.WriteBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000 v.WriteBWInfo.AudioSpeedInBytesperMS = (v.WriteBWInfo.AudioDatainBytes - v.WriteBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000 v.WriteBWInfo.LastVideoDatainBytes = v.WriteBWInfo.VideoDatainBytes v.WriteBWInfo.LastAudioDatainBytes = v.WriteBWInfo.AudioDatainBytes v.WriteBWInfo.LastTimestamp = nowInMS } } func (v *VirWriter) Check() { var c core.ChunkStream for { if err := v.conn.Read(&c); err != nil { v.Close(err) return } } } func (v *VirWriter) DropPacket(pktQue chan *av.Packet, info av.Info) { log.Printf("[%v] packet queue max!!!", info) for i := 0; i < maxQueueNum-84; i++ { tmpPkt, ok := <-pktQue // try to don't drop audio if ok && tmpPkt.IsAudio { if len(pktQue) > maxQueueNum-2 { log.Println("drop audio pkt") <-pktQue } else { pktQue <- tmpPkt } } if ok && tmpPkt.IsVideo { videoPkt, ok := tmpPkt.Header.(av.VideoPacketHeader) // dont't drop sps config and dont't drop key frame if ok && (videoPkt.IsSeq() || videoPkt.IsKeyFrame()) { pktQue <- tmpPkt } if len(pktQue) > maxQueueNum-10 { log.Println("drop video pkt") <-pktQue } } } log.Println("packet queue len: ", len(pktQue)) } // func (v *VirWriter) Write(p *av.Packet) (err error) { err = nil if v.closed { err = errors.New("VirWriter closed") return } defer func() { if e := recover(); e != nil { errString := fmt.Sprintf("VirWriter has already been closed:%v", e) err = errors.New(errString) } }() if len(v.packetQueue) >= maxQueueNum-24 { v.DropPacket(v.packetQueue, v.Info()) } else { v.packetQueue <- p } return } func (v *VirWriter) SendPacket() error { Flush := reflect.ValueOf(v.conn).MethodByName("Flush") var cs core.ChunkStream for { p, ok := <-v.packetQueue if ok { cs.Data = p.Data cs.Length = uint32(len(p.Data)) cs.StreamID = p.StreamID cs.Timestamp = p.TimeStamp cs.Timestamp += v.BaseTimeStamp() if p.IsVideo { cs.TypeID = av.TAG_VIDEO } else { if p.IsMetadata { cs.TypeID = av.TAG_SCRIPTDATAAMF0 } else { cs.TypeID = av.TAG_AUDIO } } v.SaveStatics(p.StreamID, uint64(cs.Length), p.IsVideo) v.SetPreTime() v.RecTimeStamp(cs.Timestamp, cs.TypeID) err := v.conn.Write(cs) if err != nil { v.closed = true return err } Flush.Call(nil) } else { return errors.New("closed") } } return nil } func (v *VirWriter) Info() (ret av.Info) { ret.UID = v.Uid _, _, URL := v.conn.GetInfo() ret.URL = URL _url, err := url.Parse(URL) if err != nil { log.Println(err) } ret.Key = strings.TrimLeft(_url.Path, "/") ret.Inter = true return } func (v *VirWriter) Close(err error) { log.Println("player ", v.Info(), "closed: "+err.Error()) if !v.closed { close(v.packetQueue) } v.closed = true v.conn.Close(err) } type VirReader struct { Uid string av.RWBaser demuxer *flv.Demuxer conn StreamReadWriteCloser ReadBWInfo StaticsBW } func NewVirReader(conn StreamReadWriteCloser) *VirReader { return &VirReader{ Uid: uid.NewId(), conn: conn, RWBaser: av.NewRWBaser(time.Second * time.Duration(*writeTimeout)), demuxer: flv.NewDemuxer(), ReadBWInfo: StaticsBW{0, 0, 0, 0, 0, 0, 0, 0}, } } func (v *VirReader) SaveStatics(streamid uint32, length uint64, isVideoFlag bool) { nowInMS := int64(time.Now().UnixNano() / 1e6) v.ReadBWInfo.StreamId = streamid if isVideoFlag { v.ReadBWInfo.VideoDatainBytes = v.ReadBWInfo.VideoDatainBytes + length } else { v.ReadBWInfo.AudioDatainBytes = v.ReadBWInfo.AudioDatainBytes + length } if v.ReadBWInfo.LastTimestamp == 0 { v.ReadBWInfo.LastTimestamp = nowInMS } else if (nowInMS - v.ReadBWInfo.LastTimestamp) >= SAVE_STATICS_INTERVAL { diffTimestamp := (nowInMS - v.ReadBWInfo.LastTimestamp) / 1000 //log.Printf("now=%d, last=%d, diff=%d", nowInMS, v.ReadBWInfo.LastTimestamp, diffTimestamp) v.ReadBWInfo.VideoSpeedInBytesperMS = (v.ReadBWInfo.VideoDatainBytes - v.ReadBWInfo.LastVideoDatainBytes) * 8 / uint64(diffTimestamp) / 1000 v.ReadBWInfo.AudioSpeedInBytesperMS = (v.ReadBWInfo.AudioDatainBytes - v.ReadBWInfo.LastAudioDatainBytes) * 8 / uint64(diffTimestamp) / 1000 v.ReadBWInfo.LastVideoDatainBytes = v.ReadBWInfo.VideoDatainBytes v.ReadBWInfo.LastAudioDatainBytes = v.ReadBWInfo.AudioDatainBytes v.ReadBWInfo.LastTimestamp = nowInMS } } func (v *VirReader) Read(p *av.Packet) (err error) { defer func() { if r := recover(); r != nil { log.Println("rtmp read packet panic: ", r) } }() v.SetPreTime() var cs core.ChunkStream for { err = v.conn.Read(&cs) if err != nil { return err } if cs.TypeID == av.TAG_AUDIO || cs.TypeID == av.TAG_VIDEO || cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3 { break } } p.IsAudio = cs.TypeID == av.TAG_AUDIO p.IsVideo = cs.TypeID == av.TAG_VIDEO p.IsMetadata = cs.TypeID == av.TAG_SCRIPTDATAAMF0 || cs.TypeID == av.TAG_SCRIPTDATAAMF3 p.StreamID = cs.StreamID p.Data = cs.Data p.TimeStamp = cs.Timestamp v.SaveStatics(p.StreamID, uint64(len(p.Data)), p.IsVideo) v.demuxer.DemuxH(p) return err } func (v *VirReader) Info() (ret av.Info) { ret.UID = v.Uid _, _, URL := v.conn.GetInfo() ret.URL = URL _url, err := url.Parse(URL) if err != nil { log.Println(err) } ret.Key = strings.TrimLeft(_url.Path, "/") return } func (v *VirReader) Close(err error) { log.Println("publisher ", v.Info(), "closed: "+err.Error()) v.conn.Close(err) }