package gate import ( "bufio" "errors" "fmt" "runtime" "github.com/liangdas/mqant/log" "github.com/liangdas/mqant/network" ) // Tcp write queue type PackQueue struct { writeError error // Notice read the error fch chan fs // writelock sync.Mutex // Pack connection r *bufio.Reader w *bufio.Writer conn network.Conn MaxPackSize int sessionID string status int } type fs struct { msg []byte shouldClose bool } const ( DISCONNECTED = iota CONNECTED CLOSED ) var ( queueMaxLen = 1024 ) // Init a pack queue func NewPackQueue(sessionID string, r *bufio.Reader, w *bufio.Writer, conn network.Conn, MaxPackSize int) *PackQueue { if MaxPackSize < 1 { MaxPackSize = 65535 } return &PackQueue{ sessionID: sessionID, MaxPackSize: MaxPackSize, r: r, w: w, conn: conn, fch: make(chan fs, queueMaxLen), status: CONNECTED, } } func (queue *PackQueue) isConnected() bool { return queue.status == CONNECTED } // Get a read pack queue // Only call once func (queue *PackQueue) Flusher() { for queue.isConnected() { f, ok := <-queue.fch if !ok { break } // queue.writelock.Lock() if !queue.isConnected() { // queue.writelock.Unlock() break } err := writeFull(queue.w, f.msg) if err != nil { log.Error("err:%v", err) break } if queue.w.Buffered() > 0 { if err := queue.w.Flush(); err != nil { // queue.writelock.Unlock() break } } if f.shouldClose { queue.Close(errors.New("shouldclose")) break } // queue.writelock.Unlock() } } // Write a pack , and get the last error func (queue *PackQueue) WritePack(msg []byte, shouldClose bool) (err error) { defer func() { if r := recover(); r != nil { buf := make([]byte, 1024) l := runtime.Stack(buf, false) errstr := string(buf[:l]) err = fmt.Errorf("WritePack error %v", errstr) queue.Close(err) } }() // queue.writelock.Lock() if len(queue.fch) >= queueMaxLen/2 { err := fmt.Errorf("session %v write full", queue.sessionID) go func() { queue.Close(err) }() // queue.writelock.Unlock() return err } if !queue.isConnected() { // queue.writelock.Unlock() return errors.New("disconnect") } if queue.writeError != nil { // queue.writelock.Unlock() return queue.writeError } if queue.w.Available() <= 0 { // queue.writelock.Unlock() return fmt.Errorf("bufio.Writer is full") } // queue.writelock.Unlock() queue.fch <- fs{ shouldClose: shouldClose, msg: msg, } // if err != nil { // Tell listener the error // Notice the read // queue.Close(err) // } return err } func writeFull(w *bufio.Writer, b []byte) (err error) { hasRead, n := 0, 0 for n < len(b) { n, err = w.Write(b[hasRead:]) if err != nil { break } hasRead += n } return err } func (queue *PackQueue) CloseFch() { defer func() { if recover() != nil { // close(ch) panic occur } }() close(queue.fch) // panic if ch is closed } // Close the all of queue's channels func (queue *PackQueue) Close(err error) error { queue.writeError = err queue.CloseFch() queue.status = CLOSED if queue.conn != nil { //再关闭一下,防止文件描述符发生泄漏 queue.conn.Close() } return nil }