package gate import ( "bufio" "errors" "fmt" "io" "runtime" "server/call" "server/config" "server/pb" "server/util" "strconv" "strings" "time" "github.com/liangdas/mqant/gate" basegate "github.com/liangdas/mqant/gate/base" "github.com/liangdas/mqant/log" "github.com/liangdas/mqant/module" timewheel "github.com/liangdas/mqant/module/modules/timer" "github.com/liangdas/mqant/network" mqanttools "github.com/liangdas/mqant/utils" ) // type Agent interface { // OnInit(gate Gate, conn network.Conn) error // WriteMsg(topic string, body []byte) error // Close() // Run() (err error) // OnClose() error // Destroy() // ConnTime() time.Time // RevNum() int64 // SendNum() int64 // IsClosed() bool // ProtocolOK() bool // GetError() error //连接断开的错误日志 // GetSession() Session // } type agent struct { connTime time.Time module module.RPCModule session gate.Session conn network.Conn gate gate.Gate w *bufio.Writer r *bufio.Reader heartbeatTimer string revNum int64 sendNum int64 lastHeartbeat int64 protocol_ok bool isclose bool queue *PackQueue // msgMutex *sync.Mutex // msgIndex uint32 // 当前消息序列号 // msgMap map[uint32]Msg } type Msg struct { Topic string Body []byte } func (a *agent) OnInit(gate gate.Gate, conn network.Conn) error { // a.ch = make(chan int, gate.Options().ConcurrentTasks) a.conn = conn a.gate = gate a.r = bufio.NewReaderSize(conn, gate.Options().BufSize) a.w = bufio.NewWriterSize(conn, gate.Options().BufSize) a.isclose = false a.protocol_ok = false // a.msgMutex = new(sync.Mutex) // a.msgIndex = 0 // a.msgMap = make(map[uint32]Msg) return nil } // func (a *agent) LoopWrite() { // for !a.IsClosed() { // msg, ok := a.msgMap[a.msgIndex] // if !ok { // break // } // a.WriteMsg(msg.Topic, msg.Body) // delete(a.msgMap, a.msgIndex) // a.msgIndex++ // } // } func (a *agent) WriteMsg(topic string, body []byte) error { // log.Debug("write to agent:%v", topic) if a.conn == nil { return errors.New("mqtt.Client nil") } // a.sendNum++ if a.gate.Options().SendMessageHook != nil { bb, err := a.gate.Options().SendMessageHook(a.GetSession(), topic, body) if err != nil { return err } body = bb } send := []byte{} length, err := util.IntToBytes(len(body)+10, 2) if err != nil { log.Error("err:%v", err) return err } send = append(send, length...) ret := strings.Split(topic, ":") if len(ret) < 2 { err = fmt.Errorf("invalid protocol:%v", topic) return err } // else if len(ret) == 3 { // index, err1 := strconv.ParseUint(ret[2], 10, 32) // if err1 != nil { // err = fmt.Errorf("invalid protocol:%v", topic) // return err // } // a.msgMutex.Lock() // if index == 0 { // 说明是第一条消息,直接发送 // a.msgIndex = 1 // a.WriteMsg(ret[0]+":"+ret[1], body) // a.LoopWrite() // } else if uint32(index) == a.msgIndex { // 顺序正确,直接发送 // a.msgIndex++ // a.WriteMsg(ret[0]+":"+ret[1], body) // a.LoopWrite() // } else { // a.msgMap[uint32(index)] = Msg{Topic: fmt.Sprintf("%v:%v", ret[0], ret[1]), Body: body} // } // a.msgMutex.Unlock() // return nil // } // moduleID := int(pb.ServerType_value[ret[0]]) moduleID := call.GetModuleID(ret[0]) // if err != nil { // log.Error("err:%v", err) // return err // } // pid := call.GetProtocolNameType(moduleID)[ret[1]] // protocol, _ := util.IntToBytes(int(pid), 2) pid, err := strconv.Atoi(ret[1]) if err != nil { log.Error("err:%v", err) return err } protocol, _ := util.IntToBytes(pid, 2) module, err := util.IntToBytes(moduleID, 2) if err != nil { log.Error("err:%v", err) return err } // log.Debug("发到客户端的proto协议:%v", pid) send = append(send, module...) send = append(send, protocol...) send = append(send, []byte{0, 0, 0, 0}...) send = append(send, body...) // a.conn.Write(send) shouldClose := false if moduleID == int(pb.ServerType_ServerTypeGate) && pid == int(pb.ServerGateResp_GateRepeatResp) { shouldClose = true } return a.queue.WritePack(send, shouldClose) } func (a *agent) Close() { go func() { //关闭连接部分情况下会阻塞超时,因此放协程去处理 if a.conn != nil { a.conn.Close() } }() } func (a *agent) Run() (err error) { defer func() { if err := recover(); err != nil { buff := make([]byte, 1024) runtime.Stack(buff, false) log.Error("conn.serve() panic(%v)\n info:%s", err, string(buff)) } a.Close() }() go func() { defer func() { if err := recover(); err != nil { buff := make([]byte, 1024) runtime.Stack(buff, false) log.Error("OverTime panic(%v)\n info:%s", err, string(buff)) } }() select { case <-time.After(a.gate.Options().OverTime): if a.GetSession() == nil || a.GetSession().GetUserID() == "" { //超过一段时间还没有建立连接则直接关闭网络连接 a.Close() } } }() addr := a.conn.RemoteAddr() sessionID := mqanttools.GenerateID().String() a.heartbeatTimer = sessionID // 心跳逻辑 timewheel.GetTimeWheel().AddTimerCustom(a.gate.Options().Heartbeat, sessionID, nil, timewheel.Job(func(arge interface{}) { a.heartbeat() })) a.session, err = basegate.NewSessionByMap(a.module.GetApp(), map[string]interface{}{ "Sessionid": sessionID, "Network": addr.Network(), "IP": addr.String(), "Serverid": a.module.GetServerID(), "Settings": make(map[string]string), }) if err != nil { log.Error("gate create agent fail", err.Error()) return } log.Debug("new session:%v", sessionID) a.queue = NewPackQueue(sessionID, a.r, a.w, a.conn, -1) util.Go(func() { a.queue.Flusher() }) a.session.JudgeGuest(a.gate.GetJudgeGuest()) a.session.CreateTrace() //代码跟踪 //回复客户端 CONNECT // err = mqtt.WritePack(mqtt.GetConnAckPack(0), a.w) // if err != nil { // log.Error("ConnAckPack error %v", err.Error()) // return // } a.connTime = time.Now() a.protocol_ok = true a.gate.GetAgentLearner().Connect(a) //发送连接成功的事件 a.ReadLoop() return nil } func (age *agent) OnClose() error { defer func() { if err := recover(); err != nil { buff := make([]byte, 1024) runtime.Stack(buff, false) log.Error("agent OnClose panic(%v)\n info:%s", err, string(buff)) } }() timewheel.GetTimeWheel().RemoveTimer(age.heartbeatTimer) age.isclose = true age.gate.GetAgentLearner().DisConnect(age) //发送连接断开的事件 return nil } func (a *agent) Destroy() { if a.conn != nil { a.conn.Destroy() } } func (a *agent) ConnTime() time.Time { return a.connTime } func (a *agent) RevNum() int64 { return a.revNum } func (a *agent) SendNum() int64 { return a.sendNum } func (a *agent) IsClosed() bool { return a.isclose } func (a *agent) ProtocolOK() bool { return a.protocol_ok } func (a *agent) GetError() error { //连接断开的错误日志 return nil } func (a *agent) GetSession() gate.Session { return a.session } func (a *agent) ReadLoop() { defer a.queue.Close(errors.New("read err")) for !a.IsClosed() { // 第一步拿到数据包长度 length, err := a.readInt(2) if err != nil { // log.Error("err:%v", err) return } if length > a.gate.Options().BufSize { log.Error("max bufSize limit:%v,length:%v", a.gate.Options().BufSize, length) return } // 第二步拿到模块协议类型 moduleType, err := a.readInt(2) if err != nil { log.Error("err:%v,session:%v", err, a.session.GetSessionID()) return } if moduleType == 0 { // log.Error("invalid moduleType") return } origin := moduleType if moduleType >= 3000 { moduleType = call.GetGameOriginID(moduleType) } // 第三步拿到协议类型 protocolType, err := a.readInt(2) if err != nil { log.Error("err:%v", err) return } if protocolType == 0 { // log.Error("invalid protocolType") return } // 第四步拿到uid _, err = a.readInt(4) if err != nil { // log.Error("err:%v", err) return } // log.Debug("uid:%v", uid) // 路由 // moduleName := pb.ModuleType_name[int32(moduleType)] moduleName := call.GetModuleName(moduleType) // if moduleName == "" { // log.Error("unknow moduleType:%v", moduleType) // return // } protocolName := strconv.Itoa(protocolType) // var protocolName string // pr := call.GetProtocolType(moduleType) // protocolName = pr[int32(protocolType)] // if protocolName == "" { // log.Error("invalid protocolType:%v", protocolType) // return // } a.lastHeartbeat = time.Now().Unix() request := []byte{} if length > 10 { // 第五步拿到协议数据 request = make([]byte, length-10) l, err := io.ReadFull(a.r, request) if err != nil { log.Error("err:%v", err) return } if l != length-10 { log.Error("session:%v,pack len:%v,read len:%v", a.session.GetSessionID(), length-10, l) return } } // 心跳包 if moduleType == int(pb.ServerType_ServerTypeGate) && protocolType == int(pb.ServerGateReq_GatePingReq) { topic := fmt.Sprintf("%v:%v", int(pb.ServerType_ServerTypeGate), int(pb.ServerGateResp_GatePingResp)) a.session.Send(topic, nil) continue } route := "" if origin >= 3000 { route = fmt.Sprintf("%v://%v/%v", moduleName, origin, protocolName) } else { route = fmt.Sprintf("%v://modules/%v", moduleName, protocolName) } if moduleName == "hall" && protocolType == int(pb.ServerGateReq_GateLoginReq) { log.Debug("session %v login", a.session.GetSessionID()) } // log.Debug("route:%v", route) a.onRoute(route, request) } } func (a *agent) readInt(n int) (ret int, err error) { tmp := []byte{} tmp, err = a.readByte(n) if err != nil { // log.Error("err:%v", err) return } ret, err = util.BytesToInt(tmp) return } func (a *agent) readByte(n int) (data []byte, err error) { for i := 0; i < n; i++ { var tmp byte tmp, err = a.r.ReadByte() if err != nil { // log.Error("err:%v", err) return } data = append(data, tmp) } return } func (a *agent) onRoute(topic string, data []byte) { defer util.Recover() _, result, err := a.gate.GetRouteHandler().OnRoute(a.GetSession(), topic, data) if err != nil { log.Error("result:%v,err:%v,topic:%v", result, err, topic) } } func (a *agent) heartbeat() { if time.Now().Unix()-a.lastHeartbeat > int64(config.GetConfig().Gate.HeartBeat) { log.Debug("player:%v heatbeat stop....", a.GetSession().GetUserID()) a.Close() return } timewheel.GetTimeWheel().AddTimerCustom(a.gate.Options().Heartbeat, a.heartbeatTimer, nil, timewheel.Job(func(arge interface{}) { a.heartbeat() })) }