|
|
|
@ -1,24 +1,25 @@
@@ -1,24 +1,25 @@
|
|
|
|
|
package rtmp |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"fmt" |
|
|
|
|
"io" |
|
|
|
|
"net" |
|
|
|
|
"os" |
|
|
|
|
"strings" |
|
|
|
|
"syscall" |
|
|
|
|
"time" |
|
|
|
|
"unsafe" |
|
|
|
|
|
|
|
|
|
"github.com/nareix/joy4/av/avutil" |
|
|
|
|
"github.com/nareix/joy4/format/ts" |
|
|
|
|
"github.com/nareix/joy5/av" |
|
|
|
|
"github.com/nareix/joy5/format/flv" |
|
|
|
|
log "github.com/sirupsen/logrus" |
|
|
|
|
|
|
|
|
|
"github.com/gabek/owncast/config" |
|
|
|
|
"github.com/gabek/owncast/core" |
|
|
|
|
"github.com/gabek/owncast/core/ffmpeg" |
|
|
|
|
"github.com/gabek/owncast/utils" |
|
|
|
|
|
|
|
|
|
"github.com/nareix/joy4/format" |
|
|
|
|
"github.com/nareix/joy4/format/rtmp" |
|
|
|
|
"github.com/nareix/joy5/codec/h264" |
|
|
|
|
"github.com/nareix/joy5/format/rtmp" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
@ -26,36 +27,53 @@ var (
@@ -26,36 +27,53 @@ var (
|
|
|
|
|
_isConnected = false |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func init() { |
|
|
|
|
format.RegisterAll() |
|
|
|
|
} |
|
|
|
|
var _transcoder ffmpeg.Transcoder |
|
|
|
|
var _pipe *os.File |
|
|
|
|
|
|
|
|
|
//Start starts the rtmp service, listening on port 1935
|
|
|
|
|
func Start() { |
|
|
|
|
port := 1935 |
|
|
|
|
server := &rtmp.Server{} |
|
|
|
|
s := rtmp.NewServer() |
|
|
|
|
var lis net.Listener |
|
|
|
|
var error error |
|
|
|
|
if lis, error = net.Listen("tcp", fmt.Sprintf(":%d", port)); error != nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.LogEvent = func(c *rtmp.Conn, nc net.Conn, e int) { |
|
|
|
|
es := rtmp.EventString[e] |
|
|
|
|
log.Traceln(unsafe.Pointer(c), nc.LocalAddr(), nc.RemoteAddr(), es) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
server.HandlePublish = handlePublish |
|
|
|
|
s.HandleConn = HandleConn |
|
|
|
|
|
|
|
|
|
error := server.ListenAndServe() |
|
|
|
|
if error != nil { |
|
|
|
|
log.Panicln(error) |
|
|
|
|
} |
|
|
|
|
log.Infof("RTMP server is listening for incoming stream on port: %d", port) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
nc, err := lis.Accept() |
|
|
|
|
if err != nil { |
|
|
|
|
time.Sleep(time.Second) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
go s.HandleNetConn(nc) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func handlePublish(conn *rtmp.Conn) { |
|
|
|
|
func HandleConn(c *rtmp.Conn, nc net.Conn) { |
|
|
|
|
if _isConnected { |
|
|
|
|
log.Errorln("stream already running; can not overtake an existing stream") |
|
|
|
|
conn.Close() |
|
|
|
|
nc.Close() |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
streamingKeyComponents := strings.Split(conn.URL.Path, "/") |
|
|
|
|
streamingKeyComponents := strings.Split(c.URL.Path, "/") |
|
|
|
|
streamingKey := streamingKeyComponents[len(streamingKeyComponents)-1] |
|
|
|
|
if streamingKey != config.Config.VideoSettings.StreamingKey { |
|
|
|
|
log.Errorln("invalid streaming key; rejecting incoming stream") |
|
|
|
|
conn.Close() |
|
|
|
|
nc.Close() |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -63,67 +81,50 @@ func handlePublish(conn *rtmp.Conn) {
@@ -63,67 +81,50 @@ func handlePublish(conn *rtmp.Conn) {
|
|
|
|
|
|
|
|
|
|
pipePath := utils.GetTemporaryPipePath() |
|
|
|
|
syscall.Mkfifo(pipePath, 0666) |
|
|
|
|
transcoder := ffmpeg.NewTranscoder() |
|
|
|
|
go transcoder.Start() |
|
|
|
|
|
|
|
|
|
_transcoder = ffmpeg.NewTranscoder() |
|
|
|
|
go _transcoder.Start() |
|
|
|
|
|
|
|
|
|
_isConnected = true |
|
|
|
|
core.SetStreamAsConnected() |
|
|
|
|
|
|
|
|
|
f, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe) |
|
|
|
|
f, err := os.OpenFile(pipePath, os.O_RDWR, os.ModeNamedPipe) |
|
|
|
|
_pipe = f |
|
|
|
|
fmt.Println(pipePath) |
|
|
|
|
if err != nil { |
|
|
|
|
panic(err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Is this too fast? Are there downsides to peeking
|
|
|
|
|
// into the stream so frequently?
|
|
|
|
|
ticker := time.NewTicker(500 * time.Millisecond) |
|
|
|
|
go func() { |
|
|
|
|
w := flv.NewMuxer(f) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-ticker.C: |
|
|
|
|
error := connCheck(conn.NetConn()) |
|
|
|
|
if error == io.EOF { |
|
|
|
|
handleDisconnect(conn) |
|
|
|
|
} |
|
|
|
|
pkt, err := c.ReadPacket() |
|
|
|
|
if err != nil { |
|
|
|
|
if err == io.EOF { |
|
|
|
|
handleDisconnect(nc) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
muxer := ts.NewMuxer(f) |
|
|
|
|
avutil.CopyFile(muxer, conn) |
|
|
|
|
|
|
|
|
|
if pkt.Type == av.H264 { |
|
|
|
|
nalus, _ := h264.SplitNALUs(pkt.Data) |
|
|
|
|
annexb := h264.JoinNALUsAnnexb(nalus) |
|
|
|
|
avcc := h264.JoinNALUsAVCC([][]byte{annexb}) |
|
|
|
|
pkt.Data = avcc |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Proactively check if the RTMP connection is still active or not.
|
|
|
|
|
// Taken from https://stackoverflow.com/a/58664631.
|
|
|
|
|
func connCheck(conn net.Conn) error { |
|
|
|
|
var sysErr error = nil |
|
|
|
|
rc, err := conn.(syscall.Conn).SyscallConn() |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
err = rc.Read(func(fd uintptr) bool { |
|
|
|
|
var buf []byte = []byte{0} |
|
|
|
|
n, _, err := syscall.Recvfrom(int(fd), buf, syscall.MSG_PEEK|syscall.MSG_DONTWAIT) |
|
|
|
|
switch { |
|
|
|
|
case n == 0 && err == nil: |
|
|
|
|
sysErr = io.EOF |
|
|
|
|
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK: |
|
|
|
|
sysErr = nil |
|
|
|
|
default: |
|
|
|
|
sysErr = err |
|
|
|
|
} |
|
|
|
|
return true |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
if err := w.WritePacket(pkt); err != nil { |
|
|
|
|
panic(err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return sysErr |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func handleDisconnect(conn *rtmp.Conn) { |
|
|
|
|
func handleDisconnect(conn net.Conn) { |
|
|
|
|
log.Infoln("RTMP disconnected.") |
|
|
|
|
conn.Close() |
|
|
|
|
_pipe.Close() |
|
|
|
|
_isConnected = false |
|
|
|
|
_transcoder.Stop() |
|
|
|
|
core.SetStreamAsDisconnected() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|