diff --git a/README.md b/README.md index 7246b165..09e7a1c9 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,15 @@ To change the format, codec or compression of a stream, you can use _FFmpeg_ or ffmpeg -i rtsp://localhost:8554/original -c:v libx264 -preset ultrafast -tune zerolatency -b 600k -f rtsp rtsp://localhost:8554/compressed ``` +#### Counting clients + +The current number of clients, publishers and receivers is printed in each log line; for instance, the line: +``` +2020/01/01 00:00:00 [2/1/1] [client 127.0.0.1:44428] OPTION +``` + +means that there are 2 clients, 1 publisher and 1 receiver. + #### Full command-line usage ``` diff --git a/main.go b/main.go index fa63cbe3..ed023cb3 100644 --- a/main.go +++ b/main.go @@ -189,15 +189,17 @@ type args struct { } type program struct { - args args - protocols map[streamProtocol]struct{} - publishIps []interface{} - readIps []interface{} - tcpl *serverTcpListener - udplRtp *serverUdpListener - udplRtcp *serverUdpListener - clients map[*serverClient]struct{} - publishers map[string]*serverClient + args args + protocols map[streamProtocol]struct{} + publishIps []interface{} + readIps []interface{} + tcpl *serverTcpListener + udplRtp *serverUdpListener + udplRtcp *serverUdpListener + clients map[*serverClient]struct{} + publishers map[string]*serverClient + publisherCount int + receiverCount int events chan programEvent done chan struct{} @@ -308,8 +310,6 @@ func newProgram(sargs []string) (*program, error) { return nil, err } - log.Printf("rtsp-simple-server %s", Version) - p := &program{ args: args, protocols: protocols, @@ -321,6 +321,8 @@ func newProgram(sargs []string) (*program, error) { done: make(chan struct{}), } + p.log("rtsp-simple-server %s", Version) + p.udplRtp, err = newServerUdpListener(p, args.rtpPort, _TRACK_FLOW_RTP) if err != nil { return nil, err @@ -344,6 +346,11 @@ func newProgram(sargs []string) (*program, error) { return p, nil } +func (p *program) log(format string, args ...interface{}) { + log.Printf("[%d/%d/%d] "+format, append([]interface{}{len(p.clients), + p.publisherCount, p.receiverCount}, args...)...) +} + func (p *program) run() { outer: for rawEvt := range p.events { @@ -351,6 +358,7 @@ outer: case programEventClientNew: c := newServerClient(p, evt.nconn) p.clients[c] = struct{}{} + c.log("connected") case programEventClientClose: // already deleted @@ -375,6 +383,15 @@ outer: } } + switch evt.client.state { + case _CLIENT_STATE_PLAY: + p.receiverCount -= 1 + + case _CLIENT_STATE_RECORD: + p.publisherCount -= 1 + } + + evt.client.log("disconnected") close(evt.done) case programEventClientGetStreamSdp: @@ -444,14 +461,17 @@ outer: evt.res <- nil case programEventClientPlay2: + p.receiverCount += 1 evt.client.state = _CLIENT_STATE_PLAY evt.res <- nil case programEventClientPause: + p.receiverCount -= 1 evt.client.state = _CLIENT_STATE_PRE_PLAY evt.res <- nil case programEventClientRecord: + p.publisherCount += 1 evt.client.state = _CLIENT_STATE_RECORD evt.res <- nil diff --git a/server-client.go b/server-client.go index e38281c0..266efb22 100644 --- a/server-client.go +++ b/server-client.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "log" "net" "os/exec" "strings" @@ -108,9 +107,7 @@ func (c *serverClient) close() { } func (c *serverClient) log(format string, args ...interface{}) { - // keep remote address outside format, since it can contain % - log.Println("[RTSP client " + c.conn.NetConn().RemoteAddr().String() + "] " + - fmt.Sprintf(format, args...)) + c.p.log("[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...) } func (c *serverClient) ip() net.IP { @@ -122,8 +119,6 @@ func (c *serverClient) zone() string { } func (c *serverClient) run() { - c.log("connected") - if c.p.args.preScript != "" { preScript := exec.Command(c.p.args.preScript) err := preScript.Run() @@ -147,12 +142,15 @@ func (c *serverClient) run() { } } - c.log("disconnected") - if c.udpCheckStreamTicker != nil { c.udpCheckStreamTicker.Stop() } + go func() { + for range c.write { + } + }() + func() { if c.p.args.postScript != "" { postScript := exec.Command(c.p.args.postScript) @@ -682,6 +680,11 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { }, }) + // set state + res = make(chan error) + c.p.events <- programEventClientPlay2{res, c} + <-res + c.log("is receiving on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" @@ -689,11 +692,6 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return "tracks" }(), c.streamProtocol) - // set state - res = make(chan error) - c.p.events <- programEventClientPlay2{res, c} - <-res - // when protocol is TCP, the RTSP connection becomes a RTP connection if c.streamProtocol == _STREAM_PROTOCOL_TCP { // write RTP frames sequentially @@ -770,6 +768,10 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { }, }) + res := make(chan error) + c.p.events <- programEventClientRecord{res, c} + <-res + c.log("is publishing on path '%s', %d %s via %s", c.path, len(c.streamTracks), func() string { if len(c.streamTracks) == 1 { return "track" @@ -777,10 +779,6 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool { return "tracks" }(), c.streamProtocol) - res := make(chan error) - c.p.events <- programEventClientRecord{res, c} - <-res - // when protocol is TCP, the RTSP connection becomes a RTP connection // receive RTP data and parse it if c.streamProtocol == _STREAM_PROTOCOL_TCP { diff --git a/server-tcpl.go b/server-tcpl.go index 2bafef6a..5d1a6fa3 100644 --- a/server-tcpl.go +++ b/server-tcpl.go @@ -1,7 +1,6 @@ package main import ( - "log" "net" ) @@ -31,7 +30,7 @@ func newServerTcpListener(p *program) (*serverTcpListener, error) { } func (l *serverTcpListener) log(format string, args ...interface{}) { - log.Printf("[TCP listener] "+format, args...) + l.p.log("[TCP listener] "+format, args...) } func (l *serverTcpListener) run() { diff --git a/server-udpl.go b/server-udpl.go index 59c6408a..d8fbfa23 100644 --- a/server-udpl.go +++ b/server-udpl.go @@ -1,7 +1,6 @@ package main import ( - "log" "net" "time" ) @@ -47,7 +46,7 @@ func (l *serverUdpListener) log(format string, args ...interface{}) { } else { label = "RTCP" } - log.Printf("[UDP/"+label+" listener] "+format, args...) + l.p.log("[UDP/"+label+" listener] "+format, args...) } func (l *serverUdpListener) run() {