From b461f15a4cb4a58d43f5804738105fb3ad4ca7ed Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Mon, 13 Feb 2023 12:12:04 +0100 Subject: [PATCH] rpicamera: support changing parameters without interrupting the stream (#1463) --- internal/conf/path.go | 6 +- internal/core/hls_muxer.go | 2 +- internal/core/hls_server.go | 6 +- internal/core/hls_source.go | 35 ++- internal/core/path.go | 54 +++-- internal/core/path_manager.go | 107 ++++++--- internal/core/rpicamera_source.go | 53 ++++- internal/core/rtmp_conn.go | 12 +- internal/core/rtmp_source.go | 32 ++- internal/core/rtsp_session.go | 12 +- internal/core/rtsp_source.go | 42 ++-- internal/core/source_static.go | 128 +++++------ internal/core/webrtc_conn.go | 2 +- internal/core/webrtc_server.go | 2 +- internal/rpicamera/embeddedexe.go | 49 ----- internal/rpicamera/exe/Makefile | 1 + internal/rpicamera/exe/camera.cpp | 203 +++++++++-------- internal/rpicamera/exe/camera.h | 3 +- internal/rpicamera/exe/encoder.c | 4 +- internal/rpicamera/exe/encoder.h | 2 +- internal/rpicamera/exe/main.c | 112 ++++------ internal/rpicamera/exe/parameters.c | 201 +++++++++++------ internal/rpicamera/exe/parameters.h | 20 +- internal/rpicamera/exe/pipe.c | 44 ++++ internal/rpicamera/exe/pipe.h | 12 + internal/rpicamera/exe/sensor_mode.c | 15 +- internal/rpicamera/exe/sensor_mode.h | 2 +- internal/rpicamera/exe/window.c | 12 +- internal/rpicamera/exe/window.h | 2 +- internal/rpicamera/pipe.go | 20 +- internal/rpicamera/rpicamera.go | 266 ++++++++++++++--------- internal/rpicamera/rpicamera_disabled.go | 4 + rtsp-simple-server.yml | 4 +- 33 files changed, 838 insertions(+), 631 deletions(-) delete mode 100644 internal/rpicamera/embeddedexe.go create mode 100644 internal/rpicamera/exe/pipe.c create mode 100644 internal/rpicamera/exe/pipe.h diff --git a/internal/conf/path.go b/internal/conf/path.go index d2741f88..f3ea3b54 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -1,9 +1,9 @@ package conf import ( - "encoding/json" "fmt" gourl "net/url" + "reflect" "regexp" "strings" "time" @@ -320,7 +320,5 @@ func (pconf *PathConf) checkAndFillMissing(conf *Conf, name string) error { // Equal checks whether two PathConfs are equal. func (pconf *PathConf) Equal(other *PathConf) bool { - a, _ := json.Marshal(pconf) - b, _ := json.Marshal(other) - return string(a) == string(b) + return reflect.DeepEqual(pconf, other) } diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index f2cad2eb..1cb07970 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -555,7 +555,7 @@ func (m *hlsMuxer) handleRequest(req *hlsMuxerRequest) func() *hls.MuxerFileResp } func (m *hlsMuxer) authenticate(ctx *gin.Context) error { - pathConf := m.path.Conf() + pathConf := m.path.safeConf() pathIPs := pathConf.ReadIPs pathUser := pathConf.ReadUser pathPass := pathConf.ReadPass diff --git a/internal/core/hls_server.go b/internal/core/hls_server.go index 5d83c29a..08c8652d 100644 --- a/internal/core/hls_server.go +++ b/internal/core/hls_server.go @@ -204,15 +204,15 @@ outer: select { case pa := <-s.chPathSourceReady: if s.alwaysRemux { - s.createMuxer(pa.Name(), "", nil) + s.createMuxer(pa.name, "", nil) } case pa := <-s.chPathSourceNotReady: if s.alwaysRemux { - c, ok := s.muxers[pa.Name()] + c, ok := s.muxers[pa.name] if ok { c.close() - delete(s.muxers, pa.Name()) + delete(s.muxers, pa.name) } } diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index 75ba5436..1e58eeb1 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -7,6 +7,7 @@ import ( "github.com/aler9/gortsplib/v2/pkg/format" "github.com/aler9/gortsplib/v2/pkg/media" + "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/formatprocessor" "github.com/aler9/rtsp-simple-server/internal/hls" "github.com/aler9/rtsp-simple-server/internal/logger" @@ -19,20 +20,14 @@ type hlsSourceParent interface { } type hlsSource struct { - ur string - fingerprint string - parent hlsSourceParent + parent hlsSourceParent } func newHLSSource( - ur string, - fingerprint string, parent hlsSourceParent, ) *hlsSource { return &hlsSource{ - ur: ur, - fingerprint: fingerprint, - parent: parent, + parent: parent, } } @@ -41,7 +36,7 @@ func (s *hlsSource) Log(level logger.Level, format string, args ...interface{}) } // run implements sourceStaticImpl. -func (s *hlsSource) run(ctx context.Context) error { +func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error { var stream *stream defer func() { @@ -51,8 +46,8 @@ func (s *hlsSource) run(ctx context.Context) error { }() c, err := hls.NewClient( - s.ur, - s.fingerprint, + cnf.Source, + cnf.SourceFingerprint, s, ) if err != nil { @@ -137,14 +132,18 @@ func (s *hlsSource) run(ctx context.Context) error { c.Start() - select { - case err := <-c.Wait(): - return err + for { + select { + case err := <-c.Wait(): + return err - case <-ctx.Done(): - c.Close() - <-c.Wait() - return nil + case <-reloadConf: + + case <-ctx.Done(): + c.Close() + <-c.Wait() + return nil + } } } diff --git a/internal/core/path.go b/internal/core/path.go index ebcd0a3e..e538708c 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -201,6 +201,7 @@ type path struct { ctx context.Context ctxCancel func() + confMutex sync.RWMutex source source bytesReceived *uint64 stream *stream @@ -217,6 +218,7 @@ type path struct { onDemandPublisherCloseTimer *time.Timer // in + chReloadConf chan *conf.PathConf chSourceStaticSetReady chan pathSourceStaticSetReadyReq chSourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq chDescribe chan pathDescribeReq @@ -227,6 +229,9 @@ type path struct { chReaderAdd chan pathReaderAddReq chReaderRemove chan pathReaderRemoveReq chAPIPathsList chan pathAPIPathsListSubReq + + // out + done chan struct{} } func newPath( @@ -236,7 +241,7 @@ func newPath( writeTimeout conf.StringDuration, readBufferCount int, confName string, - conf *conf.PathConf, + cnf *conf.PathConf, name string, matches []string, wg *sync.WaitGroup, @@ -251,7 +256,7 @@ func newPath( writeTimeout: writeTimeout, readBufferCount: readBufferCount, confName: confName, - conf: conf, + conf: cnf, name: name, matches: matches, wg: wg, @@ -265,6 +270,7 @@ func newPath( onDemandStaticSourceCloseTimer: newEmptyTimer(), onDemandPublisherReadyTimer: newEmptyTimer(), onDemandPublisherCloseTimer: newEmptyTimer(), + chReloadConf: make(chan *conf.PathConf), chSourceStaticSetReady: make(chan pathSourceStaticSetReadyReq), chSourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq), chDescribe: make(chan pathDescribeReq), @@ -275,6 +281,7 @@ func newPath( chReaderAdd: make(chan pathReaderAddReq), chReaderRemove: make(chan pathReaderRemoveReq), chAPIPathsList: make(chan pathAPIPathsListSubReq), + done: make(chan struct{}), } pa.log(logger.Debug, "created") @@ -289,26 +296,15 @@ func (pa *path) close() { pa.ctxCancel() } +func (pa *path) wait() { + <-pa.done +} + // Log is the main logging function. func (pa *path) log(level logger.Level, format string, args ...interface{}) { pa.parent.log(level, "[path "+pa.name+"] "+format, args...) } -// ConfName returns the configuration name of this path. -func (pa *path) ConfName() string { - return pa.confName -} - -// Conf returns the configuration of this path. -func (pa *path) Conf() *conf.PathConf { - return pa.conf -} - -// Name returns the name of this path. -func (pa *path) Name() string { - return pa.name -} - func (pa *path) hasStaticSource() bool { return strings.HasPrefix(pa.conf.Source, "rtsp://") || strings.HasPrefix(pa.conf.Source, "rtsps://") || @@ -327,7 +323,14 @@ func (pa *path) hasOnDemandPublisher() bool { return pa.conf.RunOnDemand != "" } +func (pa *path) safeConf() *conf.PathConf { + pa.confMutex.RLock() + defer pa.confMutex.RUnlock() + return pa.conf +} + func (pa *path) run() { + defer close(pa.done) defer pa.wg.Done() if pa.conf.Source == "redirect" { @@ -410,6 +413,15 @@ func (pa *path) run() { return fmt.Errorf("not in use") } + case newConf := <-pa.chReloadConf: + if pa.hasStaticSource() { + go pa.source.(*sourceStatic).reloadConf(newConf) + } + + pa.confMutex.Lock() + pa.conf = newConf + pa.confMutex.Unlock() + case req := <-pa.chSourceStaticSetReady: err := pa.sourceSetReady(req.medias, req.generateRTPPackets) if err != nil { @@ -920,6 +932,14 @@ func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) { close(req.res) } +// reloadConf is called by pathManager. +func (pa *path) reloadConf(newConf *conf.PathConf) { + select { + case pa.chReloadConf <- newConf: + case <-pa.ctx.Done(): + } +} + // sourceStaticSetReady is called by sourceStatic. func (pa *path) sourceStaticSetReady(sourceStaticCtx context.Context, req pathSourceStaticSetReadyReq) { select { diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 3b79113b..78f27c13 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -10,6 +10,24 @@ import ( "github.com/aler9/rtsp-simple-server/internal/logger" ) +func pathConfCanBeUpdated(oldPathConf *conf.PathConf, newPathConf *conf.PathConf) bool { + var copy conf.PathConf + cloneStruct(©, oldPathConf) + + copy.RPICameraBrightness = newPathConf.RPICameraBrightness + copy.RPICameraContrast = newPathConf.RPICameraContrast + copy.RPICameraSaturation = newPathConf.RPICameraSaturation + copy.RPICameraSharpness = newPathConf.RPICameraSharpness + copy.RPICameraExposure = newPathConf.RPICameraExposure + copy.RPICameraAWB = newPathConf.RPICameraAWB + copy.RPICameraDenoise = newPathConf.RPICameraDenoise + copy.RPICameraMetering = newPathConf.RPICameraMetering + copy.RPICameraEV = newPathConf.RPICameraEV + copy.RPICameraFPS = newPathConf.RPICameraFPS + + return newPathConf.Equal(©) +} + type pathManagerHLSServer interface { pathSourceReady(*path) pathSourceNotReady(*path) @@ -29,11 +47,12 @@ type pathManager struct { metrics *metrics parent pathManagerParent - ctx context.Context - ctxCancel func() - wg sync.WaitGroup - hlsServer pathManagerHLSServer - paths map[string]*path + ctx context.Context + ctxCancel func() + wg sync.WaitGroup + hlsServer pathManagerHLSServer + paths map[string]*path + pathsByConf map[string]map[*path]struct{} // in chConfReload chan map[string]*conf.PathConf @@ -72,6 +91,7 @@ func newPathManager( ctx: ctx, ctxCancel: ctxCancel, paths: make(map[string]*path), + pathsByConf: make(map[string]map[*path]struct{}), chConfReload: make(chan map[string]*conf.PathConf), chPathClose: make(chan *path), chPathSourceReady: make(chan *path), @@ -118,36 +138,34 @@ func (pm *pathManager) run() { outer: for { select { - case pathConfs := <-pm.chConfReload: - // remove confs - for pathConfName := range pm.pathConfs { - if _, ok := pathConfs[pathConfName]; !ok { - delete(pm.pathConfs, pathConfName) - } - } - - // update confs - for pathConfName, oldConf := range pm.pathConfs { - if !oldConf.Equal(pathConfs[pathConfName]) { - pm.pathConfs[pathConfName] = pathConfs[pathConfName] - } - } - - // add confs - for pathConfName, pathConf := range pathConfs { - if _, ok := pm.pathConfs[pathConfName]; !ok { - pm.pathConfs[pathConfName] = pathConf + case newPathConfs := <-pm.chConfReload: + for confName, pathConf := range pm.pathConfs { + if newPathConf, ok := newPathConfs[confName]; ok { + // configuration has changed + if !newPathConf.Equal(pathConf) { + if pathConfCanBeUpdated(pathConf, newPathConf) { // paths associated with the configuration can be updated + for pa := range pm.pathsByConf[confName] { + go pa.reloadConf(newPathConf) + } + } else { // paths associated with the configuration must be recreated + for pa := range pm.pathsByConf[confName] { + pm.removePath(pa) + pa.close() + pa.wait() // avoid conflicts between sources + } + } + } + } else { + // configuration has been deleted, remove associated paths + for pa := range pm.pathsByConf[confName] { + pm.removePath(pa) + pa.close() + pa.wait() // avoid conflicts between sources + } } } - // remove paths associated with a conf which doesn't exist anymore - // or has changed - for _, pa := range pm.paths { - if pathConf, ok := pm.pathConfs[pa.ConfName()]; !ok || pathConf != pa.Conf() { - delete(pm.paths, pa.Name()) - pa.close() - } - } + pm.pathConfs = newPathConfs // add new paths for pathConfName, pathConf := range pm.pathConfs { @@ -157,11 +175,10 @@ outer: } case pa := <-pm.chPathClose: - if pmpa, ok := pm.paths[pa.Name()]; !ok || pmpa != pa { + if pmpa, ok := pm.paths[pa.name]; !ok || pmpa != pa { continue } - delete(pm.paths, pa.Name()) - pa.close() + pm.removePath(pa) case pa := <-pm.chPathSourceReady: if pm.hlsServer != nil { @@ -278,7 +295,7 @@ func (pm *pathManager) createPath( name string, matches []string, ) { - pm.paths[name] = newPath( + pa := newPath( pm.ctx, pm.rtspAddress, pm.readTimeout, @@ -291,6 +308,21 @@ func (pm *pathManager) createPath( &pm.wg, pm.externalCmdPool, pm) + + pm.paths[name] = pa + + if _, ok := pm.pathsByConf[pathConfName]; !ok { + pm.pathsByConf[pathConfName] = make(map[*path]struct{}) + } + pm.pathsByConf[pathConfName][pa] = struct{}{} +} + +func (pm *pathManager) removePath(pa *path) { + delete(pm.pathsByConf[pa.confName], pa) + if len(pm.pathsByConf[pa.confName]) == 0 { + delete(pm.pathsByConf, pa.confName) + } + delete(pm.paths, pa.name) } func (pm *pathManager) findPathConf(name string) (string, *conf.PathConf, []string, error) { @@ -330,6 +362,7 @@ func (pm *pathManager) pathSourceReady(pa *path) { select { case pm.chPathSourceReady <- pa: case <-pm.ctx.Done(): + case <-pa.ctx.Done(): // in case pathManager is closing the path } } @@ -338,6 +371,7 @@ func (pm *pathManager) pathSourceNotReady(pa *path) { select { case pm.chPathSourceNotReady <- pa: case <-pm.ctx.Done(): + case <-pa.ctx.Done(): // in case pathManager is closing the path } } @@ -346,6 +380,7 @@ func (pm *pathManager) onPathClose(pa *path) { select { case pm.chPathClose <- pa: case <-pm.ctx.Done(): + case <-pa.ctx.Done(): // in case pathManager is closing the path } } diff --git a/internal/core/rpicamera_source.go b/internal/core/rpicamera_source.go index bc52a3bc..a5916944 100644 --- a/internal/core/rpicamera_source.go +++ b/internal/core/rpicamera_source.go @@ -7,11 +7,46 @@ import ( "github.com/aler9/gortsplib/v2/pkg/format" "github.com/aler9/gortsplib/v2/pkg/media" + "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/formatprocessor" "github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/rpicamera" ) +func paramsFromConf(cnf *conf.PathConf) rpicamera.Params { + return rpicamera.Params{ + CameraID: cnf.RPICameraCamID, + Width: cnf.RPICameraWidth, + Height: cnf.RPICameraHeight, + HFlip: cnf.RPICameraHFlip, + VFlip: cnf.RPICameraVFlip, + Brightness: cnf.RPICameraBrightness, + Contrast: cnf.RPICameraContrast, + Saturation: cnf.RPICameraSaturation, + Sharpness: cnf.RPICameraSharpness, + Exposure: cnf.RPICameraExposure, + AWB: cnf.RPICameraAWB, + Denoise: cnf.RPICameraDenoise, + Shutter: cnf.RPICameraShutter, + Metering: cnf.RPICameraMetering, + Gain: cnf.RPICameraGain, + EV: cnf.RPICameraEV, + ROI: cnf.RPICameraROI, + TuningFile: cnf.RPICameraTuningFile, + Mode: cnf.RPICameraMode, + FPS: cnf.RPICameraFPS, + IDRPeriod: cnf.RPICameraIDRPeriod, + Bitrate: cnf.RPICameraBitrate, + Profile: cnf.RPICameraProfile, + Level: cnf.RPICameraLevel, + AfMode: cnf.RPICameraAfMode, + AfRange: cnf.RPICameraAfRange, + AfSpeed: cnf.RPICameraAfSpeed, + LensPosition: cnf.RPICameraLensPosition, + AfWindow: cnf.RPICameraAfWindow, + } +} + type rpiCameraSourceParent interface { log(logger.Level, string, ...interface{}) sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes @@ -19,16 +54,13 @@ type rpiCameraSourceParent interface { } type rpiCameraSource struct { - params rpicamera.Params parent rpiCameraSourceParent } func newRPICameraSource( - params rpicamera.Params, parent rpiCameraSourceParent, ) *rpiCameraSource { return &rpiCameraSource{ - params: params, parent: parent, } } @@ -38,7 +70,7 @@ func (s *rpiCameraSource) Log(level logger.Level, format string, args ...interfa } // run implements sourceStaticImpl. -func (s *rpiCameraSource) run(ctx context.Context) error { +func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error { medi := &media.Media{ Type: media.TypeVideo, Formats: []format.Format{&format.H264{ @@ -73,7 +105,7 @@ func (s *rpiCameraSource) run(ctx context.Context) error { } } - cam, err := rpicamera.New(s.params, onData) + cam, err := rpicamera.New(paramsFromConf(cnf), onData) if err != nil { return err } @@ -85,8 +117,15 @@ func (s *rpiCameraSource) run(ctx context.Context) error { } }() - <-ctx.Done() - return nil + for { + select { + case cnf := <-reloadConf: + cam.ReloadParams(paramsFromConf(cnf)) + + case <-ctx.Done(): + return nil + } + } } // apiSourceDescribe implements sourceStaticImpl. diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 6a0b1d25..90d82a98 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -422,14 +422,16 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { defer res.stream.readerRemove(c) c.log(logger.Info, "is reading from path '%s', %s", - path.Name(), sourceMediaInfo(medias)) + path.name, sourceMediaInfo(medias)) - if path.Conf().RunOnRead != "" { + pathConf := path.safeConf() + + if pathConf.RunOnRead != "" { c.log(logger.Info, "runOnRead command started") onReadCmd := externalcmd.NewCmd( c.externalCmdPool, - path.Conf().RunOnRead, - path.Conf().RunOnReadRestart, + pathConf.RunOnRead, + pathConf.RunOnReadRestart, path.externalCmdEnv(), func(co int) { c.log(logger.Info, "runOnRead command exited with code %d", co) @@ -530,7 +532,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { } c.log(logger.Info, "is publishing to path '%s', %s", - path.Name(), + path.name, sourceMediaInfo(medias)) // disable write deadline to allow outgoing acknowledges diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index c0700629..6e69323e 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -30,23 +30,17 @@ type rtmpSourceParent interface { } type rtmpSource struct { - ur string - fingerprint string readTimeout conf.StringDuration writeTimeout conf.StringDuration parent rtmpSourceParent } func newRTMPSource( - ur string, - fingerprint string, readTimeout conf.StringDuration, writeTimeout conf.StringDuration, parent rtmpSourceParent, ) *rtmpSource { return &rtmpSource{ - ur: ur, - fingerprint: fingerprint, readTimeout: readTimeout, writeTimeout: writeTimeout, parent: parent, @@ -58,10 +52,10 @@ func (s *rtmpSource) Log(level logger.Level, format string, args ...interface{}) } // run implements sourceStaticImpl. -func (s *rtmpSource) run(ctx context.Context) error { +func (s *rtmpSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error { s.Log(logger.Debug, "connecting") - u, err := url.Parse(s.ur) + u, err := url.Parse(cnf.Source) if err != nil { return err } @@ -86,7 +80,7 @@ func (s *rtmpSource) run(ctx context.Context) error { h := sha256.New() h.Write(cs.PeerCertificates[0].Raw) hstr := hex.EncodeToString(h.Sum(nil)) - fingerprintLower := strings.ToLower(s.fingerprint) + fingerprintLower := strings.ToLower(cnf.SourceFingerprint) if hstr != fingerprintLower { return fmt.Errorf("server fingerprint do not match: expected %s, got %s", @@ -213,15 +207,19 @@ func (s *rtmpSource) run(ctx context.Context) error { }() }() - select { - case err := <-readDone: - nconn.Close() - return err + for { + select { + case err := <-readDone: + nconn.Close() + return err + + case <-reloadConf: - case <-ctx.Done(): - nconn.Close() - <-readDone - return nil + case <-ctx.Done(): + nconn.Close() + <-readDone + return nil + } } } diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 46a3860a..e40a5487 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -270,16 +270,18 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo if s.session.State() == gortsplib.ServerSessionStatePrePlay { s.log(logger.Info, "is reading from path '%s', with %s, %s", - s.path.Name(), + s.path.name, s.session.SetuppedTransport(), sourceMediaInfo(s.session.SetuppedMedias())) - if s.path.Conf().RunOnRead != "" { + pathConf := s.path.safeConf() + + if pathConf.RunOnRead != "" { s.log(logger.Info, "runOnRead command started") s.onReadCmd = externalcmd.NewCmd( s.externalCmdPool, - s.path.Conf().RunOnRead, - s.path.Conf().RunOnReadRestart, + pathConf.RunOnRead, + pathConf.RunOnReadRestart, s.path.externalCmdEnv(), func(co int) { s.log(logger.Info, "runOnRead command exited with code %d", co) @@ -311,7 +313,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R } s.log(logger.Info, "is publishing to path '%s', with %s, %s", - s.path.Name(), + s.path.name, s.session.SetuppedTransport(), sourceMediaInfo(s.session.AnnouncedMedias())) diff --git a/internal/core/rtsp_source.go b/internal/core/rtsp_source.go index 68441ab8..79f655e0 100644 --- a/internal/core/rtsp_source.go +++ b/internal/core/rtsp_source.go @@ -27,10 +27,6 @@ type rtspSourceParent interface { } type rtspSource struct { - ur string - proto conf.SourceProtocol - anyPortEnable bool - fingerprint string readTimeout conf.StringDuration writeTimeout conf.StringDuration readBufferCount int @@ -38,20 +34,12 @@ type rtspSource struct { } func newRTSPSource( - ur string, - proto conf.SourceProtocol, - anyPortEnable bool, - fingerprint string, readTimeout conf.StringDuration, writeTimeout conf.StringDuration, readBufferCount int, parent rtspSourceParent, ) *rtspSource { return &rtspSource{ - ur: ur, - proto: proto, - anyPortEnable: anyPortEnable, - fingerprint: fingerprint, readTimeout: readTimeout, writeTimeout: writeTimeout, readBufferCount: readBufferCount, @@ -64,18 +52,18 @@ func (s *rtspSource) Log(level logger.Level, format string, args ...interface{}) } // run implements sourceStaticImpl. -func (s *rtspSource) run(ctx context.Context) error { +func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error { s.Log(logger.Debug, "connecting") var tlsConfig *tls.Config - if s.fingerprint != "" { + if cnf.SourceFingerprint != "" { tlsConfig = &tls.Config{ InsecureSkipVerify: true, VerifyConnection: func(cs tls.ConnectionState) error { h := sha256.New() h.Write(cs.PeerCertificates[0].Raw) hstr := hex.EncodeToString(h.Sum(nil)) - fingerprintLower := strings.ToLower(s.fingerprint) + fingerprintLower := strings.ToLower(cnf.SourceFingerprint) if hstr != fingerprintLower { return fmt.Errorf("server fingerprint do not match: expected %s, got %s", @@ -88,12 +76,12 @@ func (s *rtspSource) run(ctx context.Context) error { } c := &gortsplib.Client{ - Transport: s.proto.Transport, + Transport: cnf.SourceProtocol.Transport, TLSConfig: tlsConfig, ReadTimeout: time.Duration(s.readTimeout), WriteTimeout: time.Duration(s.writeTimeout), ReadBufferCount: s.readBufferCount, - AnyPortEnable: s.anyPortEnable, + AnyPortEnable: cnf.SourceAnyPortEnable, OnRequest: func(req *base.Request) { s.Log(logger.Debug, "c->s %v", req) }, @@ -105,7 +93,7 @@ func (s *rtspSource) run(ctx context.Context) error { }, } - u, err := url.Parse(s.ur) + u, err := url.Parse(cnf.Source) if err != nil { return err } @@ -238,14 +226,18 @@ func (s *rtspSource) run(ctx context.Context) error { }() }() - select { - case err := <-readErr: - return err + for { + select { + case err := <-readErr: + return err + + case <-reloadConf: - case <-ctx.Done(): - c.Close() - <-readErr - return nil + case <-ctx.Done(): + c.Close() + <-readErr + return nil + } } } diff --git a/internal/core/source_static.go b/internal/core/source_static.go index 46fd93eb..8844c629 100644 --- a/internal/core/source_static.go +++ b/internal/core/source_static.go @@ -8,7 +8,6 @@ import ( "github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/logger" - "github.com/aler9/rtsp-simple-server/internal/rpicamera" ) const ( @@ -17,7 +16,7 @@ const ( type sourceStaticImpl interface { Log(logger.Level, string, ...interface{}) - run(context.Context) error + run(context.Context, *conf.PathConf, chan *conf.PathConf) error apiSourceDescribe() interface{} } @@ -29,6 +28,7 @@ type sourceStaticParent interface { // sourceStatic is a static source. type sourceStatic struct { + conf *conf.PathConf parent sourceStaticParent ctx context.Context @@ -36,86 +36,53 @@ type sourceStatic struct { impl sourceStaticImpl running bool - done chan struct{} + // in + chReloadConf chan *conf.PathConf chSourceStaticImplSetReady chan pathSourceStaticSetReadyReq chSourceStaticImplSetNotReady chan pathSourceStaticSetNotReadyReq + + // out + done chan struct{} } func newSourceStatic( - conf *conf.PathConf, + cnf *conf.PathConf, readTimeout conf.StringDuration, writeTimeout conf.StringDuration, readBufferCount int, parent sourceStaticParent, ) *sourceStatic { s := &sourceStatic{ + conf: cnf, parent: parent, + chReloadConf: make(chan *conf.PathConf), chSourceStaticImplSetReady: make(chan pathSourceStaticSetReadyReq), chSourceStaticImplSetNotReady: make(chan pathSourceStaticSetNotReadyReq), } switch { - case strings.HasPrefix(conf.Source, "rtsp://") || - strings.HasPrefix(conf.Source, "rtsps://"): + case strings.HasPrefix(cnf.Source, "rtsp://") || + strings.HasPrefix(cnf.Source, "rtsps://"): s.impl = newRTSPSource( - conf.Source, - conf.SourceProtocol, - conf.SourceAnyPortEnable, - conf.SourceFingerprint, readTimeout, writeTimeout, readBufferCount, s) - case strings.HasPrefix(conf.Source, "rtmp://") || - strings.HasPrefix(conf.Source, "rtmps://"): + case strings.HasPrefix(cnf.Source, "rtmp://") || + strings.HasPrefix(cnf.Source, "rtmps://"): s.impl = newRTMPSource( - conf.Source, - conf.SourceFingerprint, readTimeout, writeTimeout, s) - case strings.HasPrefix(conf.Source, "http://") || - strings.HasPrefix(conf.Source, "https://"): + case strings.HasPrefix(cnf.Source, "http://") || + strings.HasPrefix(cnf.Source, "https://"): s.impl = newHLSSource( - conf.Source, - conf.SourceFingerprint, s) - case conf.Source == "rpiCamera": + case cnf.Source == "rpiCamera": s.impl = newRPICameraSource( - rpicamera.Params{ - CameraID: conf.RPICameraCamID, - Width: conf.RPICameraWidth, - Height: conf.RPICameraHeight, - HFlip: conf.RPICameraHFlip, - VFlip: conf.RPICameraVFlip, - Brightness: conf.RPICameraBrightness, - Contrast: conf.RPICameraContrast, - Saturation: conf.RPICameraSaturation, - Sharpness: conf.RPICameraSharpness, - Exposure: conf.RPICameraExposure, - AWB: conf.RPICameraAWB, - Denoise: conf.RPICameraDenoise, - Shutter: conf.RPICameraShutter, - Metering: conf.RPICameraMetering, - Gain: conf.RPICameraGain, - EV: conf.RPICameraEV, - ROI: conf.RPICameraROI, - TuningFile: conf.RPICameraTuningFile, - Mode: conf.RPICameraMode, - FPS: conf.RPICameraFPS, - IDRPeriod: conf.RPICameraIDRPeriod, - Bitrate: conf.RPICameraBitrate, - Profile: conf.RPICameraProfile, - Level: conf.RPICameraLevel, - AfMode: conf.RPICameraAfMode, - AfRange: conf.RPICameraAfRange, - AfSpeed: conf.RPICameraAfSpeed, - LensPosition: conf.RPICameraLensPosition, - AfWindow: conf.RPICameraAfWindow, - }, s) } @@ -163,33 +130,43 @@ func (s *sourceStatic) log(level logger.Level, format string, args ...interface{ func (s *sourceStatic) run() { defer close(s.done) -outer: - for { - s.runInner() + var innerCtx context.Context + var innerCtxCancel func() + implErr := make(chan error) + innerReloadConf := make(chan *conf.PathConf) - select { - case <-time.After(sourceStaticRetryPause): - case <-s.ctx.Done(): - break outer - } + recreate := func() { + innerCtx, innerCtxCancel = context.WithCancel(context.Background()) + go func() { + implErr <- s.impl.run(innerCtx, s.conf, innerReloadConf) + }() } - s.ctxCancel() -} + recreate() -func (s *sourceStatic) runInner() { - innerCtx, innerCtxCancel := context.WithCancel(context.Background()) - implErr := make(chan error) - go func() { - implErr <- s.impl.run(innerCtx) - }() + recreating := false + recreateTimer := newEmptyTimer() for { select { case err := <-implErr: innerCtxCancel() s.impl.Log(logger.Info, "ERR: %v", err) - return + recreating = true + recreateTimer = time.NewTimer(sourceStaticRetryPause) + + case newConf := <-s.chReloadConf: + s.conf = newConf + if !recreating { + cReloadConf := innerReloadConf + cInnerCtx := innerCtx + go func() { + select { + case cReloadConf <- newConf: + case <-cInnerCtx.Done(): + } + }() + } case req := <-s.chSourceStaticImplSetReady: s.parent.sourceStaticSetReady(s.ctx, req) @@ -197,14 +174,27 @@ func (s *sourceStatic) runInner() { case req := <-s.chSourceStaticImplSetNotReady: s.parent.sourceStaticSetNotReady(s.ctx, req) + case <-recreateTimer.C: + recreate() + recreating = false + case <-s.ctx.Done(): - innerCtxCancel() - <-implErr + if !recreating { + innerCtxCancel() + <-implErr + } return } } } +func (s *sourceStatic) reloadConf(newConf *conf.PathConf) { + select { + case s.chReloadConf <- newConf: + case <-s.ctx.Done(): + } +} + // apiSourceDescribe implements source. func (s *sourceStatic) apiSourceDescribe() interface{} { return s.impl.apiSourceDescribe() diff --git a/internal/core/webrtc_conn.go b/internal/core/webrtc_conn.go index b3da7711..904ea11d 100644 --- a/internal/core/webrtc_conn.go +++ b/internal/core/webrtc_conn.go @@ -507,7 +507,7 @@ outer: defer res.stream.readerRemove(c) c.log(logger.Info, "is reading from path '%s', %s", - path.Name(), sourceMediaInfo(gatherMedias(tracks))) + path.name, sourceMediaInfo(gatherMedias(tracks))) go func() { for { diff --git a/internal/core/webrtc_server.go b/internal/core/webrtc_server.go index 726c6ff1..944bf973 100644 --- a/internal/core/webrtc_server.go +++ b/internal/core/webrtc_server.go @@ -421,7 +421,7 @@ func (s *webRTCServer) newConn(dir string, wsconn *websocket.ServerConn) *webRTC } func (s *webRTCServer) authenticate(pa *path, ctx *gin.Context) error { - pathConf := pa.Conf() + pathConf := pa.safeConf() pathIPs := pathConf.ReadIPs pathUser := pathConf.ReadUser pathPass := pathConf.ReadPass diff --git a/internal/rpicamera/embeddedexe.go b/internal/rpicamera/embeddedexe.go deleted file mode 100644 index edd1add8..00000000 --- a/internal/rpicamera/embeddedexe.go +++ /dev/null @@ -1,49 +0,0 @@ -//go:build rpicamera -// +build rpicamera - -package rpicamera - -import ( - "os" - "os/exec" - "strconv" - "time" -) - -const ( - tempPathPrefix = "/dev/shm/rtspss-embeddedexe-" -) - -type embeddedExe struct { - cmd *exec.Cmd -} - -func newEmbeddedExe(content []byte, env []string) (*embeddedExe, error) { - tempPath := tempPathPrefix + strconv.FormatInt(time.Now().UnixNano(), 10) - - err := os.WriteFile(tempPath, content, 0o755) - if err != nil { - return nil, err - } - - cmd := exec.Command(tempPath) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Env = env - - err = cmd.Start() - os.Remove(tempPath) - - if err != nil { - return nil, err - } - - return &embeddedExe{ - cmd: cmd, - }, nil -} - -func (e *embeddedExe) close() { - e.cmd.Process.Kill() - e.cmd.Wait() -} diff --git a/internal/rpicamera/exe/Makefile b/internal/rpicamera/exe/Makefile index 455cc019..ef3344e6 100644 --- a/internal/rpicamera/exe/Makefile +++ b/internal/rpicamera/exe/Makefile @@ -26,6 +26,7 @@ OBJS = \ encoder.o \ main.o \ parameters.o \ + pipe.o \ window.o \ sensor_mode.o diff --git a/internal/rpicamera/exe/camera.cpp b/internal/rpicamera/exe/camera.cpp index 792ffe2d..0714b3b3 100644 --- a/internal/rpicamera/exe/camera.cpp +++ b/internal/rpicamera/exe/camera.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -71,13 +72,15 @@ static PixelFormat mode_to_pixel_format(sensor_mode_t *mode) { } struct CameraPriv { - parameters_t *params; + const parameters_t *params; camera_frame_cb frame_cb; std::unique_ptr camera_manager; std::shared_ptr camera; Stream *video_stream; std::unique_ptr allocator; std::vector> requests; + std::mutex ctrls_mutex; + std::unique_ptr ctrls; }; static int get_v4l2_colorspace(std::optional const &cs) { @@ -87,7 +90,7 @@ static int get_v4l2_colorspace(std::optional const &cs) { return V4L2_COLORSPACE_SMPTE170M; } -bool camera_create(parameters_t *params, camera_frame_cb frame_cb, camera_t **cam) { +bool camera_create(const parameters_t *params, camera_frame_cb frame_cb, camera_t **cam) { // We make sure to set the environment variable before libcamera init setenv("LIBCAMERA_RPI_TUNING_FILE", params->tuning_file, 1); @@ -223,6 +226,13 @@ static void on_request_complete(Request *request) { camp->frame_cb(buffer->planes()[0].fd.get(), size, ts); request->reuse(Request::ReuseFlag::ReuseBuffers); + + { + std::lock_guard lock(camp->ctrls_mutex); + request->controls() = *camp->ctrls; + camp->ctrls->clear(); + } + camp->camera->queueRequest(request); } @@ -236,81 +246,126 @@ int camera_get_mode_colorspace(camera_t *cam) { return get_v4l2_colorspace(camp->video_stream->configuration().colorSpace); } -bool camera_start(camera_t *cam) { - CameraPriv *camp = (CameraPriv *)cam; - - ControlList ctrls = ControlList(controls::controls); - - ctrls.set(controls::Brightness, camp->params->brightness); - ctrls.set(controls::Contrast, camp->params->contrast); - ctrls.set(controls::Saturation, camp->params->saturation); - ctrls.set(controls::Sharpness, camp->params->sharpness); +static void fill_dynamic_controls(ControlList *ctrls, const parameters_t *params) { + ctrls->set(controls::Brightness, params->brightness); + ctrls->set(controls::Contrast, params->contrast); + ctrls->set(controls::Saturation, params->saturation); + ctrls->set(controls::Sharpness, params->sharpness); int exposure_mode; - if (strcmp(camp->params->exposure, "short") == 0) { + if (strcmp(params->exposure, "short") == 0) { exposure_mode = controls::ExposureShort; - } else if (strcmp(camp->params->exposure, "long") == 0) { + } else if (strcmp(params->exposure, "long") == 0) { exposure_mode = controls::ExposureLong; - } else if (strcmp(camp->params->exposure, "custom") == 0) { + } else if (strcmp(params->exposure, "custom") == 0) { exposure_mode = controls::ExposureCustom; } else { exposure_mode = controls::ExposureNormal; } - ctrls.set(controls::AeExposureMode, exposure_mode); + ctrls->set(controls::AeExposureMode, exposure_mode); int awb_mode; - if (strcmp(camp->params->awb, "incandescent") == 0) { + if (strcmp(params->awb, "incandescent") == 0) { awb_mode = controls::AwbIncandescent; - } else if (strcmp(camp->params->awb, "tungsten") == 0) { + } else if (strcmp(params->awb, "tungsten") == 0) { awb_mode = controls::AwbTungsten; - } else if (strcmp(camp->params->awb, "fluorescent") == 0) { + } else if (strcmp(params->awb, "fluorescent") == 0) { awb_mode = controls::AwbFluorescent; - } else if (strcmp(camp->params->awb, "indoor") == 0) { + } else if (strcmp(params->awb, "indoor") == 0) { awb_mode = controls::AwbIndoor; - } else if (strcmp(camp->params->awb, "daylight") == 0) { + } else if (strcmp(params->awb, "daylight") == 0) { awb_mode = controls::AwbDaylight; - } else if (strcmp(camp->params->awb, "cloudy") == 0) { + } else if (strcmp(params->awb, "cloudy") == 0) { awb_mode = controls::AwbCloudy; - } else if (strcmp(camp->params->awb, "custom") == 0) { + } else if (strcmp(params->awb, "custom") == 0) { awb_mode = controls::AwbCustom; } else { awb_mode = controls::AwbAuto; } - ctrls.set(controls::AwbMode, awb_mode); + ctrls->set(controls::AwbMode, awb_mode); int denoise_mode; - if (strcmp(camp->params->denoise, "cdn_off") == 0) { + if (strcmp(params->denoise, "cdn_off") == 0) { denoise_mode = controls::draft::NoiseReductionModeMinimal; - } else if (strcmp(camp->params->denoise, "cdn_hq") == 0) { + } else if (strcmp(params->denoise, "cdn_hq") == 0) { denoise_mode = controls::draft::NoiseReductionModeHighQuality; - } else if (strcmp(camp->params->denoise, "cdn_fast") == 0) { + } else if (strcmp(params->denoise, "cdn_fast") == 0) { denoise_mode = controls::draft::NoiseReductionModeFast; } else { denoise_mode = controls::draft::NoiseReductionModeOff; } - ctrls.set(controls::draft::NoiseReductionMode, denoise_mode); - - if (camp->params->shutter != 0) { - ctrls.set(controls::ExposureTime, camp->params->shutter); - } + ctrls->set(controls::draft::NoiseReductionMode, denoise_mode); int metering_mode; - if (strcmp(camp->params->metering, "spot") == 0) { + if (strcmp(params->metering, "spot") == 0) { metering_mode = controls::MeteringSpot; - } else if (strcmp(camp->params->metering, "matrix") == 0) { + } else if (strcmp(params->metering, "matrix") == 0) { metering_mode = controls::MeteringMatrix; - } else if (strcmp(camp->params->metering, "custom") == 0) { + } else if (strcmp(params->metering, "custom") == 0) { metering_mode = controls::MeteringCustom; } else { metering_mode = controls::MeteringCentreWeighted; } - ctrls.set(controls::AeMeteringMode, metering_mode); + ctrls->set(controls::AeMeteringMode, metering_mode); + + ctrls->set(controls::ExposureValue, params->ev); + + int64_t frame_time = 1000000 / params->fps; + ctrls->set(controls::FrameDurationLimits, Span({ frame_time, frame_time })); +} + +bool camera_start(camera_t *cam) { + CameraPriv *camp = (CameraPriv *)cam; + + camp->ctrls = std::make_unique(controls::controls); + + fill_dynamic_controls(camp->ctrls.get(), camp->params); + + if (camp->params->shutter != 0) { + camp->ctrls->set(controls::ExposureTime, camp->params->shutter); + } if (camp->params->gain > 0) { - ctrls.set(controls::AnalogueGain, camp->params->gain); + camp->ctrls->set(controls::AnalogueGain, camp->params->gain); } - ctrls.set(controls::ExposureValue, camp->params->ev); + if (camp->camera->controls().count(&controls::AfMode) > 0) { + int af_mode; + if (strcmp(camp->params->af_mode, "manual") == 0) { + af_mode = controls::AfModeManual; + } else if (strcmp(camp->params->af_mode, "continuous") == 0) { + af_mode = controls::AfModeContinuous; + } else { + af_mode = controls::AfModeAuto; + } + camp->ctrls->set(controls::AfMode, af_mode); + + if (af_mode == controls::AfModeManual) { + camp->ctrls->set(controls::LensPosition, camp->params->lens_position); + } + } + + if (camp->camera->controls().count(&controls::AfRange) > 0) { + int af_range; + if (strcmp(camp->params->af_range, "macro") == 0) { + af_range = controls::AfRangeMacro; + } else if (strcmp(camp->params->af_range, "full") == 0) { + af_range = controls::AfRangeFull; + } else { + af_range = controls::AfRangeNormal; + } + camp->ctrls->set(controls::AfRange, af_range); + } + + if (camp->camera->controls().count(&controls::AfSpeed) > 0) { + int af_speed; + if (strcmp(camp->params->af_range, "fast") == 0) { + af_speed = controls::AfSpeedFast; + } else { + af_speed = controls::AfSpeedNormal; + } + camp->ctrls->set(controls::AfSpeed, af_speed); + } if (camp->params->roi != NULL) { std::optional opt = camp->camera->properties().get(properties::ScalerCropMaximum); @@ -328,51 +383,9 @@ bool camera_start(camera_t *cam) { camp->params->roi->width * sensor_area.width, camp->params->roi->height * sensor_area.height); crop.translateBy(sensor_area.topLeft()); - ctrls.set(controls::ScalerCrop, crop); - } - - int64_t frame_time = 1000000 / camp->params->fps; - ctrls.set(controls::FrameDurationLimits, Span({ frame_time, frame_time })); - - int af_mode; - if (strcmp(camp->params->af_mode, "manual") == 0) { - af_mode = controls::AfModeManual; - } else if (strcmp(camp->params->af_mode, "auto") == 0) { - af_mode = controls::AfModeAuto; - } else if (strcmp(camp->params->af_mode, "continuous") == 0) { - af_mode = controls::AfModeContinuous; - } else { - af_mode = controls::AfModeManual; - } - ctrls.set(controls::AfMode, af_mode); - - int af_range; - if (strcmp(camp->params->af_range, "normal") == 0) { - af_range = controls::AfRangeNormal; - } else if (strcmp(camp->params->af_range, "macro") == 0) { - af_range = controls::AfRangeMacro; - } else if (strcmp(camp->params->af_range, "full") == 0) { - af_range = controls::AfRangeFull; - } else { - af_range = controls::AfRangeNormal; - } - ctrls.set(controls::AfRange, af_range); - - int af_speed; - if (strcmp(camp->params->af_range, "normal") == 0) { - af_speed = controls::AfSpeedNormal; - } else if (strcmp(camp->params->af_range, "fast") == 0) { - af_speed = controls::AfSpeedFast; - } else { - af_speed = controls::AfSpeedNormal; + camp->ctrls->set(controls::ScalerCrop, crop); } - ctrls.set(controls::AfSpeed, af_speed); - - - // Lens Position - ctrls.set(controls::LensPosition, camp->params->lens_position); - // Af Window if (camp->params->af_window != NULL) { std::optional opt = camp->camera->properties().get(properties::ScalerCropMaximum); Rectangle sensor_area; @@ -385,26 +398,25 @@ bool camera_start(camera_t *cam) { Rectangle afwindows_rectangle[1]; - afwindows_rectangle[0] = Rectangle ( - camp->params->af_window->x * sensor_area.width, - camp->params->af_window->y * sensor_area.height, - camp->params->af_window->width * sensor_area.width, - camp->params->af_window->height * sensor_area.height); + afwindows_rectangle[0] = Rectangle( + camp->params->af_window->x * sensor_area.width, + camp->params->af_window->y * sensor_area.height, + camp->params->af_window->width * sensor_area.width, + camp->params->af_window->height * sensor_area.height); - afwindows_rectangle[0].translateBy(sensor_area.topLeft()); - //activate the AfMeteringWindows - ctrls.set(controls::AfMetering, controls::AfMeteringWindows); - //set window - ctrls.set(controls::AfWindows, afwindows_rectangle); + afwindows_rectangle[0].translateBy(sensor_area.topLeft()); + camp->ctrls->set(controls::AfMetering, controls::AfMeteringWindows); + camp->ctrls->set(controls::AfWindows, afwindows_rectangle); } - - int res = camp->camera->start(&ctrls); + int res = camp->camera->start(camp->ctrls.get()); if (res != 0) { set_error("Camera.start() failed"); return false; } + camp->ctrls->clear(); + camp->camera->requestCompleted.connect(on_request_complete); for (std::unique_ptr &request : camp->requests) { @@ -417,3 +429,10 @@ bool camera_start(camera_t *cam) { return true; } + +void camera_reload_params(camera_t *cam, const parameters_t *params) { + CameraPriv *camp = (CameraPriv *)cam; + + std::lock_guard lock(camp->ctrls_mutex); + fill_dynamic_controls(camp->ctrls.get(), params); +} diff --git a/internal/rpicamera/exe/camera.h b/internal/rpicamera/exe/camera.h index 42191b07..ec213084 100644 --- a/internal/rpicamera/exe/camera.h +++ b/internal/rpicamera/exe/camera.h @@ -10,10 +10,11 @@ extern "C" { #endif const char *camera_get_error(); -bool camera_create(parameters_t *params, camera_frame_cb frame_cb, camera_t **cam); +bool camera_create(const parameters_t *params, camera_frame_cb frame_cb, camera_t **cam); int camera_get_mode_stride(camera_t *cam); int camera_get_mode_colorspace(camera_t *cam); bool camera_start(camera_t *cam); +void camera_reload_params(camera_t *cam, const parameters_t *params); #ifdef __cplusplus } diff --git a/internal/rpicamera/exe/encoder.c b/internal/rpicamera/exe/encoder.c index ef73b351..287c2221 100644 --- a/internal/rpicamera/exe/encoder.c +++ b/internal/rpicamera/exe/encoder.c @@ -33,7 +33,7 @@ const char *encoder_get_error() { } typedef struct { - parameters_t *params; + const parameters_t *params; int fd; void **capture_buffers; int cur_buffer; @@ -112,7 +112,7 @@ static void *output_thread(void *userdata) { return NULL; } -bool encoder_create(parameters_t *params, int stride, int colorspace, encoder_output_cb output_cb, encoder_t **enc) { +bool encoder_create(const parameters_t *params, int stride, int colorspace, encoder_output_cb output_cb, encoder_t **enc) { *enc = malloc(sizeof(encoder_priv_t)); encoder_priv_t *encp = (encoder_priv_t *)(*enc); diff --git a/internal/rpicamera/exe/encoder.h b/internal/rpicamera/exe/encoder.h index a575ecd4..c2d84b9b 100644 --- a/internal/rpicamera/exe/encoder.h +++ b/internal/rpicamera/exe/encoder.h @@ -6,7 +6,7 @@ typedef void encoder_t; typedef void (*encoder_output_cb)(uint64_t ts, const uint8_t *buf, uint64_t size); const char *encoder_get_error(); -bool encoder_create(parameters_t *params, int stride, int colorspace, encoder_output_cb output_cb, encoder_t **enc); +bool encoder_create(const parameters_t *params, int stride, int colorspace, encoder_output_cb output_cb, encoder_t **enc); void encoder_encode(encoder_t *enc, int buffer_fd, size_t size, int64_t timestamp_us); #endif diff --git a/internal/rpicamera/exe/main.c b/internal/rpicamera/exe/main.c index 69519383..ba156157 100644 --- a/internal/rpicamera/exe/main.c +++ b/internal/rpicamera/exe/main.c @@ -3,88 +3,53 @@ #include #include #include -#include -#include -#include #include #include #include "parameters.h" +#include "pipe.h" #include "camera.h" #include "encoder.h" -int pipe_fd; -pthread_mutex_t pipe_mutex; -parameters_t params; -camera_t *cam; +int pipe_video_fd; +pthread_mutex_t pipe_video_mutex; encoder_t *enc; -static void pipe_write_error(int fd, const char *format, ...) { - char buf[256]; - buf[0] = 'e'; - va_list args; - va_start(args, format); - vsnprintf(&buf[1], 255, format, args); - int n = strlen(buf); - write(fd, &n, 4); - write(fd, buf, n); -} - -static void pipe_write_ready(int fd) { - char buf[] = {'r'}; - int n = 1; - write(fd, &n, 4); - write(fd, buf, n); -} - -static void pipe_write_buf(int fd, uint64_t ts, const uint8_t *buf, int n) { - char head[] = {'b'}; - n += 1 + sizeof(uint64_t); - write(fd, &n, 4); - write(fd, head, 1); - write(fd, &ts, sizeof(uint64_t)); - write(fd, buf, n - 1 - sizeof(uint64_t)); -} - static void on_frame(int buffer_fd, uint64_t size, uint64_t timestamp) { encoder_encode(enc, buffer_fd, size, timestamp); } static void on_encoder_output(uint64_t ts, const uint8_t *buf, uint64_t size) { - pthread_mutex_lock(&pipe_mutex); - pipe_write_buf(pipe_fd, ts, buf, size); - pthread_mutex_unlock(&pipe_mutex); -} - -static bool init_siglistener(sigset_t *set) { - sigemptyset(set); - - int res = sigaddset(set, SIGKILL); - if (res == -1) { - return false; - } - - return true; + pthread_mutex_lock(&pipe_video_mutex); + pipe_write_buf(pipe_video_fd, ts, buf, size); + pthread_mutex_unlock(&pipe_video_mutex); } int main() { - pipe_fd = atoi(getenv("PIPE_FD")); + int pipe_conf_fd = atoi(getenv("PIPE_CONF_FD")); + pipe_video_fd = atoi(getenv("PIPE_VIDEO_FD")); - pthread_mutex_init(&pipe_mutex, NULL); - pthread_mutex_lock(&pipe_mutex); + uint8_t *buf; + uint32_t n = pipe_read(pipe_conf_fd, &buf); - bool ok = parameters_load(¶ms); + parameters_t params; + bool ok = parameters_unserialize(¶ms, &buf[1], n-1); + free(buf); if (!ok) { - pipe_write_error(pipe_fd, "parameters_load(): %s", parameters_get_error()); + pipe_write_error(pipe_video_fd, "parameters_unserialize(): %s", parameters_get_error()); return 5; } + pthread_mutex_init(&pipe_video_mutex, NULL); + pthread_mutex_lock(&pipe_video_mutex); + + camera_t *cam; ok = camera_create( ¶ms, on_frame, &cam); if (!ok) { - pipe_write_error(pipe_fd, "camera_create(): %s", camera_get_error()); + pipe_write_error(pipe_video_fd, "camera_create(): %s", camera_get_error()); return 5; } @@ -95,28 +60,41 @@ int main() { on_encoder_output, &enc); if (!ok) { - pipe_write_error(pipe_fd, "encoder_create(): %s", encoder_get_error()); + pipe_write_error(pipe_video_fd, "encoder_create(): %s", encoder_get_error()); return 5; } ok = camera_start(cam); if (!ok) { - pipe_write_error(pipe_fd, "camera_start(): %s", camera_get_error()); + pipe_write_error(pipe_video_fd, "camera_start(): %s", camera_get_error()); return 5; } - sigset_t set; - ok = init_siglistener(&set); - if (!ok) { - pipe_write_error(pipe_fd, "init_siglistener() failed"); - return 5; + pipe_write_ready(pipe_video_fd); + pthread_mutex_unlock(&pipe_video_mutex); + + while (true) { + uint8_t *buf; + uint32_t n = pipe_read(pipe_conf_fd, &buf); + + switch (buf[0]) { + case 'e': + return 0; + + case 'c': + { + parameters_t params; + bool ok = parameters_unserialize(¶ms, &buf[1], n-1); + free(buf); + if (!ok) { + printf("skipping reloading parameters since they are invalid: %s\n", parameters_get_error()); + continue; + } + camera_reload_params(cam, ¶ms); + parameters_destroy(¶ms); + } + } } - pipe_write_ready(pipe_fd); - pthread_mutex_unlock(&pipe_mutex); - - int sig; - sigwait(&set, &sig); - return 0; } diff --git a/internal/rpicamera/exe/parameters.c b/internal/rpicamera/exe/parameters.c index 98741236..f848d27a 100644 --- a/internal/rpicamera/exe/parameters.c +++ b/internal/rpicamera/exe/parameters.c @@ -20,85 +20,142 @@ const char *parameters_get_error() { return errbuf; } -bool parameters_load(parameters_t *params) { - params->camera_id = atoi(getenv("CAMERA_ID")); - params->width = atoi(getenv("WIDTH")); - params->height = atoi(getenv("HEIGHT")); - params->h_flip = (strcmp(getenv("H_FLIP"), "1") == 0); - params->v_flip = (strcmp(getenv("V_FLIP"), "1") == 0); - params->brightness = atof(getenv("BRIGHTNESS")); - params->contrast = atof(getenv("CONTRAST")); - params->saturation = atof(getenv("SATURATION")); - params->sharpness = atof(getenv("SHARPNESS")); - params->exposure = getenv("EXPOSURE"); - params->awb = getenv("AWB"); - params->denoise = getenv("DENOISE"); - params->shutter = atoi(getenv("SHUTTER")); - params->metering = getenv("METERING"); - params->gain = atof(getenv("GAIN")); - params->ev = atof(getenv("EV")); - - if (strlen(getenv("ROI")) != 0) { - bool ok = window_load(getenv("ROI"), ¶ms->roi); - if (!ok) { - set_error("invalid ROI"); - return false; - } - } else { - params->roi = NULL; - } - - params->tuning_file = getenv("TUNING_FILE"); - - if (strlen(getenv("MODE")) != 0) { - bool ok = sensor_mode_load(getenv("MODE"), ¶ms->mode); - if (!ok) { - set_error("invalid sensor mode"); - return false; +bool parameters_unserialize(parameters_t *params, const uint8_t *buf, size_t buf_size) { + char *tmp = malloc(buf_size + 1); + memcpy(tmp, buf, buf_size); + tmp[buf_size] = 0x00; + + char *entry; + + while ((entry = strsep(&tmp, " ")) != NULL) { + char *key = strsep(&entry, "="); + char *val = strsep(&entry, "="); + + if (strcmp(key, "CameraID") == 0) { + params->camera_id = atoi(val); + } else if (strcmp(key, "Width") == 0) { + params->width = atoi(val); + } else if (strcmp(key, "Height") == 0) { + params->height = atoi(val); + } else if (strcmp(key, "HFlip") == 0) { + params->h_flip = (strcmp(val, "1") == 0); + } else if (strcmp(key, "VFlip") == 0) { + params->v_flip = (strcmp(val, "1") == 0); + } else if (strcmp(key, "Brightness") == 0) { + params->brightness = atof(val); + } else if (strcmp(key, "Contrast") == 0) { + params->contrast = atof(val); + } else if (strcmp(key, "Saturation") == 0) { + params->saturation = atof(val); + } else if (strcmp(key, "Sharpness") == 0) { + params->sharpness = atof(val); + } else if (strcmp(key, "Exposure") == 0) { + params->exposure = strdup(val); + } else if (strcmp(key, "AWB") == 0) { + params->awb = strdup(val); + } else if (strcmp(key, "Denoise") == 0) { + params->denoise = strdup(val); + } else if (strcmp(key, "Shutter") == 0) { + params->shutter = atoi(val); + } else if (strcmp(key, "Metering") == 0) { + params->metering = strdup(val); + } else if (strcmp(key, "Gain") == 0) { + params->gain = atof(val); + } else if (strcmp(key, "EV") == 0) { + params->ev = atof(val); + } else if (strcmp(key, "ROI") == 0) { + if (strlen(val) != 0) { + params->roi = malloc(sizeof(window_t)); + bool ok = window_load(val, params->roi); + if (!ok) { + set_error("invalid ROI"); + return false; + } + } else { + params->roi = NULL; + } + } else if (strcmp(key, "TuningFile") == 0) { + params->tuning_file = strdup(val); + } else if (strcmp(key, "Mode") == 0) { + if (strlen(val) != 0) { + params->mode = malloc(sizeof(sensor_mode_t)); + bool ok = sensor_mode_load(val, params->mode); + if (!ok) { + set_error("invalid sensor mode"); + return false; + } + } else { + params->mode = NULL; + } + } else if (strcmp(key, "FPS") == 0) { + params->fps = atoi(val); + } else if (strcmp(key, "IDRPeriod") == 0) { + params->idr_period = atoi(val); + } else if (strcmp(key, "Bitrate") == 0) { + params->bitrate = atoi(val); + } else if (strcmp(key, "Profile") == 0) { + if (strcmp(val, "baseline") == 0) { + params->profile = V4L2_MPEG_VIDEO_H264_PROFILE_BASELINE; + } else if (strcmp(val, "main") == 0) { + params->profile = V4L2_MPEG_VIDEO_H264_PROFILE_MAIN; + } else { + params->profile = V4L2_MPEG_VIDEO_H264_PROFILE_HIGH; + } + } else if (strcmp(key, "Level") == 0) { + if (strcmp(val, "4.0") == 0) { + params->level = V4L2_MPEG_VIDEO_H264_LEVEL_4_0; + } else if (strcmp(val, "4.1") == 0) { + params->level = V4L2_MPEG_VIDEO_H264_LEVEL_4_1; + } else { + params->level = V4L2_MPEG_VIDEO_H264_LEVEL_4_2; + } + } else if (strcmp(key, "AfMode") == 0) { + params->af_mode = strdup(val); + } else if (strcmp(key, "AfRange") == 0) { + params->af_range = strdup(val); + } else if (strcmp(key, "AfSpeed") == 0) { + params->af_speed = strdup(val); + } else if (strcmp(key, "LensPosition") == 0) { + params->lens_position = atof(val); + } else if (strcmp(key, "AfWindow") == 0) { + if (strlen(val) != 0) { + params->af_window = malloc(sizeof(window_t)); + bool ok = window_load(val, params->af_window); + if (!ok) { + set_error("invalid AfWindow"); + return false; + } + } else { + params->af_window = NULL; + } } - } else { - params->mode = NULL; - } - - params->fps = atoi(getenv("FPS")); - params->idr_period = atoi(getenv("IDR_PERIOD")); - params->bitrate = atoi(getenv("BITRATE")); - - const char *profile = getenv("PROFILE"); - if (strcmp(profile, "baseline") == 0) { - params->profile = V4L2_MPEG_VIDEO_H264_PROFILE_BASELINE; - } else if (strcmp(profile, "main") == 0) { - params->profile = V4L2_MPEG_VIDEO_H264_PROFILE_MAIN; - } else { - params->profile = V4L2_MPEG_VIDEO_H264_PROFILE_HIGH; } - const char *level = getenv("LEVEL"); - if (strcmp(level, "4.0") == 0) { - params->level = V4L2_MPEG_VIDEO_H264_LEVEL_4_0; - } else if (strcmp(level, "4.1") == 0) { - params->level = V4L2_MPEG_VIDEO_H264_LEVEL_4_1; - } else { - params->level = V4L2_MPEG_VIDEO_H264_LEVEL_4_2; - } + free(tmp); params->buffer_count = 6; params->capture_buffer_count = params->buffer_count * 2; - params->af_mode = getenv("AF_MODE"); - params->af_range = getenv("AF_RANGE"); - params->af_speed = getenv("AF_SPEED"); - params->lens_position = atof(getenv("LENS_POSITION")); + return true; +} - if (strlen(getenv("AF_WINDOW")) != 0) { - bool ok = window_load(getenv("AF_WINDOW"), ¶ms->af_window); - if (!ok) { - set_error("invalid AF_WINDOW"); - return false; - } - } else { - params->af_window = NULL; +void parameters_destroy(parameters_t *params) { + free(params->exposure); + free(params->awb); + free(params->denoise); + free(params->metering); + free(params->tuning_file); + free(params->af_mode); + free(params->af_range); + free(params->af_speed); + + if (params->roi != NULL) { + free(params->roi); + } + if (params->mode != NULL) { + free(params->mode); + } + if (params->af_window != NULL) { + free(params->af_window); } - - return true; } diff --git a/internal/rpicamera/exe/parameters.h b/internal/rpicamera/exe/parameters.h index 7971c5d5..c94fc381 100644 --- a/internal/rpicamera/exe/parameters.h +++ b/internal/rpicamera/exe/parameters.h @@ -1,6 +1,7 @@ #ifndef __PARAMETERS_H__ #define __PARAMETERS_H__ +#include #include #include "window.h" @@ -16,24 +17,24 @@ typedef struct { float contrast; float saturation; float sharpness; - const char *exposure; - const char *awb; - const char *denoise; + char *exposure; + char *awb; + char *denoise; unsigned int shutter; - const char *metering; + char *metering; float gain; float ev; window_t *roi; - const char *tuning_file; + char *tuning_file; sensor_mode_t *mode; unsigned int fps; unsigned int idr_period; unsigned int bitrate; unsigned int profile; unsigned int level; - const char *af_mode; - const char *af_range; - const char *af_speed; + char *af_mode; + char *af_range; + char *af_speed; float lens_position; window_t *af_window; @@ -47,7 +48,8 @@ extern "C" { #endif const char *parameters_get_error(); -bool parameters_load(parameters_t *params); +bool parameters_unserialize(parameters_t *params, const uint8_t *buf, size_t buf_size); +void parameters_destroy(parameters_t *params); #ifdef __cplusplus } diff --git a/internal/rpicamera/exe/pipe.c b/internal/rpicamera/exe/pipe.c new file mode 100644 index 00000000..38d2437f --- /dev/null +++ b/internal/rpicamera/exe/pipe.c @@ -0,0 +1,44 @@ +#include +#include +#include +#include +#include +#include + +#include "pipe.h" + +void pipe_write_error(int fd, const char *format, ...) { + char buf[256]; + buf[0] = 'e'; + va_list args; + va_start(args, format); + vsnprintf(&buf[1], 255, format, args); + uint32_t n = strlen(buf); + write(fd, &n, 4); + write(fd, buf, n); +} + +void pipe_write_ready(int fd) { + char buf[] = {'r'}; + uint32_t n = 1; + write(fd, &n, 4); + write(fd, buf, n); +} + +void pipe_write_buf(int fd, uint64_t ts, const uint8_t *buf, uint32_t n) { + char head[] = {'b'}; + n += 1 + sizeof(uint64_t); + write(fd, &n, 4); + write(fd, head, 1); + write(fd, &ts, sizeof(uint64_t)); + write(fd, buf, n - 1 - sizeof(uint64_t)); +} + +uint32_t pipe_read(int fd, uint8_t **pbuf) { + uint32_t n; + read(fd, &n, 4); + + *pbuf = malloc(n); + read(fd, *pbuf, n); + return n; +} diff --git a/internal/rpicamera/exe/pipe.h b/internal/rpicamera/exe/pipe.h new file mode 100644 index 00000000..26630452 --- /dev/null +++ b/internal/rpicamera/exe/pipe.h @@ -0,0 +1,12 @@ +#ifndef __PIPE_H__ +#define __PIPE_H__ + +#include +#include + +void pipe_write_error(int fd, const char *format, ...); +void pipe_write_ready(int fd); +void pipe_write_buf(int fd, uint64_t ts, const uint8_t *buf, uint32_t n); +uint32_t pipe_read(int fd, uint8_t **pbuf); + +#endif diff --git a/internal/rpicamera/exe/sensor_mode.c b/internal/rpicamera/exe/sensor_mode.c index b41663cb..55932e21 100644 --- a/internal/rpicamera/exe/sensor_mode.c +++ b/internal/rpicamera/exe/sensor_mode.c @@ -4,26 +4,23 @@ #include "sensor_mode.h" -bool sensor_mode_load(const char *encoded, sensor_mode_t **mode) { - *mode = malloc(sizeof(sensor_mode_t)); - +bool sensor_mode_load(const char *encoded, sensor_mode_t *mode) { char p; - int n = sscanf(encoded, "%u:%u:%u:%c", &((*mode)->width), &((*mode)->height), &((*mode)->bit_depth), &p); + int n = sscanf(encoded, "%u:%u:%u:%c", &(mode->width), &(mode->height), &(mode->bit_depth), &p); if (n < 2) { - free(*mode); return false; } if (n < 4) { - (*mode)->packed = true; + mode->packed = true; } else if (toupper(p) == 'P') { - (*mode)->packed = true; + mode->packed = true; } else if (toupper(p) == 'U') { - (*mode)->packed = false; + mode->packed = false; } if (n < 3) { - (*mode)->bit_depth = 12; + mode->bit_depth = 12; } return true; diff --git a/internal/rpicamera/exe/sensor_mode.h b/internal/rpicamera/exe/sensor_mode.h index ab91839c..6306ee87 100644 --- a/internal/rpicamera/exe/sensor_mode.h +++ b/internal/rpicamera/exe/sensor_mode.h @@ -10,6 +10,6 @@ typedef struct { bool packed; } sensor_mode_t; -bool sensor_mode_load(const char *encoded, sensor_mode_t **mode); +bool sensor_mode_load(const char *encoded, sensor_mode_t *mode); #endif diff --git a/internal/rpicamera/exe/window.c b/internal/rpicamera/exe/window.c index 7d75b2db..83e65358 100644 --- a/internal/rpicamera/exe/window.c +++ b/internal/rpicamera/exe/window.c @@ -3,7 +3,7 @@ #include "window.h" -bool window_load(const char *encoded, window_t **mode) { +bool window_load(const char *encoded, window_t *window) { float vals[4]; int i = 0; char *token = strtok((char *)encoded, ","); @@ -21,10 +21,10 @@ bool window_load(const char *encoded, window_t **mode) { return false; } - *mode = malloc(sizeof(window_t)); - (*mode)->x = vals[0]; - (*mode)->y = vals[1]; - (*mode)->width = vals[2]; - (*mode)->height = vals[3]; + window->x = vals[0]; + window->y = vals[1]; + window->width = vals[2]; + window->height = vals[3]; + return true; } diff --git a/internal/rpicamera/exe/window.h b/internal/rpicamera/exe/window.h index 3584d3ab..f27e470a 100644 --- a/internal/rpicamera/exe/window.h +++ b/internal/rpicamera/exe/window.h @@ -10,6 +10,6 @@ typedef struct { float height; } window_t; -bool window_load(const char *encoded, window_t **mode); +bool window_load(const char *encoded, window_t *window); #endif diff --git a/internal/rpicamera/pipe.go b/internal/rpicamera/pipe.go index 0eca38d1..4b037e8c 100644 --- a/internal/rpicamera/pipe.go +++ b/internal/rpicamera/pipe.go @@ -4,7 +4,6 @@ package rpicamera import ( - "encoding/binary" "syscall" ) @@ -51,14 +50,14 @@ func (p *pipe) close() { } func (p *pipe) read() ([]byte, error) { - sizebuf := make([]byte, 4) - err := syscallReadAll(p.readFD, sizebuf) + buf := make([]byte, 4) + err := syscallReadAll(p.readFD, buf) if err != nil { return nil, err } - size := int(binary.LittleEndian.Uint32(sizebuf)) - buf := make([]byte, size) + le := int(buf[3])<<24 | int(buf[2])<<16 | int(buf[1])<<8 | int(buf[0]) + buf = make([]byte, le) err = syscallReadAll(p.readFD, buf) if err != nil { @@ -67,3 +66,14 @@ func (p *pipe) read() ([]byte, error) { return buf, nil } + +func (p *pipe) write(byts []byte) error { + le := len(byts) + _, err := syscall.Write(p.writeFD, []byte{byte(le), byte(le >> 8), byte(le >> 16), byte(le >> 24)}) + if err != nil { + return err + } + + _, err = syscall.Write(p.writeFD, byts) + return err +} diff --git a/internal/rpicamera/rpicamera.go b/internal/rpicamera/rpicamera.go index 36315448..c57afeb5 100644 --- a/internal/rpicamera/rpicamera.go +++ b/internal/rpicamera/rpicamera.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "os/exec" + "reflect" "runtime" "strconv" "strings" @@ -16,16 +17,13 @@ import ( "github.com/aler9/gortsplib/v2/pkg/codecs/h264" ) +const ( + tempPathPrefix = "/dev/shm/rtspss-embeddedexe-" +) + //go:embed exe/exe var exeContent []byte -func bool2env(v bool) string { - if v { - return "1" - } - return "0" -} - func getKernelArch() (string, error) { cmd := exec.Command("uname", "-m") @@ -89,11 +87,77 @@ func setupSymlinks() error { return setupSymlink("libcamera-base") } +func removeSymlinks() { + os.Remove("/dev/shm/libcamera-base.so.x.x.x") + os.Remove("/dev/shm/libcamera.so.x.x.x") +} + +func startEmbeddedExe(content []byte, env []string) (*exec.Cmd, error) { + tempPath := tempPathPrefix + strconv.FormatInt(time.Now().UnixNano(), 10) + + err := os.WriteFile(tempPath, content, 0o755) + if err != nil { + return nil, err + } + + cmd := exec.Command(tempPath) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = env + + err = cmd.Start() + os.Remove(tempPath) + + if err != nil { + return nil, err + } + + return cmd, nil +} + +func serializeParams(p Params) []byte { + rv := reflect.ValueOf(p) + rt := rv.Type() + nf := rv.NumField() + ret := make([]string, nf) + + for i := 0; i < nf; i++ { + entry := rt.Field(i).Name + "=" + f := rv.Field(i) + + switch f.Kind() { + case reflect.Int: + entry += strconv.FormatInt(f.Int(), 10) + + case reflect.Float64: + entry += strconv.FormatFloat(f.Float(), 'f', -1, 64) + + case reflect.String: + entry += f.String() + + case reflect.Bool: + if f.Bool() { + entry += "1" + } else { + entry += "0" + } + + default: + panic("unhandled type") + } + + ret[i] = entry + } + + return []byte(strings.Join(ret, " ")) +} + type RPICamera struct { onData func(time.Duration, [][]byte) - exe *embeddedExe - pipe *pipe + cmd *exec.Cmd + pipeConf *pipe + pipeVideo *pipe waitDone chan error readerDone chan error @@ -113,132 +177,124 @@ func New( return nil, err } - pipe, err := newPipe() + c := &RPICamera{ + onData: onData, + } + + c.pipeConf, err = newPipe() if err != nil { return nil, err } + c.pipeVideo, err = newPipe() + if err != nil { + c.pipeConf.close() + return nil, err + } + env := []string{ "LD_LIBRARY_PATH=/dev/shm", - "PIPE_FD=" + strconv.FormatInt(int64(pipe.writeFD), 10), - "CAMERA_ID=" + strconv.FormatInt(int64(params.CameraID), 10), - "WIDTH=" + strconv.FormatInt(int64(params.Width), 10), - "HEIGHT=" + strconv.FormatInt(int64(params.Height), 10), - "H_FLIP=" + bool2env(params.HFlip), - "V_FLIP=" + bool2env(params.VFlip), - "BRIGHTNESS=" + strconv.FormatFloat(params.Brightness, 'f', -1, 64), - "CONTRAST=" + strconv.FormatFloat(params.Contrast, 'f', -1, 64), - "SATURATION=" + strconv.FormatFloat(params.Saturation, 'f', -1, 64), - "SHARPNESS=" + strconv.FormatFloat(params.Sharpness, 'f', -1, 64), - "EXPOSURE=" + params.Exposure, - "AWB=" + params.AWB, - "DENOISE=" + params.Denoise, - "SHUTTER=" + strconv.FormatInt(int64(params.Shutter), 10), - "METERING=" + params.Metering, - "GAIN=" + strconv.FormatFloat(params.Gain, 'f', -1, 64), - "EV=" + strconv.FormatFloat(params.EV, 'f', -1, 64), - "ROI=" + params.ROI, - "TUNING_FILE=" + params.TuningFile, - "MODE=" + params.Mode, - "FPS=" + strconv.FormatInt(int64(params.FPS), 10), - "IDR_PERIOD=" + strconv.FormatInt(int64(params.IDRPeriod), 10), - "BITRATE=" + strconv.FormatInt(int64(params.Bitrate), 10), - "PROFILE=" + params.Profile, - "LEVEL=" + params.Level, - "AF_MODE=" + params.AfMode, - "AF_RANGE=" + params.AfRange, - "AF_SPEED=" + params.AfSpeed, - "LENS_POSITION=" + strconv.FormatFloat(params.LensPosition, 'f', -1, 64), - "AF_WINDOW=" + params.AfWindow, - } - - exe, err := newEmbeddedExe(exeContent, env) + "PIPE_CONF_FD=" + strconv.FormatInt(int64(c.pipeConf.readFD), 10), + "PIPE_VIDEO_FD=" + strconv.FormatInt(int64(c.pipeVideo.writeFD), 10), + } + + c.cmd, err = startEmbeddedExe(exeContent, env) if err != nil { - pipe.close() + removeSymlinks() + c.pipeConf.close() + c.pipeVideo.close() return nil, err } - waitDone := make(chan error) + removeSymlinks() + + c.pipeConf.write(append([]byte{'c'}, serializeParams(params)...)) + + c.waitDone = make(chan error) go func() { - waitDone <- exe.cmd.Wait() + c.waitDone <- c.cmd.Wait() }() - readerDone := make(chan error) + c.readerDone = make(chan error) go func() { - readerDone <- func() error { - buf, err := pipe.read() - if err != nil { - return err - } - - switch buf[0] { - case 'e': - return fmt.Errorf(string(buf[1:])) - - case 'r': - return nil - - default: - return fmt.Errorf("unexpected output from pipe (%c)", buf[0]) - } - }() + c.readerDone <- c.readReady() }() select { - case <-waitDone: - exe.close() - pipe.close() - <-readerDone + case <-c.waitDone: + c.pipeConf.close() + c.pipeVideo.close() + <-c.readerDone return nil, fmt.Errorf("process exited unexpectedly") - case err := <-readerDone: + case err := <-c.readerDone: if err != nil { - exe.close() - <-waitDone - pipe.close() + c.pipeConf.write([]byte{'e'}) + <-c.waitDone + c.pipeConf.close() + c.pipeVideo.close() return nil, err } } - readerDone = make(chan error) + c.readerDone = make(chan error) go func() { - readerDone <- func() error { - for { - buf, err := pipe.read() - if err != nil { - return err - } - - if buf[0] != 'b' { - return fmt.Errorf("unexpected output from pipe (%c)", buf[0]) - } - - tmp := uint64(buf[8])<<56 | uint64(buf[7])<<48 | uint64(buf[6])<<40 | uint64(buf[5])<<32 | - uint64(buf[4])<<24 | uint64(buf[3])<<16 | uint64(buf[2])<<8 | uint64(buf[1]) - dts := time.Duration(tmp) * time.Microsecond - - nalus, err := h264.AnnexBUnmarshal(buf[9:]) - if err != nil { - return err - } - - onData(dts, nalus) - } - }() + c.readerDone <- c.readData() }() - return &RPICamera{ - onData: onData, - exe: exe, - pipe: pipe, - waitDone: waitDone, - readerDone: readerDone, - }, nil + return c, nil } func (c *RPICamera) Close() { - c.exe.close() + c.pipeConf.write([]byte{'e'}) <-c.waitDone - c.pipe.close() + c.pipeConf.close() + c.pipeVideo.close() <-c.readerDone } + +func (c *RPICamera) ReloadParams(params Params) { + c.pipeConf.write(append([]byte{'c'}, serializeParams(params)...)) +} + +func (c *RPICamera) readReady() error { + buf, err := c.pipeVideo.read() + if err != nil { + return err + } + + switch buf[0] { + case 'e': + return fmt.Errorf(string(buf[1:])) + + case 'r': + return nil + + default: + return fmt.Errorf("unexpected output from video pipe: '0x%.2x'", buf[0]) + } +} + +func (c *RPICamera) readData() error { + for { + buf, err := c.pipeVideo.read() + if err != nil { + return err + } + + if buf[0] != 'b' { + return fmt.Errorf("unexpected output from pipe (%c)", buf[0]) + } + + tmp := uint64(buf[8])<<56 | uint64(buf[7])<<48 | uint64(buf[6])<<40 | uint64(buf[5])<<32 | + uint64(buf[4])<<24 | uint64(buf[3])<<16 | uint64(buf[2])<<8 | uint64(buf[1]) + dts := time.Duration(tmp) * time.Microsecond + + nalus, err := h264.AnnexBUnmarshal(buf[9:]) + if err != nil { + return err + } + + c.onData(dts, nalus) + } +} diff --git a/internal/rpicamera/rpicamera_disabled.go b/internal/rpicamera/rpicamera_disabled.go index 21dd2ebd..499ddf9a 100644 --- a/internal/rpicamera/rpicamera_disabled.go +++ b/internal/rpicamera/rpicamera_disabled.go @@ -23,3 +23,7 @@ func New( // Close closes a RPICamera. func (c *RPICamera) Close() { } + +// ReloadParams reloads the camera parameters. +func (c *RPICamera) ReloadParams(params Params) { +} diff --git a/rtsp-simple-server.yml b/rtsp-simple-server.yml index d39c6f76..894ba744 100644 --- a/rtsp-simple-server.yml +++ b/rtsp-simple-server.yml @@ -328,8 +328,8 @@ paths: # H264 level rpiCameraLevel: '4.1' # Autofocus mode - # values: manual, auto, continuous - rpiCameraAfMode: continuous + # values: auto, manual, continuous + rpiCameraAfMode: auto # Autofocus range # values: normal, macro, full rpiCameraAfRange: normal