Browse Source

new options runOnConnect, runOnPublish, runOnRead

pull/31/head v0.8.9
aler9 6 years ago
parent
commit
b837b7afdd
  1. 16
      Makefile
  2. 11
      README.md
  3. 10
      conf.go
  4. 15
      conf.yml
  5. 67
      server-client.go

16
Makefile

@ -55,7 +55,7 @@ test-nodocker:
define DOCKERFILE_RUN define DOCKERFILE_RUN
FROM amd64/$(BASE_IMAGE) FROM amd64/$(BASE_IMAGE)
RUN apk add --no-cache git RUN apk add --no-cache git ffmpeg
WORKDIR /s WORKDIR /s
COPY go.mod go.sum ./ COPY go.mod go.sum ./
RUN go mod download RUN go mod download
@ -71,12 +71,16 @@ define CONFIG_RUN
paths: paths:
all: all:
readUser: test # readUser: test
readPass: tast # readPass: tast
# proxied:
# source: rtsp://192.168.10.1/unicast
# sourceProtocol: udp
original:
runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed
proxied:
source: rtsp://192.168.10.1/unicast
sourceProtocol: udp
endef endef
export CONFIG_RUN export CONFIG_RUN

11
README.md

@ -13,7 +13,7 @@ Features:
* Each stream can have multiple video and audio tracks, encoded in any format * Each stream can have multiple video and audio tracks, encoded in any format
* Publish multiple streams at once, each in a separate path, that can be read by multiple users * Publish multiple streams at once, each in a separate path, that can be read by multiple users
* Supports authentication * Supports authentication
* Supports running a script when a client connects or disconnects * Run custom commands when clients connect, disconnect, read or publish streams (linux only)
* Compatible with Linux, Windows and Mac, does not require any dependency or interpreter, it's a single executable * Compatible with Linux, Windows and Mac, does not require any dependency or interpreter, it's a single executable
## Installation and basic usage ## Installation and basic usage
@ -66,7 +66,7 @@ docker run --rm -it -v $PWD/conf.yml:/conf.yml -p 8554:8554 aler9/rtsp-simple-se
#### Full configuration file #### Full configuration file
To change the configuration, it's enough to edit the file `conf.yml`, provided with the executable. The default configuration is [available here](conf.yml). To change the configuration, it's enough to edit the `conf.yml` file, provided with the executable. The default configuration is [available here](conf.yml).
#### Usage as RTSP Proxy #### Usage as RTSP Proxy
@ -115,9 +115,12 @@ WARNING: RTSP is a plain protocol, and the credentials can be intercepted and re
_rtsp-simple-server_ is an RTSP server: it publishes existing streams and does not touch them. It is not a media server, that is a far more complex and heavy software that can receive existing streams, re-encode them and publish them. _rtsp-simple-server_ is an RTSP server: it publishes existing streams and does not touch them. It is not a media server, that is a far more complex and heavy software that can receive existing streams, re-encode them and publish them.
To change the format, codec or compression of a stream, you can use _FFmpeg_ or _Gstreamer_ together with _rtsp-simple-server_, obtaining the same features of a media server. For instance, if we want to re-encode an existing stream, that is available in the `/original` path, and make the resulting stream available in the `/compressed` path, it is enough to launch _FFmpeg_ in parallel with _rtsp-simple-server_, with the following syntax: To change the format, codec or compression of a stream, you can use _FFmpeg_ or _Gstreamer_ together with _rtsp-simple-server_, obtaining the same features of a media server. For instance, to re-encode an existing stream, that is available in the `/original` path, and publish the resulting stream in the `/compressed` path, edit `conf.yml` and replace everything inside section `paths` with the following content:
``` ```
ffmpeg -i rtsp://localhost:8554/original -c:v libx264 -preset ultrafast -tune zerolatency -b 600k -f rtsp rtsp://localhost:8554/compressed paths:
all:
original:
runOnPublish: ffmpeg -i rtsp://localhost:8554/original -b:a 64k -c:v libx264 -preset ultrafast -b:v 500k -max_muxing_queue_size 1024 -f rtsp rtsp://localhost:8554/compressed
``` ```
#### Client count #### Client count

10
conf.go

@ -22,6 +22,8 @@ type ConfPath struct {
ReadPass string `yaml:"readPass"` ReadPass string `yaml:"readPass"`
ReadIps []string `yaml:"readIps"` ReadIps []string `yaml:"readIps"`
readIpsParsed []interface{} readIpsParsed []interface{}
RunOnPublish string `yaml:"runOnPublish"`
RunOnRead string `yaml:"runOnRead"`
} }
type conf struct { type conf struct {
@ -30,8 +32,7 @@ type conf struct {
RtspPort int `yaml:"rtspPort"` RtspPort int `yaml:"rtspPort"`
RtpPort int `yaml:"rtpPort"` RtpPort int `yaml:"rtpPort"`
RtcpPort int `yaml:"rtcpPort"` RtcpPort int `yaml:"rtcpPort"`
PreScript string `yaml:"preScript"` RunOnConnect string `yaml:"runOnConnect"`
PostScript string `yaml:"postScript"`
ReadTimeout time.Duration `yaml:"readTimeout"` ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"` WriteTimeout time.Duration `yaml:"writeTimeout"`
StreamDeadAfter time.Duration `yaml:"streamDeadAfter"` StreamDeadAfter time.Duration `yaml:"streamDeadAfter"`
@ -148,6 +149,11 @@ func loadConf(fpath string, stdin io.Reader) (*conf, error) {
} }
for path, pconf := range conf.Paths { for path, pconf := range conf.Paths {
if pconf == nil {
conf.Paths[path] = &ConfPath{}
pconf = conf.Paths[path]
}
if pconf.Source == "" { if pconf.Source == "" {
pconf.Source = "record" pconf.Source = "record"
} }

15
conf.yml

@ -7,10 +7,9 @@ rtspPort: 8554
rtpPort: 8000 rtpPort: 8000
# port of the UDP RTCP listener # port of the UDP RTCP listener
rtcpPort: 8001 rtcpPort: 8001
# script to run when a client connects # command to run when a client connects.
preScript: # this is terminated with SIGINT when a client disconnects.
# script to run when a client disconnects runOnConnect:
postScript:
# timeout of read operations # timeout of read operations
readTimeout: 5s readTimeout: 5s
# timeout of write operations # timeout of write operations
@ -46,3 +45,11 @@ paths:
readPass: readPass:
# IPs or networks (x.x.x.x/24) allowed to read # IPs or networks (x.x.x.x/24) allowed to read
readIps: [] readIps: []
# command to run when a client starts publishing.
# This is terminated with SIGINT when a client stops publishing.
runOnPublish:
# command to run when a clients starts reading.
# This is terminated with SIGINT when a client stops reading.
runOnRead:

67
server-client.go

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"os"
"os/exec" "os/exec"
"strings" "strings"
"time" "time"
@ -125,9 +126,12 @@ func (c *serverClient) publisherSdpParsed() *sdp.SessionDescription {
} }
func (c *serverClient) run() { func (c *serverClient) run() {
if c.p.conf.PreScript != "" { var runOnConnectCmd *exec.Cmd
preScript := exec.Command(c.p.conf.PreScript) if c.p.conf.RunOnConnect != "" {
err := preScript.Run() runOnConnectCmd = exec.Command("/bin/sh", "-c", c.p.conf.RunOnConnect)
runOnConnectCmd.Stdout = os.Stdout
runOnConnectCmd.Stderr = os.Stderr
err := runOnConnectCmd.Start()
if err != nil { if err != nil {
c.log("ERR: %s", err) c.log("ERR: %s", err)
} }
@ -155,15 +159,10 @@ outer:
c.conn.NetConn().Close() // close socket in case it has not been closed yet c.conn.NetConn().Close() // close socket in case it has not been closed yet
func() { if runOnConnectCmd != nil {
if c.p.conf.PostScript != "" { runOnConnectCmd.Process.Signal(os.Interrupt)
postScript := exec.Command(c.p.conf.PostScript) runOnConnectCmd.Wait()
err := postScript.Run() }
if err != nil {
c.log("ERR: %s", err)
}
}
}()
close(c.done) // close() never blocks close(c.done) // close() never blocks
} }
@ -732,7 +731,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}, },
}) })
c.runPlay() c.runPlay(path)
return false return false
case gortsplib.RECORD: case gortsplib.RECORD:
@ -760,7 +759,7 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
}, },
}) })
c.runRecord() c.runRecord(path)
return false return false
case gortsplib.TEARDOWN: case gortsplib.TEARDOWN:
@ -773,7 +772,9 @@ func (c *serverClient) handleRequest(req *gortsplib.Request) bool {
} }
} }
func (c *serverClient) runPlay() { func (c *serverClient) runPlay(path string) {
pconf := c.findConfForPath(path)
if c.streamProtocol == streamProtocolTcp { if c.streamProtocol == streamProtocolTcp {
c.writeBuf = newDoubleBuffer(2048) c.writeBuf = newDoubleBuffer(2048)
c.events = make(chan serverClientEvent) c.events = make(chan serverClientEvent)
@ -790,6 +791,17 @@ func (c *serverClient) runPlay() {
return "tracks" return "tracks"
}(), c.streamProtocol) }(), c.streamProtocol)
var runOnReadCmd *exec.Cmd
if pconf.RunOnRead != "" {
runOnReadCmd = exec.Command("/bin/sh", "-c", pconf.RunOnRead)
runOnReadCmd.Stdout = os.Stdout
runOnReadCmd.Stderr = os.Stderr
err := runOnReadCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}
if c.streamProtocol == streamProtocolTcp { if c.streamProtocol == streamProtocolTcp {
readDone := make(chan error) readDone := make(chan error)
go func() { go func() {
@ -851,9 +863,16 @@ func (c *serverClient) runPlay() {
c.p.events <- programEventClientPlayStop{done, c} c.p.events <- programEventClientPlayStop{done, c}
<-done <-done
} }
if runOnReadCmd != nil {
runOnReadCmd.Process.Signal(os.Interrupt)
runOnReadCmd.Wait()
}
} }
func (c *serverClient) runRecord() { func (c *serverClient) runRecord(path string) {
pconf := c.findConfForPath(path)
c.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks)) c.RtcpReceivers = make([]*gortsplib.RtcpReceiver, len(c.streamTracks))
for trackId := range c.streamTracks { for trackId := range c.streamTracks {
c.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver() c.RtcpReceivers[trackId] = gortsplib.NewRtcpReceiver()
@ -870,6 +889,17 @@ func (c *serverClient) runRecord() {
return "tracks" return "tracks"
}(), c.streamProtocol) }(), c.streamProtocol)
var runOnPublishCmd *exec.Cmd
if pconf.RunOnPublish != "" {
runOnPublishCmd = exec.Command("/bin/sh", "-c", pconf.RunOnPublish)
runOnPublishCmd.Stdout = os.Stdout
runOnPublishCmd.Stderr = os.Stderr
err := runOnPublishCmd.Start()
if err != nil {
c.log("ERR: %s", err)
}
}
if c.streamProtocol == streamProtocolTcp { if c.streamProtocol == streamProtocolTcp {
frame := &gortsplib.InterleavedFrame{} frame := &gortsplib.InterleavedFrame{}
@ -1013,4 +1043,9 @@ func (c *serverClient) runRecord() {
for trackId := range c.streamTracks { for trackId := range c.streamTracks {
c.RtcpReceivers[trackId].Close() c.RtcpReceivers[trackId].Close()
} }
if runOnPublishCmd != nil {
runOnPublishCmd.Process.Signal(os.Interrupt)
runOnPublishCmd.Wait()
}
} }

Loading…
Cancel
Save