123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598 |
- // Copyright 2019 github.com. All rights reserved.
- // Use of this source code is governed by github.com.
- package elastic
- import (
- "context"
- "errors"
- "fmt"
- "strconv"
- "strings"
- "github.com/tidwall/gjson"
- elastic "gopkg.in/olivere/elastic.v5"
- )
- const (
- // 分割符
- delimiter = ","
- // 分页常量
- slideUp = 1 // 向上滑动
- slideDown = 2 // 向下滑动
- positive = 1 // 正序
- reverse = 2 // 倒序
- )
- // SplitTopBottomData 分割分页的top/bottom
- func SplitTopBottomData(param string) (base, num, extra int64, err error) {
- if param == "" {
- return 0.0, 0, 0.0, nil
- }
- mems := strings.Split(param, delimiter)
- switch {
- case len(mems) == 0:
- return 0.0, 0, 0.0, errors.New("member is not enough")
- case len(mems) == 1:
- base, _ = strconv.ParseInt(mems[0], 10, 64)
- case len(mems) == 2:
- if base, err = strconv.ParseInt(mems[0], 10, 64); err != nil {
- return 0.0, 0, 0.0, err
- } else {
- num, _ = strconv.ParseInt(mems[1], 10, 64)
- }
- case len(mems) >= 3:
- if base, err = strconv.ParseInt(mems[0], 10, 64); err != nil {
- return 0.0, 0, 0.0, err
- } else {
- num, _ = strconv.ParseInt(mems[1], 10, 64)
- extra, _ = strconv.ParseInt(mems[2], 10, 64)
- }
- default:
- return 0.0, 0, 0.0, nil
- }
- return
- }
- // JoinTopBottomData 组装分页的top/bottom
- func JoinTopBottomData(base, num, extra int64) string {
- return fmt.Sprintf("%d%s%d%s%d", base, delimiter, num, delimiter, extra)
- }
- // ElasticSort 排序参数结构
- type ElasticSort struct {
- Field string
- Asc bool
- }
- // ElasticPageParam 分页参数
- type ElasticPageParam struct {
- Indexs []string
- Types []string
- Match map[string]string
- Query elastic.Query
- Sorts []ElasticSort
- GeoSort *elastic.GeoDistanceSort
- Size uint32 // 单次拉取条数
- Slide uint32 // 滑动方向 1: 向上滑动 2:向下滑动
- Top string // 页首数据
- Bottom string // 页尾数据
- SequenceType uint32 // 1:正序 2:反序
- BaseFilter string // 基础过滤域, 默认为"timestamp"
- ExtraFilter string // 附加过滤域
- Includes []string // includes, 要获取的字段名列表
- Excludes []string // excludes,不获取的字段名列表
- }
- // ElasticCountParam 计数参数
- type ElasticCountParam struct {
- Indexs []string
- Types []string
- Match map[string]string
- Query elastic.Query
- }
- // ElasticSumParam 取和参数
- type ElasticSumParam struct {
- Indexs []string
- Types []string
- Match map[string]string
- Query elastic.Query
- AggFields map[string]string
- }
- // ElasticRandomParam 随机取多条数据
- type ElasticRandomParam struct {
- Indexs []string
- Types []string
- Query elastic.Query
- GeoSort *elastic.GeoDistanceSort
- Size uint32 // 单次拉取条数
- BaseFilter string // 基础过滤域, 默认为"timestamp"
- ExtraFilter string // 附加过滤域
- OtherKeys []string // 需要的其它key, 如target
- }
- // ElasticHit 结果参数
- type ElasticHit struct {
- Index string
- Type string
- ID string
- Base int64
- Extra int64
- Source string
- }
- // PageResult 分页结果
- type PageResult struct {
- Top string
- Bottom string
- HasMore bool
- TotalCount uint64
- Hits []ElasticHit
- }
- // RandomResult 随机结果
- type RandomResult struct {
- TotalCount uint64
- Hits []ElasticHit
- }
- // GetPageFromElastic 从elastic中读单面数据
- func GetPageFromElastic(param *ElasticPageParam) (result PageResult, err error) {
- // 参数检查
- if client == nil || param == nil {
- return result, fmt.Errorf("client or param is nil.")
- }
- if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
- return result, fmt.Errorf("Invalid es params. %v", param)
- }
- // 支持多个索引和type查询
- searchServer := client.Search()
- if len(param.Indexs) > 0 {
- searchServer = searchServer.Index(param.Indexs...)
- }
- if len(param.Types) > 0 {
- searchServer = searchServer.Type(param.Types...)
- }
- // 解析top
- topTime, topSameNum, topExtra, err := SplitTopBottomData(param.Top)
- if err != nil {
- return result, err
- }
- // 解析bottom
- botTime, botSameNum, botExtra, err := SplitTopBottomData(param.Bottom)
- if err != nil {
- return result, err
- }
- // 默认为"timestamp"
- if param.BaseFilter == "" {
- param.BaseFilter = "timestamp"
- }
- if len(param.Includes) > 0 || len(param.Excludes) > 0 {
- fetchSourceContext := elastic.NewFetchSourceContext(true)
- if len(param.Includes) > 0 {
- fetchSourceContext.Include(param.Includes...)
- }
- if len(param.Excludes) > 0 {
- fetchSourceContext.Exclude(param.Excludes...)
- }
- searchServer = searchServer.FetchSourceContext(fetchSourceContext)
- }
- // 组装match条件
- // 注意: Match 与 Query 互斥
- boolQuery := elastic.NewBoolQuery()
- if len(param.Match) == 0 {
- if param.Query != nil {
- boolQuery = boolQuery.Must(param.Query)
- } else {
- boolQuery = boolQuery.Must(elastic.NewMatchAllQuery())
- }
- } else {
- for key, value := range param.Match {
- boolQuery = boolQuery.Must(elastic.NewMatchQuery(key, value))
- }
- }
- // 按具体字段排序
- if len(param.Sorts) > 0 {
- for _, s := range param.Sorts {
- searchServer = searchServer.Sort(s.Field, s.Asc)
- }
- }
- // 位置排序
- if param.GeoSort != nil {
- searchServer = searchServer.SortBy(param.GeoSort)
- }
- topRemove := true
- removeCount := uint32(0)
- count := uint32(0)
- positiveFlag := true
- switch param.Slide {
- case slideDown:
- switch param.SequenceType {
- case positive:
- // 正序向下滑 timestamp >=0
- if topTime == 0 {
- // timestamp >= 0 正序排列
- if param.ExtraFilter != "" {
- searchServer = searchServer.Sort(param.ExtraFilter, true)
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0))
- }
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0))
- searchServer = searchServer.Sort(param.BaseFilter, true)
- } else {
- // timestamp <= top 倒序排列
- if param.ExtraFilter != "" {
- searchServer = searchServer.Sort(param.ExtraFilter, false)
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Lte(topExtra))
- }
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Lte(topTime))
- searchServer = searchServer.Sort(param.BaseFilter, false)
- positiveFlag = false
- }
- default:
- // 倒序向下滑
- if topTime == 0 {
- // timestamp >= 0 倒序排列
- if param.ExtraFilter != "" {
- searchServer = searchServer.Sort(param.ExtraFilter, false)
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0))
- }
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0))
- searchServer = searchServer.Sort(param.BaseFilter, false)
- } else {
- // timestamp >= top 倒序排列
- if param.ExtraFilter != "" {
- searchServer = searchServer.Sort(param.ExtraFilter, false)
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(topExtra))
- }
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(topTime))
- searchServer = searchServer.Sort(param.BaseFilter, false)
- }
- }
- // 删除的数量
- removeCount = uint32(topSameNum)
- // 需要拉取的条数
- count = removeCount + param.Size
- // 去掉头部相同数据
- topRemove = false
- default:
- switch param.SequenceType {
- case positive:
- // 正序向上滑
- if botTime == 0 {
- // timestamp >= 0 正序排序
- if param.ExtraFilter != "" {
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0))
- searchServer = searchServer.Sort(param.ExtraFilter, true)
- }
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0))
- searchServer = searchServer.Sort(param.BaseFilter, true)
- } else {
- // timestamp >= bot 正序排序
- if param.ExtraFilter != "" {
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(botExtra))
- searchServer = searchServer.Sort(param.ExtraFilter, true)
- }
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(botTime))
- searchServer = searchServer.Sort(param.BaseFilter, true)
- }
- default:
- // 倒序向上滑
- if botTime == 0 {
- // timestamp >= 0 倒序排序
- if param.ExtraFilter != "" {
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0))
- searchServer = searchServer.Sort(param.ExtraFilter, false)
- }
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0))
- searchServer = searchServer.Sort(param.BaseFilter, false)
- } else {
- // timestamp <= bot 倒序排序
- if param.ExtraFilter != "" {
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Lte(botExtra))
- searchServer = searchServer.Sort(param.ExtraFilter, false)
- }
- boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Lte(botTime))
- searchServer = searchServer.Sort(param.BaseFilter, false)
- }
- }
- // 删除的数量
- removeCount = uint32(botSameNum)
- // 需要拉取的条数
- count = removeCount + param.Size
- }
- // 执行搜索
- searchServer = searchServer.From(0).Size(int(count))
- res, err := searchServer.Query(boolQuery).Do(context.Background())
- if err != nil {
- return result, err
- }
- result.Hits = make([]ElasticHit, 0, len(res.Hits.Hits))
- if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 {
- result.HasMore = res.Hits.TotalHits > int64(count)
- if res.Hits.TotalHits > int64(removeCount) {
- result.TotalCount = uint64(res.Hits.TotalHits - int64(removeCount))
- }
- for _, hit := range res.Hits.Hits {
- if hit.Source != nil {
- result.Hits = append(result.Hits, ElasticHit{
- ID: hit.Id,
- Index: hit.Index,
- Type: hit.Type,
- Base: gjson.GetBytes(*hit.Source, param.BaseFilter).Int(),
- Extra: gjson.GetBytes(*hit.Source, param.ExtraFilter).Int(),
- Source: string(*hit.Source),
- })
- }
- }
- }
- // 计算top与bottom
- length := len(result.Hits)
- if length > 0 {
- // 检查 删除数据比查找到的数据要多直接返回空
- if length <= int(removeCount) {
- return PageResult{}, nil
- }
- // 倒序
- if !positiveFlag {
- //需要倒序结果 对其数据进行正序
- for i := 0; i < length/2; i++ {
- result.Hits[i], result.Hits[length-1-i] = result.Hits[length-1-i], result.Hits[i]
- }
- }
- topBase := result.Hits[0].Base
- topExtra := result.Hits[0].Extra
- botBase := result.Hits[length-1].Base
- botExtra := result.Hits[length-1].Extra
- // 去掉上一次访问过的数据
- if topRemove {
- // 去掉头部
- result.Hits = result.Hits[removeCount:]
- } else {
- // 去掉尾部
- result.Hits = result.Hits[:length-int(removeCount)]
- }
- // 对重复数据删除后的处理时需要重新计算length与topbase/botbase等, 否则会可能内存越界
- length = len(result.Hits)
- if length > 0 {
- topBase = result.Hits[0].Base
- topExtra = result.Hits[0].Extra
- botBase = result.Hits[length-1].Base
- botExtra = result.Hits[length-1].Extra
- }
- topNum := int64(0)
- bottomNum := int64(0)
- for _, item := range result.Hits {
- if item.Base == topBase {
- topNum += 1
- } else {
- break
- }
- }
- for i := length - 1; i >= 0; i -= 1 {
- if result.Hits[i].Base == botBase {
- bottomNum += 1
- } else {
- break
- }
- }
- result.Top = JoinTopBottomData(topBase, topNum, topExtra)
- result.Bottom = JoinTopBottomData(botBase, bottomNum, botExtra)
- }
- return result, nil
- }
- // GetPageFromElasticByIndex 索引取分页,类似于mysql的 from to 分页
- func GetPageFromElasticByIndex(param *ElasticPageParam) (result PageResult, err error) {
- // 参数检查
- if client == nil || param == nil {
- return result, fmt.Errorf("client or param is nil.")
- }
- if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
- return result, fmt.Errorf("Invalid es params. %v", param)
- }
- // 支持多个索引和type查询
- searchServer := client.Search()
- if len(param.Indexs) > 0 {
- searchServer = searchServer.Index(param.Indexs...)
- }
- if len(param.Types) > 0 {
- searchServer = searchServer.Type(param.Types...)
- }
- // 组装match条件
- // 注意: Match 与 Query 互斥
- boolQuery := elastic.NewBoolQuery()
- if len(param.Match) == 0 {
- if param.Query != nil {
- boolQuery = boolQuery.Must(param.Query)
- } else {
- boolQuery = boolQuery.Must(elastic.NewMatchAllQuery())
- }
- } else {
- for key, value := range param.Match {
- boolQuery = boolQuery.Must(elastic.NewMatchQuery(key, value))
- }
- }
- if len(param.Sorts) > 0 {
- for _, s := range param.Sorts {
- searchServer = searchServer.Sort(s.Field, s.Asc)
- }
- }
- if param.GeoSort != nil {
- searchServer = searchServer.SortBy(param.GeoSort)
- }
- // 解析top
- from, _, _, err := SplitTopBottomData(param.Bottom)
- if err != nil {
- return result, err
- }
- searchServer = searchServer.From(int(from)).Size(int(param.Size))
- res, err := searchServer.Query(boolQuery).Do(context.Background())
- if err != nil {
- return result, err
- }
- result.Hits = make([]ElasticHit, 0, len(res.Hits.Hits))
- if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 {
- result.HasMore = res.Hits.TotalHits > int64(param.Size)
- result.TotalCount = uint64(res.Hits.TotalHits)
- for _, hit := range res.Hits.Hits {
- if hit.Source != nil {
- result.Hits = append(result.Hits, ElasticHit{
- ID: hit.Id,
- Index: hit.Index,
- Type: hit.Type,
- Source: string(*hit.Source),
- })
- }
- }
- }
- // 计算top与bottom
- len := int64(len(result.Hits))
- if len > 0 {
- result.Top = JoinTopBottomData(from, 0, 0)
- result.Bottom = JoinTopBottomData(from+len, 0, 0)
- }
- return result, nil
- }
- // GetSumFromElastic 取和函数
- func GetSumFromElastic(param *ElasticSumParam) (map[string]int64, error) {
- // 参数检查
- if client == nil || param == nil {
- return nil, fmt.Errorf("client or param is nil.")
- }
- if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
- return nil, fmt.Errorf("Invalid es params. %v", param)
- }
- // 支持多个索引和type查询
- searchServer := client.Search()
- if len(param.Indexs) > 0 {
- searchServer = searchServer.Index(param.Indexs...)
- }
- if len(param.Types) > 0 {
- searchServer = searchServer.Type(param.Types...)
- }
- // 组装要汇总的参数字段
- for key, field := range param.AggFields {
- agg := elastic.NewSumAggregation().Field(field)
- searchServer = searchServer.Aggregation(key, agg)
- }
- // 开始查询
- res, err := searchServer.Query(param.Query).Do(context.Background())
- if err != nil {
- return nil, err
- }
- // 取出结果
- result := make(map[string]int64, len(param.AggFields))
- if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 {
- agg := res.Aggregations
- if agg != nil {
- for k, v := range agg {
- result[k] = gjson.GetBytes(*v, "value").Int()
- }
- }
- }
- return result, nil
- }
- // GetCountFromElastic 计数函数
- func GetCountFromElastic(param *ElasticCountParam) (int64, error) {
- // 参数检查
- if client == nil || param == nil {
- return 0, fmt.Errorf("client or param is nil.")
- }
- if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
- return 0, fmt.Errorf("Invalid es params. %v", param)
- }
- return client.Count(param.Indexs...).Type(param.Types...).Query(param.Query).Do(context.TODO())
- }
- // GGetRandomFromElastic 随机取多条数据函数
- func GetRandomFromElastic(param *ElasticRandomParam) (result RandomResult, err error) {
- // 参数检查
- if client == nil || param == nil {
- return result, fmt.Errorf("client or param is nil.")
- }
- if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
- return result, fmt.Errorf("Invalid es params. %v", param)
- }
- // 支持多个索引和type查询
- searchServer := client.Search()
- if len(param.Indexs) > 0 {
- searchServer = searchServer.Index(param.Indexs...)
- }
- if len(param.Types) > 0 {
- searchServer = searchServer.Type(param.Types...)
- }
- if param.GeoSort != nil {
- searchServer = searchServer.SortBy(param.GeoSort)
- }
- searchServer = searchServer.Size(int(param.Size))
- res, err := searchServer.Query(param.Query).Do(context.Background())
- if err != nil {
- return result, err
- }
- result.Hits = make([]ElasticHit, 0, len(res.Hits.Hits))
- if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 {
- result.TotalCount = uint64(res.Hits.TotalHits)
- for _, hit := range res.Hits.Hits {
- if hit.Source != nil {
- result.Hits = append(result.Hits, ElasticHit{
- ID: hit.Id,
- Index: hit.Index,
- Type: hit.Type,
- Source: string(*hit.Source),
- })
- }
- }
- }
- return result, nil
- }
|