package call import ( "bytes" "encoding/json" "fmt" "io" "io/ioutil" "mime/multipart" "net/http" "net/url" "server/common" "server/config" "server/db" "server/util" "strconv" "strings" "time" "github.com/liangdas/mqant/log" ) // 初始化在线人数上报 func InitOnline(f func() []*common.ESNewOnline) { now := time.Now() zero := util.GetZeroTime(now) h, m, _ := now.Clock() m -= m % 5 lastRecord := zero.Add(time.Duration(h) * time.Hour).Add(time.Duration(m) * time.Minute) next := lastRecord.Add(5 * time.Minute).Sub(now) time.AfterFunc(next, func() { WriteOnline(lastRecord.Add(5*time.Minute).Unix(), f) InitOnline(f) }) } // WriteOnline 写入在线统计 func WriteOnline(time int64, f func() []*common.ESNewOnline) { data := f() if len(data) == 0 { return } for i := range data { if data[i].Total == 0 { continue } data[i].Time = time db.ES().InsertToESGO(common.ESIndexBackPlayerOnline, data[i]) } } type FBEvents struct { EventName string `json:"event_name"` EventTime int64 `json:"event_time"` UserData struct { Em string `json:"em,omitempty"` Ph string `json:"ph,omitempty"` FN string `json:"fn,omitempty"` LN string `json:"ln,omitempty"` Country string `json:"country,omitempty"` IP string `json:"client_ip_address,omitempty"` ClientUserAgent string `json:"client_user_agent"` FBC string `json:"fbc,omitempty"` FBP string `json:"fbp,omitempty"` ExternalID string `json:"external_id,omitempty"` CT string `json:"ct,omitempty"` // 城市,哈希处理 ST string `json:"st,omitempty"` // 州,哈希处理 ZP string `json:"zp,omitempty"` // 邮编,哈希处理 } `json:"user_data"` CustomData struct { Currency string `json:"currency"` Value int64 `json:"value"` } `json:"custom_data"` ActionSource string `json:"action_source"` // AppData struct { // AdvertiserTrackingEnabled int `json:"advertiser_tracking_enabled"` // ApplicationTrackingEnabled int `json:"application_tracking_enabled"` // Extinfo []string `json:"extinfo"` // } `json:"app_data"` } type FBEvent int const ( FBEventRegist FBEvent = iota FBEventPurchase ) func (f FBEvent) GetName() string { switch f { case FBEventRegist: return "CompleteRegistration" case FBEventPurchase: return "Purchase" default: return "" } } // UploadFB 上报fb数据 func UploadFB(uid int, event FBEvent, amount int64) { u := &common.PlayerDBInfo{Id: uid} db.Mysql().Get(u) channel := GetChannelByID(u.ChannelID) if channel == nil || channel.FBPixelID == "" || channel.FBAccessToken == "" || config.GetBase().AD.FBAPIURL == "" { return } if channel.ADUpload != common.ADFB { return } pi := &common.PayInfo{UID: uid} db.Mysql().Get(pi) randPi := &common.PayInfo{} db.Mysql().C().Raw("SELECT * FROM pay_info ORDER BY RAND() LIMIT 1").Scan(randPi) pa := &common.PlayerADData{UID: uid} db.Mysql().GetLast(pa) if pa.FBC == "" { pa = &common.PlayerADData{ChannelID: u.ChannelID} db.Mysql().C().Raw("SELECT * FROM player_ad_data ORDER BY RAND() LIMIT 1").Scan(pa) } // 拿取玩家信息 em := pi.Email if em == "" { em = randPi.Email } ph := u.Mobile if ph == "" { ph = randPi.Mobile } fn, ln := util.FormatUserName(pi.AccountName) if fn == "" { fn, ln = util.FormatUserName(randPi.AccountName) } ua := "$CLIENT_USER_AGENT" if pa.UserAgent != "" { ua = pa.UserAgent } // 分析ip var ct, st, zp string ipinfo, err := SearchIP(u.IP) if err != nil { log.Error("err:%v", err) } else { ct = util.CalculateSHA256(strings.ToLower(ipinfo.City.Names["en"])) if len(ipinfo.Subdivisions) > 0 { st = util.CalculateSHA256(strings.ToLower(ipinfo.Subdivisions[0].Names["en"])) } zp = util.CalculateSHA256(strings.ToLower(ipinfo.Postal.Code)) } util.IndexTry(func() error { // 准备事件数据 var requestBody bytes.Buffer multipartWriter := multipart.NewWriter(&requestBody) // 添加文本字段 events := []FBEvents{{ EventName: event.GetName(), EventTime: time.Now().Unix(), }} events[0].UserData.Em = util.CalculateSHA256(strings.ToLower(em)) events[0].UserData.Ph = util.CalculateSHA256("91" + ph) events[0].UserData.FN = util.CalculateSHA256(strings.ToLower(fn)) events[0].UserData.LN = util.CalculateSHA256(strings.ToLower(ln)) events[0].UserData.IP = u.IP events[0].UserData.ClientUserAgent = ua events[0].UserData.FBC = pa.FBC events[0].UserData.FBP = pa.FBP events[0].UserData.Country = util.CalculateSHA256("in") events[0].UserData.ExternalID = util.CalculateSHA256(fmt.Sprintf("%d", uid)) events[0].UserData.ST = st events[0].UserData.CT = ct events[0].UserData.ZP = zp events[0].ActionSource = "website" if event == FBEventPurchase { events[0].CustomData.Currency = "brl" events[0].CustomData.Value = amount } evJson, _ := json.Marshal(events) err = multipartWriter.WriteField("data", string(evJson)) if err != nil { log.Error("Error writing text field:%v", err) return err } err = multipartWriter.WriteField("access_token", channel.FBAccessToken) if err != nil { log.Error("Error writing text field:%v", err) return err } multipartWriter.Close() req, err := http.NewRequest("POST", config.GetBase().AD.FBAPIURL+channel.FBPixelID+"/events?access_token="+channel.FBAccessToken, &requestBody) if err != nil { log.Error("err:%v", err) return err } // 设置请求头,包括 Content-Type req.Header.Set("Content-Type", multipartWriter.FormDataContentType()) // 发送请求 client := &http.Client{ Timeout: 10 * time.Second, } log.Debug("UploadFB:%+v", req) resp, err := client.Do(req) if err != nil { log.Error("err:%v", err) return err } defer resp.Body.Close() ret, err := io.ReadAll(resp.Body) if err != nil { log.Error("http read body err:%v", err) return err } log.Debug("fbres:%v", string(ret)) return nil }) } // UploadAdjust 上报adjust数据 func UploadAdjust(event common.AdjustEventType, u *common.PlayerDBInfo, param map[string]string) { channel := GetChannelByID(u.ChannelID) if channel == nil || (u.ADID == "" && u.GPSADID == "") || channel.AdjustAppToken == "" { return } if channel.ADUpload != common.ADJust { return } util.IndexTry(func() error { var err error // if u == nil { // u, err = GetUserXInfo("channel_id", "adid", "gps_adid") // if err != nil { // log.Error("err:%v", err) // return // } // } reqURL, _ := url.Parse(config.GetConfig().Web.Adjust.URL) params := url.Values{} params.Add("app_token", channel.AdjustAppToken) params.Add("event_token", channel.GetAdjustEventID(int(event))) params.Add("s2s", "1") params.Add("created_at", strconv.FormatInt(time.Now().Unix(), 10)) params.Add("adid", u.ADID) params.Add("gps_adid", u.GPSADID) params.Add("web_uuid", u.GPSADID) for k, v := range param { params.Add(k, v) } reqURL.RawQuery = params.Encode() req, err := http.NewRequest("GET", reqURL.String(), nil) if err != nil { log.Error("err:%v", err) return err } req.Header.Set("Authorization", "Bearer "+channel.AdjustAuth) client := &http.Client{ Timeout: 5 * time.Second, } log.Debug("UploadAdjust:%+v", req) res, err := client.Do(req) if err != nil { log.Error("http post call err:%v", err) return err } ret, err := io.ReadAll(res.Body) defer res.Body.Close() if err != nil { log.Error("http read body err:%v", err) return err } log.Debug("adjustRes:%v", string(ret)) return nil }) } // IsOrganic 查询设备是否是自然量 count 重试次数 func IsOrganic(gpsID, appToken string, cid int, count int) (is bool) { count-- // reqURL, _ := url.Parse(config.GetConfig().Web.Adjust.APIURL) reqURL, _ := url.Parse("https://api.adjust.com/device_service/api/v1/inspect_device") params := url.Values{} params.Add("advertising_id", gpsID) params.Add("app_token", appToken) reqURL.RawQuery = params.Encode() var lg string defer func() { if count > 0 && is { return } db.ES().InsertToESByIDGO(common.ESIndexBackABLog, fmt.Sprintf("%d_%s", cid, gpsID), &common.ESABLog{ Channel: cid, Time: time.Now().Unix(), DeviceID: gpsID, IsA: is, Log: lg, }) }() req, err := http.NewRequest("GET", reqURL.String(), nil) if err != nil { log.Error("err:%v", err) lg = err.Error() return } // req.Header.Set("Authorization", "Bearer "+config.GetConfig().Web.Adjust.APIToken) req.Header.Set("Authorization", "Bearer _2ZGYezfdZ7yD2he-zWx") client := &http.Client{ Timeout: 5 * time.Second, } log.Debug("check organic:%+v", req) res, err := client.Do(req) if err != nil { log.Error("http post call err:%v", err) lg = err.Error() return } if res.Status == http.StatusText(http.StatusNotFound) { is = true lg = "http not found" return } ret, err := ioutil.ReadAll(res.Body) defer res.Body.Close() if err != nil { log.Error("http read body err:%v", err) lg = err.Error() return } log.Debug("AdjustDeviceResp:%v", string(ret)) ad := new(AdjustDeviceResp) if err := json.Unmarshal(ret, ad); err != nil { log.Error("err:%v", err) lg = string(ret) } is = ad.TrackerName == "Organic" if is { lg = "Organic" } if is && count > 0 { time.Sleep(time.Second) return IsOrganic(gpsID, appToken, cid, count) } return } // IsOrganic2 查询设备是否是自然量(更为严格,凡是找不到设备号的,一律禁止) count 重试次数 func IsOrganic2(gpsID, appToken string, cid int, count int) (is bool) { count-- // reqURL, _ := url.Parse(config.GetConfig().Web.Adjust.APIURL) reqURL, _ := url.Parse("https://api.adjust.com/device_service/api/v1/inspect_device") params := url.Values{} params.Add("advertising_id", gpsID) params.Add("app_token", appToken) reqURL.RawQuery = params.Encode() var lg string defer func() { if count > 0 && is { return } db.ES().InsertToESByIDGO(common.ESIndexBackABLog, fmt.Sprintf("%d_%s", cid, gpsID), &common.ESABLog{ Channel: cid, Time: time.Now().Unix(), DeviceID: gpsID, IsA: is, Log: lg, }) }() req, err := http.NewRequest("GET", reqURL.String(), nil) if err != nil { is = true log.Error("err:%v", err) lg = err.Error() return } // req.Header.Set("Authorization", "Bearer "+config.GetConfig().Web.Adjust.APIToken) req.Header.Set("Authorization", "Bearer _2ZGYezfdZ7yD2he-zWx") client := &http.Client{ Timeout: 5 * time.Second, } log.Debug("check organic:%+v", req) res, err := client.Do(req) if err != nil { is = true log.Error("http post call err:%v", err) lg = err.Error() return } if res.Status == http.StatusText(http.StatusNotFound) { is = true lg = "http not found" return } ret, err := ioutil.ReadAll(res.Body) defer res.Body.Close() if err != nil { is = true log.Error("http read body err:%v", err) lg = err.Error() return } log.Debug("AdjustDeviceResp2:%v", string(ret)) ad := new(AdjustDeviceResp) if err := json.Unmarshal(ret, ad); err != nil { log.Error("err:%v", err) is = true lg = string(ret) } if !is { is = ad.TrackerName == "Organic" if is { lg = "Organic" } } if is && count > 0 { time.Sleep(time.Second) return IsOrganic2(gpsID, appToken, cid, count) } return } type AdjustDeviceResp struct { Adid string `json:"Adid"` AdvertisingID string `json:"AdvertisingId"` Tracker string `json:"Tracker"` TrackerName string `json:"TrackerName"` // ClickTime time.Time `json:"ClickTime"` // InstallTime time.Time `json:"InstallTime"` // LastAppVersion string `json:"LastAppVersion"` // LastAppVersionShort string `json:"LastAppVersionShort"` // LastSessionTime time.Time `json:"LastSessionTime"` // LastEventTimes struct { // Zcqz1Y time.Time `json:"zcqz1y"` // } `json:"LastEventTimes"` // PushToken string `json:"PushToken"` // State string `json:"State"` // InstallState string `json:"InstallState"` // SignatureAcceptanceStatus string `json:"SignatureAcceptanceStatus"` // SignatureVerificationResult string `json:"SignatureVerificationResult"` } // WriteRealOnline 写入各场次实时在线 func WriteRealOnline(field string, f func() map[int]*common.RedisRealOnline) { now := time.Now() zero := util.GetZeroTime(now) h, m, _ := now.Clock() m -= m % 5 lastRecord := zero.Add(time.Duration(h) * time.Hour).Add(time.Duration(m) * time.Minute) next := lastRecord.Add(5 * time.Minute).Sub(now) // _, _, s := now.Clock() // next := now.Add(time.Duration(60-s) * time.Second).Sub(now) log.Debug("next real:%v", next) time.AfterFunc(next, func() { real := f() for i, v := range real { keyField := fmt.Sprintf("%v", field) if i > 0 { keyField += fmt.Sprintf(":%v", i) } key := common.GetRedisKeyOnlineKey(keyField) if !db.Redis().Exist(key) { if err := db.Redis().HSet(key, v); err != nil { log.Error("err:%v", err) continue } if err := db.Redis().Expire(key, next-2*time.Second); err != nil { log.Error("err:%v", err) continue } } else { if err := db.Redis().HIncrBy(key, "Total", int64(v.Total)); err != nil { log.Error("err:%v", err) continue } if err := db.Redis().HIncrBy(key, "New", int64(v.New)); err != nil { log.Error("err:%v", err) continue } } } WriteRealOnline(field, f) }) } type KwaiReq struct { Clickid string `json:"clickid"` EventName string `json:"event_name"` PixelID string `json:"pixelId"` AccessToken string `json:"access_token"` TestFlag bool `json:"testFlag"` TrackFlag bool `json:"trackFlag"` IsAttributed int `json:"is_attributed"` // 归因标记,此处需写死为1 Mmpcode string `json:"mmpcode"` // 标记数据来源,此处需写死为 PL PixelSdkVersion string `json:"pixelSdkVersion"` // 版本信息,此处需写死为,9.9.9 Properties string `json:"properties"` } type KwaiResp struct { Result int `json:"result"` // 返回状态码 ErrorMsg string `json:"error_msg"` // 返回信息详情 } type KwaiProperties struct { Currency string `json:"currency"` Price float64 `json:"price"` } const ( KwaiEventRegist = iota KwaiEventPay ) func GetKwaiEventName(event int) string { switch event { case KwaiEventRegist: return "EVENT_COMPLETE_REGISTRATION" case KwaiEventPay: return "EVENT_PURCHASE" } return "" } func UploadKwai(uid, event int, amount int64) { u := &common.PlayerDBInfo{Id: uid} db.Mysql().Get(u) channel := GetChannelByID(u.ChannelID) if channel == nil || (u.ADID == "" && u.GPSADID == "") || channel.AdjustAppToken == "" { return } if channel.ADUpload != common.ADKwai { return } pa := &common.PlayerADData{UID: uid} db.Mysql().GetLast(pa) if pa.FBC == "" { pa = &common.PlayerADData{ChannelID: u.ChannelID} db.Mysql().C().Raw("SELECT * FROM player_ad_data ORDER BY RAND() LIMIT 1").Scan(pa) } send := &KwaiReq{ Clickid: pa.FBC, EventName: GetKwaiEventName(event), PixelID: channel.FBPixelID, AccessToken: channel.FBAccessToken, TestFlag: false, TrackFlag: !config.GetBase().Release, IsAttributed: 1, Mmpcode: "PL", PixelSdkVersion: "9.9.9", } if event == KwaiEventPay { pro := &KwaiProperties{ Currency: "brl", Price: float64(amount/1e6) / 100, } byt, _ := json.Marshal(pro) send.Properties = string(byt) } util.IndexTry(func() error { ret := &KwaiResp{} return util.HttpPost(config.GetBase().AD.KwaiAPIURL, send, ret, nil) }) }