Browse Source

connrtmp: use contexts

pull/442/head
aler9 4 years ago
parent
commit
baf2100ad6
  1. 55
      internal/connrtmp/conn.go

55
internal/connrtmp/conn.go

@ -1,6 +1,7 @@
package connrtmp package connrtmp
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -159,36 +160,22 @@ func (c *Conn) run() {
defer onConnectCmd.Close() defer onConnectCmd.Close()
} }
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount)) ctx, cancel := context.WithCancel(context.Background())
connErr := make(chan error) connErr := make(chan error)
go func() { go func() {
connErr <- func() error { connErr <- c.runInner(ctx)
c.conn.NetConn().SetReadDeadline(time.Now().Add(c.readTimeout))
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.ServerHandshake()
if err != nil {
return err
}
if c.conn.IsPublishing() {
return c.runPublish()
}
return c.runRead()
}()
}() }()
select { select {
case err := <-connErr: case err := <-connErr:
cancel()
if err != io.EOF { if err != io.EOF {
c.log(logger.Info, "ERR: %s", err) c.log(logger.Info, "ERR: %s", err)
} }
c.conn.NetConn().Close()
case <-c.terminate: case <-c.terminate:
c.ringBuffer.Close() cancel()
c.conn.NetConn().Close()
<-connErr <-connErr
} }
@ -202,7 +189,26 @@ func (c *Conn) run() {
<-c.parentTerminate <-c.parentTerminate
} }
func (c *Conn) runRead() error { func (c *Conn) runInner(ctx context.Context) error {
go func() {
<-ctx.Done()
c.conn.NetConn().Close()
}()
c.conn.NetConn().SetReadDeadline(time.Now().Add(c.readTimeout))
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.ServerHandshake()
if err != nil {
return err
}
if c.conn.IsPublishing() {
return c.runPublish(ctx)
}
return c.runRead(ctx)
}
func (c *Conn) runRead(ctx context.Context) error {
pathName, query := pathNameAndQuery(c.conn.URL()) pathName, query := pathNameAndQuery(c.conn.URL())
sres := make(chan readpublisher.SetupPlayRes) sres := make(chan readpublisher.SetupPlayRes)
@ -259,6 +265,13 @@ func (c *Conn) runRead() error {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
c.conn.WriteMetadata(videoTrack, audioTrack) c.conn.WriteMetadata(videoTrack, audioTrack)
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
go func() {
<-ctx.Done()
c.ringBuffer.Close()
}()
pres := make(chan readpublisher.PlayRes) pres := make(chan readpublisher.PlayRes)
c.path.OnReadPublisherPlay(readpublisher.PlayReq{c, pres}) //nolint:govet c.path.OnReadPublisherPlay(readpublisher.PlayReq{c, pres}) //nolint:govet
<-pres <-pres
@ -348,7 +361,7 @@ func (c *Conn) runRead() error {
} }
} }
func (c *Conn) runPublish() error { func (c *Conn) runPublish(ctx context.Context) error {
c.conn.NetConn().SetReadDeadline(time.Now().Add(c.readTimeout)) c.conn.NetConn().SetReadDeadline(time.Now().Add(c.readTimeout))
videoTrack, audioTrack, err := c.conn.ReadMetadata() videoTrack, audioTrack, err := c.conn.ReadMetadata()
if err != nil { if err != nil {

Loading…
Cancel
Save