page.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package elastic
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "github.com/tidwall/gjson"
  11. elastic "gopkg.in/olivere/elastic.v5"
  12. )
  13. const (
  14. // 分割符
  15. delimiter = ","
  16. // 分页常量
  17. slideUp = 1 // 向上滑动
  18. slideDown = 2 // 向下滑动
  19. positive = 1 // 正序
  20. reverse = 2 // 倒序
  21. )
  22. // SplitTopBottomData 分割分页的top/bottom
  23. func SplitTopBottomData(param string) (base, num, extra int64, err error) {
  24. if param == "" {
  25. return 0.0, 0, 0.0, nil
  26. }
  27. mems := strings.Split(param, delimiter)
  28. switch {
  29. case len(mems) == 0:
  30. return 0.0, 0, 0.0, errors.New("member is not enough")
  31. case len(mems) == 1:
  32. base, _ = strconv.ParseInt(mems[0], 10, 64)
  33. case len(mems) == 2:
  34. if base, err = strconv.ParseInt(mems[0], 10, 64); err != nil {
  35. return 0.0, 0, 0.0, err
  36. } else {
  37. num, _ = strconv.ParseInt(mems[1], 10, 64)
  38. }
  39. case len(mems) >= 3:
  40. if base, err = strconv.ParseInt(mems[0], 10, 64); err != nil {
  41. return 0.0, 0, 0.0, err
  42. } else {
  43. num, _ = strconv.ParseInt(mems[1], 10, 64)
  44. extra, _ = strconv.ParseInt(mems[2], 10, 64)
  45. }
  46. default:
  47. return 0.0, 0, 0.0, nil
  48. }
  49. return
  50. }
  51. // JoinTopBottomData 组装分页的top/bottom
  52. func JoinTopBottomData(base, num, extra int64) string {
  53. return fmt.Sprintf("%d%s%d%s%d", base, delimiter, num, delimiter, extra)
  54. }
  55. // ElasticSort 排序参数结构
  56. type ElasticSort struct {
  57. Field string
  58. Asc bool
  59. }
  60. // ElasticPageParam 分页参数
  61. type ElasticPageParam struct {
  62. Indexs []string
  63. Types []string
  64. Match map[string]string
  65. Query elastic.Query
  66. Sorts []ElasticSort
  67. GeoSort *elastic.GeoDistanceSort
  68. Size uint32 // 单次拉取条数
  69. Slide uint32 // 滑动方向 1: 向上滑动 2:向下滑动
  70. Top string // 页首数据
  71. Bottom string // 页尾数据
  72. SequenceType uint32 // 1:正序 2:反序
  73. BaseFilter string // 基础过滤域, 默认为"timestamp"
  74. ExtraFilter string // 附加过滤域
  75. Includes []string // includes, 要获取的字段名列表
  76. Excludes []string // excludes,不获取的字段名列表
  77. }
  78. // ElasticCountParam 计数参数
  79. type ElasticCountParam struct {
  80. Indexs []string
  81. Types []string
  82. Match map[string]string
  83. Query elastic.Query
  84. }
  85. // ElasticSumParam 取和参数
  86. type ElasticSumParam struct {
  87. Indexs []string
  88. Types []string
  89. Match map[string]string
  90. Query elastic.Query
  91. AggFields map[string]string
  92. }
  93. // ElasticRandomParam 随机取多条数据
  94. type ElasticRandomParam struct {
  95. Indexs []string
  96. Types []string
  97. Query elastic.Query
  98. GeoSort *elastic.GeoDistanceSort
  99. Size uint32 // 单次拉取条数
  100. BaseFilter string // 基础过滤域, 默认为"timestamp"
  101. ExtraFilter string // 附加过滤域
  102. OtherKeys []string // 需要的其它key, 如target
  103. }
  104. // ElasticHit 结果参数
  105. type ElasticHit struct {
  106. Index string
  107. Type string
  108. ID string
  109. Base int64
  110. Extra int64
  111. Source string
  112. }
  113. // PageResult 分页结果
  114. type PageResult struct {
  115. Top string
  116. Bottom string
  117. HasMore bool
  118. TotalCount uint64
  119. Hits []ElasticHit
  120. }
  121. // RandomResult 随机结果
  122. type RandomResult struct {
  123. TotalCount uint64
  124. Hits []ElasticHit
  125. }
  126. // GetPageFromElastic 从elastic中读单面数据
  127. func GetPageFromElastic(param *ElasticPageParam) (result PageResult, err error) {
  128. // 参数检查
  129. if client == nil || param == nil {
  130. return result, fmt.Errorf("client or param is nil.")
  131. }
  132. if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
  133. return result, fmt.Errorf("Invalid es params. %v", param)
  134. }
  135. // 支持多个索引和type查询
  136. searchServer := client.Search()
  137. if len(param.Indexs) > 0 {
  138. searchServer = searchServer.Index(param.Indexs...)
  139. }
  140. if len(param.Types) > 0 {
  141. searchServer = searchServer.Type(param.Types...)
  142. }
  143. // 解析top
  144. topTime, topSameNum, topExtra, err := SplitTopBottomData(param.Top)
  145. if err != nil {
  146. return result, err
  147. }
  148. // 解析bottom
  149. botTime, botSameNum, botExtra, err := SplitTopBottomData(param.Bottom)
  150. if err != nil {
  151. return result, err
  152. }
  153. // 默认为"timestamp"
  154. if param.BaseFilter == "" {
  155. param.BaseFilter = "timestamp"
  156. }
  157. if len(param.Includes) > 0 || len(param.Excludes) > 0 {
  158. fetchSourceContext := elastic.NewFetchSourceContext(true)
  159. if len(param.Includes) > 0 {
  160. fetchSourceContext.Include(param.Includes...)
  161. }
  162. if len(param.Excludes) > 0 {
  163. fetchSourceContext.Exclude(param.Excludes...)
  164. }
  165. searchServer = searchServer.FetchSourceContext(fetchSourceContext)
  166. }
  167. // 组装match条件
  168. // 注意: Match 与 Query 互斥
  169. boolQuery := elastic.NewBoolQuery()
  170. if len(param.Match) == 0 {
  171. if param.Query != nil {
  172. boolQuery = boolQuery.Must(param.Query)
  173. } else {
  174. boolQuery = boolQuery.Must(elastic.NewMatchAllQuery())
  175. }
  176. } else {
  177. for key, value := range param.Match {
  178. boolQuery = boolQuery.Must(elastic.NewMatchQuery(key, value))
  179. }
  180. }
  181. // 按具体字段排序
  182. if len(param.Sorts) > 0 {
  183. for _, s := range param.Sorts {
  184. searchServer = searchServer.Sort(s.Field, s.Asc)
  185. }
  186. }
  187. // 位置排序
  188. if param.GeoSort != nil {
  189. searchServer = searchServer.SortBy(param.GeoSort)
  190. }
  191. topRemove := true
  192. removeCount := uint32(0)
  193. count := uint32(0)
  194. positiveFlag := true
  195. switch param.Slide {
  196. case slideDown:
  197. switch param.SequenceType {
  198. case positive:
  199. // 正序向下滑 timestamp >=0
  200. if topTime == 0 {
  201. // timestamp >= 0 正序排列
  202. if param.ExtraFilter != "" {
  203. searchServer = searchServer.Sort(param.ExtraFilter, true)
  204. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0))
  205. }
  206. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0))
  207. searchServer = searchServer.Sort(param.BaseFilter, true)
  208. } else {
  209. // timestamp <= top 倒序排列
  210. if param.ExtraFilter != "" {
  211. searchServer = searchServer.Sort(param.ExtraFilter, false)
  212. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Lte(topExtra))
  213. }
  214. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Lte(topTime))
  215. searchServer = searchServer.Sort(param.BaseFilter, false)
  216. positiveFlag = false
  217. }
  218. default:
  219. // 倒序向下滑
  220. if topTime == 0 {
  221. // timestamp >= 0 倒序排列
  222. if param.ExtraFilter != "" {
  223. searchServer = searchServer.Sort(param.ExtraFilter, false)
  224. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0))
  225. }
  226. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0))
  227. searchServer = searchServer.Sort(param.BaseFilter, false)
  228. } else {
  229. // timestamp >= top 倒序排列
  230. if param.ExtraFilter != "" {
  231. searchServer = searchServer.Sort(param.ExtraFilter, false)
  232. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(topExtra))
  233. }
  234. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(topTime))
  235. searchServer = searchServer.Sort(param.BaseFilter, false)
  236. }
  237. }
  238. // 删除的数量
  239. removeCount = uint32(topSameNum)
  240. // 需要拉取的条数
  241. count = removeCount + param.Size
  242. // 去掉头部相同数据
  243. topRemove = false
  244. default:
  245. switch param.SequenceType {
  246. case positive:
  247. // 正序向上滑
  248. if botTime == 0 {
  249. // timestamp >= 0 正序排序
  250. if param.ExtraFilter != "" {
  251. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0))
  252. searchServer = searchServer.Sort(param.ExtraFilter, true)
  253. }
  254. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0))
  255. searchServer = searchServer.Sort(param.BaseFilter, true)
  256. } else {
  257. // timestamp >= bot 正序排序
  258. if param.ExtraFilter != "" {
  259. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(botExtra))
  260. searchServer = searchServer.Sort(param.ExtraFilter, true)
  261. }
  262. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(botTime))
  263. searchServer = searchServer.Sort(param.BaseFilter, true)
  264. }
  265. default:
  266. // 倒序向上滑
  267. if botTime == 0 {
  268. // timestamp >= 0 倒序排序
  269. if param.ExtraFilter != "" {
  270. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Gte(0))
  271. searchServer = searchServer.Sort(param.ExtraFilter, false)
  272. }
  273. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Gte(0))
  274. searchServer = searchServer.Sort(param.BaseFilter, false)
  275. } else {
  276. // timestamp <= bot 倒序排序
  277. if param.ExtraFilter != "" {
  278. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.ExtraFilter).Lte(botExtra))
  279. searchServer = searchServer.Sort(param.ExtraFilter, false)
  280. }
  281. boolQuery = boolQuery.Filter(elastic.NewRangeQuery(param.BaseFilter).Lte(botTime))
  282. searchServer = searchServer.Sort(param.BaseFilter, false)
  283. }
  284. }
  285. // 删除的数量
  286. removeCount = uint32(botSameNum)
  287. // 需要拉取的条数
  288. count = removeCount + param.Size
  289. }
  290. // 执行搜索
  291. searchServer = searchServer.From(0).Size(int(count))
  292. res, err := searchServer.Query(boolQuery).Do(context.Background())
  293. if err != nil {
  294. return result, err
  295. }
  296. result.Hits = make([]ElasticHit, 0, len(res.Hits.Hits))
  297. if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 {
  298. result.HasMore = res.Hits.TotalHits > int64(count)
  299. if res.Hits.TotalHits > int64(removeCount) {
  300. result.TotalCount = uint64(res.Hits.TotalHits - int64(removeCount))
  301. }
  302. for _, hit := range res.Hits.Hits {
  303. if hit.Source != nil {
  304. result.Hits = append(result.Hits, ElasticHit{
  305. ID: hit.Id,
  306. Index: hit.Index,
  307. Type: hit.Type,
  308. Base: gjson.GetBytes(*hit.Source, param.BaseFilter).Int(),
  309. Extra: gjson.GetBytes(*hit.Source, param.ExtraFilter).Int(),
  310. Source: string(*hit.Source),
  311. })
  312. }
  313. }
  314. }
  315. // 计算top与bottom
  316. length := len(result.Hits)
  317. if length > 0 {
  318. // 检查 删除数据比查找到的数据要多直接返回空
  319. if length <= int(removeCount) {
  320. return PageResult{}, nil
  321. }
  322. // 倒序
  323. if !positiveFlag {
  324. //需要倒序结果 对其数据进行正序
  325. for i := 0; i < length/2; i++ {
  326. result.Hits[i], result.Hits[length-1-i] = result.Hits[length-1-i], result.Hits[i]
  327. }
  328. }
  329. topBase := result.Hits[0].Base
  330. topExtra := result.Hits[0].Extra
  331. botBase := result.Hits[length-1].Base
  332. botExtra := result.Hits[length-1].Extra
  333. // 去掉上一次访问过的数据
  334. if topRemove {
  335. // 去掉头部
  336. result.Hits = result.Hits[removeCount:]
  337. } else {
  338. // 去掉尾部
  339. result.Hits = result.Hits[:length-int(removeCount)]
  340. }
  341. // 对重复数据删除后的处理时需要重新计算length与topbase/botbase等, 否则会可能内存越界
  342. length = len(result.Hits)
  343. if length > 0 {
  344. topBase = result.Hits[0].Base
  345. topExtra = result.Hits[0].Extra
  346. botBase = result.Hits[length-1].Base
  347. botExtra = result.Hits[length-1].Extra
  348. }
  349. topNum := int64(0)
  350. bottomNum := int64(0)
  351. for _, item := range result.Hits {
  352. if item.Base == topBase {
  353. topNum += 1
  354. } else {
  355. break
  356. }
  357. }
  358. for i := length - 1; i >= 0; i -= 1 {
  359. if result.Hits[i].Base == botBase {
  360. bottomNum += 1
  361. } else {
  362. break
  363. }
  364. }
  365. result.Top = JoinTopBottomData(topBase, topNum, topExtra)
  366. result.Bottom = JoinTopBottomData(botBase, bottomNum, botExtra)
  367. }
  368. return result, nil
  369. }
  370. // GetPageFromElasticByIndex 索引取分页,类似于mysql的 from to 分页
  371. func GetPageFromElasticByIndex(param *ElasticPageParam) (result PageResult, err error) {
  372. // 参数检查
  373. if client == nil || param == nil {
  374. return result, fmt.Errorf("client or param is nil.")
  375. }
  376. if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
  377. return result, fmt.Errorf("Invalid es params. %v", param)
  378. }
  379. // 支持多个索引和type查询
  380. searchServer := client.Search()
  381. if len(param.Indexs) > 0 {
  382. searchServer = searchServer.Index(param.Indexs...)
  383. }
  384. if len(param.Types) > 0 {
  385. searchServer = searchServer.Type(param.Types...)
  386. }
  387. // 组装match条件
  388. // 注意: Match 与 Query 互斥
  389. boolQuery := elastic.NewBoolQuery()
  390. if len(param.Match) == 0 {
  391. if param.Query != nil {
  392. boolQuery = boolQuery.Must(param.Query)
  393. } else {
  394. boolQuery = boolQuery.Must(elastic.NewMatchAllQuery())
  395. }
  396. } else {
  397. for key, value := range param.Match {
  398. boolQuery = boolQuery.Must(elastic.NewMatchQuery(key, value))
  399. }
  400. }
  401. if len(param.Sorts) > 0 {
  402. for _, s := range param.Sorts {
  403. searchServer = searchServer.Sort(s.Field, s.Asc)
  404. }
  405. }
  406. if param.GeoSort != nil {
  407. searchServer = searchServer.SortBy(param.GeoSort)
  408. }
  409. // 解析top
  410. from, _, _, err := SplitTopBottomData(param.Bottom)
  411. if err != nil {
  412. return result, err
  413. }
  414. searchServer = searchServer.From(int(from)).Size(int(param.Size))
  415. res, err := searchServer.Query(boolQuery).Do(context.Background())
  416. if err != nil {
  417. return result, err
  418. }
  419. result.Hits = make([]ElasticHit, 0, len(res.Hits.Hits))
  420. if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 {
  421. result.HasMore = res.Hits.TotalHits > int64(param.Size)
  422. result.TotalCount = uint64(res.Hits.TotalHits)
  423. for _, hit := range res.Hits.Hits {
  424. if hit.Source != nil {
  425. result.Hits = append(result.Hits, ElasticHit{
  426. ID: hit.Id,
  427. Index: hit.Index,
  428. Type: hit.Type,
  429. Source: string(*hit.Source),
  430. })
  431. }
  432. }
  433. }
  434. // 计算top与bottom
  435. len := int64(len(result.Hits))
  436. if len > 0 {
  437. result.Top = JoinTopBottomData(from, 0, 0)
  438. result.Bottom = JoinTopBottomData(from+len, 0, 0)
  439. }
  440. return result, nil
  441. }
  442. // GetSumFromElastic 取和函数
  443. func GetSumFromElastic(param *ElasticSumParam) (map[string]int64, error) {
  444. // 参数检查
  445. if client == nil || param == nil {
  446. return nil, fmt.Errorf("client or param is nil.")
  447. }
  448. if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
  449. return nil, fmt.Errorf("Invalid es params. %v", param)
  450. }
  451. // 支持多个索引和type查询
  452. searchServer := client.Search()
  453. if len(param.Indexs) > 0 {
  454. searchServer = searchServer.Index(param.Indexs...)
  455. }
  456. if len(param.Types) > 0 {
  457. searchServer = searchServer.Type(param.Types...)
  458. }
  459. // 组装要汇总的参数字段
  460. for key, field := range param.AggFields {
  461. agg := elastic.NewSumAggregation().Field(field)
  462. searchServer = searchServer.Aggregation(key, agg)
  463. }
  464. // 开始查询
  465. res, err := searchServer.Query(param.Query).Do(context.Background())
  466. if err != nil {
  467. return nil, err
  468. }
  469. // 取出结果
  470. result := make(map[string]int64, len(param.AggFields))
  471. if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 {
  472. agg := res.Aggregations
  473. if agg != nil {
  474. for k, v := range agg {
  475. result[k] = gjson.GetBytes(*v, "value").Int()
  476. }
  477. }
  478. }
  479. return result, nil
  480. }
  481. // GetCountFromElastic 计数函数
  482. func GetCountFromElastic(param *ElasticCountParam) (int64, error) {
  483. // 参数检查
  484. if client == nil || param == nil {
  485. return 0, fmt.Errorf("client or param is nil.")
  486. }
  487. if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
  488. return 0, fmt.Errorf("Invalid es params. %v", param)
  489. }
  490. return client.Count(param.Indexs...).Type(param.Types...).Query(param.Query).Do(context.TODO())
  491. }
  492. // GGetRandomFromElastic 随机取多条数据函数
  493. func GetRandomFromElastic(param *ElasticRandomParam) (result RandomResult, err error) {
  494. // 参数检查
  495. if client == nil || param == nil {
  496. return result, fmt.Errorf("client or param is nil.")
  497. }
  498. if len(param.Indexs) <= 0 || len(param.Types) <= 0 {
  499. return result, fmt.Errorf("Invalid es params. %v", param)
  500. }
  501. // 支持多个索引和type查询
  502. searchServer := client.Search()
  503. if len(param.Indexs) > 0 {
  504. searchServer = searchServer.Index(param.Indexs...)
  505. }
  506. if len(param.Types) > 0 {
  507. searchServer = searchServer.Type(param.Types...)
  508. }
  509. if param.GeoSort != nil {
  510. searchServer = searchServer.SortBy(param.GeoSort)
  511. }
  512. searchServer = searchServer.Size(int(param.Size))
  513. res, err := searchServer.Query(param.Query).Do(context.Background())
  514. if err != nil {
  515. return result, err
  516. }
  517. result.Hits = make([]ElasticHit, 0, len(res.Hits.Hits))
  518. if res != nil && res.Hits != nil && len(res.Hits.Hits) > 0 {
  519. result.TotalCount = uint64(res.Hits.TotalHits)
  520. for _, hit := range res.Hits.Hits {
  521. if hit.Source != nil {
  522. result.Hits = append(result.Hits, ElasticHit{
  523. ID: hit.Id,
  524. Index: hit.Index,
  525. Type: hit.Type,
  526. Source: string(*hit.Source),
  527. })
  528. }
  529. }
  530. }
  531. return result, nil
  532. }