|
|
|
@ -22,10 +22,6 @@ import (
@@ -22,10 +22,6 @@ import (
|
|
|
|
|
"github.com/grafov/m3u8" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// If we try to upload a playlist but it is not yet on disk
|
|
|
|
|
// then keep a reference to it here.
|
|
|
|
|
var _queuedPlaylistUpdates = make(map[string]string) |
|
|
|
|
|
|
|
|
|
// S3Storage is the s3 implementation of a storage provider.
|
|
|
|
|
type S3Storage struct { |
|
|
|
|
sess *session.Session |
|
|
|
@ -38,9 +34,20 @@ type S3Storage struct {
@@ -38,9 +34,20 @@ type S3Storage struct {
|
|
|
|
|
s3AccessKey string |
|
|
|
|
s3Secret string |
|
|
|
|
s3ACL string |
|
|
|
|
|
|
|
|
|
// If we try to upload a playlist but it is not yet on disk
|
|
|
|
|
// then keep a reference to it here.
|
|
|
|
|
queuedPlaylistUpdates map[string]string |
|
|
|
|
|
|
|
|
|
uploader *s3manager.Uploader |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var _uploader *s3manager.Uploader |
|
|
|
|
// NewS3Storage returns a new S3Storage instance.
|
|
|
|
|
func NewS3Storage() *S3Storage { |
|
|
|
|
return &S3Storage{ |
|
|
|
|
queuedPlaylistUpdates: make(map[string]string), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Setup sets up the s3 storage for saving the video to s3.
|
|
|
|
|
func (s *S3Storage) Setup() error { |
|
|
|
@ -63,7 +70,7 @@ func (s *S3Storage) Setup() error {
@@ -63,7 +70,7 @@ func (s *S3Storage) Setup() error {
|
|
|
|
|
|
|
|
|
|
s.sess = s.connectAWS() |
|
|
|
|
|
|
|
|
|
_uploader = s3manager.NewUploader(s.sess) |
|
|
|
|
s.uploader = s3manager.NewUploader(s.sess) |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -93,7 +100,7 @@ func (s *S3Storage) SegmentWritten(localFilePath string) {
@@ -93,7 +100,7 @@ func (s *S3Storage) SegmentWritten(localFilePath string) {
|
|
|
|
|
// them are in sync.
|
|
|
|
|
playlistPath := filepath.Join(filepath.Dir(localFilePath), "stream.m3u8") |
|
|
|
|
if _, err := s.Save(playlistPath, 0); err != nil { |
|
|
|
|
_queuedPlaylistUpdates[playlistPath] = playlistPath |
|
|
|
|
s.queuedPlaylistUpdates[playlistPath] = playlistPath |
|
|
|
|
if pErr, ok := err.(*os.PathError); ok { |
|
|
|
|
log.Debugln(pErr.Path, "does not yet exist locally when trying to upload to S3 storage.") |
|
|
|
|
return |
|
|
|
@ -106,12 +113,12 @@ func (s *S3Storage) VariantPlaylistWritten(localFilePath string) {
@@ -106,12 +113,12 @@ func (s *S3Storage) VariantPlaylistWritten(localFilePath string) {
|
|
|
|
|
// We are uploading the variant playlist after uploading the segment
|
|
|
|
|
// to make sure we're not referring to files in a playlist that don't
|
|
|
|
|
// yet exist. See SegmentWritten.
|
|
|
|
|
if _, ok := _queuedPlaylistUpdates[localFilePath]; ok { |
|
|
|
|
if _, ok := s.queuedPlaylistUpdates[localFilePath]; ok { |
|
|
|
|
if _, err := s.Save(localFilePath, 0); err != nil { |
|
|
|
|
log.Errorln(err) |
|
|
|
|
_queuedPlaylistUpdates[localFilePath] = localFilePath |
|
|
|
|
s.queuedPlaylistUpdates[localFilePath] = localFilePath |
|
|
|
|
} |
|
|
|
|
delete(_queuedPlaylistUpdates, localFilePath) |
|
|
|
|
delete(s.queuedPlaylistUpdates, localFilePath) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -152,7 +159,7 @@ func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
@@ -152,7 +159,7 @@ func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
|
|
|
|
|
uploadInput.ACL = aws.String("public-read") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
response, err := _uploader.Upload(uploadInput) |
|
|
|
|
response, err := s.uploader.Upload(uploadInput) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
log.Traceln("error uploading:", filePath, err.Error()) |
|
|
|
|