diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 120ff966..ec7587f5 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -127,15 +127,15 @@ func (c *rtmpConn) ID() string { // RemoteAddr returns the remote address of the Conn. func (c *rtmpConn) RemoteAddr() net.Addr { - return c.conn.NetConn().RemoteAddr() + return c.conn.RemoteAddr() } func (c *rtmpConn) log(level logger.Level, format string, args ...interface{}) { - c.parent.log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...) + c.parent.log(level, "[conn %v] "+format, append([]interface{}{c.conn.RemoteAddr()}, args...)...) } func (c *rtmpConn) ip() net.IP { - return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP + return c.conn.RemoteAddr().(*net.TCPAddr).IP } func (c *rtmpConn) safeState() gortsplib.ServerSessionState { @@ -197,11 +197,11 @@ func (c *rtmpConn) run() { func (c *rtmpConn) runInner(ctx context.Context) error { go func() { <-ctx.Done() - c.conn.NetConn().Close() + c.conn.Close() }() - c.conn.NetConn().SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) - c.conn.NetConn().SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + c.conn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) + c.conn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) err := c.conn.ServerHandshake() if err != nil { return err @@ -277,7 +277,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { return fmt.Errorf("the stream doesn't contain an H264 track or an AAC track") } - c.conn.NetConn().SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + c.conn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) c.conn.WriteMetadata(videoTrack, audioTrack) c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount)) @@ -308,7 +308,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { } // disable read deadline - c.conn.NetConn().SetReadDeadline(time.Time{}) + c.conn.SetReadDeadline(time.Time{}) var videoStartPTS time.Duration var videoDTSEst *h264.DTSEstimator @@ -379,7 +379,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { pts -= videoStartPTS dts := videoDTSEst.Feed(pts) - c.conn.NetConn().SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + c.conn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) err = c.conn.WritePacket(av.Packet{ Type: av.H264, Data: data, @@ -415,7 +415,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { } for _, au := range aus { - c.conn.NetConn().SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + c.conn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) err := c.conn.WritePacket(av.Packet{ Type: av.AAC, Data: au, @@ -432,7 +432,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { } func (c *rtmpConn) runPublish(ctx context.Context) error { - c.conn.NetConn().SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) + c.conn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) videoTrack, audioTrack, err := c.conn.ReadMetadata() if err != nil { return err @@ -488,7 +488,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { c.stateMutex.Unlock() // disable write deadline - c.conn.NetConn().SetWriteDeadline(time.Time{}) + c.conn.SetWriteDeadline(time.Time{}) rres := c.path.onPublisherRecord(pathPublisherRecordReq{ Author: c, @@ -507,7 +507,7 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { } for { - c.conn.NetConn().SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) + c.conn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) pkt, err := c.conn.ReadPacket() if err != nil { return err diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index c2698771..8aebaff3 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -115,16 +115,16 @@ func (s *rtmpSource) runInner() bool { readDone := make(chan error) go func() { readDone <- func() error { - conn.NetConn().SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) - conn.NetConn().SetWriteDeadline(time.Now().Add(time.Duration(s.writeTimeout))) + conn.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) + conn.SetWriteDeadline(time.Now().Add(time.Duration(s.writeTimeout))) err = conn.ClientHandshake() if err != nil { return err } - conn.NetConn().SetWriteDeadline(time.Time{}) + conn.SetWriteDeadline(time.Time{}) - conn.NetConn().SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) + conn.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) videoTrack, audioTrack, err := conn.ReadMetadata() if err != nil { return err @@ -172,7 +172,7 @@ func (s *rtmpSource) runInner() bool { } for { - conn.NetConn().SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) + conn.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) pkt, err := conn.ReadPacket() if err != nil { return err @@ -248,11 +248,11 @@ func (s *rtmpSource) runInner() bool { select { case err := <-readDone: - conn.NetConn().Close() + conn.Close() return err case <-innerCtx.Done(): - conn.NetConn().Close() + conn.Close() <-readDone return nil } diff --git a/internal/rtmp/conn.go b/internal/rtmp/conn.go index 83bbf461..8dbce2af 100644 --- a/internal/rtmp/conn.go +++ b/internal/rtmp/conn.go @@ -3,6 +3,7 @@ package rtmp import ( "net" "net/url" + "time" "github.com/notedit/rtmp/av" "github.com/notedit/rtmp/format/rtmp" @@ -14,9 +15,24 @@ type Conn struct { nconn net.Conn } -// NetConn returns the underlying net.Conn. -func (c *Conn) NetConn() net.Conn { - return c.nconn +// Close closes the connection. +func (c *Conn) Close() error { + return c.nconn.Close() +} + +// SetReadDeadline sets the read deadline. +func (c *Conn) SetReadDeadline(t time.Time) error { + return c.nconn.SetReadDeadline(t) +} + +// SetWriteDeadline sets the write deadline. +func (c *Conn) SetWriteDeadline(t time.Time) error { + return c.nconn.SetWriteDeadline(t) +} + +// RemoteAddr returns the remote network address. +func (c *Conn) RemoteAddr() net.Addr { + return c.nconn.RemoteAddr() } // IsPublishing returns whether the connection is publishing. diff --git a/internal/rtmp/metadata.go b/internal/rtmp/metadata.go index 09127677..92235f71 100644 --- a/internal/rtmp/metadata.go +++ b/internal/rtmp/metadata.go @@ -15,7 +15,7 @@ const ( codecAAC = 10 ) -// ReadMetadata extracts track informations from a connection that is publishing. +// ReadMetadata reads track informations. func (c *Conn) ReadMetadata() (*gortsplib.Track, *gortsplib.Track, error) { var videoTrack *gortsplib.Track var audioTrack *gortsplib.Track @@ -170,7 +170,7 @@ func (c *Conn) ReadMetadata() (*gortsplib.Track, *gortsplib.Track, error) { } } -// WriteMetadata writes track informations to a connection that is reading. +// WriteMetadata writes track informations. func (c *Conn) WriteMetadata(videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) error { err := c.WritePacket(av.Packet{ Type: av.Metadata,