You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
176 lines
3.2 KiB
176 lines
3.2 KiB
package gate |
|
|
|
import ( |
|
"bufio" |
|
"errors" |
|
"fmt" |
|
"runtime" |
|
|
|
"github.com/liangdas/mqant/log" |
|
"github.com/liangdas/mqant/network" |
|
) |
|
|
|
// Tcp write queue |
|
type PackQueue struct { |
|
writeError error |
|
// Notice read the error |
|
fch chan fs |
|
// writelock sync.Mutex |
|
// Pack connection |
|
r *bufio.Reader |
|
w *bufio.Writer |
|
|
|
conn network.Conn |
|
|
|
MaxPackSize int |
|
|
|
sessionID string |
|
|
|
status int |
|
} |
|
|
|
type fs struct { |
|
msg []byte |
|
shouldClose bool |
|
} |
|
|
|
const ( |
|
DISCONNECTED = iota |
|
CONNECTED |
|
CLOSED |
|
) |
|
|
|
var ( |
|
queueMaxLen = 1024 |
|
) |
|
|
|
// Init a pack queue |
|
func NewPackQueue(sessionID string, r *bufio.Reader, w *bufio.Writer, conn network.Conn, MaxPackSize int) *PackQueue { |
|
if MaxPackSize < 1 { |
|
MaxPackSize = 65535 |
|
} |
|
return &PackQueue{ |
|
sessionID: sessionID, |
|
MaxPackSize: MaxPackSize, |
|
r: r, |
|
w: w, |
|
conn: conn, |
|
fch: make(chan fs, queueMaxLen), |
|
status: CONNECTED, |
|
} |
|
} |
|
|
|
func (queue *PackQueue) isConnected() bool { |
|
return queue.status == CONNECTED |
|
} |
|
|
|
// Get a read pack queue |
|
// Only call once |
|
func (queue *PackQueue) Flusher() { |
|
for queue.isConnected() { |
|
f, ok := <-queue.fch |
|
if !ok { |
|
break |
|
} |
|
// queue.writelock.Lock() |
|
if !queue.isConnected() { |
|
// queue.writelock.Unlock() |
|
break |
|
} |
|
err := writeFull(queue.w, f.msg) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
break |
|
} |
|
if queue.w.Buffered() > 0 { |
|
if err := queue.w.Flush(); err != nil { |
|
// queue.writelock.Unlock() |
|
break |
|
} |
|
} |
|
if f.shouldClose { |
|
queue.Close(errors.New("shouldclose")) |
|
break |
|
} |
|
// queue.writelock.Unlock() |
|
} |
|
} |
|
|
|
// Write a pack , and get the last error |
|
func (queue *PackQueue) WritePack(msg []byte, shouldClose bool) (err error) { |
|
defer func() { |
|
if r := recover(); r != nil { |
|
buf := make([]byte, 1024) |
|
l := runtime.Stack(buf, false) |
|
errstr := string(buf[:l]) |
|
err = fmt.Errorf("WritePack error %v", errstr) |
|
queue.Close(err) |
|
} |
|
}() |
|
// queue.writelock.Lock() |
|
if len(queue.fch) >= queueMaxLen/2 { |
|
err := fmt.Errorf("session %v write full", queue.sessionID) |
|
go func() { |
|
queue.Close(err) |
|
}() |
|
// queue.writelock.Unlock() |
|
return err |
|
} |
|
if !queue.isConnected() { |
|
// queue.writelock.Unlock() |
|
return errors.New("disconnect") |
|
} |
|
if queue.writeError != nil { |
|
// queue.writelock.Unlock() |
|
return queue.writeError |
|
} |
|
if queue.w.Available() <= 0 { |
|
// queue.writelock.Unlock() |
|
return fmt.Errorf("bufio.Writer is full") |
|
} |
|
// queue.writelock.Unlock() |
|
queue.fch <- fs{ |
|
shouldClose: shouldClose, |
|
msg: msg, |
|
} |
|
// if err != nil { |
|
// Tell listener the error |
|
// Notice the read |
|
// queue.Close(err) |
|
// } |
|
return err |
|
} |
|
|
|
func writeFull(w *bufio.Writer, b []byte) (err error) { |
|
hasRead, n := 0, 0 |
|
for n < len(b) { |
|
n, err = w.Write(b[hasRead:]) |
|
if err != nil { |
|
break |
|
} |
|
hasRead += n |
|
} |
|
return err |
|
} |
|
|
|
func (queue *PackQueue) CloseFch() { |
|
defer func() { |
|
if recover() != nil { |
|
// close(ch) panic occur |
|
} |
|
}() |
|
|
|
close(queue.fch) // panic if ch is closed |
|
} |
|
|
|
// Close the all of queue's channels |
|
func (queue *PackQueue) Close(err error) error { |
|
queue.writeError = err |
|
queue.CloseFch() |
|
queue.status = CLOSED |
|
if queue.conn != nil { |
|
//再关闭一下,防止文件描述符发生泄漏 |
|
queue.conn.Close() |
|
} |
|
return nil |
|
}
|
|
|