You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
113 lines
2.1 KiB
113 lines
2.1 KiB
package call |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"server/db" |
|
"server/natsClient" |
|
"server/pb" |
|
"server/util" |
|
"time" |
|
|
|
"github.com/liangdas/mqant/log" |
|
"github.com/olivere/elastic/v7" |
|
) |
|
|
|
func InsertToESGO(index string, params interface{}, id ...string) { |
|
if GetTopicName() == "common" { |
|
data := &WriteData{Index: index, Data: params} |
|
if id != nil { |
|
data.ID = id[0] |
|
} |
|
Go(func() { |
|
AddBulk(data) |
|
}) |
|
} else { |
|
byt, _ := json.Marshal(params) |
|
data := &pb.InnerESBulk{Index: index, Data: string(byt)} |
|
if id != nil { |
|
data.ID = id[0] |
|
} |
|
Go(func() { Publish(natsClient.TopicInnerESBulk, data) }) |
|
} |
|
} |
|
|
|
var ( |
|
Bulk *elastic.BulkService |
|
WriteChan = make(chan *WriteData, 20000) |
|
ESCloseChan = make(chan struct{}) |
|
ESFinishChan = make(chan struct{}) // 完成信号 |
|
MaxSize int64 = 50 * 1024 * 1024 // bulk最大长度 |
|
) |
|
|
|
type WriteData struct { |
|
ID string |
|
Index string |
|
Data interface{} |
|
} |
|
|
|
func InitBulkQueue() { |
|
es := db.ES().C() |
|
Bulk = es.Bulk() |
|
util.Go(func() { |
|
t := time.NewTimer(time.Second) |
|
for { |
|
select { |
|
case <-t.C: |
|
t.Reset(time.Second) |
|
Flush() |
|
case one := <-WriteChan: |
|
e := elastic.NewBulkCreateRequest().Index(one.Index) |
|
if one.ID != "" { |
|
e.Id(one.ID) |
|
} |
|
e.Doc(one.Data) |
|
Bulk.Add(e) |
|
if Bulk.EstimatedSizeInBytes() >= MaxSize { |
|
Flush() |
|
} |
|
case <-ESCloseChan: |
|
log.Debug("module closing") |
|
Flush() |
|
ESFinishChan <- struct{}{} |
|
} |
|
// fmt.Println(total) |
|
} |
|
}) |
|
} |
|
|
|
func AddBulk(w *WriteData) { |
|
WriteChan <- w |
|
} |
|
|
|
func Flush() { |
|
if Bulk.EstimatedSizeInBytes() <= 0 { |
|
return |
|
} |
|
bulk := Bulk |
|
// util.IndexTry(func() error { |
|
util.Go(func() { |
|
_, err := bulk.Do(context.Background()) |
|
if err != nil { |
|
log.Error("err:%v", err) |
|
// return err |
|
} |
|
}) |
|
// return nil |
|
// }) |
|
Bulk = db.ES().C().Bulk() |
|
} |
|
|
|
// 滚动设置新的索引 |
|
func Rollover(aliasName, indexName string) { |
|
if db.ES().IndexExist(indexName) { |
|
return |
|
} |
|
// 不存在创建 |
|
if !db.ES().IndexExist(aliasName) { |
|
db.ES().CreateExist(indexName) |
|
db.ES().AddAlias(aliasName, indexName) |
|
} else { |
|
db.ES().Rollover(aliasName, indexName, nil) |
|
} |
|
}
|
|
|