LCOV - code coverage report
Current view: top level - src/metrics/instruments - observable_counter.dart (source / functions) Coverage Total Hit
Test: lcov.info Lines: 77.8 % 90 70
Test Date: 2025-11-15 13:23:01 Functions: - 0 0

            Line data    Source code
       1              : // Licensed under the Apache License, Version 2.0
       2              : // Copyright 2025, Michael Bushe, All rights reserved.
       3              : 
       4              : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart';
       5              : import '../../../dartastic_opentelemetry.dart';
       6              : 
       7              : /// ObservableCounter is an asynchronous instrument that reports monotonically
       8              : /// increasing values when observed.
       9              : ///
      10              : /// An ObservableCounter is used to measure monotonically increasing values
      11              : /// where measurements are made by a callback function. For example, CPU time,
      12              : /// bytes received, or number of operations.
      13              : class ObservableCounter<T extends num>
      14              :     implements APIObservableCounter<T>, SDKInstrument {
      15              :   /// The underlying API ObservableCounter.
      16              :   final APIObservableCounter<T> _apiCounter;
      17              : 
      18              :   /// The Meter that created this ObservableCounter.
      19              :   final Meter _meter;
      20              : 
      21              :   /// Storage for accumulating counter measurements.
      22              :   final SumStorage<T> _storage = SumStorage<T>(isMonotonic: true);
      23              : 
      24              :   /// The last observed values, for tracking and detecting resets.
      25              :   final Map<Attributes, T> _lastValues = {};
      26              : 
      27              :   /// Creates a new ObservableCounter instance.
      28            5 :   ObservableCounter({
      29              :     required APIObservableCounter<T> apiCounter,
      30              :     required Meter meter,
      31              :   })  : _apiCounter = apiCounter,
      32              :         _meter = meter;
      33              : 
      34            5 :   @override
      35           10 :   String get name => _apiCounter.name;
      36              : 
      37            1 :   @override
      38            2 :   String? get unit => _apiCounter.unit;
      39              : 
      40            1 :   @override
      41            2 :   String? get description => _apiCounter.description;
      42              : 
      43            3 :   @override
      44              :   bool get enabled {
      45              :     // In the SDK, metrics are enabled based on the meter provider's enabled state
      46            9 :     return _meter.provider.enabled;
      47              :   }
      48              : 
      49            5 :   @override
      50            5 :   APIMeter get meter => _meter;
      51              : 
      52            2 :   @override
      53            4 :   List<ObservableCallback<T>> get callbacks => _apiCounter.callbacks;
      54              : 
      55            2 :   @override
      56              :   APICallbackRegistration<T> addCallback(ObservableCallback<T> callback) {
      57              :     // Register with the API implementation first
      58            4 :     final registration = _apiCounter.addCallback(callback);
      59              : 
      60              :     // Return a registration that also unregisters from our list
      61            2 :     return _ObservableCounterCallbackRegistration<T>(
      62              :       apiRegistration: registration,
      63              :       counter: this,
      64              :       callback: callback,
      65              :     );
      66              :   }
      67              : 
      68            1 :   @override
      69              :   void removeCallback(ObservableCallback<T> callback) {
      70            2 :     _apiCounter.removeCallback(callback);
      71              :   }
      72              : 
      73              :   /// Gets the current value of the counter for a specific set of attributes.
      74              :   /// If no attributes are provided, returns the sum of all recorded values.
      75            0 :   T getValue([Attributes? attributes]) {
      76              :     final num value;
      77              : 
      78              :     if (attributes == null) {
      79              :       // For no attributes, sum all points
      80            0 :       value = _storage
      81            0 :           .collectPoints()
      82            0 :           .fold<num>(0, (sum, point) => sum + point.value);
      83              :     } else {
      84              :       // For specific attributes, get that value
      85            0 :       value = _storage.getValue(attributes);
      86              :     }
      87              : 
      88              :     // Handle the cast to the generic type
      89            0 :     if (T == int) return value.toInt() as T;
      90            0 :     if (T == double) return value.toDouble() as T;
      91              :     return value as T;
      92              :   }
      93              : 
      94              :   /// Collects measurements from all registered callbacks.
      95            2 :   @override
      96              :   List<Measurement<T>> collect() {
      97            2 :     if (!enabled) {
      98            1 :       return [];
      99              :     }
     100              : 
     101            2 :     final result = <Measurement<T>>[];
     102              : 
     103              :     // Get a snapshot of callbacks to avoid concurrent modification issues
     104            4 :     final callbacksSnapshot = List<ObservableCallback<T>>.from(callbacks);
     105              : 
     106              :     // Return early if no callbacks registered
     107            2 :     if (callbacksSnapshot.isEmpty) {
     108              :       return result;
     109              :     }
     110              : 
     111              :     // First, clear previous values to prepare for fresh collection
     112              :     // This is necessary to avoid accumulating values from multiple collections
     113            4 :     _storage.reset();
     114              : 
     115              :     // Call all callbacks
     116            4 :     for (final callback in callbacksSnapshot) {
     117              :       try {
     118              :         // Create a new observable result for each callback
     119            2 :         final observableResult = ObservableResult<T>();
     120              : 
     121              :         // Call the callback with the observable result
     122              :         // Cast the parameter to ensure type safety
     123              :         try {
     124            2 :           callback(observableResult as APIObservableResult<T>);
     125              :         } catch (e) {
     126            0 :           print('Type error in callback: $e');
     127              :           continue;
     128              :         }
     129              : 
     130              :         // Process the measurements from the observable result
     131            4 :         for (final measurement in observableResult.measurements) {
     132              :           // Type checking for the generic parameter
     133            2 :           final dynamic rawValue = measurement.value;
     134            2 :           final num value = (rawValue is num)
     135              :               ? rawValue
     136            0 :               : num.tryParse(rawValue.toString()) ?? 0;
     137              :           final attributes =
     138            4 :               measurement.attributes ?? OTelFactory.otelFactory!.attributes();
     139              : 
     140              :           // Check for monotonicity - current value should be >= last value
     141              :           final T lastValue =
     142            6 :               (_lastValues[attributes] ?? (T == int ? 0 : 0.0)) as T;
     143              : 
     144              :           // If value decreased, it indicates a counter reset
     145            2 :           if (value < lastValue) {
     146              :             // Per spec, for a reset we just record the current value
     147              :             // For SDK storage, convert the num to the appropriate T type
     148            1 :             if (T == int) {
     149            3 :               _storage.record(value.toInt() as T, attributes);
     150            0 :             } else if (T == double) {
     151            0 :               _storage.record(value.toDouble() as T, attributes);
     152              :             } else {
     153            0 :               _storage.record(value as T, attributes);
     154              :             }
     155            1 :             result.add(measurement);
     156            2 :           } else if (value > lastValue) {
     157              :             // Only add measurements with positive deltas
     158              :             // For SDK storage, convert the num to the appropriate T type
     159            2 :             if (T == int) {
     160            6 :               _storage.record(value.toInt() as T, attributes);
     161            1 :             } else if (T == double) {
     162            3 :               _storage.record(value.toDouble() as T, attributes);
     163              :             } else {
     164            0 :               _storage.record(value as T, attributes);
     165              :             }
     166            2 :             result.add(measurement);
     167              :           } else {
     168              :             // For zero deltas, we still record the value in storage for cumulative reporting,
     169              :             // but don't include it in the returned measurements
     170              :             // For SDK storage, convert the num to the appropriate T type
     171            1 :             if (T == int) {
     172            3 :               _storage.record(value.toInt() as T, attributes);
     173            0 :             } else if (T == double) {
     174            0 :               _storage.record(value.toDouble() as T, attributes);
     175              :             } else {
     176            0 :               _storage.record(value as T, attributes);
     177              :             }
     178              :             // Note: The measurement is deliberately not added to the result list
     179              :           }
     180              : 
     181              :           // Store the latest value for next time
     182            2 :           if (T == int) {
     183            6 :             _lastValues[attributes] = value.toInt() as T;
     184            1 :           } else if (T == double) {
     185            3 :             _lastValues[attributes] = value.toDouble() as T;
     186              :           } else {
     187            0 :             _lastValues[attributes] = value as T;
     188              :           }
     189              :         }
     190              :       } catch (e) {
     191            0 :         print(
     192            0 :             'Error collecting measurements from ObservableCounter callback: $e');
     193              :       }
     194              :     }
     195              : 
     196              :     return result;
     197              :   }
     198              : 
     199              :   /// Collects metrics for the SDK metric export.
     200              :   ///
     201              :   /// This is called by the MeterProvider during metric collection.
     202            2 :   @override
     203              :   List<Metric> collectMetrics() {
     204            2 :     if (!enabled) {
     205            2 :       return [];
     206              :     }
     207              : 
     208              :     // Get the points from storage
     209            2 :     final points = collectPoints();
     210            2 :     if (points.isEmpty) {
     211            1 :       return [];
     212              :     }
     213              : 
     214              :     // Create the metric to export
     215            1 :     return [
     216            1 :       Metric.sum(
     217            1 :         name: name,
     218            1 :         description: description,
     219            1 :         unit: unit,
     220              :         temporality: AggregationTemporality.cumulative,
     221              :         points: points,
     222              :         isMonotonic: true, // Counters are monotonic
     223              :       )
     224              :     ];
     225              :   }
     226              : 
     227              :   /// Gets the current points for this counter.
     228              :   /// This is used by the SDK to collect metrics.
     229            2 :   List<MetricPoint<T>> collectPoints() {
     230            2 :     if (!enabled) {
     231            0 :       return [];
     232              :     }
     233              : 
     234              :     // Then return points from storage
     235            4 :     return _storage.collectPoints();
     236              :   }
     237              : 
     238              :   /// Resets the counter. This is only used for testing.
     239            2 :   void reset() {
     240            4 :     _storage.reset();
     241            4 :     _lastValues.clear();
     242              :   }
     243              : }
     244              : 
     245              : /// Wrapper for APICallbackRegistration that also handles our internal state.
     246              : class _ObservableCounterCallbackRegistration<T extends num>
     247              :     implements APICallbackRegistration<T> {
     248              :   /// The API registration.
     249              :   final APICallbackRegistration<T> apiRegistration;
     250              : 
     251              :   /// The counter this registration is for.
     252              :   final ObservableCounter<T> counter;
     253              : 
     254              :   /// The callback that was registered.
     255              :   final ObservableCallback<T> callback;
     256              : 
     257            2 :   _ObservableCounterCallbackRegistration({
     258              :     required this.apiRegistration,
     259              :     required this.counter,
     260              :     required this.callback,
     261              :   });
     262              : 
     263            1 :   @override
     264              :   void unregister() {
     265              :     // Unregister from the API implementation
     266            2 :     apiRegistration.unregister();
     267              : 
     268              :     // Also remove from our counter directly for redundancy
     269            3 :     counter.removeCallback(callback);
     270              :   }
     271              : }
        

Generated by: LCOV version 2.0-1