You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
217 lines
5.6 KiB
217 lines
5.6 KiB
|
1 year ago
|
/*
|
||
|
|
*
|
||
|
|
一定要记得在confin.json配置这个模块的参数,否则无法使用
|
||
|
|
*/
|
||
|
|
package gate
|
||
|
|
|
||
|
|
import (
|
||
|
|
"fmt"
|
||
|
|
"net/url"
|
||
|
|
"server/call"
|
||
|
|
"server/config"
|
||
|
|
"server/db"
|
||
|
|
"server/natsClient"
|
||
|
|
"server/pb"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
mdb "server/db/mysql"
|
||
|
|
|
||
|
|
"github.com/gogo/protobuf/proto"
|
||
|
|
"github.com/liangdas/mqant/conf"
|
||
|
|
"github.com/liangdas/mqant/gate"
|
||
|
|
"github.com/liangdas/mqant/gate/uriroute"
|
||
|
|
"github.com/liangdas/mqant/log"
|
||
|
|
"github.com/liangdas/mqant/module"
|
||
|
|
"github.com/liangdas/mqant/server"
|
||
|
|
)
|
||
|
|
|
||
|
|
var Module = func() module.Module {
|
||
|
|
this := new(Gate)
|
||
|
|
return this
|
||
|
|
}
|
||
|
|
|
||
|
|
type Gate struct {
|
||
|
|
BaseGate
|
||
|
|
// basegate.Gate //继承
|
||
|
|
Route *uriroute.URIRoute
|
||
|
|
}
|
||
|
|
|
||
|
|
func (g *Gate) GetType() string {
|
||
|
|
//很关键,需要与配置文件中的Module配置对应
|
||
|
|
return "gate"
|
||
|
|
}
|
||
|
|
func (g *Gate) Version() string {
|
||
|
|
//可以在监控时了解代码版本
|
||
|
|
return "1.0.0"
|
||
|
|
}
|
||
|
|
func (g *Gate) OnInit(app module.App, settings *conf.ModuleSettings) {
|
||
|
|
|
||
|
|
// 初始化调用
|
||
|
|
call.NewCaller(g)
|
||
|
|
|
||
|
|
route := uriroute.NewURIRoute(g,
|
||
|
|
uriroute.Selector(g.Selector),
|
||
|
|
uriroute.DataParsing(func(topic string, u *url.URL, msg []byte) (bean interface{}, err error) {
|
||
|
|
// 如果客户端消息要加密, 在这里解密
|
||
|
|
// log.Info("msg:%v", msg)
|
||
|
|
// one := new(pb.JoinMatchReq)
|
||
|
|
// fmt.Println(proto.Unmarshal(msg, one))
|
||
|
|
// fmt.Println(one)
|
||
|
|
// var tmp []byte
|
||
|
|
// bean, err = utils.AesDecrypt(msg)
|
||
|
|
// if err != nil {
|
||
|
|
// log.Error("decrypt err:%v,msg:%v", err, msg)
|
||
|
|
// }
|
||
|
|
// bean = msg
|
||
|
|
// str := strings.Split(string(tmp), ",")
|
||
|
|
// res := []byte{}
|
||
|
|
// for _, v := range str {
|
||
|
|
// tmp, _ := strconv.Atoi(v)
|
||
|
|
// res = append(res, byte(tmp))
|
||
|
|
// }
|
||
|
|
// log.Info("res:%v", res)
|
||
|
|
// bean = res
|
||
|
|
// log.Info("bean:%v", bean)
|
||
|
|
return
|
||
|
|
}),
|
||
|
|
// uriroute.CallTimeOut(3*time.Second),
|
||
|
|
)
|
||
|
|
//注意这里一定要用 gate.Gate 而不是 module.BaseModule
|
||
|
|
sec := config.GetConfig().Gate.HeartBeat
|
||
|
|
bufsize := config.GetConfig().Gate.BufSize
|
||
|
|
wsAddr := config.GetConfig().Gate.WSAddr
|
||
|
|
wsPort := config.GetConfig().Gate.WSPort
|
||
|
|
tls := config.GetConfig().Gate.TLS
|
||
|
|
cert := config.GetConfig().Gate.CertFile
|
||
|
|
key := config.GetConfig().Gate.KeyFile
|
||
|
|
fmt.Println("GATE:===================", wsAddr, wsPort)
|
||
|
|
if wsAddr == "" {
|
||
|
|
panic("invalid gate addr")
|
||
|
|
}
|
||
|
|
|
||
|
|
metadata := make(map[string]string)
|
||
|
|
metadata["addr"] = wsAddr
|
||
|
|
options := []server.Option{server.Metadata(metadata)}
|
||
|
|
g.BaseGate.OnInit(g, app, settings,
|
||
|
|
gate.Heartbeat(time.Second*time.Duration(sec)),
|
||
|
|
gate.BufSize(bufsize),
|
||
|
|
gate.SetRouteHandler(route),
|
||
|
|
gate.SetSessionLearner(g),
|
||
|
|
gate.SetStorageHandler(g),
|
||
|
|
gate.WsAddr(wsPort),
|
||
|
|
gate.TLS(tls),
|
||
|
|
gate.CertFile(cert),
|
||
|
|
gate.KeyFile(key),
|
||
|
|
gate.ServerOpts(options),
|
||
|
|
gate.SetSendMessageHook(MessageHook),
|
||
|
|
)
|
||
|
|
g.BaseGate.SetCreateAgent(g.NewAgent)
|
||
|
|
db.InitDB(&mdb.MysqlClient{})
|
||
|
|
loadConfig()
|
||
|
|
call.InitReload(g.App.Transport())
|
||
|
|
}
|
||
|
|
|
||
|
|
func (g *Gate) OnDestroy() {
|
||
|
|
log.Debug("%s OnDestroy", g.GetType())
|
||
|
|
g.BaseModule.OnDestroy()
|
||
|
|
}
|
||
|
|
|
||
|
|
// 当连接建立 并且MQTT协议握手成功
|
||
|
|
func (g *Gate) Connect(session gate.Session) {
|
||
|
|
//log.Info("client connect session %v, ip %s, total num:%d", session.GetSessionID(), session.GetIP(), g.GetGateHandler().GetAgentNum())
|
||
|
|
//_ = session.SetLocalUserData(&sync.Map{})
|
||
|
|
}
|
||
|
|
|
||
|
|
// DisConnect 当连接关闭
|
||
|
|
func (g *Gate) DisConnect(session gate.Session) {
|
||
|
|
uid := session.GetUserIDInt64()
|
||
|
|
if uid <= 0 {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
log.Info("client disconnect session %v, ip %s, userid:%s, total num:%d", session.GetSessionID(), session.GetIP(), session.GetUserID(), g.GetGateHandler().GetAgentNum())
|
||
|
|
|
||
|
|
notify := &pb.ClientDisConnectNotify{UserID: uint32(uid), SessionId: session.GetSessionID()}
|
||
|
|
payload, _ := proto.Marshal(notify)
|
||
|
|
_ = g.App.Transport().Publish(natsClient.TopicClientDisconnect, payload)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (g *Gate) OnRoute(session gate.Session, topic string, msg []byte) (bool, interface{}, error) {
|
||
|
|
return g.Route.OnRoute(session, topic, msg)
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
*
|
||
|
|
存储用户的Session信息
|
||
|
|
Session Bind Userid以后每次设置 settings都会调用一次Storage
|
||
|
|
*/
|
||
|
|
func (g *Gate) Storage(session gate.Session) (err error) {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
*
|
||
|
|
强制删除Session信息
|
||
|
|
*/
|
||
|
|
func (g *Gate) Delete(session gate.Session) (err error) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
*
|
||
|
|
获取用户Session信息
|
||
|
|
用户登录以后会调用Query获取最新信息
|
||
|
|
*/
|
||
|
|
func (g *Gate) Query(Userid string) ([]byte, error) {
|
||
|
|
return nil, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
*
|
||
|
|
用户心跳,一般用户在线时60s发送一次
|
||
|
|
可以用来延长Session信息过期时间
|
||
|
|
*/
|
||
|
|
func (g *Gate) Heartbeat(session gate.Session) {
|
||
|
|
//log.Debug("client heart beat, session:%v, uid:%v", session.GetSessionID(), session.GetUserID())
|
||
|
|
}
|
||
|
|
|
||
|
|
// Selector 客户端路由规则自定义函数
|
||
|
|
func (g *Gate) Selector(session gate.Session, topic string, u *url.URL) (s module.ServerSession, err error) {
|
||
|
|
moduleType := u.Scheme
|
||
|
|
nodeID := u.Hostname()
|
||
|
|
|
||
|
|
//使用自己的
|
||
|
|
if nodeID == "modules" {
|
||
|
|
return g.GetRouteServer(moduleType, call.VersionSelector(nodeID))
|
||
|
|
//取模
|
||
|
|
// } else if nodeID == "cache" {
|
||
|
|
// //缓存
|
||
|
|
// } else if nodeID == "random" {
|
||
|
|
// //随机
|
||
|
|
// } else {
|
||
|
|
//
|
||
|
|
//指定节点规则就是 module://[user:pass@]nodeId/path
|
||
|
|
//方式1
|
||
|
|
//moduleType=fmt.Sprintf("%v@%v",moduleType,u.Hostname())
|
||
|
|
//方式2
|
||
|
|
// serverID := fmt.Sprintf("%v@%v", moduleType, nodeId)
|
||
|
|
}
|
||
|
|
return g.GetRouteServer(moduleType, call.WorkIDSelector(nodeID))
|
||
|
|
}
|
||
|
|
|
||
|
|
// MessageHook 下发消息前的进一步处理
|
||
|
|
func MessageHook(session gate.Session, topic string, msg []byte) ([]byte, error) {
|
||
|
|
// log.Info("msg:%v", msg)
|
||
|
|
// send := utils.AesEncrypt(msg)
|
||
|
|
// log.Info("send:%v", string(send))
|
||
|
|
return msg, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (g *Gate) NewAgent() gate.Agent {
|
||
|
|
a := &agent{
|
||
|
|
module: g.GetModule(),
|
||
|
|
lastHeartbeat: time.Now().Unix(),
|
||
|
|
}
|
||
|
|
return a
|
||
|
|
}
|