You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
435 lines
10 KiB
435 lines
10 KiB
package gate |
|
|
|
import ( |
|
"bufio" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"runtime" |
|
"server/call" |
|
"server/config" |
|
"server/pb" |
|
"server/util" |
|
"strconv" |
|
"strings" |
|
"time" |
|
|
|
"github.com/liangdas/mqant/gate" |
|
basegate "github.com/liangdas/mqant/gate/base" |
|
"github.com/liangdas/mqant/log" |
|
"github.com/liangdas/mqant/module" |
|
timewheel "github.com/liangdas/mqant/module/modules/timer" |
|
"github.com/liangdas/mqant/network" |
|
mqanttools "github.com/liangdas/mqant/utils" |
|
) |
|
|
|
// type Agent interface { |
|
// OnInit(gate Gate, conn network.Conn) error |
|
// WriteMsg(topic string, body []byte) error |
|
// Close() |
|
// Run() (err error) |
|
// OnClose() error |
|
// Destroy() |
|
// ConnTime() time.Time |
|
// RevNum() int64 |
|
// SendNum() int64 |
|
// IsClosed() bool |
|
// ProtocolOK() bool |
|
// GetError() error //连接断开的错误日志 |
|
// GetSession() Session |
|
// } |
|
|
|
type agent struct { |
|
connTime time.Time |
|
module module.RPCModule |
|
session gate.Session |
|
conn network.Conn |
|
gate gate.Gate |
|
w *bufio.Writer |
|
r *bufio.Reader |
|
heartbeatTimer string |
|
revNum int64 |
|
sendNum int64 |
|
lastHeartbeat int64 |
|
protocol_ok bool |
|
isclose bool |
|
queue *PackQueue |
|
// msgMutex *sync.Mutex |
|
// msgIndex uint32 // 当前消息序列号 |
|
// msgMap map[uint32]Msg |
|
} |
|
|
|
type Msg struct { |
|
Topic string |
|
Body []byte |
|
} |
|
|
|
func (a *agent) OnInit(gate gate.Gate, conn network.Conn) error { |
|
// a.ch = make(chan int, gate.Options().ConcurrentTasks) |
|
a.conn = conn |
|
a.gate = gate |
|
a.r = bufio.NewReaderSize(conn, gate.Options().BufSize) |
|
a.w = bufio.NewWriterSize(conn, gate.Options().BufSize) |
|
a.isclose = false |
|
a.protocol_ok = false |
|
// a.msgMutex = new(sync.Mutex) |
|
// a.msgIndex = 0 |
|
// a.msgMap = make(map[uint32]Msg) |
|
return nil |
|
} |
|
|
|
// func (a *agent) LoopWrite() { |
|
// for !a.IsClosed() { |
|
// msg, ok := a.msgMap[a.msgIndex] |
|
// if !ok { |
|
// break |
|
// } |
|
// a.WriteMsg(msg.Topic, msg.Body) |
|
// delete(a.msgMap, a.msgIndex) |
|
// a.msgIndex++ |
|
// } |
|
// } |
|
|
|
func (a *agent) WriteMsg(topic string, body []byte) error { |
|
// log.Debug("write to agent:%v", topic) |
|
if a.conn == nil { |
|
return errors.New("mqtt.Client nil") |
|
} |
|
// a.sendNum++ |
|
if a.gate.Options().SendMessageHook != nil { |
|
bb, err := a.gate.Options().SendMessageHook(a.GetSession(), topic, body) |
|
if err != nil { |
|
return err |
|
} |
|
body = bb |
|
} |
|
send := []byte{} |
|
length, err := util.IntToBytes(len(body)+10, 2) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
return err |
|
} |
|
send = append(send, length...) |
|
ret := strings.Split(topic, ":") |
|
if len(ret) < 2 { |
|
err = fmt.Errorf("invalid protocol:%v", topic) |
|
return err |
|
} |
|
// else if len(ret) == 3 { |
|
// index, err1 := strconv.ParseUint(ret[2], 10, 32) |
|
// if err1 != nil { |
|
// err = fmt.Errorf("invalid protocol:%v", topic) |
|
// return err |
|
// } |
|
// a.msgMutex.Lock() |
|
// if index == 0 { // 说明是第一条消息,直接发送 |
|
// a.msgIndex = 1 |
|
// a.WriteMsg(ret[0]+":"+ret[1], body) |
|
// a.LoopWrite() |
|
// } else if uint32(index) == a.msgIndex { // 顺序正确,直接发送 |
|
// a.msgIndex++ |
|
// a.WriteMsg(ret[0]+":"+ret[1], body) |
|
// a.LoopWrite() |
|
// } else { |
|
// a.msgMap[uint32(index)] = Msg{Topic: fmt.Sprintf("%v:%v", ret[0], ret[1]), Body: body} |
|
// } |
|
// a.msgMutex.Unlock() |
|
// return nil |
|
// } |
|
|
|
// moduleID := int(pb.ServerType_value[ret[0]]) |
|
moduleID := call.GetModuleID(ret[0]) |
|
// if err != nil { |
|
// log.Error("err:%v", err) |
|
// return err |
|
// } |
|
// pid := call.GetProtocolNameType(moduleID)[ret[1]] |
|
// protocol, _ := util.IntToBytes(int(pid), 2) |
|
pid, err := strconv.Atoi(ret[1]) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
return err |
|
} |
|
protocol, _ := util.IntToBytes(pid, 2) |
|
|
|
module, err := util.IntToBytes(moduleID, 2) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
return err |
|
} |
|
// log.Debug("发到客户端的proto协议:%v", pid) |
|
send = append(send, module...) |
|
send = append(send, protocol...) |
|
send = append(send, []byte{0, 0, 0, 0}...) |
|
send = append(send, body...) |
|
|
|
// a.conn.Write(send) |
|
|
|
shouldClose := false |
|
if moduleID == int(pb.ServerType_ServerTypeGate) && pid == int(pb.ServerGateResp_GateRepeatResp) { |
|
shouldClose = true |
|
} |
|
return a.queue.WritePack(send, shouldClose) |
|
} |
|
|
|
func (a *agent) Close() { |
|
go func() { |
|
//关闭连接部分情况下会阻塞超时,因此放协程去处理 |
|
if a.conn != nil { |
|
a.conn.Close() |
|
} |
|
}() |
|
} |
|
|
|
func (a *agent) Run() (err error) { |
|
defer func() { |
|
if err := recover(); err != nil { |
|
buff := make([]byte, 1024) |
|
runtime.Stack(buff, false) |
|
log.Error("conn.serve() panic(%v)\n info:%s", err, string(buff)) |
|
} |
|
a.Close() |
|
|
|
}() |
|
go func() { |
|
defer func() { |
|
if err := recover(); err != nil { |
|
buff := make([]byte, 1024) |
|
runtime.Stack(buff, false) |
|
log.Error("OverTime panic(%v)\n info:%s", err, string(buff)) |
|
} |
|
}() |
|
select { |
|
case <-time.After(a.gate.Options().OverTime): |
|
if a.GetSession() == nil || a.GetSession().GetUserID() == "" { |
|
//超过一段时间还没有建立连接则直接关闭网络连接 |
|
a.Close() |
|
} |
|
|
|
} |
|
}() |
|
|
|
addr := a.conn.RemoteAddr() |
|
sessionID := mqanttools.GenerateID().String() |
|
a.heartbeatTimer = sessionID |
|
// 心跳逻辑 |
|
timewheel.GetTimeWheel().AddTimerCustom(a.gate.Options().Heartbeat, sessionID, nil, timewheel.Job(func(arge interface{}) { |
|
a.heartbeat() |
|
})) |
|
a.session, err = basegate.NewSessionByMap(a.module.GetApp(), map[string]interface{}{ |
|
"Sessionid": sessionID, |
|
"Network": addr.Network(), |
|
"IP": addr.String(), |
|
"Serverid": a.module.GetServerID(), |
|
"Settings": make(map[string]string), |
|
}) |
|
if err != nil { |
|
log.Error("gate create agent fail", err.Error()) |
|
return |
|
} |
|
log.Debug("new session:%v", sessionID) |
|
a.queue = NewPackQueue(sessionID, a.r, a.w, a.conn, -1) |
|
util.Go(func() { |
|
a.queue.Flusher() |
|
}) |
|
a.session.JudgeGuest(a.gate.GetJudgeGuest()) |
|
a.session.CreateTrace() //代码跟踪 |
|
//回复客户端 CONNECT |
|
// err = mqtt.WritePack(mqtt.GetConnAckPack(0), a.w) |
|
// if err != nil { |
|
// log.Error("ConnAckPack error %v", err.Error()) |
|
// return |
|
// } |
|
a.connTime = time.Now() |
|
a.protocol_ok = true |
|
a.gate.GetAgentLearner().Connect(a) //发送连接成功的事件 |
|
a.ReadLoop() |
|
return nil |
|
} |
|
|
|
func (age *agent) OnClose() error { |
|
defer func() { |
|
if err := recover(); err != nil { |
|
buff := make([]byte, 1024) |
|
runtime.Stack(buff, false) |
|
log.Error("agent OnClose panic(%v)\n info:%s", err, string(buff)) |
|
} |
|
}() |
|
timewheel.GetTimeWheel().RemoveTimer(age.heartbeatTimer) |
|
|
|
age.isclose = true |
|
age.gate.GetAgentLearner().DisConnect(age) //发送连接断开的事件 |
|
return nil |
|
} |
|
|
|
func (a *agent) Destroy() { |
|
if a.conn != nil { |
|
a.conn.Destroy() |
|
} |
|
} |
|
func (a *agent) ConnTime() time.Time { |
|
return a.connTime |
|
} |
|
func (a *agent) RevNum() int64 { |
|
return a.revNum |
|
} |
|
func (a *agent) SendNum() int64 { |
|
return a.sendNum |
|
} |
|
func (a *agent) IsClosed() bool { |
|
return a.isclose |
|
} |
|
func (a *agent) ProtocolOK() bool { |
|
return a.protocol_ok |
|
} |
|
func (a *agent) GetError() error { //连接断开的错误日志 |
|
return nil |
|
} |
|
func (a *agent) GetSession() gate.Session { |
|
return a.session |
|
} |
|
|
|
func (a *agent) ReadLoop() { |
|
defer a.queue.Close(errors.New("read err")) |
|
for !a.IsClosed() { |
|
// 第一步拿到数据包长度 |
|
length, err := a.readInt(2) |
|
if err != nil { |
|
// log.Error("read int err:%v", err) |
|
return |
|
} |
|
if length > a.gate.Options().BufSize { |
|
log.Error("max bufSize limit:%v,length:%v", a.gate.Options().BufSize, length) |
|
return |
|
} |
|
// 第二步拿到模块协议类型 |
|
moduleType, err := a.readInt(2) |
|
if err != nil { |
|
log.Error("err:%v,session:%v", err, a.session.GetSessionID()) |
|
return |
|
} |
|
if moduleType == 0 { |
|
// log.Error("invalid moduleType") |
|
return |
|
} |
|
origin := moduleType |
|
if moduleType >= 3000 { |
|
moduleType = call.GetGameOriginID(moduleType) |
|
} |
|
|
|
// 第三步拿到协议类型 |
|
protocolType, err := a.readInt(2) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
return |
|
} |
|
if protocolType == 0 { |
|
// log.Error("invalid protocolType") |
|
return |
|
} |
|
|
|
// 第四步拿到uid |
|
_, err = a.readInt(4) |
|
if err != nil { |
|
// log.Error("get uid err:%v", err) |
|
return |
|
} |
|
// log.Debug("uid:%v", uid) |
|
|
|
// 路由 |
|
// moduleName := pb.ModuleType_name[int32(moduleType)] |
|
moduleName := call.GetModuleName(moduleType) |
|
// if moduleName == "" { |
|
// log.Error("unknow moduleType:%v", moduleType) |
|
// return |
|
// } |
|
|
|
protocolName := strconv.Itoa(protocolType) |
|
// var protocolName string |
|
// pr := call.GetProtocolType(moduleType) |
|
// protocolName = pr[int32(protocolType)] |
|
// if protocolName == "" { |
|
// log.Error("invalid protocolType:%v", protocolType) |
|
// return |
|
// } |
|
|
|
a.lastHeartbeat = time.Now().Unix() |
|
request := []byte{} |
|
if length > 10 { |
|
// 第五步拿到协议数据 |
|
request = make([]byte, length-10) |
|
l, err := io.ReadFull(a.r, request) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
return |
|
} |
|
if l != length-10 { |
|
log.Error("session:%v,pack len:%v,read len:%v", a.session.GetSessionID(), length-10, l) |
|
return |
|
} |
|
} |
|
|
|
// 心跳包 |
|
if moduleType == int(pb.ServerType_ServerTypeGate) && protocolType == int(pb.ServerGateReq_GatePingReq) { |
|
topic := fmt.Sprintf("%v:%v", int(pb.ServerType_ServerTypeGate), int(pb.ServerGateResp_GatePingResp)) |
|
a.session.Send(topic, nil) |
|
continue |
|
} |
|
|
|
route := "" |
|
if origin >= 3000 { |
|
route = fmt.Sprintf("%v://%v/%v", moduleName, origin, protocolName) |
|
} else { |
|
route = fmt.Sprintf("%v://modules/%v", moduleName, protocolName) |
|
} |
|
|
|
if moduleName == "hall" && protocolType == int(pb.ServerGateReq_GateLoginReq) { |
|
log.Debug("session %v login", a.session.GetSessionID()) |
|
} |
|
// log.Debug("route:%v", route) |
|
a.onRoute(route, request) |
|
} |
|
} |
|
|
|
func (a *agent) readInt(n int) (ret int, err error) { |
|
tmp := []byte{} |
|
tmp, err = a.readByte(n) |
|
if err != nil { |
|
// log.Error("err:%v", err) |
|
return |
|
} |
|
ret, err = util.BytesToInt(tmp) |
|
return |
|
} |
|
|
|
func (a *agent) readByte(n int) (data []byte, err error) { |
|
for i := 0; i < n; i++ { |
|
var tmp byte |
|
tmp, err = a.r.ReadByte() |
|
if err != nil { |
|
// log.Error("err:%v", err) |
|
return |
|
} |
|
data = append(data, tmp) |
|
} |
|
return |
|
} |
|
|
|
func (a *agent) onRoute(topic string, data []byte) { |
|
defer util.Recover() |
|
_, result, err := a.gate.GetRouteHandler().OnRoute(a.GetSession(), topic, data) |
|
if err != nil { |
|
log.Error("result:%v,err:%v,topic:%v", result, err, topic) |
|
} |
|
} |
|
|
|
func (a *agent) heartbeat() { |
|
if time.Now().Unix()-a.lastHeartbeat > int64(config.GetConfig().Gate.HeartBeat) { |
|
log.Debug("player:%v heatbeat stop....", a.GetSession().GetUserID()) |
|
a.Close() |
|
return |
|
} |
|
timewheel.GetTimeWheel().AddTimerCustom(a.gate.Options().Heartbeat, a.heartbeatTimer, nil, timewheel.Job(func(arge interface{}) { |
|
a.heartbeat() |
|
})) |
|
}
|
|
|