@ -26,7 +26,7 @@ import (
@@ -26,7 +26,7 @@ import (
const (
checkStreamInterval = 5 * time . Second
receiverReportInterval = 10 * time . Second
sessionId = "12345678"
sessionID = "12345678"
)
type readReq struct {
@ -104,8 +104,8 @@ type Client struct {
@@ -104,8 +104,8 @@ type Client struct {
protocols map [ gortsplib . StreamProtocol ] struct { }
wg * sync . WaitGroup
stats * stats . Stats
serverUdp Rtp * serverudp . Server
serverUdp Rtcp * serverudp . Server
serverUDP Rtp * serverudp . Server
serverUDP Rtcp * serverudp . Server
conn * gortsplib . ConnServer
parent Parent
@ -120,7 +120,7 @@ type Client struct {
@@ -120,7 +120,7 @@ type Client struct {
rtcpReceivers map [ int ] * rtcpreceiver . RtcpReceiver
udpLastFrameTimes [ ] * int64
describeCSeq base . HeaderValue
describeUrl string
describeURL string
tcpWriteMutex sync . Mutex
tcpWriteOk bool
@ -139,8 +139,8 @@ func New(
@@ -139,8 +139,8 @@ func New(
protocols map [ gortsplib . StreamProtocol ] struct { } ,
wg * sync . WaitGroup ,
stats * stats . Stats ,
serverUdp Rtp * serverudp . Server ,
serverUdp Rtcp * serverudp . Server ,
serverUDP Rtp * serverudp . Server ,
serverUDP Rtcp * serverudp . Server ,
nconn net . Conn ,
parent Parent ) * Client {
@ -152,8 +152,8 @@ func New(
@@ -152,8 +152,8 @@ func New(
protocols : protocols ,
wg : wg ,
stats : stats ,
serverUdpRtp : serverUdp Rtp ,
serverUdpRtcp : serverUdp Rtcp ,
serverUDPRtp : serverUDP Rtp ,
serverUDPRtcp : serverUDP Rtcp ,
conn : gortsplib . NewConnServer ( gortsplib . ConnServerConf {
Conn : nconn ,
ReadTimeout : readTimeout ,
@ -181,7 +181,7 @@ func (c *Client) Close() {
@@ -181,7 +181,7 @@ func (c *Client) Close() {
close ( c . terminate )
}
// IsSource implemente s path.source.
// IsSource implements path.source.
func ( c * Client ) IsSource ( ) { }
func ( c * Client ) log ( format string , args ... interface { } ) {
@ -272,14 +272,14 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
@@ -272,14 +272,14 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
err := c . authHelper . ValidateHeader ( req . Header [ "Authorization" ] , req . Method , req . URL )
if err != nil {
c . authFailures += 1
c . authFailures ++
// vlc with login prompt sends 4 requests:
// 1) without credentials
// 2) with password but without username
// 3) without credentials
// 4) with password and username
// henc e we must allow up to 3 failures
// therefor e we must allow up to 3 failures
if c . authFailures > 3 {
c . log ( "ERR: unauthorized: %s" , err )
@ -290,20 +290,19 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
@@ -290,20 +290,19 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
"WWW-Authenticate" : c . authHelper . GenerateHeader ( ) ,
} ,
} }
}
} else {
if c . authFailures > 1 {
c . log ( "WARN: unauthorized: %s" , err )
}
return errAuthNotCritical { & base . Response {
StatusCode : base . StatusUnauthorized ,
Header : base . Header {
"CSeq" : req . Header [ "CSeq" ] ,
"WWW-Authenticate" : c . authHelper . GenerateHeader ( ) ,
} ,
} }
if c . authFailures > 1 {
c . log ( "WARN: unauthorized: %s" , err )
}
return errAuthNotCritical { & base . Response {
StatusCode : base . StatusUnauthorized ,
Header : base . Header {
"CSeq" : req . Header [ "CSeq" ] ,
"WWW-Authenticate" : c . authHelper . GenerateHeader ( ) ,
} ,
} }
}
}
@ -347,27 +346,27 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -347,27 +346,27 @@ func (c *Client) handleRequest(req *base.Request) error {
}
switch req . Method {
case base . OPTIONS :
case base . Options :
c . conn . WriteResponse ( & base . Response {
StatusCode : base . StatusOK ,
Header : base . Header {
"CSeq" : cseq ,
"Public" : base . HeaderValue { strings . Join ( [ ] string {
string ( base . GET_PARAMETER ) ,
string ( base . DESCRIBE ) ,
string ( base . ANNOUNCE ) ,
string ( base . SETUP ) ,
string ( base . PLAY ) ,
string ( base . RECORD ) ,
string ( base . PAUSE ) ,
string ( base . TEARDOWN ) ,
string ( base . GetParameter ) ,
string ( base . Describe ) ,
string ( base . Announce ) ,
string ( base . Setup ) ,
string ( base . Play ) ,
string ( base . Record ) ,
string ( base . Pause ) ,
string ( base . Teardown ) ,
} , ", " ) } ,
} ,
} )
return nil
// GET_PARAMETER is used like a ping
case base . GET_PARAMETER :
case base . GetParameter :
c . conn . WriteResponse ( & base . Response {
StatusCode : base . StatusOK ,
Header : base . Header {
@ -378,7 +377,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -378,7 +377,7 @@ func (c *Client) handleRequest(req *base.Request) error {
} )
return nil
case base . DESCRIBE :
case base . Describe :
err := c . checkState ( map [ state ] struct { } {
stateInitial : { } ,
} )
@ -417,11 +416,11 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -417,11 +416,11 @@ func (c *Client) handleRequest(req *base.Request) error {
c . path = path
c . state = stateWaitingDescribe
c . describeCSeq = cseq
c . describeUrl = req . URL . String ( )
c . describeURL = req . URL . String ( )
return errStateWaitingDescribe
case base . ANNOUNCE :
case base . Announce :
err := c . checkState ( map [ state ] struct { } {
stateInitial : { } ,
} )
@ -458,10 +457,10 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -458,10 +457,10 @@ func (c *Client) handleRequest(req *base.Request) error {
return errStateTerminate
}
for trackId , t := range tracks {
for trackID , t := range tracks {
_ , err := t . ClockRate ( )
if err != nil {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "unable to get clock rate of track %d" , trackId ) )
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "unable to get clock rate of track %d" , trackID ) )
return errStateTerminate
}
}
@ -483,9 +482,9 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -483,9 +482,9 @@ func (c *Client) handleRequest(req *base.Request) error {
}
}
for trackId , t := range tracks {
for trackID , t := range tracks {
clockRate , _ := t . ClockRate ( )
c . rtcpReceivers [ trackId ] = rtcpreceiver . New ( nil , clockRate )
c . rtcpReceivers [ trackID ] = rtcpreceiver . New ( nil , clockRate )
}
c . path = path
@ -499,7 +498,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -499,7 +498,7 @@ func (c *Client) handleRequest(req *base.Request) error {
} )
return nil
case base . SETUP :
case base . Setup :
th , err := headers . ReadTransport ( req . Header [ "Transport" ] )
if err != nil {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "transport header: %s" , err ) )
@ -540,10 +539,10 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -540,10 +539,10 @@ func (c *Client) handleRequest(req *base.Request) error {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "invalid track id (%s)" , controlPath ) )
return errStateTerminate
}
trackId := int ( tmp )
trackID := int ( tmp )
if _ , ok := c . streamTracks [ trackId ] ; ok {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "track %d has already been setup" , trackId ) )
if _ , ok := c . streamTracks [ trackID ] ; ok {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "track %d has already been setup" , trackID ) )
return errStateTerminate
}
@ -564,7 +563,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -564,7 +563,7 @@ func (c *Client) handleRequest(req *base.Request) error {
return errStateTerminate
}
path , err := c . parent . OnClientSetupPlay ( c , basePath , trackId , req )
path , err := c . parent . OnClientSetupPlay ( c , basePath , trackID , req )
if err != nil {
switch terr := err . ( type ) {
case errAuthNotCritical :
@ -585,7 +584,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -585,7 +584,7 @@ func (c *Client) handleRequest(req *base.Request) error {
c . state = statePrePlay
c . streamProtocol = gortsplib . StreamProtocolUDP
c . streamTracks [ trackId ] = & streamTrack {
c . streamTracks [ trackID ] = & streamTrack {
rtpPort : ( * th . ClientPorts ) [ 0 ] ,
rtcpPort : ( * th . ClientPorts ) [ 1 ] ,
}
@ -597,7 +596,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -597,7 +596,7 @@ func (c *Client) handleRequest(req *base.Request) error {
return & v
} ( ) ,
ClientPorts : th . ClientPorts ,
ServerPorts : & [ 2 ] int { c . serverUdp Rtp . Port ( ) , c . serverUdp Rtcp . Port ( ) } ,
ServerPorts : & [ 2 ] int { c . serverUDP Rtp . Port ( ) , c . serverUDP Rtcp . Port ( ) } ,
}
c . conn . WriteResponse ( & base . Response {
@ -605,67 +604,66 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -605,67 +604,66 @@ func (c *Client) handleRequest(req *base.Request) error {
Header : base . Header {
"CSeq" : cseq ,
"Transport" : th . Write ( ) ,
"Session" : base . HeaderValue { sessionId } ,
"Session" : base . HeaderValue { sessionID } ,
} ,
} )
return nil
}
// play with TCP
} else {
if _ , ok := c . protocols [ gortsplib . StreamProtocolTCP ] ; ! ok {
c . writeResError ( cseq , base . StatusUnsupportedTransport , fmt . Errorf ( "TCP streaming is disabled" ) )
return nil
}
// play with TCP
if _ , ok := c . protocols [ gortsplib . StreamProtocolTCP ] ; ! ok {
c . writeResError ( cseq , base . StatusUnsupportedTransport , fmt . Errorf ( "TCP streaming is disabled" ) )
return nil
}
if len ( c . streamTracks ) > 0 && c . streamProtocol != gortsplib . StreamProtocolTCP {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "can't receive tracks with different protocols" ) )
return errStateTerminate
}
if len ( c . streamTracks ) > 0 && c . streamProtocol != gortsplib . StreamProtocolTCP {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "can't receive tracks with different protocols" ) )
return errStateTerminate
}
path , err := c . parent . OnClientSetupPlay ( c , basePath , trackId , req )
if err != nil {
switch terr := err . ( type ) {
case errAuthNotCritical :
c . conn . WriteResponse ( terr . Response )
return nil
path , err := c . parent . OnClientSetupPlay ( c , basePath , trackID , req )
if err != nil {
switch terr := err . ( type ) {
case errAuthNotCritical :
c . conn . WriteResponse ( terr . Response )
return nil
case errAuthCritical :
c . conn . WriteResponse ( terr . Response )
return errStateTerminate
case errAuthCritical :
c . conn . WriteResponse ( terr . Response )
return errStateTerminate
default :
c . writeResError ( cseq , base . StatusBadRequest , err )
return errStateTerminate
}
default :
c . writeResError ( cseq , base . StatusBadRequest , err )
return errStateTerminate
}
}
c . path = path
c . state = statePrePlay
c . streamProtocol = gortsplib . StreamProtocolTCP
c . streamTracks [ trackId ] = & streamTrack {
rtpPort : 0 ,
rtcpPort : 0 ,
}
c . path = path
c . state = statePrePlay
interleavedIds := [ 2 ] int { trackId * 2 , ( trackId * 2 ) + 1 }
c . streamProtocol = gortsplib . StreamProtocolTCP
c . streamTracks [ trackID ] = & streamTrack {
rtpPort : 0 ,
rtcpPort : 0 ,
}
th := & headers . Transport {
Protocol : gortsplib . StreamProtocolTCP ,
InterleavedIds : & interleavedIds ,
}
interleavedIds := [ 2 ] int { trackID * 2 , ( trackID * 2 ) + 1 }
c . conn . WriteResponse ( & base . Response {
StatusCode : base . StatusOK ,
Header : base . Header {
"CSeq" : cseq ,
"Transport" : th . Write ( ) ,
"Session" : base . HeaderValue { sessionId } ,
} ,
} )
return nil
th := & headers . Transport {
Protocol : gortsplib . StreamProtocolTCP ,
InterleavedIds : & interleavedIds ,
}
c . conn . WriteResponse ( & base . Response {
StatusCode : base . StatusOK ,
Header : base . Header {
"CSeq" : cseq ,
"Transport" : th . Write ( ) ,
"Session" : base . HeaderValue { sessionID } ,
} ,
} )
return nil
// record
case statePreRecord :
if th . Mode == nil || * th . Mode != headers . TransportModeRecord {
@ -702,8 +700,8 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -702,8 +700,8 @@ func (c *Client) handleRequest(req *base.Request) error {
}
c . streamProtocol = gortsplib . StreamProtocolUDP
trackId := len ( c . streamTracks )
c . streamTracks [ trackId ] = & streamTrack {
trackID := len ( c . streamTracks )
c . streamTracks [ trackID ] = & streamTrack {
rtpPort : ( * th . ClientPorts ) [ 0 ] ,
rtcpPort : ( * th . ClientPorts ) [ 1 ] ,
}
@ -715,7 +713,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -715,7 +713,7 @@ func (c *Client) handleRequest(req *base.Request) error {
return & v
} ( ) ,
ClientPorts : th . ClientPorts ,
ServerPorts : & [ 2 ] int { c . serverUdp Rtp . Port ( ) , c . serverUdp Rtcp . Port ( ) } ,
ServerPorts : & [ 2 ] int { c . serverUDP Rtp . Port ( ) , c . serverUDP Rtcp . Port ( ) } ,
}
c . conn . WriteResponse ( & base . Response {
@ -723,69 +721,68 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -723,69 +721,68 @@ func (c *Client) handleRequest(req *base.Request) error {
Header : base . Header {
"CSeq" : cseq ,
"Transport" : th . Write ( ) ,
"Session" : base . HeaderValue { sessionId } ,
"Session" : base . HeaderValue { sessionID } ,
} ,
} )
return nil
}
// record with TCP
} else {
if _ , ok := c . protocols [ gortsplib . StreamProtocolTCP ] ; ! ok {
c . writeResError ( cseq , base . StatusUnsupportedTransport , fmt . Errorf ( "TCP streaming is disabled" ) )
return nil
}
if len ( c . streamTracks ) > 0 && c . streamProtocol != gortsplib . StreamProtocolTCP {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "can't publish tracks with different protocols" ) )
return errStateTerminate
}
// record with TCP
if _ , ok := c . protocols [ gortsplib . StreamProtocolTCP ] ; ! ok {
c . writeResError ( cseq , base . StatusUnsupportedTransport , fmt . Errorf ( "TCP streaming is disabled" ) )
return nil
}
interleavedIds := [ 2 ] int { len ( c . streamTracks ) * 2 , 1 + len ( c . streamTracks ) * 2 }
if len ( c . streamTracks ) > 0 && c . streamProtocol != gortsplib . StreamProtocolTCP {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "can't publish tracks with different protocols" ) )
return errStateTerminate
}
if th . InterleavedIds == nil {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "transport header does not contain the interleaved field" ) )
return errStateTerminate
}
interleavedIds := [ 2 ] int { len ( c . streamTracks ) * 2 , 1 + len ( c . streamTracks ) * 2 }
if ( * th . InterleavedIds ) [ 0 ] != interleavedIds [ 0 ] || ( * th . InterleavedIds ) [ 1 ] != interleavedIds [ 1 ] {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "wrong interleaved ids, expected %v, got %v" , interleavedIds , * th . InterleavedIds ) )
return errStateTerminate
}
if th . InterleavedIds == nil {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "transport header does not contain the interleaved field" ) )
return errStateTerminate
}
if len ( c . streamTracks ) >= c . path . SourceTrackCount ( ) {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "all the tracks have already been setup" ) )
return errStateTerminate
}
if ( * th . InterleavedIds ) [ 0 ] != interleavedIds [ 0 ] || ( * th . InterleavedIds ) [ 1 ] != interleavedIds [ 1 ] {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "wrong interleaved ids, expected %v, got %v" , interleavedIds , * th . InterleavedIds ) )
return errStateTerminate
}
c . streamProtocol = gortsplib . StreamProtocolTCP
trackId := len ( c . streamTracks )
c . streamTracks [ trackId ] = & streamTrack {
rtpPort : 0 ,
rtcpPort : 0 ,
}
if len ( c . streamTracks ) >= c . path . SourceTrackCount ( ) {
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "all the tracks have already been setup" ) )
return errStateTerminate
}
ht := & headers . Transport {
Protocol : gortsplib . StreamProtocolTCP ,
InterleavedIds : & interleavedIds ,
}
c . streamProtocol = gortsplib . StreamProtocolTCP
trackID := len ( c . streamTracks )
c . streamTracks [ trackID ] = & streamTrack {
rtpPort : 0 ,
rtcpPort : 0 ,
}
c . conn . WriteResponse ( & base . Response {
StatusCode : base . StatusOK ,
Header : base . Header {
"CSeq" : cseq ,
"Transport" : ht . Write ( ) ,
"Session" : base . HeaderValue { sessionId } ,
} ,
} )
return nil
ht := & headers . Transport {
Protocol : gortsplib . StreamProtocolTCP ,
InterleavedIds : & interleavedIds ,
}
c . conn . WriteResponse ( & base . Response {
StatusCode : base . StatusOK ,
Header : base . Header {
"CSeq" : cseq ,
"Transport" : ht . Write ( ) ,
"Session" : base . HeaderValue { sessionID } ,
} ,
} )
return nil
default :
c . writeResError ( cseq , base . StatusBadRequest , fmt . Errorf ( "client is in state '%s'" , c . state ) )
return errStateTerminate
}
case base . PLAY :
case base . Play :
// play can be sent twice, allow calling it even if we're already playing
err := c . checkState ( map [ state ] struct { } {
statePrePlay : { } ,
@ -824,7 +821,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -824,7 +821,7 @@ func (c *Client) handleRequest(req *base.Request) error {
StatusCode : base . StatusOK ,
Header : base . Header {
"CSeq" : cseq ,
"Session" : base . HeaderValue { sessionId } ,
"Session" : base . HeaderValue { sessionID } ,
} ,
} )
@ -833,7 +830,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -833,7 +830,7 @@ func (c *Client) handleRequest(req *base.Request) error {
}
return nil
case base . RECORD :
case base . Record :
err := c . checkState ( map [ state ] struct { } {
statePreRecord : { } ,
} )
@ -865,12 +862,12 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -865,12 +862,12 @@ func (c *Client) handleRequest(req *base.Request) error {
StatusCode : base . StatusOK ,
Header : base . Header {
"CSeq" : cseq ,
"Session" : base . HeaderValue { sessionId } ,
"Session" : base . HeaderValue { sessionID } ,
} ,
} )
return errStateRecord
case base . PAUSE :
case base . Pause :
err := c . checkState ( map [ state ] struct { } {
statePrePlay : { } ,
statePlay : { } ,
@ -886,7 +883,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -886,7 +883,7 @@ func (c *Client) handleRequest(req *base.Request) error {
StatusCode : base . StatusOK ,
Header : base . Header {
"CSeq" : cseq ,
"Session" : base . HeaderValue { sessionId } ,
"Session" : base . HeaderValue { sessionID } ,
} ,
} )
@ -895,7 +892,7 @@ func (c *Client) handleRequest(req *base.Request) error {
@@ -895,7 +892,7 @@ func (c *Client) handleRequest(req *base.Request) error {
}
return nil
case base . TEARDOWN :
case base . Teardown :
// close connection silently
return errStateTerminate
@ -983,7 +980,7 @@ func (c *Client) runWaitingDescribe() bool {
@@ -983,7 +980,7 @@ func (c *Client) runWaitingDescribe() bool {
StatusCode : base . StatusOK ,
Header : base . Header {
"CSeq" : c . describeCSeq ,
"Content-Base" : base . HeaderValue { c . describeUrl + "/" } ,
"Content-Base" : base . HeaderValue { c . describeURL + "/" } ,
"Content-Type" : base . HeaderValue { "application/sdp" } ,
} ,
Content : res . sdp ,
@ -1033,9 +1030,8 @@ func (c *Client) runPlay() bool {
@@ -1033,9 +1030,8 @@ func (c *Client) runPlay() bool {
if c . streamProtocol == gortsplib . StreamProtocolUDP {
return c . runPlayUDP ( )
} else {
return c . runPlayTCP ( )
}
return c . runPlayTCP ( )
}
func ( c * Client ) runPlayUDP ( ) bool {
@ -1209,19 +1205,19 @@ func (c *Client) runRecord() bool {
@@ -1209,19 +1205,19 @@ func (c *Client) runRecord() bool {
if c . streamProtocol == gortsplib . StreamProtocolUDP {
c . udpLastFrameTimes = make ( [ ] * int64 , len ( c . streamTracks ) )
for trackId := range c . streamTracks {
for trackID := range c . streamTracks {
v := time . Now ( ) . Unix ( )
c . udpLastFrameTimes [ trackId ] = & v
c . udpLastFrameTimes [ trackID ] = & v
}
for trackId , track := range c . streamTracks {
c . serverUdp Rtp . AddPublisher ( c . ip ( ) , track . rtpPort , c , trackId )
c . serverUdp Rtcp . AddPublisher ( c . ip ( ) , track . rtcpPort , c , trackId )
for trackID , track := range c . streamTracks {
c . serverUDP Rtp . AddPublisher ( c . ip ( ) , track . rtpPort , c , trackID )
c . serverUDP Rtcp . AddPublisher ( c . ip ( ) , track . rtcpPort , c , trackID )
}
// open the firewall by sending packets to the counterpart
for _ , track := range c . streamTracks {
c . serverUdp Rtp . Write (
c . serverUDP Rtp . Write (
[ ] byte { 0x80 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 } ,
& net . UDPAddr {
IP : c . ip ( ) ,
@ -1229,7 +1225,7 @@ func (c *Client) runRecord() bool {
@@ -1229,7 +1225,7 @@ func (c *Client) runRecord() bool {
Port : track . rtpPort ,
} )
c . serverUdp Rtcp . Write (
c . serverUDP Rtcp . Write (
[ ] byte { 0x80 , 0xc9 , 0x00 , 0x01 , 0x00 , 0x00 , 0x00 , 0x00 } ,
& net . UDPAddr {
IP : c . ip ( ) ,
@ -1249,9 +1245,8 @@ func (c *Client) runRecord() bool {
@@ -1249,9 +1245,8 @@ func (c *Client) runRecord() bool {
if c . streamProtocol == gortsplib . StreamProtocolUDP {
return c . runRecordUDP ( )
} else {
return c . runRecordTCP ( )
}
return c . runRecordTCP ( )
}
func ( c * Client ) runRecordUDP ( ) bool {
@ -1286,8 +1281,8 @@ func (c *Client) runRecordUDP() bool {
@@ -1286,8 +1281,8 @@ func (c *Client) runRecordUDP() bool {
onError := func ( err error ) bool {
if err == errStateInitial {
for _ , track := range c . streamTracks {
c . serverUdp Rtp . RemovePublisher ( c . ip ( ) , track . rtpPort , c )
c . serverUdp Rtcp . RemovePublisher ( c . ip ( ) , track . rtcpPort , c )
c . serverUDP Rtp . RemovePublisher ( c . ip ( ) , track . rtpPort , c )
c . serverUDP Rtcp . RemovePublisher ( c . ip ( ) , track . rtcpPort , c )
}
c . state = statePreRecord
@ -1301,8 +1296,8 @@ func (c *Client) runRecordUDP() bool {
@@ -1301,8 +1296,8 @@ func (c *Client) runRecordUDP() bool {
}
for _ , track := range c . streamTracks {
c . serverUdp Rtp . RemovePublisher ( c . ip ( ) , track . rtpPort , c )
c . serverUdp Rtcp . RemovePublisher ( c . ip ( ) , track . rtcpPort , c )
c . serverUDP Rtp . RemovePublisher ( c . ip ( ) , track . rtpPort , c )
c . serverUDP Rtcp . RemovePublisher ( c . ip ( ) , track . rtcpPort , c )
}
c . path . OnClientRemove ( c )
@ -1345,8 +1340,8 @@ func (c *Client) runRecordUDP() bool {
@@ -1345,8 +1340,8 @@ func (c *Client) runRecordUDP() bool {
<- readerDone
for _ , track := range c . streamTracks {
c . serverUdp Rtp . RemovePublisher ( c . ip ( ) , track . rtpPort , c )
c . serverUdp Rtcp . RemovePublisher ( c . ip ( ) , track . rtcpPort , c )
c . serverUDP Rtp . RemovePublisher ( c . ip ( ) , track . rtpPort , c )
c . serverUDP Rtcp . RemovePublisher ( c . ip ( ) , track . rtcpPort , c )
}
c . path . OnClientRemove ( c )
@ -1360,12 +1355,12 @@ func (c *Client) runRecordUDP() bool {
@@ -1360,12 +1355,12 @@ func (c *Client) runRecordUDP() bool {
case <- receiverReportTicker . C :
now := time . Now ( )
for trackId := range c . streamTracks {
r := c . rtcpReceivers [ trackId ] . Report ( now )
c . serverUdp Rtcp . Write ( r , & net . UDPAddr {
for trackID := range c . streamTracks {
r := c . rtcpReceivers [ trackID ] . Report ( now )
c . serverUDP Rtcp . Write ( r , & net . UDPAddr {
IP : c . ip ( ) ,
Zone : c . zone ( ) ,
Port : c . streamTracks [ trackId ] . rtcpPort ,
Port : c . streamTracks [ trackID ] . rtcpPort ,
} )
}
@ -1380,8 +1375,8 @@ func (c *Client) runRecordUDP() bool {
@@ -1380,8 +1375,8 @@ func (c *Client) runRecordUDP() bool {
<- readerDone
for _ , track := range c . streamTracks {
c . serverUdp Rtp . RemovePublisher ( c . ip ( ) , track . rtpPort , c )
c . serverUdp Rtcp . RemovePublisher ( c . ip ( ) , track . rtcpPort , c )
c . serverUDP Rtp . RemovePublisher ( c . ip ( ) , track . rtpPort , c )
c . serverUDP Rtcp . RemovePublisher ( c . ip ( ) , track . rtcpPort , c )
}
c . path . OnClientRemove ( c )
@ -1406,13 +1401,13 @@ func (c *Client) runRecordTCP() bool {
@@ -1406,13 +1401,13 @@ func (c *Client) runRecordTCP() bool {
switch recvt := recv . ( type ) {
case * base . InterleavedFrame :
if recvt . TrackId >= len ( c . streamTracks ) {
readerDone <- fmt . Errorf ( "invalid track id '%d'" , recvt . TrackId )
if recvt . TrackID >= len ( c . streamTracks ) {
readerDone <- fmt . Errorf ( "invalid track id '%d'" , recvt . TrackID )
return
}
c . rtcpReceivers [ recvt . TrackId ] . ProcessFrame ( time . Now ( ) , recvt . StreamType , recvt . Content )
c . path . OnFrame ( recvt . TrackId , recvt . StreamType , recvt . Content )
c . rtcpReceivers [ recvt . TrackID ] . ProcessFrame ( time . Now ( ) , recvt . StreamType , recvt . Content )
c . path . OnFrame ( recvt . TrackID , recvt . StreamType , recvt . Content )
case * base . Request :
okc := make ( chan bool )
@ -1465,9 +1460,9 @@ func (c *Client) runRecordTCP() bool {
@@ -1465,9 +1460,9 @@ func (c *Client) runRecordTCP() bool {
case <- receiverReportTicker . C :
now := time . Now ( )
for trackId := range c . streamTracks {
r := c . rtcpReceivers [ trackId ] . Report ( now )
c . conn . WriteFrameTCP ( trackId , gortsplib . StreamTypeRtcp , r )
for trackID := range c . streamTracks {
r := c . rtcpReceivers [ trackID ] . Report ( now )
c . conn . WriteFrameTCP ( trackID , gortsplib . StreamTypeRtcp , r )
}
case <- c . terminate :
@ -1487,31 +1482,31 @@ func (c *Client) runRecordTCP() bool {
@@ -1487,31 +1482,31 @@ func (c *Client) runRecordTCP() bool {
}
}
// OnUdp PublisherFrame implements serverudp.Publisher.
func ( c * Client ) OnUdpPublisherFrame ( trackId int , streamType base . StreamType , buf [ ] byte ) {
// OnUDP PublisherFrame implements serverudp.Publisher.
func ( c * Client ) OnUDPPublisherFrame ( trackID int , streamType base . StreamType , buf [ ] byte ) {
now := time . Now ( )
atomic . StoreInt64 ( c . udpLastFrameTimes [ trackId ] , now . Unix ( ) )
c . rtcpReceivers [ trackId ] . ProcessFrame ( now , streamType , buf )
c . path . OnFrame ( trackId , streamType , buf )
atomic . StoreInt64 ( c . udpLastFrameTimes [ trackID ] , now . Unix ( ) )
c . rtcpReceivers [ trackID ] . ProcessFrame ( now , streamType , buf )
c . path . OnFrame ( trackID , streamType , buf )
}
// OnReaderFrame implements path.Reader.
func ( c * Client ) OnReaderFrame ( trackId int , streamType base . StreamType , buf [ ] byte ) {
track , ok := c . streamTracks [ trackId ]
func ( c * Client ) OnReaderFrame ( trackID int , streamType base . StreamType , buf [ ] byte ) {
track , ok := c . streamTracks [ trackID ]
if ! ok {
return
}
if c . streamProtocol == gortsplib . StreamProtocolUDP {
if streamType == gortsplib . StreamTypeRtp {
c . serverUdp Rtp . Write ( buf , & net . UDPAddr {
c . serverUDP Rtp . Write ( buf , & net . UDPAddr {
IP : c . ip ( ) ,
Zone : c . zone ( ) ,
Port : track . rtpPort ,
} )
} else {
c . serverUdp Rtcp . Write ( buf , & net . UDPAddr {
c . serverUDP Rtcp . Write ( buf , & net . UDPAddr {
IP : c . ip ( ) ,
Zone : c . zone ( ) ,
Port : track . rtcpPort ,
@ -1521,7 +1516,7 @@ func (c *Client) OnReaderFrame(trackId int, streamType base.StreamType, buf []by
@@ -1521,7 +1516,7 @@ func (c *Client) OnReaderFrame(trackId int, streamType base.StreamType, buf []by
} else {
c . tcpWriteMutex . Lock ( )
if c . tcpWriteOk {
c . conn . WriteFrameTCP ( trackId , streamType , buf )
c . conn . WriteFrameTCP ( trackID , streamType , buf )
}
c . tcpWriteMutex . Unlock ( )
}