You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
61 lines
1021 B
61 lines
1021 B
|
1 year ago
|
package es
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"server/util"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/liangdas/mqant/log"
|
||
|
|
"github.com/olivere/elastic/v7"
|
||
|
|
)
|
||
|
|
|
||
|
|
var (
|
||
|
|
Bulk *elastic.BulkService
|
||
|
|
WriteChan = make(chan *WriteData, 10000)
|
||
|
|
)
|
||
|
|
|
||
|
|
type WriteData struct {
|
||
|
|
ID string
|
||
|
|
Index string
|
||
|
|
Data interface{}
|
||
|
|
}
|
||
|
|
|
||
|
|
func InitBulkQueue(es *elastic.Client) {
|
||
|
|
Bulk = es.Bulk()
|
||
|
|
util.Go(func() {
|
||
|
|
t := time.NewTimer(time.Second)
|
||
|
|
for {
|
||
|
|
select {
|
||
|
|
case <-t.C:
|
||
|
|
t.Reset(time.Second)
|
||
|
|
if Bulk.EstimatedSizeInBytes() > 0 {
|
||
|
|
bulk := Bulk
|
||
|
|
// util.IndexTry(func() error {
|
||
|
|
util.Go(func() {
|
||
|
|
_, err := bulk.Do(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
log.Error("err:%v", err)
|
||
|
|
// return err
|
||
|
|
}
|
||
|
|
})
|
||
|
|
// return nil
|
||
|
|
// })
|
||
|
|
Bulk = es.Bulk()
|
||
|
|
}
|
||
|
|
case one := <-WriteChan:
|
||
|
|
e := elastic.NewBulkCreateRequest().Index(one.Index)
|
||
|
|
if one.ID != "" {
|
||
|
|
e.Id(one.ID)
|
||
|
|
}
|
||
|
|
e.Doc(one.Data)
|
||
|
|
Bulk.Add(e)
|
||
|
|
}
|
||
|
|
// fmt.Println(total)
|
||
|
|
}
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func AddBulk(w *WriteData) {
|
||
|
|
WriteChan <- w
|
||
|
|
}
|