印度包网
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

435 lines
10 KiB

package gate
import (
"bufio"
"errors"
"fmt"
"io"
"runtime"
"server/call"
"server/config"
"server/pb"
"server/util"
"strconv"
"strings"
"time"
"github.com/liangdas/mqant/gate"
basegate "github.com/liangdas/mqant/gate/base"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/module"
timewheel "github.com/liangdas/mqant/module/modules/timer"
"github.com/liangdas/mqant/network"
mqanttools "github.com/liangdas/mqant/utils"
)
// type Agent interface {
// OnInit(gate Gate, conn network.Conn) error
// WriteMsg(topic string, body []byte) error
// Close()
// Run() (err error)
// OnClose() error
// Destroy()
// ConnTime() time.Time
// RevNum() int64
// SendNum() int64
// IsClosed() bool
// ProtocolOK() bool
// GetError() error //连接断开的错误日志
// GetSession() Session
// }
type agent struct {
connTime time.Time
module module.RPCModule
session gate.Session
conn network.Conn
gate gate.Gate
w *bufio.Writer
r *bufio.Reader
heartbeatTimer string
revNum int64
sendNum int64
lastHeartbeat int64
protocol_ok bool
isclose bool
queue *PackQueue
// msgMutex *sync.Mutex
// msgIndex uint32 // 当前消息序列号
// msgMap map[uint32]Msg
}
type Msg struct {
Topic string
Body []byte
}
func (a *agent) OnInit(gate gate.Gate, conn network.Conn) error {
// a.ch = make(chan int, gate.Options().ConcurrentTasks)
a.conn = conn
a.gate = gate
a.r = bufio.NewReaderSize(conn, gate.Options().BufSize)
a.w = bufio.NewWriterSize(conn, gate.Options().BufSize)
a.isclose = false
a.protocol_ok = false
// a.msgMutex = new(sync.Mutex)
// a.msgIndex = 0
// a.msgMap = make(map[uint32]Msg)
return nil
}
// func (a *agent) LoopWrite() {
// for !a.IsClosed() {
// msg, ok := a.msgMap[a.msgIndex]
// if !ok {
// break
// }
// a.WriteMsg(msg.Topic, msg.Body)
// delete(a.msgMap, a.msgIndex)
// a.msgIndex++
// }
// }
func (a *agent) WriteMsg(topic string, body []byte) error {
// log.Debug("write to agent:%v", topic)
if a.conn == nil {
return errors.New("mqtt.Client nil")
}
// a.sendNum++
if a.gate.Options().SendMessageHook != nil {
bb, err := a.gate.Options().SendMessageHook(a.GetSession(), topic, body)
if err != nil {
return err
}
body = bb
}
send := []byte{}
length, err := util.IntToBytes(len(body)+10, 2)
if err != nil {
log.Error("err:%v", err)
return err
}
send = append(send, length...)
ret := strings.Split(topic, ":")
if len(ret) < 2 {
err = fmt.Errorf("invalid protocol:%v", topic)
return err
}
// else if len(ret) == 3 {
// index, err1 := strconv.ParseUint(ret[2], 10, 32)
// if err1 != nil {
// err = fmt.Errorf("invalid protocol:%v", topic)
// return err
// }
// a.msgMutex.Lock()
// if index == 0 { // 说明是第一条消息,直接发送
// a.msgIndex = 1
// a.WriteMsg(ret[0]+":"+ret[1], body)
// a.LoopWrite()
// } else if uint32(index) == a.msgIndex { // 顺序正确,直接发送
// a.msgIndex++
// a.WriteMsg(ret[0]+":"+ret[1], body)
// a.LoopWrite()
// } else {
// a.msgMap[uint32(index)] = Msg{Topic: fmt.Sprintf("%v:%v", ret[0], ret[1]), Body: body}
// }
// a.msgMutex.Unlock()
// return nil
// }
// moduleID := int(pb.ServerType_value[ret[0]])
moduleID := call.GetModuleID(ret[0])
// if err != nil {
// log.Error("err:%v", err)
// return err
// }
// pid := call.GetProtocolNameType(moduleID)[ret[1]]
// protocol, _ := util.IntToBytes(int(pid), 2)
pid, err := strconv.Atoi(ret[1])
if err != nil {
log.Error("err:%v", err)
return err
}
protocol, _ := util.IntToBytes(pid, 2)
module, err := util.IntToBytes(moduleID, 2)
if err != nil {
log.Error("err:%v", err)
return err
}
// log.Debug("发到客户端的proto协议:%v", pid)
send = append(send, module...)
send = append(send, protocol...)
send = append(send, []byte{0, 0, 0, 0}...)
send = append(send, body...)
// a.conn.Write(send)
shouldClose := false
if moduleID == int(pb.ServerType_ServerTypeGate) && pid == int(pb.ServerGateResp_GateRepeatResp) {
shouldClose = true
}
return a.queue.WritePack(send, shouldClose)
}
func (a *agent) Close() {
go func() {
//关闭连接部分情况下会阻塞超时,因此放协程去处理
if a.conn != nil {
a.conn.Close()
}
}()
}
func (a *agent) Run() (err error) {
defer func() {
if err := recover(); err != nil {
buff := make([]byte, 1024)
runtime.Stack(buff, false)
log.Error("conn.serve() panic(%v)\n info:%s", err, string(buff))
}
a.Close()
}()
go func() {
defer func() {
if err := recover(); err != nil {
buff := make([]byte, 1024)
runtime.Stack(buff, false)
log.Error("OverTime panic(%v)\n info:%s", err, string(buff))
}
}()
select {
case <-time.After(a.gate.Options().OverTime):
if a.GetSession() == nil || a.GetSession().GetUserID() == "" {
//超过一段时间还没有建立连接则直接关闭网络连接
a.Close()
}
}
}()
addr := a.conn.RemoteAddr()
sessionID := mqanttools.GenerateID().String()
a.heartbeatTimer = sessionID
// 心跳逻辑
timewheel.GetTimeWheel().AddTimerCustom(a.gate.Options().Heartbeat, sessionID, nil, timewheel.Job(func(arge interface{}) {
a.heartbeat()
}))
a.session, err = basegate.NewSessionByMap(a.module.GetApp(), map[string]interface{}{
"Sessionid": sessionID,
"Network": addr.Network(),
"IP": addr.String(),
"Serverid": a.module.GetServerID(),
"Settings": make(map[string]string),
})
if err != nil {
log.Error("gate create agent fail", err.Error())
return
}
log.Debug("new session:%v", sessionID)
a.queue = NewPackQueue(sessionID, a.r, a.w, a.conn, -1)
util.Go(func() {
a.queue.Flusher()
})
a.session.JudgeGuest(a.gate.GetJudgeGuest())
a.session.CreateTrace() //代码跟踪
//回复客户端 CONNECT
// err = mqtt.WritePack(mqtt.GetConnAckPack(0), a.w)
// if err != nil {
// log.Error("ConnAckPack error %v", err.Error())
// return
// }
a.connTime = time.Now()
a.protocol_ok = true
a.gate.GetAgentLearner().Connect(a) //发送连接成功的事件
a.ReadLoop()
return nil
}
func (age *agent) OnClose() error {
defer func() {
if err := recover(); err != nil {
buff := make([]byte, 1024)
runtime.Stack(buff, false)
log.Error("agent OnClose panic(%v)\n info:%s", err, string(buff))
}
}()
timewheel.GetTimeWheel().RemoveTimer(age.heartbeatTimer)
age.isclose = true
age.gate.GetAgentLearner().DisConnect(age) //发送连接断开的事件
return nil
}
func (a *agent) Destroy() {
if a.conn != nil {
a.conn.Destroy()
}
}
func (a *agent) ConnTime() time.Time {
return a.connTime
}
func (a *agent) RevNum() int64 {
return a.revNum
}
func (a *agent) SendNum() int64 {
return a.sendNum
}
func (a *agent) IsClosed() bool {
return a.isclose
}
func (a *agent) ProtocolOK() bool {
return a.protocol_ok
}
func (a *agent) GetError() error { //连接断开的错误日志
return nil
}
func (a *agent) GetSession() gate.Session {
return a.session
}
func (a *agent) ReadLoop() {
defer a.queue.Close(errors.New("read err"))
for !a.IsClosed() {
// 第一步拿到数据包长度
length, err := a.readInt(2)
if err != nil {
// log.Error("err:%v", err)
return
}
if length > a.gate.Options().BufSize {
log.Error("max bufSize limit:%v,length:%v", a.gate.Options().BufSize, length)
return
}
// 第二步拿到模块协议类型
moduleType, err := a.readInt(2)
if err != nil {
log.Error("err:%v,session:%v", err, a.session.GetSessionID())
return
}
if moduleType == 0 {
// log.Error("invalid moduleType")
return
}
origin := moduleType
if moduleType >= 3000 {
moduleType = call.GetGameOriginID(moduleType)
}
// 第三步拿到协议类型
protocolType, err := a.readInt(2)
if err != nil {
log.Error("err:%v", err)
return
}
if protocolType == 0 {
// log.Error("invalid protocolType")
return
}
// 第四步拿到uid
_, err = a.readInt(4)
if err != nil {
// log.Error("err:%v", err)
return
}
// log.Debug("uid:%v", uid)
// 路由
// moduleName := pb.ModuleType_name[int32(moduleType)]
moduleName := call.GetModuleName(moduleType)
// if moduleName == "" {
// log.Error("unknow moduleType:%v", moduleType)
// return
// }
protocolName := strconv.Itoa(protocolType)
// var protocolName string
// pr := call.GetProtocolType(moduleType)
// protocolName = pr[int32(protocolType)]
// if protocolName == "" {
// log.Error("invalid protocolType:%v", protocolType)
// return
// }
a.lastHeartbeat = time.Now().Unix()
request := []byte{}
if length > 10 {
// 第五步拿到协议数据
request = make([]byte, length-10)
l, err := io.ReadFull(a.r, request)
if err != nil {
log.Error("err:%v", err)
return
}
if l != length-10 {
log.Error("session:%v,pack len:%v,read len:%v", a.session.GetSessionID(), length-10, l)
return
}
}
// 心跳包
if moduleType == int(pb.ServerType_ServerTypeGate) && protocolType == int(pb.ServerGateReq_GatePingReq) {
topic := fmt.Sprintf("%v:%v", int(pb.ServerType_ServerTypeGate), int(pb.ServerGateResp_GatePingResp))
a.session.Send(topic, nil)
continue
}
route := ""
if origin >= 3000 {
route = fmt.Sprintf("%v://%v/%v", moduleName, origin, protocolName)
} else {
route = fmt.Sprintf("%v://modules/%v", moduleName, protocolName)
}
if moduleName == "hall" && protocolType == int(pb.ServerGateReq_GateLoginReq) {
log.Debug("session %v login", a.session.GetSessionID())
}
// log.Debug("route:%v", route)
a.onRoute(route, request)
}
}
func (a *agent) readInt(n int) (ret int, err error) {
tmp := []byte{}
tmp, err = a.readByte(n)
if err != nil {
// log.Error("err:%v", err)
return
}
ret, err = util.BytesToInt(tmp)
return
}
func (a *agent) readByte(n int) (data []byte, err error) {
for i := 0; i < n; i++ {
var tmp byte
tmp, err = a.r.ReadByte()
if err != nil {
// log.Error("err:%v", err)
return
}
data = append(data, tmp)
}
return
}
func (a *agent) onRoute(topic string, data []byte) {
defer util.Recover()
_, result, err := a.gate.GetRouteHandler().OnRoute(a.GetSession(), topic, data)
if err != nil {
log.Error("result:%v,err:%v,topic:%v", result, err, topic)
}
}
func (a *agent) heartbeat() {
if time.Now().Unix()-a.lastHeartbeat > int64(config.GetConfig().Gate.HeartBeat) {
log.Debug("player:%v heatbeat stop....", a.GetSession().GetUserID())
a.Close()
return
}
timewheel.GetTimeWheel().AddTimerCustom(a.gate.Options().Heartbeat, a.heartbeatTimer, nil, timewheel.Job(func(arge interface{}) {
a.heartbeat()
}))
}