Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'dart:async';
5 :
6 : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart'
7 : show OTelLog;
8 :
9 : import 'data/metric_data.dart';
10 : import 'meter_provider.dart';
11 : import 'metric_exporter.dart';
12 :
13 : /// MetricReader is responsible for collecting metrics from a MeterProvider
14 : /// and passing them to a MetricExporter.
15 : abstract class MetricReader {
16 : /// The MeterProvider this reader is associated with.
17 : MeterProvider? _meterProvider;
18 :
19 : /// Register a MeterProvider with this reader.
20 : ///
21 : /// This allows the reader to collect metrics from the provider.
22 59 : void registerMeterProvider(MeterProvider provider) {
23 59 : _meterProvider = provider;
24 : }
25 :
26 : /// Get the MeterProvider this reader is associated with.
27 116 : MeterProvider? get meterProvider => _meterProvider;
28 :
29 : /// Collect metrics from the MeterProvider.
30 : ///
31 : /// This method triggers the collection of metrics, and returns the
32 : /// collected data as an object containing resource and metric information.
33 : Future<MetricData> collect();
34 :
35 : /// Force flush metrics through the associated exporter.
36 : ///
37 : /// Returns true if the flush was successful, false otherwise.
38 : Future<bool> forceFlush();
39 :
40 : /// Shutdown the metric reader.
41 : ///
42 : /// This should clean up any resources and perform final exports.
43 : Future<bool> shutdown();
44 : }
45 :
46 : /// PeriodicExportingMetricReader is a MetricReader that periodically
47 : /// collects metrics and exports them.
48 : class PeriodicExportingMetricReader extends MetricReader {
49 : /// The exporter to send metrics to.
50 : final MetricExporter _exporter;
51 :
52 : /// How often to collect and export metrics.
53 : final Duration _interval;
54 :
55 : /// Maximum time to wait for export operations.
56 : final Duration _timeout;
57 :
58 : /// Timer for periodic collection.
59 : Timer? _timer;
60 :
61 : /// Creates a new PeriodicExportingMetricReader.
62 : ///
63 : /// [interval] How often to collect and export metrics (default: 60 seconds).
64 : /// [timeout] Maximum time to wait for export operations (default: 30 seconds).
65 59 : PeriodicExportingMetricReader(
66 : this._exporter, {
67 : Duration interval = const Duration(seconds: 60),
68 : Duration timeout = const Duration(seconds: 30),
69 : }) : _interval = interval,
70 : _timeout = timeout {
71 : // Start the timer
72 59 : _startTimer();
73 : }
74 :
75 : /// Start the periodic collection timer.
76 59 : void _startTimer() {
77 59 : _timer?.cancel();
78 179 : _timer = Timer.periodic(_interval, (_) => _collectAndExport());
79 : }
80 :
81 : /// Collect and export metrics.
82 58 : Future<void> _collectAndExport() async {
83 58 : if (meterProvider == null) return;
84 :
85 : try {
86 : // Collect metrics
87 58 : final data = await collect();
88 :
89 : // Export metrics
90 116 : if (data.metrics.isNotEmpty) {
91 4 : final exportFuture = _exporter.export(data);
92 :
93 : // Apply timeout to export
94 4 : await exportFuture.timeout(_timeout, onTimeout: () {
95 0 : print('Metric export timed out after $_timeout');
96 : return false;
97 : });
98 : }
99 : } catch (e) {
100 0 : print('Error during metric collection/export: $e');
101 : }
102 : }
103 :
104 58 : @override
105 : Future<MetricData> collect() async {
106 58 : if (meterProvider == null) {
107 0 : if (OTelLog.isLogMetrics()) {
108 0 : OTelLog.logMetric(
109 : 'PeriodicExportingMetricReader: No meter provider registered');
110 : }
111 : // Return an empty container with no metrics
112 0 : return MetricData.empty();
113 : }
114 :
115 : // Get the meter provider as an SDK MeterProvider to access the metric storage
116 58 : final sdkMeterProvider = meterProvider as MeterProvider;
117 :
118 : // Collect metrics from all instruments in the meter provider
119 58 : final metrics = await sdkMeterProvider.collectAllMetrics();
120 :
121 58 : if (OTelLog.isLogMetrics()) {
122 58 : OTelLog.logMetric(
123 116 : 'PeriodicExportingMetricReader: Collected ${metrics.length} metrics');
124 : }
125 :
126 58 : return MetricData(
127 116 : resource: meterProvider!.resource,
128 : metrics: metrics,
129 : );
130 : }
131 :
132 2 : @override
133 : Future<bool> forceFlush() async {
134 : try {
135 : // Collect and export immediately
136 2 : await _collectAndExport();
137 4 : return await _exporter.forceFlush();
138 : } catch (e) {
139 0 : print('Error during forceFlush: $e');
140 : return false;
141 : }
142 : }
143 :
144 58 : @override
145 : Future<bool> shutdown() async {
146 116 : _timer?.cancel();
147 58 : _timer = null;
148 :
149 : try {
150 : // Perform one final collection and export
151 58 : await _collectAndExport();
152 116 : return await _exporter.shutdown();
153 : } catch (e) {
154 0 : print('Error during shutdown: $e');
155 : return false;
156 : }
157 : }
158 : }
|