You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
216 lines
5.6 KiB
216 lines
5.6 KiB
/* |
|
* |
|
一定要记得在confin.json配置这个模块的参数,否则无法使用 |
|
*/ |
|
package gate |
|
|
|
import ( |
|
"fmt" |
|
"net/url" |
|
"server/call" |
|
"server/config" |
|
"server/db" |
|
"server/natsClient" |
|
"server/pb" |
|
"time" |
|
|
|
mdb "server/db/mysql" |
|
|
|
"github.com/gogo/protobuf/proto" |
|
"github.com/liangdas/mqant/conf" |
|
"github.com/liangdas/mqant/gate" |
|
"github.com/liangdas/mqant/gate/uriroute" |
|
"github.com/liangdas/mqant/log" |
|
"github.com/liangdas/mqant/module" |
|
"github.com/liangdas/mqant/server" |
|
) |
|
|
|
var Module = func() module.Module { |
|
this := new(Gate) |
|
return this |
|
} |
|
|
|
type Gate struct { |
|
BaseGate |
|
// basegate.Gate //继承 |
|
Route *uriroute.URIRoute |
|
} |
|
|
|
func (g *Gate) GetType() string { |
|
//很关键,需要与配置文件中的Module配置对应 |
|
return "gate" |
|
} |
|
func (g *Gate) Version() string { |
|
//可以在监控时了解代码版本 |
|
return "1.0.0" |
|
} |
|
func (g *Gate) OnInit(app module.App, settings *conf.ModuleSettings) { |
|
|
|
// 初始化调用 |
|
call.NewCaller(g) |
|
|
|
route := uriroute.NewURIRoute(g, |
|
uriroute.Selector(g.Selector), |
|
uriroute.DataParsing(func(topic string, u *url.URL, msg []byte) (bean interface{}, err error) { |
|
// 如果客户端消息要加密, 在这里解密 |
|
// log.Info("msg:%v", msg) |
|
// one := new(pb.JoinMatchReq) |
|
// fmt.Println(proto.Unmarshal(msg, one)) |
|
// fmt.Println(one) |
|
// var tmp []byte |
|
// bean, err = utils.AesDecrypt(msg) |
|
// if err != nil { |
|
// log.Error("decrypt err:%v,msg:%v", err, msg) |
|
// } |
|
// bean = msg |
|
// str := strings.Split(string(tmp), ",") |
|
// res := []byte{} |
|
// for _, v := range str { |
|
// tmp, _ := strconv.Atoi(v) |
|
// res = append(res, byte(tmp)) |
|
// } |
|
// log.Info("res:%v", res) |
|
// bean = res |
|
// log.Info("bean:%v", bean) |
|
return |
|
}), |
|
// uriroute.CallTimeOut(3*time.Second), |
|
) |
|
//注意这里一定要用 gate.Gate 而不是 module.BaseModule |
|
sec := config.GetConfig().Gate.HeartBeat |
|
bufsize := config.GetConfig().Gate.BufSize |
|
wsAddr := config.GetConfig().Gate.WSAddr |
|
wsPort := config.GetConfig().Gate.WSPort |
|
tls := config.GetConfig().Gate.TLS |
|
cert := config.GetConfig().Gate.CertFile |
|
key := config.GetConfig().Gate.KeyFile |
|
fmt.Println("GATE:===================", wsAddr, wsPort) |
|
if wsAddr == "" { |
|
panic("invalid gate addr") |
|
} |
|
|
|
metadata := make(map[string]string) |
|
metadata["addr"] = wsAddr |
|
options := []server.Option{server.Metadata(metadata)} |
|
g.BaseGate.OnInit(g, app, settings, |
|
gate.Heartbeat(time.Second*time.Duration(sec)), |
|
gate.BufSize(bufsize), |
|
gate.SetRouteHandler(route), |
|
gate.SetSessionLearner(g), |
|
gate.SetStorageHandler(g), |
|
gate.WsAddr(wsPort), |
|
gate.TLS(tls), |
|
gate.CertFile(cert), |
|
gate.KeyFile(key), |
|
gate.ServerOpts(options), |
|
gate.SetSendMessageHook(MessageHook), |
|
) |
|
g.BaseGate.SetCreateAgent(g.NewAgent) |
|
db.InitDB(&mdb.MysqlClient{}) |
|
loadConfig() |
|
call.InitReload(g.App.Transport()) |
|
} |
|
|
|
func (g *Gate) OnDestroy() { |
|
log.Debug("%s OnDestroy", g.GetType()) |
|
g.BaseModule.OnDestroy() |
|
} |
|
|
|
// 当连接建立 并且MQTT协议握手成功 |
|
func (g *Gate) Connect(session gate.Session) { |
|
//log.Info("client connect session %v, ip %s, total num:%d", session.GetSessionID(), session.GetIP(), g.GetGateHandler().GetAgentNum()) |
|
//_ = session.SetLocalUserData(&sync.Map{}) |
|
} |
|
|
|
// DisConnect 当连接关闭 |
|
func (g *Gate) DisConnect(session gate.Session) { |
|
uid := session.GetUserIDInt64() |
|
if uid <= 0 { |
|
return |
|
} |
|
|
|
log.Info("client disconnect session %v, ip %s, userid:%s, total num:%d", session.GetSessionID(), session.GetIP(), session.GetUserID(), g.GetGateHandler().GetAgentNum()) |
|
|
|
notify := &pb.ClientDisConnectNotify{UserID: uint32(uid), SessionId: session.GetSessionID()} |
|
payload, _ := proto.Marshal(notify) |
|
_ = g.App.Transport().Publish(natsClient.TopicClientDisconnect, payload) |
|
} |
|
|
|
func (g *Gate) OnRoute(session gate.Session, topic string, msg []byte) (bool, interface{}, error) { |
|
return g.Route.OnRoute(session, topic, msg) |
|
} |
|
|
|
/* |
|
* |
|
存储用户的Session信息 |
|
Session Bind Userid以后每次设置 settings都会调用一次Storage |
|
*/ |
|
func (g *Gate) Storage(session gate.Session) (err error) { |
|
return nil |
|
} |
|
|
|
/* |
|
* |
|
强制删除Session信息 |
|
*/ |
|
func (g *Gate) Delete(session gate.Session) (err error) { |
|
return |
|
} |
|
|
|
/* |
|
* |
|
获取用户Session信息 |
|
用户登录以后会调用Query获取最新信息 |
|
*/ |
|
func (g *Gate) Query(Userid string) ([]byte, error) { |
|
return nil, nil |
|
} |
|
|
|
/* |
|
* |
|
用户心跳,一般用户在线时60s发送一次 |
|
可以用来延长Session信息过期时间 |
|
*/ |
|
func (g *Gate) Heartbeat(session gate.Session) { |
|
//log.Debug("client heart beat, session:%v, uid:%v", session.GetSessionID(), session.GetUserID()) |
|
} |
|
|
|
// Selector 客户端路由规则自定义函数 |
|
func (g *Gate) Selector(session gate.Session, topic string, u *url.URL) (s module.ServerSession, err error) { |
|
moduleType := u.Scheme |
|
nodeID := u.Hostname() |
|
|
|
//使用自己的 |
|
if nodeID == "modules" { |
|
return g.GetRouteServer(moduleType, call.VersionSelector(nodeID)) |
|
//取模 |
|
// } else if nodeID == "cache" { |
|
// //缓存 |
|
// } else if nodeID == "random" { |
|
// //随机 |
|
// } else { |
|
// |
|
//指定节点规则就是 module://[user:pass@]nodeId/path |
|
//方式1 |
|
//moduleType=fmt.Sprintf("%v@%v",moduleType,u.Hostname()) |
|
//方式2 |
|
// serverID := fmt.Sprintf("%v@%v", moduleType, nodeId) |
|
} |
|
return g.GetRouteServer(moduleType, call.WorkIDSelector(nodeID)) |
|
} |
|
|
|
// MessageHook 下发消息前的进一步处理 |
|
func MessageHook(session gate.Session, topic string, msg []byte) ([]byte, error) { |
|
// log.Info("msg:%v", msg) |
|
// send := utils.AesEncrypt(msg) |
|
// log.Info("send:%v", string(send)) |
|
return msg, nil |
|
} |
|
|
|
func (g *Gate) NewAgent() gate.Agent { |
|
a := &agent{ |
|
module: g.GetModule(), |
|
lastHeartbeat: time.Now().Unix(), |
|
} |
|
return a |
|
}
|
|
|