印度包网
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

232 lines
5.7 KiB

1 year ago
package hall
import (
"fmt"
"server/call"
"server/common"
"server/db"
"server/natsClient"
"server/pb"
"github.com/gogo/protobuf/proto"
"github.com/liangdas/mqant/gate"
"github.com/liangdas/mqant/log"
"github.com/nats-io/nats.go"
)
func initNats(conn *nats.Conn) {
// 广播
natsClient.NewCommonNatsImp(conn, natsClient.TopicBroadcast, broadcastNatsImp)
// 接受广播请求
natsClient.NewCommonNatsImp(conn, natsClient.TopicBroadcastReq, broadcastReqNatsImp)
// 断线
natsClient.NewCommonNatsImp(conn, natsClient.TopicClientDisconnect, disconnectNatsImp)
// 配置更新
call.InitReload(conn)
// 后台发邮件
natsClient.NewCommonNatsImp(conn, natsClient.TopicBackRefreshMail, refreshMailNatsImp)
// 操作玩家
natsClient.NewCommonNatsImp(conn, natsClient.TopicInnerOptPlayer, optPlayer)
// 只监听充值操作
natsClient.NewCommonNatsImp(conn, natsClient.TopicInnerRefreshGold, refreshPlayerGold)
// 广播任意协议
natsClient.NewCommonNatsImp(conn, natsClient.TopicBroadcastAll, broadcastAllNatsImp)
}
// natsImp nats对象
type natsImp struct {
m *Hall
}
func (h *Hall) NewNatsImp(topic string) *natsImp {
n := new(natsImp)
base := natsClient.NewNatsClient()
base.Topic = topic
base.NatsImp = n
base.Conn = h.App.Transport()
n.m = h
go func() {
err := base.OnRequestHandle()
if err != nil {
log.Error("NewNatsImp OnRequestHandle error:%v", err)
}
}()
return n
}
// OnMsgCallBack 订阅回调
func (n *natsImp) OnMsgCallBack(nm *nats.Msg) {
m := new(pb.NewPlayerLogin)
if err := proto.Unmarshal(nm.Data, m); err != nil {
log.Error("error:%v", err)
return
}
// 过滤自己发布的消息
if m.HallServerID == n.m.GetServerID() {
return
}
if v, ok := playerIdMap.Load(m.Uid); ok {
p := v.(*player)
call.SendSS(p.session, int(pb.ServerGateResp_GateRepeatResp), nil)
log.Debug("redPointTipsNotifyNatsImp player duplicate login, uid:%d, old session:%s", p.db.Id, p.session.GetSessionID())
// 在另外一个大厅登陆的, 所以这里要删除
delPlayerByID(p.db.Id)
// db.Redis().Delkey(common.GetRedisKeyToken(p.token))
}
}
func disconnectNatsImp(data []byte) {
notify := &pb.ClientDisConnectNotify{}
err := proto.Unmarshal(data, notify)
if err != nil {
log.Error("clientDissconnectNatsImp OnMsgCallBack proto Unmarshal error %v", err)
return
}
log.Debug("disconnect notify:%+v", notify)
delPlayerBySession(notify.SessionId)
p := getPlayerById(int(notify.UserID))
if p != nil && p.session.GetSessionID() == notify.SessionId {
delPlayerByID(int(notify.UserID))
call.UpdateUserXInfo(&common.PlayerDBInfo{Id: p.db.Id}, map[string]interface{}{"online": 2})
}
}
func broadcastNatsImp(data []byte) {
var session gate.Session
sids := ""
playerIdMap.Range(func(key, value interface{}) bool {
p := value.(*player)
if session == nil {
session = p.session
}
sids += p.session.GetSessionID() + ","
return true
})
if sids == "" {
return
}
sids = sids[:len(sids)-1]
session.SendBatch(sids, fmt.Sprintf("%v:%v", int(pb.ServerType_ServerTypeCommon), int(pb.ServerCommonResp_CommonBroadcastResp)), data)
}
func broadcastReqNatsImp(data []byte) {
msg := new(pb.InnerBroadcast)
if err := proto.Unmarshal(data, msg); err != nil {
log.Error("err:%v", err)
return
}
log.Debug("broadcastreq:%+v", *msg)
one := &oneBroadcast{
id: int(msg.ID),
priority: int(msg.Priority),
frequency: int(msg.Frequency),
interval: int(msg.Interval),
content: msg.Content,
}
one.broadcast()
}
func refreshMailNatsImp(data []byte) {
one := new(pb.InnerRefreshMail)
if err := proto.Unmarshal(data, one); err != nil {
log.Error("err:%v", err)
return
}
log.Debug("back refresh mail...%+v", *one)
if len(one.UIDs) == 0 {
playerIdMap.Range(func(key, value interface{}) bool {
p := value.(*player)
p.PushRedPoint()
return true
})
} else {
playerIdMap.Range(func(key, value interface{}) bool {
p := value.(*player)
contain := false
for _, v := range one.UIDs {
if v == uint32(p.db.Id) {
contain = true
break
}
}
if !contain {
return true
}
call.UpsertRedPointAndNotify(p.db.Id, 1, call.ModuleMail)
// p.PushRedPoint()
return true
})
}
}
func optPlayer(data []byte) {
one := new(pb.InnerOptPlayer)
if err := proto.Unmarshal(data, one); err != nil {
log.Error("err:%v", err)
return
}
log.Debug("back opt player...%+v", *one)
switch one.Opt {
case common.OptPlayerTypeKick:
p, ok := playerIdMap.Load(int(one.UID))
if !ok {
return
}
player := p.(*player)
db.Redis().Delkey(common.GetRedisKeyToken(player.token))
db.Redis().Delkey(common.GetRedisKeyUser(player.db.Id))
player.session.Close()
case common.OptPlayerTypeDisconnect:
p, ok := playerIdMap.Load(int(one.UID))
if !ok {
return
}
player := p.(*player)
player.session.Close()
}
}
func refreshPlayerGold(data []byte) {
msg := new(pb.InnerRefreshGold)
if err := proto.Unmarshal(data, msg); err != nil {
log.Error("err:%v", err)
return
}
if msg.Event != uint32(common.CurrencyEventReCharge) && msg.Event != common.CurrencyEventGMRecharge {
return
}
p := getPlayerById(int(msg.UID))
if p == nil {
return
}
log.Debug("player %v recharge", p.db.Id)
p.isRecharge = true
}
func broadcastAllNatsImp(data []byte) {
inner := &pb.InnerBroadcastAll{}
if err := proto.Unmarshal(data, inner); err != nil {
log.Error("err:%v", err)
return
}
var session gate.Session
sids := ""
playerIdMap.Range(func(key, value interface{}) bool {
p := value.(*player)
if session == nil {
session = p.session
}
sids += p.session.GetSessionID() + ","
return true
})
if sids == "" {
return
}
sids = sids[:len(sids)-1]
session.SendBatch(sids, fmt.Sprintf("%v:%v", int(inner.ProtocolID), int(inner.ProtocolType)), inner.Data)
}