/* * 一定要记得在confin.json配置这个模块的参数,否则无法使用 */ package gate import ( "fmt" "net/url" "server/call" "server/config" "server/db" "server/natsClient" "server/pb" "time" mdb "server/db/mysql" "github.com/gogo/protobuf/proto" "github.com/liangdas/mqant/conf" "github.com/liangdas/mqant/gate" "github.com/liangdas/mqant/gate/uriroute" "github.com/liangdas/mqant/log" "github.com/liangdas/mqant/module" "github.com/liangdas/mqant/server" ) var Module = func() module.Module { this := new(Gate) return this } type Gate struct { BaseGate // basegate.Gate //继承 Route *uriroute.URIRoute } func (g *Gate) GetType() string { //很关键,需要与配置文件中的Module配置对应 return "gate" } func (g *Gate) Version() string { //可以在监控时了解代码版本 return "1.0.0" } func (g *Gate) OnInit(app module.App, settings *conf.ModuleSettings) { // 初始化调用 call.NewCaller(g) route := uriroute.NewURIRoute(g, uriroute.Selector(g.Selector), uriroute.DataParsing(func(topic string, u *url.URL, msg []byte) (bean interface{}, err error) { // 如果客户端消息要加密, 在这里解密 // log.Info("msg:%v", msg) // one := new(pb.JoinMatchReq) // fmt.Println(proto.Unmarshal(msg, one)) // fmt.Println(one) // var tmp []byte // bean, err = utils.AesDecrypt(msg) // if err != nil { // log.Error("decrypt err:%v,msg:%v", err, msg) // } // bean = msg // str := strings.Split(string(tmp), ",") // res := []byte{} // for _, v := range str { // tmp, _ := strconv.Atoi(v) // res = append(res, byte(tmp)) // } // log.Info("res:%v", res) // bean = res // log.Info("bean:%v", bean) return }), // uriroute.CallTimeOut(3*time.Second), ) //注意这里一定要用 gate.Gate 而不是 module.BaseModule sec := config.GetConfig().Gate.HeartBeat bufsize := config.GetConfig().Gate.BufSize wsAddr := config.GetConfig().Gate.WSAddr wsPort := config.GetConfig().Gate.WSPort tls := config.GetConfig().Gate.TLS cert := config.GetConfig().Gate.CertFile key := config.GetConfig().Gate.KeyFile fmt.Println("GATE:===================", wsAddr, wsPort) if wsAddr == "" { panic("invalid gate addr") } metadata := make(map[string]string) metadata["addr"] = wsAddr options := []server.Option{server.Metadata(metadata)} g.BaseGate.OnInit(g, app, settings, gate.Heartbeat(time.Second*time.Duration(sec)), gate.BufSize(bufsize), gate.SetRouteHandler(route), gate.SetSessionLearner(g), gate.SetStorageHandler(g), gate.WsAddr(wsPort), gate.TLS(tls), gate.CertFile(cert), gate.KeyFile(key), gate.ServerOpts(options), gate.SetSendMessageHook(MessageHook), ) g.BaseGate.SetCreateAgent(g.NewAgent) db.InitDB(&mdb.MysqlClient{}) loadConfig() call.InitReload(g.App.Transport()) } func (g *Gate) OnDestroy() { log.Debug("%s OnDestroy", g.GetType()) g.BaseModule.OnDestroy() } // 当连接建立 并且MQTT协议握手成功 func (g *Gate) Connect(session gate.Session) { //log.Info("client connect session %v, ip %s, total num:%d", session.GetSessionID(), session.GetIP(), g.GetGateHandler().GetAgentNum()) //_ = session.SetLocalUserData(&sync.Map{}) } // DisConnect 当连接关闭 func (g *Gate) DisConnect(session gate.Session) { uid := session.GetUserIDInt64() if uid <= 0 { return } log.Info("client disconnect session %v, ip %s, userid:%s, total num:%d", session.GetSessionID(), session.GetIP(), session.GetUserID(), g.GetGateHandler().GetAgentNum()) notify := &pb.ClientDisConnectNotify{UserID: uint32(uid), SessionId: session.GetSessionID()} payload, _ := proto.Marshal(notify) _ = g.App.Transport().Publish(natsClient.TopicClientDisconnect, payload) } func (g *Gate) OnRoute(session gate.Session, topic string, msg []byte) (bool, interface{}, error) { return g.Route.OnRoute(session, topic, msg) } /* * 存储用户的Session信息 Session Bind Userid以后每次设置 settings都会调用一次Storage */ func (g *Gate) Storage(session gate.Session) (err error) { return nil } /* * 强制删除Session信息 */ func (g *Gate) Delete(session gate.Session) (err error) { return } /* * 获取用户Session信息 用户登录以后会调用Query获取最新信息 */ func (g *Gate) Query(Userid string) ([]byte, error) { return nil, nil } /* * 用户心跳,一般用户在线时60s发送一次 可以用来延长Session信息过期时间 */ func (g *Gate) Heartbeat(session gate.Session) { //log.Debug("client heart beat, session:%v, uid:%v", session.GetSessionID(), session.GetUserID()) } // Selector 客户端路由规则自定义函数 func (g *Gate) Selector(session gate.Session, topic string, u *url.URL) (s module.ServerSession, err error) { moduleType := u.Scheme nodeID := u.Hostname() //使用自己的 if nodeID == "modules" { return g.GetRouteServer(moduleType, call.VersionSelector(nodeID)) //取模 // } else if nodeID == "cache" { // //缓存 // } else if nodeID == "random" { // //随机 // } else { // //指定节点规则就是 module://[user:pass@]nodeId/path //方式1 //moduleType=fmt.Sprintf("%v@%v",moduleType,u.Hostname()) //方式2 // serverID := fmt.Sprintf("%v@%v", moduleType, nodeId) } return g.GetRouteServer(moduleType, call.WorkIDSelector(nodeID)) } // MessageHook 下发消息前的进一步处理 func MessageHook(session gate.Session, topic string, msg []byte) ([]byte, error) { // log.Info("msg:%v", msg) // send := utils.AesEncrypt(msg) // log.Info("send:%v", string(send)) return msg, nil } func (g *Gate) NewAgent() gate.Agent { a := &agent{ module: g.GetModule(), lastHeartbeat: time.Now().Unix(), } return a }