Skip to content

Commit e02eb08

Browse files
Merge pull request #85 from openobserve/topk-documentation
documented approx_topk and approx_topk_distinct
2 parents 2fa3a1a + b78b0fa commit e02eb08

File tree

2 files changed

+274
-3
lines changed

2 files changed

+274
-3
lines changed

docs/sql_reference.md

Lines changed: 273 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,277 @@ Each row in the result shows:
332332
!!! note
333333
To avoid unexpected bucket sizes based on internal defaults, always specify the bucket duration explicitly using units.
334334

335-
!!! info "Datafusion SQL reference"
336-
OpenObserve uses [Apache DataFusion](https://datafusion.apache.org/user-guide/sql/index.html) as its query engine. All supported SQL syntax and functions are available through DataFusion.
335+
---
336+
337+
### `approx_topk(field, k)`
338+
339+
**Description:**
340+
341+
- Returns the top `K` most frequent values for a specified field using the **Space-Saving algorithm** optimized for high-cardinality data.
342+
- Results are approximate due to distributed processing. Globally significant values may be missed if they do not appear in enough partitions' local top-K results.
343+
344+
**Example:**
345+
```sql
346+
SELECT approx_topk(clientip, 10) FROM "default"
347+
```
348+
It returns the `10` most frequently occurring client IP addresses from the `default` stream.
349+
??? info "The Space-Saving Algorithm Explained:"
350+
The Space-Saving algorithm enables efficient top-K queries on high-cardinality data by limiting memory usage during distributed query execution. This approach trades exact precision for system stability and performance. <br>
351+
**Problem Statement** <br>
352+
353+
Traditional GROUP BY operations on high-cardinality fields can cause memory exhaustion in distributed systems. Consider this query:
354+
355+
```sql
356+
SELECT clientip, count(*) as cnt
357+
FROM default
358+
GROUP BY clientip
359+
ORDER BY cnt DESC
360+
LIMIT 10
361+
```
362+
363+
**Challenges:**
364+
365+
- Dataset contains 3 million unique client IP addresses
366+
- Query executes across 60 CPU cores with 60 data partitions
367+
- Each core maintains hash tables during aggregation across all partitions
368+
- Memory usage: 3M entries × 60 cores × 60 partitions = 10.8 billion hash table entries
369+
370+
**Typical Error Message:**
371+
```
372+
Resources exhausted: Failed to allocate additional 63232256 bytes for GroupedHashAggregateStream[20] with 0 bytes already allocated for this reservation - 51510301 bytes remain available for the total pool
373+
```
374+
375+
**Solution: Space-Saving Mechanism** <br>
376+
377+
```sql
378+
SELECT approx_topk(clientip, 10) FROM default
379+
```
380+
381+
Instead of returning all unique values from each partition, each partition returns only its top 10 results. The leader node then aggregates these partial results to compute the final top 10.
382+
383+
**Example: Web Server Log Analysis** <br>
384+
385+
**Scenario** <br>
386+
Find the top 10 client IPs by request count from web server logs distributed across 3 follower query nodes.
387+
388+
**Raw Data Distribution** <br>
389+
390+
| Rank | Node 1 | Requests | Node 2 | Requests | Node 3 | Requests |
391+
|------|---------|----------|---------|----------|---------|----------|
392+
| 1 | 192.168.1.100 | 850 | 192.168.1.100 | 920 | 192.168.1.100 | 880 |
393+
| 2 | 10.0.0.5 | 720 | 203.0.113.50 | 780 | 10.0.0.5 | 810 |
394+
| 3 | 203.0.113.50 | 680 | 198.51.100.75 | 740 | 203.0.113.50 | 750 |
395+
| 4 | 172.16.0.10 | 620 | 10.0.0.5 | 700 | 198.51.100.75 | 690 |
396+
| 5 | 192.168.1.200 | 580 | 172.16.0.10 | 660 | 172.16.0.10 | 650 |
397+
| 6 | 198.51.100.75 | 540 | 192.168.1.200 | 640 | 192.168.1.200 | 610 |
398+
| 7 | 10.0.0.25 | 500 | 203.0.113.80 | 600 | 203.0.113.80 | 570 |
399+
| 8 | 172.16.0.30 | 480 | 172.16.0.30 | 580 | 10.0.0.25 | 530 |
400+
| 9 | 203.0.113.80 | 460 | 10.0.0.25 | 560 | 172.16.0.30 | 490 |
401+
| 10 | 192.168.1.150 | 440 | 192.168.1.150 | 520 | 192.168.1.150 | 450 |
402+
403+
404+
**Follower Query Nodes Process Data** <br>
405+
406+
Each follower node executes the query locally and returns only its top 10 results:
407+
408+
| Rank | Node 1 → Leader | Requests | Node 2 → Leader | Requests | Node 3 → Leader | Requests |
409+
|------|-----------------|----------|-----------------|----------|-----------------|----------|
410+
| 1 | 192.168.1.100 | 850 | 192.168.1.100 | 920 | 192.168.1.100 | 880 |
411+
| 2 | 10.0.0.5 | 720 | 203.0.113.50 | 780 | 10.0.0.5 | 810 |
412+
| 3 | 203.0.113.50 | 680 | 198.51.100.75 | 740 | 203.0.113.50 | 750 |
413+
| 4 | 172.16.0.10 | 620 | 10.0.0.5 | 700 | 198.51.100.75 | 690 |
414+
| 5 | 192.168.1.200 | 580 | 172.16.0.10 | 660 | 172.16.0.10 | 650 |
415+
| 6 | 198.51.100.75 | 540 | 192.168.1.200 | 640 | 192.168.1.200 | 610 |
416+
| 7 | 10.0.0.25 | 500 | 203.0.113.80 | 600 | 203.0.113.80 | 570 |
417+
| 8 | 172.16.0.30 | 480 | 172.16.0.30 | 580 | 10.0.0.25 | 530 |
418+
| 9 | 203.0.113.80 | 460 | 10.0.0.25 | 560 | 172.16.0.30 | 490 |
419+
| 10 | 192.168.1.150 | 440 | 192.168.1.150 | 520 | 192.168.1.150 | 450 |
420+
421+
**Leader Query Node Aggregates Results** <br>
422+
423+
| Client IP | Node 1 | Node 2 | Node 3 | Total Requests |
424+
|-----------|---------|---------|---------|----------------|
425+
| 192.168.1.100 | 850 | 920 | 880 | **2,650** |
426+
| 10.0.0.5 | 720 | 700 | 810 | **2,230** |
427+
| 203.0.113.50 | 680 | 780 | 750 | **2,210** |
428+
| 198.51.100.75 | 540 | 740 | 690 | **1,970** |
429+
| 172.16.0.10 | 620 | 660 | 650 | **1,930** |
430+
| 192.168.1.200 | 580 | 640 | 610 | **1,830** |
431+
| 203.0.113.80 | 460 | 600 | 570 | **1,630** |
432+
| 10.0.0.25 | 500 | 560 | 530 | **1,590** |
433+
| 172.16.0.30 | 480 | 580 | 490 | **1,550** |
434+
| 192.168.1.150 | 440 | 520 | 450 | **1,410** |
435+
436+
**Final Top 10 Results:**
437+
438+
| Rank | Client IP | Total Requests |
439+
|------|-----------|----------------|
440+
| 1 | 192.168.1.100 | 2,650 |
441+
| 2 | 10.0.0.5 | 2,230 |
442+
| 3 | 203.0.113.50 | 2,210 |
443+
| 4 | 198.51.100.75 | 1,970 |
444+
| 5 | 172.16.0.10 | 1,930 |
445+
| 6 | 192.168.1.200 | 1,830 |
446+
| 7 | 203.0.113.80 | 1,630 |
447+
| 8 | 10.0.0.25 | 1,590 |
448+
| 9 | 172.16.0.30 | 1,550 |
449+
| 10 | 192.168.1.150 | 1,410 |
450+
451+
**Why Results Are Approximate** <br>
452+
453+
Results are approximate because some globally significant IPs might not appear in individual nodes' top 10 lists due to uneven data distribution across nodes. For example, an IP with moderate traffic across all nodes might have a high global total but not rank in any single node's top 10.
454+
455+
**Limitations** <br>
456+
457+
- Results are approximate, not exact.
458+
- Accuracy depends on data distribution across partitions.
459+
- Filter clauses are not currently supported with approx_topk
460+
461+
---
462+
463+
### `approx_topk_distinct(field1, field2, k)`
464+
465+
**Description:**
466+
467+
- Returns the top `K` values from `field1` that have the most unique values in `field2`.
468+
- Here:
469+
470+
- **field1**: The field to group by and return top results for.
471+
- **field2**: The field to count distinct values of.
472+
- **k**: Number of top results to return.
473+
474+
- Uses HyperLogLog algorithm for efficient distinct counting and Space-Saving algorithm for top-K selection on high-cardinality data.
475+
- Results are approximate due to the probabilistic nature of both algorithms and distributed processing across partitions.
476+
477+
**Example:**
478+
479+
```sql
480+
SELECT approx_topk_distinct(clientip, clientas, 3) FROM "default" ORDER BY _timestamp DESC
481+
```
482+
It returns the top 3 client IP addresses that have the most unique user agents.
483+
484+
??? info "The HyperLogLog Algorithm Explained:"
485+
**Problem Statement**
486+
487+
Traditional `GROUP BY` operations with `DISTINCT` counts on high-cardinality fields can cause memory exhaustion in distributed systems. Consider this query:
488+
489+
```sql
490+
SELECT clientip, count(distinct clientas) as cnt
491+
FROM default
492+
GROUP BY clientip
493+
ORDER BY cnt DESC
494+
LIMIT 10
495+
```
496+
497+
**Challenges:**
498+
499+
- Dataset contains 3 million unique client IP addresses.
500+
- Each client IP can have thousands of unique user agents (`clientas`).
501+
- Total unique user agents: 100 million values.
502+
- Query executes across 60 CPU cores with 60 data partitions.
503+
- Memory usage for distinct counting: Potentially unlimited storage for tracking unique values.
504+
- Combined with grouping: Memory requirements become exponentially larger.
505+
506+
**Typical Error Message:**
507+
```
508+
Resources exhausted: Failed to allocate additional 63232256 bytes for GroupedHashAggregateStream[20] with 0 bytes already allocated for this reservation - 51510301 bytes remain available for the total pool
509+
```
510+
511+
**Solution: HyperLogLog + Space-Saving Mechanism**
512+
513+
```sql
514+
SELECT approx_topk_distinct(clientip, clientas, 10) FROM default
515+
```
516+
517+
**Combined Approach:**
518+
519+
- **HyperLogLog**: Handles distinct counting using a fixed **16 kilobytes** data structure per group.
520+
- **Space-Saving**: Limits the number of groups returned from each partition to top K.
521+
- Each partition returns only its top 10 client IPs (by distinct user agent count) to the leader.
522+
523+
**Example: Web Server User Agent Analysis**
524+
Find the top 10 client IPs by unique user agent count from web server logs in the `default` stream.
525+
526+
**Raw Data Distribution**
527+
528+
| Node 1 | Distinct User Agents | Node 2 | Distinct User Agents | Node 3 | Distinct User Agents |
529+
|---------|---------------------|---------|---------------------|---------|---------------------|
530+
| 192.168.1.100 | 450 | 192.168.1.100 | 520 | 192.168.1.100 | 480 |
531+
| 10.0.0.5 | 380 | 203.0.113.50 | 420 | 10.0.0.5 | 410 |
532+
| 203.0.113.50 | 350 | 198.51.100.75 | 390 | 203.0.113.50 | 400 |
533+
| 172.16.0.10 | 320 | 10.0.0.5 | 370 | 198.51.100.75 | 370 |
534+
| 192.168.1.200 | 300 | 172.16.0.10 | 350 | 172.16.0.10 | 340 |
535+
| 198.51.100.75 | 280 | 192.168.1.200 | 330 | 192.168.1.200 | 320 |
536+
| 10.0.0.25 | 260 | 203.0.113.80 | 310 | 203.0.113.80 | 300 |
537+
| 172.16.0.30 | 240 | 172.16.0.30 | 290 | 10.0.0.25 | 280 |
538+
| 203.0.113.80 | 220 | 10.0.0.25 | 270 | 172.16.0.30 | 260 |
539+
| 192.168.1.150 | 200 | 192.168.1.150 | 250 | 192.168.1.150 | 240 |
540+
541+
**Note**: Each distinct count is computed using HyperLogLog's 16KB data structure per client IP.
542+
543+
**Follower Query Nodes Process Data**
544+
545+
Each follower node executes the query locally and returns only its top 10 results:
546+
547+
| Rank | Node 1 → Leader | Distinct Count | Node 2 → Leader | Distinct Count | Node 3 → Leader | Distinct Count |
548+
|------|-----------------|----------------|-----------------|----------------|-----------------|----------------|
549+
| 1 | 192.168.1.100 | 450 | 192.168.1.100 | 520 | 192.168.1.100 | 480 |
550+
| 2 | 10.0.0.5 | 380 | 203.0.113.50 | 420 | 10.0.0.5 | 410 |
551+
| 3 | 203.0.113.50 | 350 | 198.51.100.75 | 390 | 203.0.113.50 | 400 |
552+
| 4 | 172.16.0.10 | 320 | 10.0.0.5 | 370 | 198.51.100.75 | 370 |
553+
| 5 | 192.168.1.200 | 300 | 172.16.0.10 | 350 | 172.16.0.10 | 340 |
554+
| 6 | 198.51.100.75 | 280 | 192.168.1.200 | 330 | 192.168.1.200 | 320 |
555+
| 7 | 10.0.0.25 | 260 | 203.0.113.80 | 310 | 203.0.113.80 | 300 |
556+
| 8 | 172.16.0.30 | 240 | 172.16.0.30 | 290 | 10.0.0.25 | 280 |
557+
| 9 | 203.0.113.80 | 220 | 10.0.0.25 | 270 | 172.16.0.30 | 260 |
558+
| 10 | 192.168.1.150 | 200 | 192.168.1.150 | 250 | 192.168.1.150 | 240 |
559+
560+
**Leader Query Node Aggregates Results**
561+
562+
| Client IP | Node 1 | Node 2 | Node 3 | Total Distinct User Agents |
563+
|-----------|---------|---------|---------|---------------------------|
564+
| 192.168.1.100 | 450 | 520 | 480 | **1,450** |
565+
| 10.0.0.5 | 380 | 370 | 410 | **1,160** |
566+
| 203.0.113.50 | 350 | 420 | 400 | **1,170** |
567+
| 198.51.100.75 | 280 | 390 | 370 | **1,040** |
568+
| 172.16.0.10 | 320 | 350 | 340 | **1,010** |
569+
| 192.168.1.200 | 300 | 330 | 320 | **950** |
570+
| 203.0.113.80 | 220 | 310 | 300 | **830** |
571+
| 10.0.0.25 | 260 | 270 | 280 | **810** |
572+
| 172.16.0.30 | 240 | 290 | 260 | **790** |
573+
| 192.168.1.150 | 200 | 250 | 240 | **690** |
574+
575+
**Final Top 10 Results:**
576+
577+
| Rank | Client IP | Total Distinct User Agents |
578+
|------|-----------|---------------------------|
579+
| 1 | 192.168.1.100 | 1,450 |
580+
| 2 | 203.0.113.50 | 1,170 |
581+
| 3 | 10.0.0.5 | 1,160 |
582+
| 4 | 198.51.100.75 | 1,040 |
583+
| 5 | 172.16.0.10 | 1,010 |
584+
| 6 | 192.168.1.200 | 950 |
585+
| 7 | 203.0.113.80 | 830 |
586+
| 8 | 10.0.0.25 | 810 |
587+
| 9 | 172.16.0.30 | 790 |
588+
| 10 | 192.168.1.150 | 690 |
589+
590+
591+
## Why Results Are Approximate
592+
Results are approximate due to two factors:
593+
594+
1. **HyperLogLog approximation:** Distinct counts are estimated, not exact.
595+
2. **Space-Saving distribution:** Some globally significant client IPs might not appear in individual nodes' top 10 lists due to uneven data distribution.
596+
597+
## Limitations
598+
599+
- Results are approximate, not exact.
600+
- Distinct count accuracy depends on HyperLogLog algorithm precision.
601+
- Filter clauses are not currently supported with `approx_topk_distinct`.
602+
603+
604+
---
605+
606+
## Related Links
607+
OpenObserve uses [Apache DataFusion](https://datafusion.apache.org/user-guide/sql/index.html) as its query engine. All supported SQL syntax and functions are available through DataFusion.
337608

docs/user-guide/pipelines/use-pipelines.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ The above example illustrates a basic pipeline setup. However, pipelines can bec
154154
<br>
155155

156156
## FAQ
157-
**Q**: If I set the frequency to 5 minutes and the current time is 23:03, when will the next runs happen?
157+
**Q**: If I set the frequency to 5 minutes and the current time is 23:03, when will the next runs happen? <br>
158158
**A**: OpenObserve aligns the next run to the nearest upcoming time that is divisible by the frequency, starting from the top of the hour in the configured timezone. This ensures that all runs occur at consistent and predictable intervals.
159159
**Example**<br>
160160
If the current time is 23:03, here is when the next run will occur for different frequencies:

0 commit comments

Comments
 (0)