You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
114 lines
2.1 KiB
114 lines
2.1 KiB
|
3 months ago
|
package call
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"encoding/json"
|
||
|
|
"server/db"
|
||
|
|
"server/natsClient"
|
||
|
|
"server/pb"
|
||
|
|
"server/util"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/liangdas/mqant/log"
|
||
|
|
"github.com/olivere/elastic/v7"
|
||
|
|
)
|
||
|
|
|
||
|
|
func InsertToESGO(index string, params interface{}, id ...string) {
|
||
|
|
if GetTopicName() == "common" {
|
||
|
|
data := &WriteData{Index: index, Data: params}
|
||
|
|
if id != nil {
|
||
|
|
data.ID = id[0]
|
||
|
|
}
|
||
|
|
Go(func() {
|
||
|
|
AddBulk(data)
|
||
|
|
})
|
||
|
|
} else {
|
||
|
|
byt, _ := json.Marshal(params)
|
||
|
|
data := &pb.InnerESBulk{Index: index, Data: string(byt)}
|
||
|
|
if id != nil {
|
||
|
|
data.ID = id[0]
|
||
|
|
}
|
||
|
|
Go(func() { Publish(natsClient.TopicInnerESBulk, data) })
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
var (
|
||
|
|
Bulk *elastic.BulkService
|
||
|
|
WriteChan = make(chan *WriteData, 20000)
|
||
|
|
ESCloseChan = make(chan struct{})
|
||
|
|
ESFinishChan = make(chan struct{}) // 完成信号
|
||
|
|
MaxSize int64 = 50 * 1024 * 1024 // bulk最大长度
|
||
|
|
)
|
||
|
|
|
||
|
|
type WriteData struct {
|
||
|
|
ID string
|
||
|
|
Index string
|
||
|
|
Data interface{}
|
||
|
|
}
|
||
|
|
|
||
|
|
func InitBulkQueue() {
|
||
|
|
es := db.ES().C()
|
||
|
|
Bulk = es.Bulk()
|
||
|
|
util.Go(func() {
|
||
|
|
t := time.NewTimer(time.Second)
|
||
|
|
for {
|
||
|
|
select {
|
||
|
|
case <-t.C:
|
||
|
|
t.Reset(time.Second)
|
||
|
|
Flush()
|
||
|
|
case one := <-WriteChan:
|
||
|
|
e := elastic.NewBulkCreateRequest().Index(one.Index)
|
||
|
|
if one.ID != "" {
|
||
|
|
e.Id(one.ID)
|
||
|
|
}
|
||
|
|
e.Doc(one.Data)
|
||
|
|
Bulk.Add(e)
|
||
|
|
if Bulk.EstimatedSizeInBytes() >= MaxSize {
|
||
|
|
Flush()
|
||
|
|
}
|
||
|
|
case <-ESCloseChan:
|
||
|
|
log.Debug("module closing")
|
||
|
|
Flush()
|
||
|
|
ESFinishChan <- struct{}{}
|
||
|
|
}
|
||
|
|
// fmt.Println(total)
|
||
|
|
}
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func AddBulk(w *WriteData) {
|
||
|
|
WriteChan <- w
|
||
|
|
}
|
||
|
|
|
||
|
|
func Flush() {
|
||
|
|
if Bulk.EstimatedSizeInBytes() <= 0 {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
bulk := Bulk
|
||
|
|
// util.IndexTry(func() error {
|
||
|
|
util.Go(func() {
|
||
|
|
_, err := bulk.Do(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
log.Error("err:%v", err)
|
||
|
|
// return err
|
||
|
|
}
|
||
|
|
})
|
||
|
|
// return nil
|
||
|
|
// })
|
||
|
|
Bulk = db.ES().C().Bulk()
|
||
|
|
}
|
||
|
|
|
||
|
|
// 滚动设置新的索引
|
||
|
|
func Rollover(aliasName, indexName string) {
|
||
|
|
if db.ES().IndexExist(indexName) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
// 不存在创建
|
||
|
|
if !db.ES().IndexExist(aliasName) {
|
||
|
|
db.ES().CreateExist(indexName)
|
||
|
|
db.ES().AddAlias(aliasName, indexName)
|
||
|
|
} else {
|
||
|
|
db.ES().Rollover(aliasName, indexName, nil)
|
||
|
|
}
|
||
|
|
}
|