|
|
|
|
@ -146,7 +146,7 @@ type Converter struct {
@@ -146,7 +146,7 @@ type Converter struct {
|
|
|
|
|
tsByName map[string]*tsFile |
|
|
|
|
tsDeleteCount int |
|
|
|
|
tsMutex sync.RWMutex |
|
|
|
|
lastRequestTime int64 |
|
|
|
|
lastRequestTime *int64 |
|
|
|
|
|
|
|
|
|
// in
|
|
|
|
|
request chan Request |
|
|
|
|
@ -176,9 +176,12 @@ func New(
@@ -176,9 +176,12 @@ func New(
|
|
|
|
|
parent: parent, |
|
|
|
|
ctx: ctx, |
|
|
|
|
ctxCancel: ctxCancel, |
|
|
|
|
lastRequestTime: time.Now().Unix(), |
|
|
|
|
tsByName: make(map[string]*tsFile), |
|
|
|
|
request: make(chan Request), |
|
|
|
|
lastRequestTime: func() *int64 { |
|
|
|
|
v := time.Now().Unix() |
|
|
|
|
return &v |
|
|
|
|
}(), |
|
|
|
|
tsByName: make(map[string]*tsFile), |
|
|
|
|
request: make(chan Request), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
c.log(logger.Info, "opened") |
|
|
|
|
@ -534,7 +537,7 @@ func (c *Converter) runInner(innerCtx context.Context) error {
@@ -534,7 +537,7 @@ func (c *Converter) runInner(innerCtx context.Context) error {
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-closeCheckTicker.C: |
|
|
|
|
t := time.Unix(atomic.LoadInt64(&c.lastRequestTime), 0) |
|
|
|
|
t := time.Unix(atomic.LoadInt64(c.lastRequestTime), 0) |
|
|
|
|
if time.Since(t) >= closeAfterInactivity { |
|
|
|
|
c.ringBuffer.Close() |
|
|
|
|
<-writerDone |
|
|
|
|
@ -563,7 +566,7 @@ func (c *Converter) runRequestHandler(terminate chan struct{}, done chan struct{
@@ -563,7 +566,7 @@ func (c *Converter) runRequestHandler(terminate chan struct{}, done chan struct{
|
|
|
|
|
case preq := <-c.request: |
|
|
|
|
req := preq |
|
|
|
|
|
|
|
|
|
atomic.StoreInt64(&c.lastRequestTime, time.Now().Unix()) |
|
|
|
|
atomic.StoreInt64(c.lastRequestTime, time.Now().Unix()) |
|
|
|
|
|
|
|
|
|
conf := c.path.Conf() |
|
|
|
|
|
|
|
|
|
|