印度包网
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

177 lines
3.2 KiB

1 year ago
package gate
import (
"bufio"
"errors"
"fmt"
"runtime"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/network"
)
// Tcp write queue
type PackQueue struct {
writeError error
// Notice read the error
fch chan fs
// writelock sync.Mutex
// Pack connection
r *bufio.Reader
w *bufio.Writer
conn network.Conn
MaxPackSize int
sessionID string
status int
}
type fs struct {
msg []byte
shouldClose bool
}
const (
DISCONNECTED = iota
CONNECTED
CLOSED
)
var (
queueMaxLen = 1024
)
// Init a pack queue
func NewPackQueue(sessionID string, r *bufio.Reader, w *bufio.Writer, conn network.Conn, MaxPackSize int) *PackQueue {
if MaxPackSize < 1 {
MaxPackSize = 65535
}
return &PackQueue{
sessionID: sessionID,
MaxPackSize: MaxPackSize,
r: r,
w: w,
conn: conn,
fch: make(chan fs, queueMaxLen),
status: CONNECTED,
}
}
func (queue *PackQueue) isConnected() bool {
return queue.status == CONNECTED
}
// Get a read pack queue
// Only call once
func (queue *PackQueue) Flusher() {
for queue.isConnected() {
f, ok := <-queue.fch
if !ok {
break
}
// queue.writelock.Lock()
if !queue.isConnected() {
// queue.writelock.Unlock()
break
}
err := writeFull(queue.w, f.msg)
if err != nil {
log.Error("err:%v", err)
break
}
if queue.w.Buffered() > 0 {
if err := queue.w.Flush(); err != nil {
// queue.writelock.Unlock()
break
}
}
if f.shouldClose {
queue.Close(errors.New("shouldclose"))
break
}
// queue.writelock.Unlock()
}
}
// Write a pack , and get the last error
func (queue *PackQueue) WritePack(msg []byte, shouldClose bool) (err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1024)
l := runtime.Stack(buf, false)
errstr := string(buf[:l])
err = fmt.Errorf("WritePack error %v", errstr)
queue.Close(err)
}
}()
// queue.writelock.Lock()
if len(queue.fch) >= queueMaxLen/2 {
err := fmt.Errorf("session %v write full", queue.sessionID)
go func() {
queue.Close(err)
}()
// queue.writelock.Unlock()
return err
}
if !queue.isConnected() {
// queue.writelock.Unlock()
return errors.New("disconnect")
}
if queue.writeError != nil {
// queue.writelock.Unlock()
return queue.writeError
}
if queue.w.Available() <= 0 {
// queue.writelock.Unlock()
return fmt.Errorf("bufio.Writer is full")
}
// queue.writelock.Unlock()
queue.fch <- fs{
shouldClose: shouldClose,
msg: msg,
}
// if err != nil {
// Tell listener the error
// Notice the read
// queue.Close(err)
// }
return err
}
func writeFull(w *bufio.Writer, b []byte) (err error) {
hasRead, n := 0, 0
for n < len(b) {
n, err = w.Write(b[hasRead:])
if err != nil {
break
}
hasRead += n
}
return err
}
func (queue *PackQueue) CloseFch() {
defer func() {
if recover() != nil {
// close(ch) panic occur
}
}()
close(queue.fch) // panic if ch is closed
}
// Close the all of queue's channels
func (queue *PackQueue) Close(err error) error {
queue.writeError = err
queue.CloseFch()
queue.status = CLOSED
if queue.conn != nil {
//再关闭一下,防止文件描述符发生泄漏
queue.conn.Close()
}
return nil
}