You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
177 lines
3.2 KiB
177 lines
3.2 KiB
|
1 year ago
|
package gate
|
||
|
|
|
||
|
|
import (
|
||
|
|
"bufio"
|
||
|
|
"errors"
|
||
|
|
"fmt"
|
||
|
|
"runtime"
|
||
|
|
|
||
|
|
"github.com/liangdas/mqant/log"
|
||
|
|
"github.com/liangdas/mqant/network"
|
||
|
|
)
|
||
|
|
|
||
|
|
// Tcp write queue
|
||
|
|
type PackQueue struct {
|
||
|
|
writeError error
|
||
|
|
// Notice read the error
|
||
|
|
fch chan fs
|
||
|
|
// writelock sync.Mutex
|
||
|
|
// Pack connection
|
||
|
|
r *bufio.Reader
|
||
|
|
w *bufio.Writer
|
||
|
|
|
||
|
|
conn network.Conn
|
||
|
|
|
||
|
|
MaxPackSize int
|
||
|
|
|
||
|
|
sessionID string
|
||
|
|
|
||
|
|
status int
|
||
|
|
}
|
||
|
|
|
||
|
|
type fs struct {
|
||
|
|
msg []byte
|
||
|
|
shouldClose bool
|
||
|
|
}
|
||
|
|
|
||
|
|
const (
|
||
|
|
DISCONNECTED = iota
|
||
|
|
CONNECTED
|
||
|
|
CLOSED
|
||
|
|
)
|
||
|
|
|
||
|
|
var (
|
||
|
|
queueMaxLen = 1024
|
||
|
|
)
|
||
|
|
|
||
|
|
// Init a pack queue
|
||
|
|
func NewPackQueue(sessionID string, r *bufio.Reader, w *bufio.Writer, conn network.Conn, MaxPackSize int) *PackQueue {
|
||
|
|
if MaxPackSize < 1 {
|
||
|
|
MaxPackSize = 65535
|
||
|
|
}
|
||
|
|
return &PackQueue{
|
||
|
|
sessionID: sessionID,
|
||
|
|
MaxPackSize: MaxPackSize,
|
||
|
|
r: r,
|
||
|
|
w: w,
|
||
|
|
conn: conn,
|
||
|
|
fch: make(chan fs, queueMaxLen),
|
||
|
|
status: CONNECTED,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (queue *PackQueue) isConnected() bool {
|
||
|
|
return queue.status == CONNECTED
|
||
|
|
}
|
||
|
|
|
||
|
|
// Get a read pack queue
|
||
|
|
// Only call once
|
||
|
|
func (queue *PackQueue) Flusher() {
|
||
|
|
for queue.isConnected() {
|
||
|
|
f, ok := <-queue.fch
|
||
|
|
if !ok {
|
||
|
|
break
|
||
|
|
}
|
||
|
|
// queue.writelock.Lock()
|
||
|
|
if !queue.isConnected() {
|
||
|
|
// queue.writelock.Unlock()
|
||
|
|
break
|
||
|
|
}
|
||
|
|
err := writeFull(queue.w, f.msg)
|
||
|
|
if err != nil {
|
||
|
|
log.Error("err:%v", err)
|
||
|
|
break
|
||
|
|
}
|
||
|
|
if queue.w.Buffered() > 0 {
|
||
|
|
if err := queue.w.Flush(); err != nil {
|
||
|
|
// queue.writelock.Unlock()
|
||
|
|
break
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if f.shouldClose {
|
||
|
|
queue.Close(errors.New("shouldclose"))
|
||
|
|
break
|
||
|
|
}
|
||
|
|
// queue.writelock.Unlock()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Write a pack , and get the last error
|
||
|
|
func (queue *PackQueue) WritePack(msg []byte, shouldClose bool) (err error) {
|
||
|
|
defer func() {
|
||
|
|
if r := recover(); r != nil {
|
||
|
|
buf := make([]byte, 1024)
|
||
|
|
l := runtime.Stack(buf, false)
|
||
|
|
errstr := string(buf[:l])
|
||
|
|
err = fmt.Errorf("WritePack error %v", errstr)
|
||
|
|
queue.Close(err)
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
// queue.writelock.Lock()
|
||
|
|
if len(queue.fch) >= queueMaxLen/2 {
|
||
|
|
err := fmt.Errorf("session %v write full", queue.sessionID)
|
||
|
|
go func() {
|
||
|
|
queue.Close(err)
|
||
|
|
}()
|
||
|
|
// queue.writelock.Unlock()
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if !queue.isConnected() {
|
||
|
|
// queue.writelock.Unlock()
|
||
|
|
return errors.New("disconnect")
|
||
|
|
}
|
||
|
|
if queue.writeError != nil {
|
||
|
|
// queue.writelock.Unlock()
|
||
|
|
return queue.writeError
|
||
|
|
}
|
||
|
|
if queue.w.Available() <= 0 {
|
||
|
|
// queue.writelock.Unlock()
|
||
|
|
return fmt.Errorf("bufio.Writer is full")
|
||
|
|
}
|
||
|
|
// queue.writelock.Unlock()
|
||
|
|
queue.fch <- fs{
|
||
|
|
shouldClose: shouldClose,
|
||
|
|
msg: msg,
|
||
|
|
}
|
||
|
|
// if err != nil {
|
||
|
|
// Tell listener the error
|
||
|
|
// Notice the read
|
||
|
|
// queue.Close(err)
|
||
|
|
// }
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func writeFull(w *bufio.Writer, b []byte) (err error) {
|
||
|
|
hasRead, n := 0, 0
|
||
|
|
for n < len(b) {
|
||
|
|
n, err = w.Write(b[hasRead:])
|
||
|
|
if err != nil {
|
||
|
|
break
|
||
|
|
}
|
||
|
|
hasRead += n
|
||
|
|
}
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
func (queue *PackQueue) CloseFch() {
|
||
|
|
defer func() {
|
||
|
|
if recover() != nil {
|
||
|
|
// close(ch) panic occur
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
|
||
|
|
close(queue.fch) // panic if ch is closed
|
||
|
|
}
|
||
|
|
|
||
|
|
// Close the all of queue's channels
|
||
|
|
func (queue *PackQueue) Close(err error) error {
|
||
|
|
queue.writeError = err
|
||
|
|
queue.CloseFch()
|
||
|
|
queue.status = CLOSED
|
||
|
|
if queue.conn != nil {
|
||
|
|
//再关闭一下,防止文件描述符发生泄漏
|
||
|
|
queue.conn.Close()
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|