Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart';
5 : import '../../../dartastic_opentelemetry.dart';
6 :
7 : /// ObservableUpDownCounter is an asynchronous instrument that reports additive
8 : /// values when observed.
9 : ///
10 : /// An ObservableUpDownCounter is used to measure a value that increases and
11 : /// decreases where measurements are made by a callback function. For example,
12 : /// number of active requests, queue size, pool size.
13 : class ObservableUpDownCounter<T extends num>
14 : implements APIObservableUpDownCounter<T>, SDKInstrument {
15 : /// The underlying API ObservableUpDownCounter.
16 : final APIObservableUpDownCounter<T> _apiCounter;
17 :
18 : /// The Meter that created this ObservableUpDownCounter.
19 : final Meter _meter;
20 :
21 : /// Storage for accumulating counter measurements.
22 : final SumStorage<T> _storage = SumStorage<T>(isMonotonic: false);
23 :
24 : /// The last observed values, for tracking changes.
25 : final Map<Attributes, T> _lastValues = {};
26 :
27 : /// Creates a new ObservableUpDownCounter instance.
28 3 : ObservableUpDownCounter({
29 : required APIObservableUpDownCounter<T> apiCounter,
30 : required Meter meter,
31 : }) : _apiCounter = apiCounter,
32 : _meter = meter;
33 :
34 3 : @override
35 6 : String get name => _apiCounter.name;
36 :
37 1 : @override
38 2 : String? get unit => _apiCounter.unit;
39 :
40 1 : @override
41 2 : String? get description => _apiCounter.description;
42 :
43 2 : @override
44 : bool get enabled {
45 : // In the SDK, metrics are enabled based on the meter provider's enabled state
46 6 : return _meter.provider.enabled;
47 : }
48 :
49 3 : @override
50 3 : APIMeter get meter => _meter;
51 :
52 1 : @override
53 2 : List<ObservableCallback<T>> get callbacks => _apiCounter.callbacks;
54 :
55 1 : @override
56 : APICallbackRegistration<T> addCallback(ObservableCallback<T> callback) {
57 : // Register with the API implementation first
58 2 : final registration = _apiCounter.addCallback(callback);
59 :
60 : // Return a registration that also unregisters from our list
61 1 : return _ObservableUpDownCounterCallbackRegistration(
62 : apiRegistration: registration,
63 : counter: this,
64 : callback: callback,
65 : );
66 : }
67 :
68 1 : @override
69 : void removeCallback(ObservableCallback<T> callback) {
70 2 : _apiCounter.removeCallback(callback);
71 : }
72 :
73 : /// Gets the current value of the counter for a specific set of attributes.
74 : /// If no attributes are provided, returns the sum of all recorded values.
75 0 : T getValue([Attributes? attributes]) {
76 : final num value;
77 :
78 : if (attributes == null) {
79 : // For no attributes, sum all points
80 0 : value = _storage
81 0 : .collectPoints()
82 0 : .fold<num>(0, (sum, point) => sum + point.value);
83 : } else {
84 : // For specific attributes, get that value
85 0 : value = _storage.getValue(attributes);
86 : }
87 :
88 : // Handle the cast to the generic type
89 0 : if (T == int) return value.toInt() as T;
90 0 : if (T == double) return value.toDouble() as T;
91 : return value as T;
92 : }
93 :
94 : /// Collects measurements from all registered callbacks.
95 1 : @override
96 : List<Measurement<T>> collect() {
97 1 : if (!enabled) {
98 1 : return [];
99 : }
100 :
101 1 : final result = <Measurement<T>>[];
102 2 : final callbackList = List<ObservableCallback<T>>.from(callbacks);
103 :
104 : // Return early if no callbacks registered
105 1 : if (callbackList.isEmpty) {
106 : return result;
107 : }
108 :
109 : // First, clear previous values to prepare for fresh collection
110 : // This is necessary to avoid accumulating values from multiple collections
111 2 : _storage.reset();
112 :
113 : // Call all callbacks
114 2 : for (final callback in callbackList) {
115 : try {
116 : // Create a new observable result for each callback
117 1 : final observableResult = ObservableResult<T>();
118 :
119 : // Call the callback with the observable result
120 : // Cast the parameter to ensure type safety
121 : try {
122 1 : callback(observableResult as APIObservableResult<T>);
123 : } catch (e) {
124 2 : print('Type error in callback: $e');
125 : continue;
126 : }
127 :
128 : // Process the measurements from the observable result
129 2 : for (final measurement in observableResult.measurements) {
130 : // Type checking for the generic parameter
131 1 : final dynamic rawValue = measurement.value;
132 1 : final num value = (rawValue is num)
133 : ? rawValue
134 0 : : num.tryParse(rawValue.toString()) ?? 0;
135 : final attributes =
136 2 : measurement.attributes ?? OTelFactory.otelFactory!.attributes();
137 :
138 : // Per the spec, for ObservableUpDownCounter we record the absolute value
139 : // directly - not the delta
140 : // For SDK storage, convert the num to the appropriate T type
141 1 : if (T == int) {
142 3 : _storage.record(value.toInt() as T, attributes);
143 1 : } else if (T == double) {
144 3 : _storage.record(value.toDouble() as T, attributes);
145 : } else {
146 0 : _storage.record(value as T, attributes);
147 : }
148 :
149 : // Add measurement with the absolute value to the result
150 1 : result.add(measurement);
151 :
152 : // Keep track of the last value for debugging and tracking
153 1 : if (T == int) {
154 3 : _lastValues[attributes] = value.toInt() as T;
155 1 : } else if (T == double) {
156 3 : _lastValues[attributes] = value.toDouble() as T;
157 : } else {
158 0 : _lastValues[attributes] = value as T;
159 : }
160 : }
161 : } catch (e) {
162 0 : print(
163 0 : 'Error collecting measurements from ObservableUpDownCounter callback: $e');
164 : }
165 : }
166 :
167 : return result;
168 : }
169 :
170 : /// Gets the current points for this counter.
171 : /// This is used by the SDK to collect metrics.
172 2 : List<MetricPoint<T>> collectPoints() {
173 2 : if (!enabled) {
174 0 : return [];
175 : }
176 :
177 : // Then return points from storage
178 4 : return _storage.collectPoints();
179 : }
180 :
181 : /// Collects metrics for the SDK metric export.
182 : ///
183 : /// This is called by the MeterProvider during metric collection.
184 2 : @override
185 : List<Metric> collectMetrics() {
186 2 : if (!enabled) {
187 2 : return [];
188 : }
189 :
190 : // Get the points from storage
191 2 : final points = collectPoints();
192 2 : if (points.isEmpty) {
193 1 : return [];
194 : }
195 :
196 : // Create the metric to export
197 1 : return [
198 1 : Metric.sum(
199 1 : name: name,
200 1 : description: description,
201 1 : unit: unit,
202 : temporality: AggregationTemporality.cumulative,
203 : points: points,
204 : isMonotonic: false, // Up/down counters are non-monotonic
205 : )
206 : ];
207 : }
208 :
209 : /// Resets the counter for testing. This is not typically used in production.
210 0 : void reset() {
211 0 : _storage.reset();
212 0 : _lastValues.clear();
213 : }
214 : }
215 :
216 : /// Wrapper for APICallbackRegistration that also handles our internal state.
217 : class _ObservableUpDownCounterCallbackRegistration<T extends num>
218 : implements APICallbackRegistration<T> {
219 : /// The API registration.
220 : final APICallbackRegistration<T> apiRegistration;
221 :
222 : /// The counter this registration is for.
223 : final ObservableUpDownCounter<T> counter;
224 :
225 : /// The callback that was registered.
226 : final ObservableCallback<T> callback;
227 :
228 1 : _ObservableUpDownCounterCallbackRegistration({
229 : required this.apiRegistration,
230 : required this.counter,
231 : required this.callback,
232 : });
233 :
234 1 : @override
235 : void unregister() {
236 : // Unregister from the API implementation
237 2 : apiRegistration.unregister();
238 :
239 : // Also remove from our counter directly for redundancy
240 3 : counter.removeCallback(callback);
241 : }
242 : }
|