印度包网
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

648 lines
20 KiB

1 year ago
package es
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"reflect"
"server/common"
"server/util"
"time"
"github.com/liangdas/mqant/log"
"github.com/olivere/elastic/v7"
)
// 向Es数据库中新增数据 index 数据表名 params 一定要是json数据结构
func (ES *EsClient) InsertToES(index string, params interface{}) error {
_, err := ES.client.Index().Index(index).BodyJson(params).Do(context.Background())
if err != nil {
log.Error("index:%v,write:%v,err:%v", index, params, err)
return err
}
return nil
}
// 向Es数据库中新增数据 指数退避写入
func (ES *EsClient) InsertToESGO(index string, params interface{}) {
// util.IndexTry(func() error {
// _, err := ES.client.Index().Index(index).BodyJson(params).Do(context.Background())
// if err != nil {
// log.Error("index:%v,write:%v,err:%v", index, params, err)
// return err
// }
// return nil
// })
util.Go(func() {
AddBulk(&WriteData{Index: index, Data: params})
})
}
// 向Es数据库中新增数据 index 数据表名 params 一定要是json数据结构
func (ES *EsClient) InsertToESByID(index, id string, params interface{}) error {
_, err := ES.client.Index().Index(index).Id(id).BodyJson(params).Do(context.Background())
if err != nil {
log.Error("index:%v,write:%v,err:%v", index, params, err)
return err
}
return nil
}
// 向Es数据库中新增数据 指数退避写入
func (ES *EsClient) InsertToESByIDGO(index, id string, params interface{}) {
// util.IndexTry(func() error {
// _, err := ES.client.Index().Index(index).Id(id).BodyJson(params).Do(context.Background())
// if err != nil {
// log.Error("index:%v,write:%v,err:%v", index, params, err)
// return err
// }
// return nil
// })
util.Go(func() {
AddBulk(&WriteData{ID: id, Index: index, Data: params})
})
}
func (ES *EsClient) Count(index string, q elastic.Query) int64 {
count, err := ES.client.Count(index).Query(q).Do(context.Background())
if err != nil {
return 0
}
return count
}
// CountCard 统计数量,按提供字段去重
func (ES *EsClient) CountCard(index, field string, q elastic.Query) int64 {
result, err := ES.C().Search().Index(index).Query(q).Aggregation("Total", elastic.NewCardinalityAggregation().Field(field)).Size(0).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return 0
}
ret := map[string]int64{}
err = json.Unmarshal(result.Aggregations["Total"], &ret)
if err != nil {
log.Error("err:%v", err)
return 0
}
return ret["value"]
}
// Update 更新文档,val必须是map|||struct||es script
func (ES *EsClient) Update(index, id string, val interface{}) (res *elastic.UpdateResponse, err error) {
switch t := val.(type) {
case *elastic.Script:
res, err = ES.client.Update().Index(index).Id(id).Script(t).Do(context.Background())
return
default: // struct or map
res, err = ES.client.Update().Index(index).Id(id).Doc(val).Do(context.Background())
return
}
}
func (ES *EsClient) DeleteByID(index string, id string) error {
_, err := ES.client.Delete().Index(index).Id(id).Do(context.Background())
return err
}
func (ES EsClient) DeleteByQuery(indices string, q elastic.Query) (err error) {
_, err = ES.client.DeleteByQuery(indices).Query(q).WaitForCompletion(false).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
}
return
}
func (ES *EsClient) DeleteIndex(index ...string) (*elastic.IndicesDeleteResponse, error) {
return ES.client.DeleteIndex(index...).Do(context.Background())
}
func (ES *EsClient) C() *elastic.Client {
return ES.client
}
// 查询一组数据 index表名,page页码,num一页数量,q查询语句,kind取值的类型(必须是指针,其中包含的ID会有特殊含义,与文档中的_id同步),sort排序(false为逆序)
func (ES *EsClient) QueryList(index string, page, num int, q elastic.Query, kind interface{}, sort ...interface{}) (int64, error) {
from := page * num
if from < 0 || num == 0 {
return 0, errors.New("invalid page or num")
}
oneType := reflect.TypeOf(kind).Kind()
if oneType != reflect.Ptr {
return 0, errors.New("invalid kind")
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
searchRes := new(elastic.SearchResult)
var err error
if len(sort) > 0 {
next := ES.client.Search(index).Query(q)
for i := 0; i < len(sort)-1; i += 2 {
field, ok := sort[i].(string)
if !ok {
log.Error("invalid sort:%v", sort...)
return 0, errors.New("invalid sort")
}
seq, ok := sort[i+1].(bool)
if !ok {
log.Error("invalid sort:%v", sort...)
return 0, errors.New("invalid sort")
}
next.Sort(field, seq)
}
searchRes, err = next.From(from).Size(num).Do(ctx)
} else {
searchRes, err = ES.client.Search(index).Query(q).From(from).Size(num).Do(ctx)
}
// ret := []interface{}{}
if elastic.IsNotFound(err) {
return 0, nil
}
if err != nil {
log.Error("search error:%v", err)
return 0, err
}
newArr := make([]reflect.Value, 0)
val := reflect.ValueOf(kind)
for _, v := range searchRes.Hits.Hits {
t := reflect.TypeOf(kind)
tee := reflect.New(t.Elem().Elem())
ti := tee.Interface()
err := json.Unmarshal(v.Source, ti)
if err != nil {
return 0, err
}
e := reflect.ValueOf(ti).Elem()
if e.Kind() == reflect.Struct {
if id := e.FieldByName("ID"); id.Kind() == reflect.String {
id.SetString(v.Id)
}
} else if e.Kind() == reflect.Ptr {
if e.Elem().Kind() == reflect.Struct {
if id := e.Elem().FieldByName("ID"); id.Kind() == reflect.String {
id.SetString(v.Id)
}
}
}
newArr = append(newArr, reflect.ValueOf(ti).Elem())
}
val.Elem().Set(reflect.Append(val.Elem(), newArr...))
count := ES.Count(index, q)
return count, nil
}
// 查询一组数据 index表名,after上次查询最后一个数据,num一页数量,q查询语句,kind取值的类型(必须是指针,其中包含的ID会有特殊含义,与文档中的_id同步),sort排序(false为逆序)
func (ES *EsClient) SearchAfter(index string, after interface{}, num int, q elastic.Query, kind interface{}, seqSort bool) (int64, error) {
oneType := reflect.TypeOf(kind).Kind()
if oneType != reflect.Ptr {
return 0, errors.New("invalid kind")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
searchRes := new(elastic.SearchResult)
var err error
next := ES.client.Search(index).Query(q).Sort("_seq_no", seqSort)
// for i := 0; i < len(sort)-1; i += 2 {
// field, ok := sort[i].(string)
// if !ok {
// log.Error("invalid sort:%v", sort...)
// return 0, errors.New("invalid sort")
// }
// seq, ok := sort[i+1].(bool)
// if !ok {
// log.Error("invalid sort:%v", sort...)
// return 0, errors.New("invalid sort")
// }
// next.Sort(field, seq)
// }
if after != nil {
searchRes, err = next.Size(num).SearchAfter(after).SeqNoPrimaryTerm(true).Do(ctx)
} else {
searchRes, err = next.Size(num).SeqNoPrimaryTerm(true).Do(ctx)
}
// ret := []interface{}{}
if elastic.IsNotFound(err) {
return 0, nil
}
if err != nil {
log.Error("search error:%v", err)
return 0, err
}
newArr := make([]reflect.Value, 0)
val := reflect.ValueOf(kind)
var resAfter int64
for i, v := range searchRes.Hits.Hits {
t := reflect.TypeOf(kind)
tee := reflect.New(t.Elem().Elem())
ti := tee.Interface()
err := json.Unmarshal(v.Source, ti)
if err != nil {
return 0, err
}
e := reflect.ValueOf(ti).Elem()
if e.Kind() == reflect.Struct {
if id := e.FieldByName("ID"); id.Kind() == reflect.String {
id.SetString(v.Id)
}
} else if e.Kind() == reflect.Ptr {
if e.Elem().Kind() == reflect.Struct {
if id := e.Elem().FieldByName("ID"); id.Kind() == reflect.String {
id.SetString(v.Id)
}
}
}
fmt.Println(*v.SeqNo)
newArr = append(newArr, reflect.ValueOf(ti).Elem())
if i == len(searchRes.Hits.Hits)-1 {
resAfter = *v.SeqNo
}
}
val.Elem().Set(reflect.Append(val.Elem(), newArr...))
return resAfter, nil
}
func (ES *EsClient) UpdateByScript(index string, q elastic.Query, script string) (updated int64, err error) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
var res *elastic.BulkIndexByScrollResponse
res, err = ES.client.UpdateByQuery(index).Query(q).Script(elastic.NewScriptInline(script)).Do(ctx)
if err != nil {
log.Error("err:%v", err)
return
}
updated = res.Updated
return
}
// QueryOne 查询单条数据,kind取值的类型(必须是指针,其中包含的ID会有特殊含义,与文档中的_id同步),sort排序(false为逆序)
func (ES *EsClient) QueryOne(index string, q elastic.Query, kind interface{}, sort ...interface{}) error {
// ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// defer cancel()
k := reflect.ValueOf(kind)
oneType := k.Kind()
if oneType != reflect.Ptr {
return errors.New("invalid kind")
}
s := ES.client.Search(index).Query(q)
ret := new(elastic.SearchResult)
var err error
if sort != nil {
ret, err = s.Sort(sort[0].(string), sort[1].(bool)).Size(1).Do(context.Background())
} else {
ret, err = s.Size(1).Do(context.Background())
}
if err != nil && !elastic.IsNotFound(err) {
// if !elastic.IsNotFound(err) {
log.Error("err:%v", err)
// }
return err
}
if ret == nil || len(ret.Hits.Hits) == 0 {
return nil
}
// randomIdx := rand.Intn(len(ret.Hits.Hits))
// hit := ret.Hits.Hits[randomIdx]
for _, v := range ret.Hits.Hits {
// log.Debug("source:%v", v)
err = json.Unmarshal(v.Source, kind)
if err != nil {
log.Error("err:%v,source:%v", err, string(v.Source))
return err
}
if k.Elem().Kind() == reflect.Struct {
// log.Debug("set:%v", v)
if id := k.Elem().FieldByName("ID"); id.Kind() == reflect.String {
id.SetString(v.Id)
}
}
break
}
return nil
}
// QueryOne 查询单条数据,kind取值的类型(必须是指针,其中包含的ID会有特殊含义,与文档中的_id同步),sort排序(false为逆序)
func (ES *EsClient) QueryOneRandom(index string, q elastic.Query, kind interface{}, sort ...interface{}) error {
// ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
// defer cancel()
k := reflect.ValueOf(kind)
oneType := k.Kind()
if oneType != reflect.Ptr {
return errors.New("invalid kind")
}
s := ES.client.Search(index).Query(q)
ret := new(elastic.SearchResult)
var err error
if sort != nil {
ret, err = s.Sort(sort[0].(string), sort[1].(bool)).Do(context.Background())
} else {
ret, err = s.Do(context.Background())
}
if err != nil {
if !elastic.IsNotFound(err) {
log.Error("err:%v", err)
}
return err
}
if len(ret.Hits.Hits) == 0 {
return nil
}
randomIdx := rand.Intn(len(ret.Hits.Hits))
hit := ret.Hits.Hits[randomIdx]
err = json.Unmarshal(hit.Source, kind)
if err != nil {
log.Error("source:%v", string(hit.Source))
return err
}
if k.Elem().Kind() == reflect.Struct {
// log.Debug("set:%v", v)
if id := k.Elem().FieldByName("ID"); id.Kind() == reflect.String {
id.SetString(hit.Id)
}
}
return nil
}
// Upsert 没找到插入,找到则执行更新
func (ES *EsClient) Upsert(index, id string, val interface{}, uVal ...interface{}) (ret *elastic.UpdateResponse, err error) {
ctx := context.Background()
var s *elastic.UpdateService
switch t := val.(type) {
case *elastic.Script:
s = ES.client.Update().Index(index).Id(id).Script(t)
default: // struct or map
s = ES.client.Update().Index(index).Id(id).Doc(val)
}
if uVal == nil {
ret, err = s.Do(ctx)
return
}
// 是否强制执行更新脚本
if len(uVal) > 1 {
ret, err = s.Upsert(uVal[0]).ScriptedUpsert(uVal[1].(bool)).Do(ctx)
} else {
ret, err = s.Upsert(uVal[0]).Do(ctx)
}
return
}
// IncrBy 根据字段叠加
func (ES *EsClient) IncrBy(index, id, field string, val interface{}, uVal interface{}) (ret *elastic.UpdateResponse, err error) {
ctx := context.Background()
s := ES.client.Update().Index(index).Id(id).Script(elastic.NewScript(fmt.Sprintf("if(ctx._source.%v==null){ctx._source.%v=%v}else{ctx._source.%v+=%v}", field, field, val, field, val)))
if uVal == nil {
ret, err = s.Do(ctx)
return
}
ret, err = s.Upsert(uVal).ScriptedUpsert(true).Do(ctx)
return
}
func (ES *EsClient) Refresh(index string) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err := ES.client.Refresh(index).Do(ctx)
if err != nil && !elastic.IsNotFound(err) {
log.Error("refresh err:%v", err)
}
}
// GroupBy 聚合查询数量
// size 不设置默认为10
func (ES *EsClient) GroupBy(index, field string, q elastic.Query, size int) (*common.GroupBuckets, error) {
query := ES.C().Search().Index(index).Query(q)
if size > 0 {
query.Aggregation("total", elastic.NewTermsAggregation().Field(field).Size(size))
} else {
query.Aggregation("total", elastic.NewTermsAggregation().Field(field))
}
result, err := query.Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return nil, err
}
res := new(common.GroupBuckets)
if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil {
log.Error("err:%v", err)
return nil, err
}
return res, nil
}
// GroupBySub 聚合查询数量
// size 不设置默认为10
func (ES *EsClient) GroupBySub(index, field, sub string, q elastic.Query, size int) (*common.Group2CardBuckets, error) {
query := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewTermsAggregation().Field(field).SubAggregation("sub1", elastic.NewTermsAggregation().Field(sub)))
if size > 0 {
query.Size(size)
}
result, err := query.Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return nil, err
}
res := new(common.Group2CardBuckets)
if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil {
log.Error("err:%v", err)
return nil, err
}
return res, nil
}
// GroupByCard 聚合查询数量,包含子查询card
// size 不设置默认为10
func (ES *EsClient) GroupByCard(index, field, sub string, q elastic.Query, size int) (*common.GroupCardBuckets, error) {
query := ES.C().Search().Index(index).Query(q)
if size > 0 {
query.Aggregation("total", elastic.NewTermsAggregation().Field(field).SubAggregation("sub", elastic.NewCardinalityAggregation().Field(sub)).Size(size))
} else {
query.Aggregation("total", elastic.NewTermsAggregation().Field(field).SubAggregation("sub", elastic.NewCardinalityAggregation().Field(sub)))
}
result, err := query.Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return nil, err
}
res := new(common.GroupCardBuckets)
if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil {
log.Error("err:%v", err)
return nil, err
}
return res, nil
}
// GroupBy2Card 聚合查询数量,包含双重子查询card
// size 不设置默认为10
func (ES *EsClient) GroupBy2Card(index, field, sub1, sub2 string, q elastic.Query, size int) (*common.Group2CardBuckets, error) {
query := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewTermsAggregation().Field(field).SubAggregation("sub1", elastic.NewTermsAggregation().Field(sub1).SubAggregation("sub2", elastic.NewCardinalityAggregation().Field(sub2))))
if size > 0 {
query.Size(size)
}
result, err := query.Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return nil, err
}
res := new(common.Group2CardBuckets)
if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil {
log.Error("err:%v", err)
return nil, err
}
return res, nil
}
// GroupBy 聚合查询数量 field聚合的字段 sumField求和的字段
func (ES *EsClient) GroupSumBy(index, field string, q elastic.Query, ret interface{}, order string, or bool, size int, sumField ...string) error {
agg := elastic.NewTermsAggregation().Field(field)
for _, v := range sumField {
agg.SubAggregation(v, elastic.NewSumAggregation().Field(v))
}
if order != "" {
agg.OrderByAggregation(order, or)
}
agg.SubAggregation("Top", elastic.NewTopHitsAggregation().From(0).Size(1))
if size == 0 {
size = 5000
}
agg.Size(size)
result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", agg).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return err
}
fmt.Println(string(result.Aggregations["total"]))
if err := json.Unmarshal(result.Aggregations["total"], &ret); err != nil {
log.Error("err:%v", err)
return err
}
return nil
}
// Group2SumBy 聚合查询数量 field聚合的字段 sub2求和的字段
func (ES *EsClient) Group2SumBy(index, field, sub1, sub2 string, q elastic.Query, order string, or bool, size int) (*common.Group2SumBuckets, error) {
agg := elastic.NewTermsAggregation().Field(field).SubAggregation("sub1", elastic.NewTermsAggregation().Field(sub1).SubAggregation("sub2", elastic.NewSumAggregation().Field(sub2)))
if order != "" {
agg.OrderByAggregation(order, or)
}
// agg.SubAggregation("Top", elastic.NewTopHitsAggregation().From(0).Size(1))
if size == 0 {
size = 5000
}
agg.Size(size)
ret := new(common.Group2SumBuckets)
result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", agg).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return nil, err
}
if err := json.Unmarshal(result.Aggregations["total"], &ret); err != nil {
log.Error("err:%v", err)
return nil, err
}
return ret, nil
}
// SumBy 聚合求和数量
func (ES *EsClient) SumBy(index, field string, q elastic.Query) (float64, error) {
result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewSumAggregation().Field(field)).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return 0, err
}
// fmt.Println(string(result.Aggregations["total"]))
res := new(common.SumByResult)
if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil {
log.Error("err:%v", err)
return res.Value, err
}
return res.Value, nil
}
// SumByInt64 聚合求和数量
func (ES *EsClient) SumByInt64(index, field string, q elastic.Query) int64 {
result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewSumAggregation().Field(field)).Size(0).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return 0
}
// fmt.Println(string(result.Aggregations["total"]))
res := new(common.SumByResult)
if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil {
log.Error("err:%v", err)
return int64(res.Value)
}
return int64(res.Value)
}
// AVG 聚合求平均值
func (ES *EsClient) AVG(index, field string, q elastic.Query) (float64, error) {
result, err := ES.C().Search().Index(index).Query(q).Aggregation("total", elastic.NewAvgAggregation().Field(field)).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return 0, err
}
// fmt.Println(string(result.Aggregations["total"]))
res := new(common.SumByResult)
if err := json.Unmarshal(result.Aggregations["total"], &res); err != nil {
log.Error("err:%v", err)
return res.Value, err
}
return res.Value, nil
}
// Exist 判断是否存在
func (ES *EsClient) Exist(index, id string) bool {
ret, err := ES.C().Exists().Index(index).Id(id).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return false
}
return ret
}
3 months ago
// Exist 判断是否存在
func (ES *EsClient) IndexExist(index string) bool {
ret, err := ES.C().IndexExists(index).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return false
}
return ret
}
// Exist 判断是否存在
func (ES *EsClient) CreateExist(index string) error {
_, err := ES.C().CreateIndex(index).Do(context.Background())
if err != nil {
log.Error("err:%v", err)
return err
}
return nil
}
// Exist 判断是否存在
func (ES *EsClient) AddAlias(aliasName, indexName string) error {
alias := elastic.NewAliasAddAction(aliasName).Index(indexName).IsWriteIndex(true)
aliasResult, err := ES.client.Alias().Action(alias).Do(context.Background())
if err != nil || !aliasResult.Acknowledged {
log.Error("err:%v", err)
return err
}
return nil
}
func (ES *EsClient) Rollover(aliasName, indexName string, conditon map[string]interface{}) error {
req := ES.client.RolloverIndex(aliasName).NewIndex(indexName)
if conditon != nil {
for k, v := range conditon {
req.AddCondition(k, v)
}
}
aliasResult, err := req.Do(context.Background())
if err != nil || !aliasResult.Acknowledged {
log.Error("err:%v", err)
return err
}
return nil
}