印度包网
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

216 lines
5.6 KiB

/*
*
一定要记得在confin.json配置这个模块的参数,否则无法使用
*/
package gate
import (
"fmt"
"net/url"
"server/call"
"server/config"
"server/db"
"server/natsClient"
"server/pb"
"time"
mdb "server/db/mysql"
"github.com/gogo/protobuf/proto"
"github.com/liangdas/mqant/conf"
"github.com/liangdas/mqant/gate"
"github.com/liangdas/mqant/gate/uriroute"
"github.com/liangdas/mqant/log"
"github.com/liangdas/mqant/module"
"github.com/liangdas/mqant/server"
)
var Module = func() module.Module {
this := new(Gate)
return this
}
type Gate struct {
BaseGate
// basegate.Gate //继承
Route *uriroute.URIRoute
}
func (g *Gate) GetType() string {
//很关键,需要与配置文件中的Module配置对应
return "gate"
}
func (g *Gate) Version() string {
//可以在监控时了解代码版本
return "1.0.0"
}
func (g *Gate) OnInit(app module.App, settings *conf.ModuleSettings) {
// 初始化调用
call.NewCaller(g)
route := uriroute.NewURIRoute(g,
uriroute.Selector(g.Selector),
uriroute.DataParsing(func(topic string, u *url.URL, msg []byte) (bean interface{}, err error) {
// 如果客户端消息要加密, 在这里解密
// log.Info("msg:%v", msg)
// one := new(pb.JoinMatchReq)
// fmt.Println(proto.Unmarshal(msg, one))
// fmt.Println(one)
// var tmp []byte
// bean, err = utils.AesDecrypt(msg)
// if err != nil {
// log.Error("decrypt err:%v,msg:%v", err, msg)
// }
// bean = msg
// str := strings.Split(string(tmp), ",")
// res := []byte{}
// for _, v := range str {
// tmp, _ := strconv.Atoi(v)
// res = append(res, byte(tmp))
// }
// log.Info("res:%v", res)
// bean = res
// log.Info("bean:%v", bean)
return
}),
// uriroute.CallTimeOut(3*time.Second),
)
//注意这里一定要用 gate.Gate 而不是 module.BaseModule
sec := config.GetConfig().Gate.HeartBeat
bufsize := config.GetConfig().Gate.BufSize
wsAddr := config.GetConfig().Gate.WSAddr
wsPort := config.GetConfig().Gate.WSPort
tls := config.GetConfig().Gate.TLS
cert := config.GetConfig().Gate.CertFile
key := config.GetConfig().Gate.KeyFile
fmt.Println("GATE:===================", wsAddr, wsPort)
if wsAddr == "" {
panic("invalid gate addr")
}
metadata := make(map[string]string)
metadata["addr"] = wsAddr
options := []server.Option{server.Metadata(metadata)}
g.BaseGate.OnInit(g, app, settings,
gate.Heartbeat(time.Second*time.Duration(sec)),
gate.BufSize(bufsize),
gate.SetRouteHandler(route),
gate.SetSessionLearner(g),
gate.SetStorageHandler(g),
gate.WsAddr(wsPort),
gate.TLS(tls),
gate.CertFile(cert),
gate.KeyFile(key),
gate.ServerOpts(options),
gate.SetSendMessageHook(MessageHook),
)
g.BaseGate.SetCreateAgent(g.NewAgent)
db.InitDB(&mdb.MysqlClient{})
loadConfig()
call.InitReload(g.App.Transport())
}
func (g *Gate) OnDestroy() {
log.Debug("%s OnDestroy", g.GetType())
g.BaseModule.OnDestroy()
}
// 当连接建立 并且MQTT协议握手成功
func (g *Gate) Connect(session gate.Session) {
//log.Info("client connect session %v, ip %s, total num:%d", session.GetSessionID(), session.GetIP(), g.GetGateHandler().GetAgentNum())
//_ = session.SetLocalUserData(&sync.Map{})
}
// DisConnect 当连接关闭
func (g *Gate) DisConnect(session gate.Session) {
uid := session.GetUserIDInt64()
if uid <= 0 {
return
}
log.Info("client disconnect session %v, ip %s, userid:%s, total num:%d", session.GetSessionID(), session.GetIP(), session.GetUserID(), g.GetGateHandler().GetAgentNum())
notify := &pb.ClientDisConnectNotify{UserID: uint32(uid), SessionId: session.GetSessionID()}
payload, _ := proto.Marshal(notify)
_ = g.App.Transport().Publish(natsClient.TopicClientDisconnect, payload)
}
func (g *Gate) OnRoute(session gate.Session, topic string, msg []byte) (bool, interface{}, error) {
return g.Route.OnRoute(session, topic, msg)
}
/*
*
存储用户的Session信息
Session Bind Userid以后每次设置 settings都会调用一次Storage
*/
func (g *Gate) Storage(session gate.Session) (err error) {
return nil
}
/*
*
强制删除Session信息
*/
func (g *Gate) Delete(session gate.Session) (err error) {
return
}
/*
*
获取用户Session信息
用户登录以后会调用Query获取最新信息
*/
func (g *Gate) Query(Userid string) ([]byte, error) {
return nil, nil
}
/*
*
用户心跳,一般用户在线时60s发送一次
可以用来延长Session信息过期时间
*/
func (g *Gate) Heartbeat(session gate.Session) {
//log.Debug("client heart beat, session:%v, uid:%v", session.GetSessionID(), session.GetUserID())
}
// Selector 客户端路由规则自定义函数
func (g *Gate) Selector(session gate.Session, topic string, u *url.URL) (s module.ServerSession, err error) {
moduleType := u.Scheme
nodeID := u.Hostname()
//使用自己的
if nodeID == "modules" {
return g.GetRouteServer(moduleType, call.VersionSelector(nodeID))
//取模
// } else if nodeID == "cache" {
// //缓存
// } else if nodeID == "random" {
// //随机
// } else {
//
//指定节点规则就是 module://[user:pass@]nodeId/path
//方式1
//moduleType=fmt.Sprintf("%v@%v",moduleType,u.Hostname())
//方式2
// serverID := fmt.Sprintf("%v@%v", moduleType, nodeId)
}
return g.GetRouteServer(moduleType, call.WorkIDSelector(nodeID))
}
// MessageHook 下发消息前的进一步处理
func MessageHook(session gate.Session, topic string, msg []byte) ([]byte, error) {
// log.Info("msg:%v", msg)
// send := utils.AesEncrypt(msg)
// log.Info("send:%v", string(send))
return msg, nil
}
func (g *Gate) NewAgent() gate.Agent {
a := &agent{
module: g.GetModule(),
lastHeartbeat: time.Now().Unix(),
}
return a
}