Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart';
5 : import '../../../dartastic_opentelemetry.dart';
6 :
7 : /// ObservableGauge is an asynchronous instrument which reports non-additive value(s)
8 : /// when the instrument is being observed.
9 : ///
10 : /// An ObservableGauge is used to asynchronously measure a non-additive current value
11 : /// that cannot be calculated synchronously.
12 : class ObservableGauge<T extends num>
13 : implements APIObservableGauge<T>, SDKInstrument {
14 : /// The underlying API ObservableGauge.
15 : final APIObservableGauge<T> _apiGaugeDelegate;
16 :
17 : /// The Meter that created this ObservableGauge.
18 : final Meter _meter;
19 :
20 : /// Storage for gauge measurements.
21 : final GaugeStorage<T> _storage = GaugeStorage<T>();
22 :
23 : /// Creates a new ObservableGauge instance.
24 3 : ObservableGauge({
25 : required APIObservableGauge<T> apiGauge,
26 : required Meter meter,
27 : }) : _apiGaugeDelegate = apiGauge,
28 : _meter = meter;
29 :
30 3 : @override
31 6 : String get name => _apiGaugeDelegate.name;
32 :
33 1 : @override
34 2 : String? get unit => _apiGaugeDelegate.unit;
35 :
36 1 : @override
37 2 : String? get description => _apiGaugeDelegate.description;
38 :
39 2 : @override
40 : bool get enabled {
41 6 : return _meter.provider.enabled;
42 : }
43 :
44 3 : @override
45 3 : APIMeter get meter => _meter;
46 :
47 1 : @override
48 2 : List<ObservableCallback<T>> get callbacks => _apiGaugeDelegate.callbacks;
49 :
50 1 : @override
51 : APICallbackRegistration<T> addCallback(ObservableCallback<T> callback) {
52 : // Register with the API implementation first
53 2 : final registration = _apiGaugeDelegate.addCallback(callback);
54 :
55 : // Return a registration that handles unregistering properly
56 1 : return _ObservableGaugeCallbackRegistration<T>(
57 : apiRegistration: registration,
58 : gauge: this,
59 : callback: callback,
60 : );
61 : }
62 :
63 1 : @override
64 : void removeCallback(ObservableCallback<T> callback) {
65 2 : _apiGaugeDelegate.removeCallback(callback);
66 : }
67 :
68 : /// Gets the current value of the gauge for a specific set of attributes.
69 : /// If no attributes are provided, returns the average of all recorded values.
70 0 : T getValue([Attributes? attributes]) {
71 : final num value;
72 :
73 : if (attributes == null) {
74 : // For gauges without attributes, we return the average of all values
75 0 : final points = _storage.collectPoints();
76 0 : if (points.isEmpty) {
77 : value = 0;
78 : } else {
79 : value =
80 0 : points.fold<num>(0, (sum, point) => sum + (point.value as num)) /
81 0 : points.length;
82 : }
83 : } else {
84 : // For specific attributes, get that value
85 0 : value = _storage.getValue(attributes);
86 : }
87 :
88 : // Handle the cast to the generic type
89 0 : if (T == int) return value.toInt() as T;
90 0 : if (T == double) return value.toDouble() as T;
91 : return value as T;
92 : }
93 :
94 : /// Collects measurements from all registered callbacks.
95 1 : @override
96 : List<Measurement<T>> collect() {
97 1 : if (!enabled) {
98 1 : return [];
99 : }
100 :
101 1 : final result = <Measurement<T>>[];
102 :
103 : // Get a snapshot of callbacks to avoid concurrent modification issues
104 2 : final callbacksSnapshot = List<ObservableCallback<T>>.from(callbacks);
105 :
106 : // Call all callbacks
107 2 : for (final callback in callbacksSnapshot) {
108 : try {
109 : // Create a new observable result for each callback
110 1 : final observableResult = ObservableResult<T>();
111 :
112 : // Call the callback with the observable result
113 : // Cast the parameter to ensure type safety
114 : try {
115 1 : callback(observableResult as APIObservableResult<T>);
116 : } catch (e) {
117 2 : print('Type error in callback: $e');
118 : continue;
119 : }
120 :
121 : // Process the measurements from the observable result
122 2 : for (final measurement in observableResult.measurements) {
123 : // Type checking for the generic parameter
124 1 : final value = measurement.value;
125 :
126 : final num numValue;
127 : numValue = value;
128 :
129 : // For observable gauges, we just record the latest value
130 : // For SDK storage, convert the num to the appropriate T type
131 : final attributes =
132 2 : measurement.attributes ?? OTelFactory.otelFactory!.attributes();
133 1 : if (T == int) {
134 3 : _storage.record(numValue.toInt() as T, attributes);
135 1 : } else if (T == double) {
136 3 : _storage.record(numValue.toDouble() as T, attributes);
137 : } else {
138 0 : _storage.record(numValue as T, attributes);
139 : }
140 :
141 1 : result.add(measurement);
142 : }
143 : } catch (e) {
144 0 : print(
145 0 : 'Error collecting measurements from ObservableGauge callback: $e');
146 : }
147 : }
148 :
149 : return result;
150 : }
151 :
152 : /// Collects metrics for the SDK metric export.
153 : ///
154 : /// This is called by the MeterProvider during metric collection.
155 2 : @override
156 : List<Metric> collectMetrics() {
157 2 : if (!enabled) {
158 2 : return [];
159 : }
160 :
161 : // Get the points from storage
162 2 : final points = collectPoints();
163 2 : if (points.isEmpty) {
164 1 : return [];
165 : }
166 :
167 : // Create the metric to export
168 1 : return [
169 1 : Metric.gauge(
170 1 : name: name,
171 1 : description: description,
172 1 : unit: unit,
173 : points: points,
174 : )
175 : ];
176 : }
177 :
178 : /// Gets the current points for this gauge.
179 : /// This is used by the SDK to collect metrics.
180 2 : List<MetricPoint<T>> collectPoints() {
181 2 : if (!enabled) {
182 0 : return [];
183 : }
184 :
185 : // Return points from storage
186 4 : return _storage.collectPoints();
187 : }
188 : }
189 :
190 : /// Wrapper for APICallbackRegistration that also handles our internal state.
191 : class _ObservableGaugeCallbackRegistration<T extends num>
192 : implements APICallbackRegistration<T> {
193 : /// The API registration.
194 : final APICallbackRegistration<T> apiRegistration;
195 :
196 : /// The gauge this registration is for.
197 : final ObservableGauge<T> gauge;
198 :
199 : /// The callback that was registered.
200 : final ObservableCallback<T> callback;
201 :
202 1 : _ObservableGaugeCallbackRegistration({
203 : required this.apiRegistration,
204 : required this.gauge,
205 : required this.callback,
206 : });
207 :
208 1 : @override
209 : void unregister() {
210 : // Unregister from the API implementation
211 2 : apiRegistration.unregister();
212 :
213 : // Also remove from our gauge directly for redundancy
214 3 : gauge.removeCallback(callback);
215 : }
216 : }
|