Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'dart:async';
5 : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart';
6 : import 'sampler.dart';
7 :
8 : /// A sampler that limits the number of sampled traces per time window.
9 : class RateLimitingSampler implements Sampler {
10 : final double _maxTracesPerSecond;
11 : final Duration _timeWindow;
12 : double _tokenBalance;
13 : DateTime _lastTokenUpdate;
14 : late final Timer _tokenReplenishTimer;
15 :
16 0 : @override
17 : String get description =>
18 0 : 'RateLimitingSampler{$_maxTracesPerSecond per second}';
19 :
20 : /// Creates a rate limiting sampler.
21 : /// [maxTracesPerSecond] specifies how many traces can be sampled per second.
22 : /// [timeWindow] specifies how often the token balance is updated (defaults to 100ms).
23 1 : RateLimitingSampler(
24 : double maxTracesPerSecond, {
25 : Duration timeWindow = const Duration(milliseconds: 100),
26 : }) : _maxTracesPerSecond = maxTracesPerSecond,
27 : _timeWindow = timeWindow,
28 : _tokenBalance =
29 : maxTracesPerSecond, // Start with tokens already available
30 1 : _lastTokenUpdate = DateTime.now() {
31 1 : if (maxTracesPerSecond <= 0) {
32 0 : throw ArgumentError('maxTracesPerSecond must be positive');
33 : }
34 1 : _updateTokens();
35 4 : _tokenReplenishTimer = Timer.periodic(timeWindow, (_) => _updateTokens());
36 : }
37 :
38 1 : void _updateTokens() {
39 1 : final now = DateTime.now();
40 : final elapsedSeconds =
41 4 : now.difference(_lastTokenUpdate).inMilliseconds / 1000;
42 1 : _lastTokenUpdate = now;
43 :
44 : // Calculate how many tokens to add based on elapsed time and rate
45 : // Don't use floor() to ensure even small time periods add tokens
46 2 : final tokensToAdd = _maxTracesPerSecond * elapsedSeconds;
47 :
48 : // Calculate max tokens based on rate and time window
49 5 : final maxTokens = _maxTracesPerSecond * _timeWindow.inMilliseconds / 1000;
50 :
51 : // Update balance, ensuring we don't exceed max
52 4 : _tokenBalance = (_tokenBalance + tokensToAdd).clamp(0.0, maxTokens);
53 : }
54 :
55 1 : @override
56 : SamplingResult shouldSample({
57 : required Context parentContext,
58 : required String traceId,
59 : required String name,
60 : required SpanKind spanKind,
61 : required Attributes? attributes,
62 : required List<SpanLink>? links,
63 : }) {
64 : // Update tokens first
65 1 : _updateTokens();
66 :
67 : // If we have tokens available, sample the trace
68 2 : if (_tokenBalance >= 1.0) {
69 2 : _tokenBalance -= 1.0;
70 : return const SamplingResult(
71 : decision: SamplingDecision.recordAndSample,
72 : source: SamplingDecisionSource.tracerConfig,
73 : );
74 : }
75 :
76 : return const SamplingResult(
77 : decision: SamplingDecision.drop,
78 : source: SamplingDecisionSource.tracerConfig,
79 : );
80 : }
81 :
82 : /// Clean up timer resources
83 1 : void dispose() {
84 2 : _tokenReplenishTimer.cancel();
85 : }
86 : }
|