Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart';
5 : import '../../../dartastic_opentelemetry.dart';
6 :
7 : /// ObservableCounter is an asynchronous instrument that reports monotonically
8 : /// increasing values when observed.
9 : ///
10 : /// An ObservableCounter is used to measure monotonically increasing values
11 : /// where measurements are made by a callback function. For example, CPU time,
12 : /// bytes received, or number of operations.
13 : class ObservableCounter<T extends num>
14 : implements APIObservableCounter<T>, SDKInstrument {
15 : /// The underlying API ObservableCounter.
16 : final APIObservableCounter<T> _apiCounter;
17 :
18 : /// The Meter that created this ObservableCounter.
19 : final Meter _meter;
20 :
21 : /// Storage for accumulating counter measurements.
22 : final SumStorage<T> _storage = SumStorage<T>(isMonotonic: true);
23 :
24 : /// The last observed values, for tracking and detecting resets.
25 : final Map<Attributes, T> _lastValues = {};
26 :
27 : /// Creates a new ObservableCounter instance.
28 5 : ObservableCounter({
29 : required APIObservableCounter<T> apiCounter,
30 : required Meter meter,
31 : }) : _apiCounter = apiCounter,
32 : _meter = meter;
33 :
34 5 : @override
35 10 : String get name => _apiCounter.name;
36 :
37 1 : @override
38 2 : String? get unit => _apiCounter.unit;
39 :
40 1 : @override
41 2 : String? get description => _apiCounter.description;
42 :
43 3 : @override
44 : bool get enabled {
45 : // In the SDK, metrics are enabled based on the meter provider's enabled state
46 9 : return _meter.provider.enabled;
47 : }
48 :
49 5 : @override
50 5 : APIMeter get meter => _meter;
51 :
52 2 : @override
53 4 : List<ObservableCallback<T>> get callbacks => _apiCounter.callbacks;
54 :
55 2 : @override
56 : APICallbackRegistration<T> addCallback(ObservableCallback<T> callback) {
57 : // Register with the API implementation first
58 4 : final registration = _apiCounter.addCallback(callback);
59 :
60 : // Return a registration that also unregisters from our list
61 2 : return _ObservableCounterCallbackRegistration<T>(
62 : apiRegistration: registration,
63 : counter: this,
64 : callback: callback,
65 : );
66 : }
67 :
68 1 : @override
69 : void removeCallback(ObservableCallback<T> callback) {
70 2 : _apiCounter.removeCallback(callback);
71 : }
72 :
73 : /// Gets the current value of the counter for a specific set of attributes.
74 : /// If no attributes are provided, returns the sum of all recorded values.
75 0 : T getValue([Attributes? attributes]) {
76 : final num value;
77 :
78 : if (attributes == null) {
79 : // For no attributes, sum all points
80 0 : value = _storage
81 0 : .collectPoints()
82 0 : .fold<num>(0, (sum, point) => sum + point.value);
83 : } else {
84 : // For specific attributes, get that value
85 0 : value = _storage.getValue(attributes);
86 : }
87 :
88 : // Handle the cast to the generic type
89 0 : if (T == int) return value.toInt() as T;
90 0 : if (T == double) return value.toDouble() as T;
91 : return value as T;
92 : }
93 :
94 : /// Collects measurements from all registered callbacks.
95 2 : @override
96 : List<Measurement<T>> collect() {
97 2 : if (!enabled) {
98 1 : return [];
99 : }
100 :
101 2 : final result = <Measurement<T>>[];
102 :
103 : // Get a snapshot of callbacks to avoid concurrent modification issues
104 4 : final callbacksSnapshot = List<ObservableCallback<T>>.from(callbacks);
105 :
106 : // Return early if no callbacks registered
107 2 : if (callbacksSnapshot.isEmpty) {
108 : return result;
109 : }
110 :
111 : // First, clear previous values to prepare for fresh collection
112 : // This is necessary to avoid accumulating values from multiple collections
113 4 : _storage.reset();
114 :
115 : // Call all callbacks
116 4 : for (final callback in callbacksSnapshot) {
117 : try {
118 : // Create a new observable result for each callback
119 2 : final observableResult = ObservableResult<T>();
120 :
121 : // Call the callback with the observable result
122 : // Cast the parameter to ensure type safety
123 : try {
124 2 : callback(observableResult as APIObservableResult<T>);
125 : } catch (e) {
126 0 : print('Type error in callback: $e');
127 : continue;
128 : }
129 :
130 : // Process the measurements from the observable result
131 4 : for (final measurement in observableResult.measurements) {
132 : // Type checking for the generic parameter
133 2 : final dynamic rawValue = measurement.value;
134 2 : final num value = (rawValue is num)
135 : ? rawValue
136 0 : : num.tryParse(rawValue.toString()) ?? 0;
137 : final attributes =
138 4 : measurement.attributes ?? OTelFactory.otelFactory!.attributes();
139 :
140 : // Check for monotonicity - current value should be >= last value
141 : final T lastValue =
142 6 : (_lastValues[attributes] ?? (T == int ? 0 : 0.0)) as T;
143 :
144 : // If value decreased, it indicates a counter reset
145 2 : if (value < lastValue) {
146 : // Per spec, for a reset we just record the current value
147 : // For SDK storage, convert the num to the appropriate T type
148 1 : if (T == int) {
149 3 : _storage.record(value.toInt() as T, attributes);
150 0 : } else if (T == double) {
151 0 : _storage.record(value.toDouble() as T, attributes);
152 : } else {
153 0 : _storage.record(value as T, attributes);
154 : }
155 1 : result.add(measurement);
156 2 : } else if (value > lastValue) {
157 : // Only add measurements with positive deltas
158 : // For SDK storage, convert the num to the appropriate T type
159 2 : if (T == int) {
160 6 : _storage.record(value.toInt() as T, attributes);
161 1 : } else if (T == double) {
162 3 : _storage.record(value.toDouble() as T, attributes);
163 : } else {
164 0 : _storage.record(value as T, attributes);
165 : }
166 2 : result.add(measurement);
167 : } else {
168 : // For zero deltas, we still record the value in storage for cumulative reporting,
169 : // but don't include it in the returned measurements
170 : // For SDK storage, convert the num to the appropriate T type
171 1 : if (T == int) {
172 3 : _storage.record(value.toInt() as T, attributes);
173 0 : } else if (T == double) {
174 0 : _storage.record(value.toDouble() as T, attributes);
175 : } else {
176 0 : _storage.record(value as T, attributes);
177 : }
178 : // Note: The measurement is deliberately not added to the result list
179 : }
180 :
181 : // Store the latest value for next time
182 2 : if (T == int) {
183 6 : _lastValues[attributes] = value.toInt() as T;
184 1 : } else if (T == double) {
185 3 : _lastValues[attributes] = value.toDouble() as T;
186 : } else {
187 0 : _lastValues[attributes] = value as T;
188 : }
189 : }
190 : } catch (e) {
191 0 : print(
192 0 : 'Error collecting measurements from ObservableCounter callback: $e');
193 : }
194 : }
195 :
196 : return result;
197 : }
198 :
199 : /// Collects metrics for the SDK metric export.
200 : ///
201 : /// This is called by the MeterProvider during metric collection.
202 2 : @override
203 : List<Metric> collectMetrics() {
204 2 : if (!enabled) {
205 2 : return [];
206 : }
207 :
208 : // Get the points from storage
209 2 : final points = collectPoints();
210 2 : if (points.isEmpty) {
211 1 : return [];
212 : }
213 :
214 : // Create the metric to export
215 1 : return [
216 1 : Metric.sum(
217 1 : name: name,
218 1 : description: description,
219 1 : unit: unit,
220 : temporality: AggregationTemporality.cumulative,
221 : points: points,
222 : isMonotonic: true, // Counters are monotonic
223 : )
224 : ];
225 : }
226 :
227 : /// Gets the current points for this counter.
228 : /// This is used by the SDK to collect metrics.
229 2 : List<MetricPoint<T>> collectPoints() {
230 2 : if (!enabled) {
231 0 : return [];
232 : }
233 :
234 : // Then return points from storage
235 4 : return _storage.collectPoints();
236 : }
237 :
238 : /// Resets the counter. This is only used for testing.
239 2 : void reset() {
240 4 : _storage.reset();
241 4 : _lastValues.clear();
242 : }
243 : }
244 :
245 : /// Wrapper for APICallbackRegistration that also handles our internal state.
246 : class _ObservableCounterCallbackRegistration<T extends num>
247 : implements APICallbackRegistration<T> {
248 : /// The API registration.
249 : final APICallbackRegistration<T> apiRegistration;
250 :
251 : /// The counter this registration is for.
252 : final ObservableCounter<T> counter;
253 :
254 : /// The callback that was registered.
255 : final ObservableCallback<T> callback;
256 :
257 2 : _ObservableCounterCallbackRegistration({
258 : required this.apiRegistration,
259 : required this.counter,
260 : required this.callback,
261 : });
262 :
263 1 : @override
264 : void unregister() {
265 : // Unregister from the API implementation
266 2 : apiRegistration.unregister();
267 :
268 : // Also remove from our counter directly for redundancy
269 3 : counter.removeCallback(callback);
270 : }
271 : }
|