|
|
|
|
package gate
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bufio"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"runtime"
|
|
|
|
|
"server/call"
|
|
|
|
|
"server/config"
|
|
|
|
|
"server/pb"
|
|
|
|
|
"server/util"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/liangdas/mqant/gate"
|
|
|
|
|
basegate "github.com/liangdas/mqant/gate/base"
|
|
|
|
|
"github.com/liangdas/mqant/log"
|
|
|
|
|
"github.com/liangdas/mqant/module"
|
|
|
|
|
timewheel "github.com/liangdas/mqant/module/modules/timer"
|
|
|
|
|
"github.com/liangdas/mqant/network"
|
|
|
|
|
mqanttools "github.com/liangdas/mqant/utils"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// type Agent interface {
|
|
|
|
|
// OnInit(gate Gate, conn network.Conn) error
|
|
|
|
|
// WriteMsg(topic string, body []byte) error
|
|
|
|
|
// Close()
|
|
|
|
|
// Run() (err error)
|
|
|
|
|
// OnClose() error
|
|
|
|
|
// Destroy()
|
|
|
|
|
// ConnTime() time.Time
|
|
|
|
|
// RevNum() int64
|
|
|
|
|
// SendNum() int64
|
|
|
|
|
// IsClosed() bool
|
|
|
|
|
// ProtocolOK() bool
|
|
|
|
|
// GetError() error //连接断开的错误日志
|
|
|
|
|
// GetSession() Session
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
type agent struct {
|
|
|
|
|
connTime time.Time
|
|
|
|
|
module module.RPCModule
|
|
|
|
|
session gate.Session
|
|
|
|
|
conn network.Conn
|
|
|
|
|
gate gate.Gate
|
|
|
|
|
w *bufio.Writer
|
|
|
|
|
r *bufio.Reader
|
|
|
|
|
heartbeatTimer string
|
|
|
|
|
revNum int64
|
|
|
|
|
sendNum int64
|
|
|
|
|
lastHeartbeat int64
|
|
|
|
|
protocol_ok bool
|
|
|
|
|
isclose bool
|
|
|
|
|
queue *PackQueue
|
|
|
|
|
// msgMutex *sync.Mutex
|
|
|
|
|
// msgIndex uint32 // 当前消息序列号
|
|
|
|
|
// msgMap map[uint32]Msg
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Msg struct {
|
|
|
|
|
Topic string
|
|
|
|
|
Body []byte
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *agent) OnInit(gate gate.Gate, conn network.Conn) error {
|
|
|
|
|
// a.ch = make(chan int, gate.Options().ConcurrentTasks)
|
|
|
|
|
a.conn = conn
|
|
|
|
|
a.gate = gate
|
|
|
|
|
a.r = bufio.NewReaderSize(conn, gate.Options().BufSize)
|
|
|
|
|
a.w = bufio.NewWriterSize(conn, gate.Options().BufSize)
|
|
|
|
|
a.isclose = false
|
|
|
|
|
a.protocol_ok = false
|
|
|
|
|
// a.msgMutex = new(sync.Mutex)
|
|
|
|
|
// a.msgIndex = 0
|
|
|
|
|
// a.msgMap = make(map[uint32]Msg)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// func (a *agent) LoopWrite() {
|
|
|
|
|
// for !a.IsClosed() {
|
|
|
|
|
// msg, ok := a.msgMap[a.msgIndex]
|
|
|
|
|
// if !ok {
|
|
|
|
|
// break
|
|
|
|
|
// }
|
|
|
|
|
// a.WriteMsg(msg.Topic, msg.Body)
|
|
|
|
|
// delete(a.msgMap, a.msgIndex)
|
|
|
|
|
// a.msgIndex++
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
func (a *agent) WriteMsg(topic string, body []byte) error {
|
|
|
|
|
// log.Debug("write to agent:%v", topic)
|
|
|
|
|
if a.conn == nil {
|
|
|
|
|
return errors.New("mqtt.Client nil")
|
|
|
|
|
}
|
|
|
|
|
// a.sendNum++
|
|
|
|
|
if a.gate.Options().SendMessageHook != nil {
|
|
|
|
|
bb, err := a.gate.Options().SendMessageHook(a.GetSession(), topic, body)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
body = bb
|
|
|
|
|
}
|
|
|
|
|
send := []byte{}
|
|
|
|
|
length, err := util.IntToBytes(len(body)+10, 2)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("err:%v", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
send = append(send, length...)
|
|
|
|
|
ret := strings.Split(topic, ":")
|
|
|
|
|
if len(ret) < 2 {
|
|
|
|
|
err = fmt.Errorf("invalid protocol:%v", topic)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
// else if len(ret) == 3 {
|
|
|
|
|
// index, err1 := strconv.ParseUint(ret[2], 10, 32)
|
|
|
|
|
// if err1 != nil {
|
|
|
|
|
// err = fmt.Errorf("invalid protocol:%v", topic)
|
|
|
|
|
// return err
|
|
|
|
|
// }
|
|
|
|
|
// a.msgMutex.Lock()
|
|
|
|
|
// if index == 0 { // 说明是第一条消息,直接发送
|
|
|
|
|
// a.msgIndex = 1
|
|
|
|
|
// a.WriteMsg(ret[0]+":"+ret[1], body)
|
|
|
|
|
// a.LoopWrite()
|
|
|
|
|
// } else if uint32(index) == a.msgIndex { // 顺序正确,直接发送
|
|
|
|
|
// a.msgIndex++
|
|
|
|
|
// a.WriteMsg(ret[0]+":"+ret[1], body)
|
|
|
|
|
// a.LoopWrite()
|
|
|
|
|
// } else {
|
|
|
|
|
// a.msgMap[uint32(index)] = Msg{Topic: fmt.Sprintf("%v:%v", ret[0], ret[1]), Body: body}
|
|
|
|
|
// }
|
|
|
|
|
// a.msgMutex.Unlock()
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// moduleID := int(pb.ServerType_value[ret[0]])
|
|
|
|
|
moduleID := call.GetModuleID(ret[0])
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// log.Error("err:%v", err)
|
|
|
|
|
// return err
|
|
|
|
|
// }
|
|
|
|
|
// pid := call.GetProtocolNameType(moduleID)[ret[1]]
|
|
|
|
|
// protocol, _ := util.IntToBytes(int(pid), 2)
|
|
|
|
|
pid, err := strconv.Atoi(ret[1])
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("err:%v", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
protocol, _ := util.IntToBytes(pid, 2)
|
|
|
|
|
|
|
|
|
|
module, err := util.IntToBytes(moduleID, 2)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("err:%v", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
// log.Debug("发到客户端的proto协议:%v", pid)
|
|
|
|
|
send = append(send, module...)
|
|
|
|
|
send = append(send, protocol...)
|
|
|
|
|
send = append(send, []byte{0, 0, 0, 0}...)
|
|
|
|
|
send = append(send, body...)
|
|
|
|
|
|
|
|
|
|
// a.conn.Write(send)
|
|
|
|
|
|
|
|
|
|
shouldClose := false
|
|
|
|
|
if moduleID == int(pb.ServerType_ServerTypeGate) && pid == int(pb.ServerGateResp_GateRepeatResp) {
|
|
|
|
|
shouldClose = true
|
|
|
|
|
}
|
|
|
|
|
return a.queue.WritePack(send, shouldClose)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *agent) Close() {
|
|
|
|
|
go func() {
|
|
|
|
|
//关闭连接部分情况下会阻塞超时,因此放协程去处理
|
|
|
|
|
if a.conn != nil {
|
|
|
|
|
a.conn.Close()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *agent) Run() (err error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
|
buff := make([]byte, 1024)
|
|
|
|
|
runtime.Stack(buff, false)
|
|
|
|
|
log.Error("conn.serve() panic(%v)\n info:%s", err, string(buff))
|
|
|
|
|
}
|
|
|
|
|
a.Close()
|
|
|
|
|
|
|
|
|
|
}()
|
|
|
|
|
go func() {
|
|
|
|
|
defer func() {
|
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
|
buff := make([]byte, 1024)
|
|
|
|
|
runtime.Stack(buff, false)
|
|
|
|
|
log.Error("OverTime panic(%v)\n info:%s", err, string(buff))
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
select {
|
|
|
|
|
case <-time.After(a.gate.Options().OverTime):
|
|
|
|
|
if a.GetSession() == nil || a.GetSession().GetUserID() == "" {
|
|
|
|
|
//超过一段时间还没有建立连接则直接关闭网络连接
|
|
|
|
|
a.Close()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
addr := a.conn.RemoteAddr()
|
|
|
|
|
sessionID := mqanttools.GenerateID().String()
|
|
|
|
|
a.heartbeatTimer = sessionID
|
|
|
|
|
// 心跳逻辑
|
|
|
|
|
timewheel.GetTimeWheel().AddTimerCustom(a.gate.Options().Heartbeat, sessionID, nil, timewheel.Job(func(arge interface{}) {
|
|
|
|
|
a.heartbeat()
|
|
|
|
|
}))
|
|
|
|
|
a.session, err = basegate.NewSessionByMap(a.module.GetApp(), map[string]interface{}{
|
|
|
|
|
"Sessionid": sessionID,
|
|
|
|
|
"Network": addr.Network(),
|
|
|
|
|
"IP": addr.String(),
|
|
|
|
|
"Serverid": a.module.GetServerID(),
|
|
|
|
|
"Settings": make(map[string]string),
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("gate create agent fail", err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
log.Debug("new session:%v", sessionID)
|
|
|
|
|
a.queue = NewPackQueue(sessionID, a.r, a.w, a.conn, -1)
|
|
|
|
|
util.Go(func() {
|
|
|
|
|
a.queue.Flusher()
|
|
|
|
|
})
|
|
|
|
|
a.session.JudgeGuest(a.gate.GetJudgeGuest())
|
|
|
|
|
a.session.CreateTrace() //代码跟踪
|
|
|
|
|
//回复客户端 CONNECT
|
|
|
|
|
// err = mqtt.WritePack(mqtt.GetConnAckPack(0), a.w)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// log.Error("ConnAckPack error %v", err.Error())
|
|
|
|
|
// return
|
|
|
|
|
// }
|
|
|
|
|
a.connTime = time.Now()
|
|
|
|
|
a.protocol_ok = true
|
|
|
|
|
a.gate.GetAgentLearner().Connect(a) //发送连接成功的事件
|
|
|
|
|
a.ReadLoop()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (age *agent) OnClose() error {
|
|
|
|
|
defer func() {
|
|
|
|
|
if err := recover(); err != nil {
|
|
|
|
|
buff := make([]byte, 1024)
|
|
|
|
|
runtime.Stack(buff, false)
|
|
|
|
|
log.Error("agent OnClose panic(%v)\n info:%s", err, string(buff))
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
timewheel.GetTimeWheel().RemoveTimer(age.heartbeatTimer)
|
|
|
|
|
|
|
|
|
|
age.isclose = true
|
|
|
|
|
age.gate.GetAgentLearner().DisConnect(age) //发送连接断开的事件
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *agent) Destroy() {
|
|
|
|
|
if a.conn != nil {
|
|
|
|
|
a.conn.Destroy()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
func (a *agent) ConnTime() time.Time {
|
|
|
|
|
return a.connTime
|
|
|
|
|
}
|
|
|
|
|
func (a *agent) RevNum() int64 {
|
|
|
|
|
return a.revNum
|
|
|
|
|
}
|
|
|
|
|
func (a *agent) SendNum() int64 {
|
|
|
|
|
return a.sendNum
|
|
|
|
|
}
|
|
|
|
|
func (a *agent) IsClosed() bool {
|
|
|
|
|
return a.isclose
|
|
|
|
|
}
|
|
|
|
|
func (a *agent) ProtocolOK() bool {
|
|
|
|
|
return a.protocol_ok
|
|
|
|
|
}
|
|
|
|
|
func (a *agent) GetError() error { //连接断开的错误日志
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
func (a *agent) GetSession() gate.Session {
|
|
|
|
|
return a.session
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *agent) ReadLoop() {
|
|
|
|
|
defer a.queue.Close(errors.New("read err"))
|
|
|
|
|
for !a.IsClosed() {
|
|
|
|
|
// 第一步拿到数据包长度
|
|
|
|
|
length, err := a.readInt(2)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// log.Error("read int err:%v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if length > a.gate.Options().BufSize {
|
|
|
|
|
log.Error("max bufSize limit:%v,length:%v", a.gate.Options().BufSize, length)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// 第二步拿到模块协议类型
|
|
|
|
|
moduleType, err := a.readInt(2)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("err:%v,session:%v", err, a.session.GetSessionID())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if moduleType == 0 {
|
|
|
|
|
// log.Error("invalid moduleType")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
origin := moduleType
|
|
|
|
|
if moduleType >= 3000 {
|
|
|
|
|
moduleType = call.GetGameOriginID(moduleType)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 第三步拿到协议类型
|
|
|
|
|
protocolType, err := a.readInt(2)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("err:%v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if protocolType == 0 {
|
|
|
|
|
// log.Error("invalid protocolType")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 第四步拿到uid
|
|
|
|
|
_, err = a.readInt(4)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// log.Error("get uid err:%v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// log.Debug("uid:%v", uid)
|
|
|
|
|
|
|
|
|
|
// 路由
|
|
|
|
|
// moduleName := pb.ModuleType_name[int32(moduleType)]
|
|
|
|
|
moduleName := call.GetModuleName(moduleType)
|
|
|
|
|
// if moduleName == "" {
|
|
|
|
|
// log.Error("unknow moduleType:%v", moduleType)
|
|
|
|
|
// return
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
protocolName := strconv.Itoa(protocolType)
|
|
|
|
|
// var protocolName string
|
|
|
|
|
// pr := call.GetProtocolType(moduleType)
|
|
|
|
|
// protocolName = pr[int32(protocolType)]
|
|
|
|
|
// if protocolName == "" {
|
|
|
|
|
// log.Error("invalid protocolType:%v", protocolType)
|
|
|
|
|
// return
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
a.lastHeartbeat = time.Now().Unix()
|
|
|
|
|
request := []byte{}
|
|
|
|
|
if length > 10 {
|
|
|
|
|
// 第五步拿到协议数据
|
|
|
|
|
request = make([]byte, length-10)
|
|
|
|
|
l, err := io.ReadFull(a.r, request)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("err:%v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if l != length-10 {
|
|
|
|
|
log.Error("session:%v,pack len:%v,read len:%v", a.session.GetSessionID(), length-10, l)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 心跳包
|
|
|
|
|
if moduleType == int(pb.ServerType_ServerTypeGate) && protocolType == int(pb.ServerGateReq_GatePingReq) {
|
|
|
|
|
topic := fmt.Sprintf("%v:%v", int(pb.ServerType_ServerTypeGate), int(pb.ServerGateResp_GatePingResp))
|
|
|
|
|
a.session.Send(topic, nil)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
route := ""
|
|
|
|
|
if origin >= 3000 {
|
|
|
|
|
route = fmt.Sprintf("%v://%v/%v", moduleName, origin, protocolName)
|
|
|
|
|
} else {
|
|
|
|
|
route = fmt.Sprintf("%v://modules/%v", moduleName, protocolName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if moduleName == "hall" && protocolType == int(pb.ServerGateReq_GateLoginReq) {
|
|
|
|
|
log.Debug("session %v login", a.session.GetSessionID())
|
|
|
|
|
}
|
|
|
|
|
// log.Debug("route:%v", route)
|
|
|
|
|
a.onRoute(route, request)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *agent) readInt(n int) (ret int, err error) {
|
|
|
|
|
tmp := []byte{}
|
|
|
|
|
tmp, err = a.readByte(n)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// log.Error("err:%v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
ret, err = util.BytesToInt(tmp)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *agent) readByte(n int) (data []byte, err error) {
|
|
|
|
|
for i := 0; i < n; i++ {
|
|
|
|
|
var tmp byte
|
|
|
|
|
tmp, err = a.r.ReadByte()
|
|
|
|
|
if err != nil {
|
|
|
|
|
// log.Error("err:%v", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
data = append(data, tmp)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *agent) onRoute(topic string, data []byte) {
|
|
|
|
|
defer util.Recover()
|
|
|
|
|
_, result, err := a.gate.GetRouteHandler().OnRoute(a.GetSession(), topic, data)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error("result:%v,err:%v,topic:%v", result, err, topic)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *agent) heartbeat() {
|
|
|
|
|
if time.Now().Unix()-a.lastHeartbeat > int64(config.GetConfig().Gate.HeartBeat) {
|
|
|
|
|
log.Debug("player:%v heatbeat stop....", a.GetSession().GetUserID())
|
|
|
|
|
a.Close()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
timewheel.GetTimeWheel().AddTimerCustom(a.gate.Options().Heartbeat, a.heartbeatTimer, nil, timewheel.Job(func(arge interface{}) {
|
|
|
|
|
a.heartbeat()
|
|
|
|
|
}))
|
|
|
|
|
}
|