|
|
|
@ -14,8 +14,8 @@ type writerChunkStream struct {
@@ -14,8 +14,8 @@ type writerChunkStream struct {
|
|
|
|
|
lastMessageStreamID *uint32 |
|
|
|
|
lastType *uint8 |
|
|
|
|
lastBodyLen *uint32 |
|
|
|
|
lastTimestamp *time.Duration |
|
|
|
|
lastTimestampDelta *time.Duration |
|
|
|
|
lastTimestamp *int64 |
|
|
|
|
lastTimestampDelta *int64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (wc *writerChunkStream) writeChunk(c chunk.Chunk) error { |
|
|
|
@ -46,9 +46,13 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
@@ -46,9 +46,13 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
|
|
|
|
|
pos := uint32(0) |
|
|
|
|
firstChunk := true |
|
|
|
|
|
|
|
|
|
var timestampDelta *time.Duration |
|
|
|
|
// convert timestamp to milliseconds before splitting message in chunks
|
|
|
|
|
/// otherwise timestampDelta gets messed up.
|
|
|
|
|
timestamp := int64(msg.Timestamp / time.Millisecond) |
|
|
|
|
|
|
|
|
|
var timestampDelta *int64 |
|
|
|
|
if wc.lastTimestamp != nil { |
|
|
|
|
diff := msg.Timestamp - *wc.lastTimestamp |
|
|
|
|
diff := timestamp - *wc.lastTimestamp |
|
|
|
|
|
|
|
|
|
// use delta only if it is positive
|
|
|
|
|
if diff >= 0 { |
|
|
|
@ -69,7 +73,7 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
@@ -69,7 +73,7 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
|
|
|
|
|
case wc.lastMessageStreamID == nil || timestampDelta == nil || *wc.lastMessageStreamID != msg.MessageStreamID: |
|
|
|
|
err := wc.writeChunk(&chunk.Chunk0{ |
|
|
|
|
ChunkStreamID: msg.ChunkStreamID, |
|
|
|
|
Timestamp: uint32(msg.Timestamp / time.Millisecond), |
|
|
|
|
Timestamp: uint32(timestamp), |
|
|
|
|
Type: msg.Type, |
|
|
|
|
MessageStreamID: msg.MessageStreamID, |
|
|
|
|
BodyLen: (bodyLen), |
|
|
|
@ -82,7 +86,7 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
@@ -82,7 +86,7 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
|
|
|
|
|
case *wc.lastType != msg.Type || *wc.lastBodyLen != bodyLen: |
|
|
|
|
err := wc.writeChunk(&chunk.Chunk1{ |
|
|
|
|
ChunkStreamID: msg.ChunkStreamID, |
|
|
|
|
TimestampDelta: uint32(*timestampDelta / time.Millisecond), |
|
|
|
|
TimestampDelta: uint32(*timestampDelta), |
|
|
|
|
Type: msg.Type, |
|
|
|
|
BodyLen: (bodyLen), |
|
|
|
|
Body: msg.Body[pos : pos+chunkBodyLen], |
|
|
|
@ -94,7 +98,7 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
@@ -94,7 +98,7 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
|
|
|
|
|
case wc.lastTimestampDelta == nil || *wc.lastTimestampDelta != *timestampDelta: |
|
|
|
|
err := wc.writeChunk(&chunk.Chunk2{ |
|
|
|
|
ChunkStreamID: msg.ChunkStreamID, |
|
|
|
|
TimestampDelta: uint32(*timestampDelta / time.Millisecond), |
|
|
|
|
TimestampDelta: uint32(*timestampDelta), |
|
|
|
|
Body: msg.Body[pos : pos+chunkBodyLen], |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
@ -117,7 +121,7 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
@@ -117,7 +121,7 @@ func (wc *writerChunkStream) writeMessage(msg *Message) error {
|
|
|
|
|
wc.lastType = &v2 |
|
|
|
|
v3 := bodyLen |
|
|
|
|
wc.lastBodyLen = &v3 |
|
|
|
|
v4 := msg.Timestamp |
|
|
|
|
v4 := timestamp |
|
|
|
|
wc.lastTimestamp = &v4 |
|
|
|
|
|
|
|
|
|
if timestampDelta != nil { |
|
|
|
|