Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'dart:async';
5 : import 'dart:io';
6 : import 'dart:math';
7 : import 'dart:typed_data';
8 :
9 : import 'package:dartastic_opentelemetry/dartastic_opentelemetry.dart';
10 : import 'package:dartastic_opentelemetry/src/metrics/export/otlp/metric_transformer.dart';
11 : import 'package:http/http.dart' as http;
12 : import 'package:http/io_client.dart';
13 :
14 : import '../../../../../proto/collector/metrics/v1/metrics_service.pb.dart';
15 : import '../../../../../proto/metrics/v1/metrics.pb.dart' as metrics_pb;
16 : import '../../../../trace/export/otlp/certificate_utils.dart';
17 : import '../../../../util/zip/gzip.dart';
18 :
19 : /// An OpenTelemetry metric exporter that exports metrics using OTLP over HTTP/protobuf
20 : class OtlpHttpMetricExporter implements MetricExporter {
21 : static const _retryableStatusCodes = [
22 : 429, // Too Many Requests
23 : 503, // Service Unavailable
24 : ];
25 :
26 : final OtlpHttpMetricExporterConfig _config;
27 : bool _isShutdown = false;
28 : final Random _random = Random();
29 : final List<Future<void>> _pendingExports = [];
30 : late final http.Client _client;
31 :
32 : /// Creates a new OTLP HTTP metric exporter with the specified configuration.
33 : /// If no configuration is provided, default settings will be used.
34 : ///
35 : /// @param config Optional configuration for the exporter
36 0 : OtlpHttpMetricExporter([OtlpHttpMetricExporterConfig? config])
37 0 : : _config = config ?? OtlpHttpMetricExporterConfig() {
38 0 : _client = _createHttpClient();
39 : }
40 :
41 : /// Creates an HTTP client with custom certificates if configured.
42 : ///
43 : /// This method creates an HttpClient with a SecurityContext configured
44 : /// with any custom certificates specified in the exporter configuration.
45 0 : http.Client _createHttpClient() {
46 : // If no certificates are configured, use the default client
47 0 : if (_config.certificate == null &&
48 0 : _config.clientKey == null &&
49 0 : _config.clientCertificate == null) {
50 0 : return http.Client();
51 : }
52 :
53 : try {
54 0 : final context = CertificateUtils.createSecurityContext(
55 0 : certificate: _config.certificate,
56 0 : clientKey: _config.clientKey,
57 0 : clientCertificate: _config.clientCertificate,
58 : );
59 :
60 : if (context == null) {
61 0 : return http.Client();
62 : }
63 :
64 : // Create an HttpClient with the custom SecurityContext
65 0 : final httpClient = HttpClient(context: context);
66 :
67 : // Wrap in IOClient for use with the http package
68 0 : return IOClient(httpClient);
69 : } catch (e) {
70 0 : if (OTelLog.isError()) {
71 0 : OTelLog.error(
72 0 : 'OtlpHttpMetricExporter: Failed to create HTTP client with certificates: $e');
73 : }
74 : // Fall back to default client on error
75 0 : return http.Client();
76 : }
77 : }
78 :
79 0 : Duration _calculateJitteredDelay(int retries) {
80 0 : final baseMs = _config.baseDelay.inMilliseconds;
81 0 : final delay = baseMs * pow(2, retries);
82 0 : final jitter = _random.nextDouble() * delay;
83 0 : return Duration(milliseconds: (delay + jitter).toInt());
84 : }
85 :
86 0 : String _getEndpointUrl() {
87 : // Ensure the endpoint ends with /v1/metrics
88 0 : String endpoint = _config.endpoint;
89 0 : if (!endpoint.endsWith('/v1/metrics')) {
90 : // Ensure there's no trailing slash before adding path
91 0 : if (endpoint.endsWith('/')) {
92 0 : endpoint = endpoint.substring(0, endpoint.length - 1);
93 : }
94 0 : endpoint = '$endpoint/v1/metrics';
95 : }
96 : return endpoint;
97 : }
98 :
99 0 : @override
100 : Future<bool> export(MetricData metrics) async {
101 0 : if (_isShutdown) {
102 0 : throw StateError('Exporter is shutdown');
103 : }
104 :
105 0 : if (metrics.metrics.isEmpty) {
106 0 : if (OTelLog.isDebug()) {
107 0 : OTelLog.debug('OtlpHttpMetricExporter: No metrics to export');
108 : }
109 : return true;
110 : }
111 :
112 0 : if (OTelLog.isDebug()) {
113 0 : OTelLog.debug(
114 0 : 'OtlpHttpMetricExporter: Beginning export of ${metrics.metrics.length} metrics');
115 : }
116 :
117 : try {
118 0 : final result = await _export(metrics);
119 0 : if (OTelLog.isDebug()) {
120 0 : OTelLog.debug('OtlpHttpMetricExporter: Export completed successfully');
121 : }
122 : return result;
123 : } catch (e) {
124 0 : if (_isShutdown &&
125 0 : e is StateError &&
126 0 : e.message.contains('shut down during')) {
127 : // Gracefully handle the case where shutdown interrupted the export
128 0 : if (OTelLog.isDebug()) {
129 0 : OTelLog.debug(
130 : 'OtlpHttpMetricExporter: Export was interrupted by shutdown, suppressing error');
131 : }
132 : return false;
133 : } else {
134 : // Re-throw other errors
135 : rethrow;
136 : }
137 : }
138 : }
139 :
140 0 : Future<bool> _export(MetricData metrics) async {
141 0 : if (_isShutdown) {
142 0 : throw StateError('Exporter was shut down during export');
143 : }
144 :
145 0 : if (OTelLog.isDebug()) {
146 0 : OTelLog.debug(
147 0 : 'OtlpHttpMetricExporter: Attempting to export ${metrics.metrics.length} metrics to ${_config.endpoint}');
148 : }
149 :
150 : var attempts = 0;
151 0 : final maxAttempts = _config.maxRetries + 1; // Initial attempt + retries
152 :
153 0 : while (attempts < maxAttempts) {
154 : // Allow the export to continue even during shutdown, so we complete in-flight requests
155 0 : final wasShutdownDuringRetry = _isShutdown;
156 :
157 : try {
158 : // Only check for shutdown on retry attempts to ensure in-progress exports can complete
159 0 : if (wasShutdownDuringRetry && attempts > 0) {
160 0 : if (OTelLog.isDebug()) {
161 0 : OTelLog.debug(
162 : 'OtlpHttpMetricExporter: Export interrupted by shutdown');
163 : }
164 0 : throw StateError('Exporter was shut down during export');
165 : }
166 :
167 0 : final success = await _tryExport(metrics);
168 0 : if (OTelLog.isDebug()) {
169 0 : OTelLog.debug(
170 : 'OtlpHttpMetricExporter: Successfully exported metrics');
171 : }
172 : return success;
173 0 : } on http.ClientException catch (e, stackTrace) {
174 0 : if (OTelLog.isError()) {
175 0 : OTelLog.error('OtlpHttpMetricExporter: HTTP error during export: $e');
176 : }
177 0 : if (OTelLog.isError()) OTelLog.error('Stack trace: $stackTrace');
178 :
179 : // Check if the exporter was shut down while we were waiting
180 : if (wasShutdownDuringRetry) {
181 0 : if (OTelLog.isError()) {
182 0 : OTelLog.error(
183 : 'OtlpHttpMetricExporter: Export interrupted by shutdown');
184 : }
185 0 : throw StateError('Exporter was shut down during export');
186 : }
187 :
188 : // Handle status code-based retries
189 : bool shouldRetry = false;
190 0 : if (e.message.contains('status code')) {
191 0 : for (final code in _retryableStatusCodes) {
192 0 : if (e.message.contains('status code $code')) {
193 : shouldRetry = true;
194 : break;
195 : }
196 : }
197 : }
198 :
199 : if (!shouldRetry) {
200 0 : if (OTelLog.isError()) {
201 0 : OTelLog.error(
202 : 'OtlpHttpMetricExporter: Non-retryable HTTP error, stopping retry attempts');
203 : }
204 : return false;
205 : }
206 :
207 0 : if (attempts >= maxAttempts - 1) {
208 0 : if (OTelLog.isError()) {
209 0 : OTelLog.error(
210 0 : 'OtlpHttpMetricExporter: Max attempts reached ($attempts out of $maxAttempts), giving up');
211 : }
212 : return false;
213 : }
214 :
215 0 : final delay = _calculateJitteredDelay(attempts);
216 0 : if (OTelLog.isDebug()) {
217 0 : OTelLog.debug(
218 0 : 'OtlpHttpMetricExporter: Retrying export after ${delay.inMilliseconds}ms...');
219 : }
220 0 : await Future<void>.delayed(delay);
221 0 : attempts++;
222 : } catch (e, stackTrace) {
223 0 : if (OTelLog.isError()) {
224 0 : OTelLog.error(
225 0 : 'OtlpHttpMetricExporter: Unexpected error during export: $e');
226 : }
227 0 : if (OTelLog.isError()) OTelLog.error('Stack trace: $stackTrace');
228 :
229 : // Check if we should stop retrying due to shutdown
230 : if (wasShutdownDuringRetry) {
231 0 : throw StateError('Exporter was shut down during export');
232 : }
233 :
234 0 : if (attempts >= maxAttempts - 1) {
235 : return false;
236 : }
237 :
238 0 : final delay = _calculateJitteredDelay(attempts);
239 0 : if (OTelLog.isDebug()) {
240 0 : OTelLog.debug(
241 0 : 'OtlpHttpMetricExporter: Retrying export after ${delay.inMilliseconds}ms...');
242 : }
243 0 : await Future<void>.delayed(delay);
244 0 : attempts++;
245 : }
246 : }
247 :
248 : return false;
249 : }
250 :
251 0 : Future<bool> _tryExport(MetricData metrics) async {
252 0 : if (_isShutdown) {
253 0 : throw StateError('Exporter is shutdown');
254 : }
255 :
256 0 : if (OTelLog.isLogMetrics()) {
257 0 : OTelLog.logMetric(
258 0 : "Exporting metrics via HTTP: ${metrics.metrics.length} metrics");
259 : }
260 :
261 0 : if (OTelLog.isDebug()) {
262 0 : OTelLog.debug(
263 0 : 'OtlpHttpMetricExporter: Preparing to export ${metrics.metrics.length} metrics');
264 : }
265 :
266 0 : if (OTelLog.isDebug()) {
267 0 : OTelLog.debug('OtlpHttpMetricExporter: Transforming metrics');
268 : }
269 :
270 : // Create the export request
271 0 : final request = ExportMetricsServiceRequest();
272 0 : final resourceMetrics = metrics_pb.ResourceMetrics();
273 :
274 : // Add resource
275 0 : if (metrics.resource != null) {
276 0 : resourceMetrics.resource =
277 0 : MetricTransformer.transformResource(metrics.resource!);
278 : } else {
279 0 : resourceMetrics.resource =
280 0 : MetricTransformer.transformResource(OTel.resource(null));
281 : }
282 :
283 : // Create scope metrics
284 0 : final scopeMetrics = metrics_pb.ScopeMetrics();
285 :
286 : // Add instrumentation scope
287 0 : scopeMetrics.scope.name = '@dart/dartastic_opentelemetry';
288 0 : scopeMetrics.scope.version = '1.0.0';
289 :
290 : // Add metrics to scope
291 0 : for (final metric in metrics.metrics) {
292 0 : scopeMetrics.metrics.add(MetricTransformer.transformMetric(metric));
293 : }
294 :
295 : // Add scope metrics to resource metrics
296 0 : resourceMetrics.scopeMetrics.add(scopeMetrics);
297 :
298 : // Add resource metrics to request
299 0 : request.resourceMetrics.add(resourceMetrics);
300 :
301 0 : if (OTelLog.isDebug()) {
302 0 : OTelLog.debug('OtlpHttpMetricExporter: Successfully transformed metrics');
303 : }
304 :
305 : // Prepare headers
306 0 : final headers = Map<String, String>.from(_config.headers);
307 0 : headers['Content-Type'] = 'application/x-protobuf';
308 :
309 0 : if (_config.compression) {
310 0 : headers['Content-Encoding'] = 'gzip';
311 : }
312 :
313 : // Convert protobuf to bytes
314 0 : final Uint8List messageBytes = request.writeToBuffer();
315 : Uint8List bodyBytes = messageBytes;
316 :
317 : // Apply gzip compression if configured
318 0 : if (_config.compression) {
319 0 : final gzip = GZip();
320 0 : final compressedBytes = await gzip.compress(messageBytes);
321 0 : bodyBytes = Uint8List.fromList(compressedBytes);
322 : }
323 :
324 : // Get the endpoint URL with the correct path
325 0 : final endpointUrl = _getEndpointUrl();
326 0 : if (OTelLog.isDebug()) {
327 0 : OTelLog.debug(
328 0 : 'OtlpHttpMetricExporter: Sending export request to $endpointUrl');
329 : }
330 :
331 : try {
332 0 : final http.Response response = await _client
333 0 : .post(
334 0 : Uri.parse(endpointUrl),
335 : headers: headers,
336 : body: bodyBytes,
337 : )
338 0 : .timeout(_config.timeout);
339 :
340 0 : if (response.statusCode >= 200 && response.statusCode < 300) {
341 0 : if (OTelLog.isDebug()) {
342 0 : OTelLog.debug(
343 : 'OtlpHttpMetricExporter: Export request completed successfully');
344 : }
345 : return true;
346 : } else {
347 : final String errorMessage =
348 0 : 'OtlpHttpMetricExporter: Export request failed with status code ${response.statusCode}';
349 0 : if (OTelLog.isError()) OTelLog.error(errorMessage);
350 0 : throw http.ClientException(errorMessage);
351 : }
352 : } catch (e, stackTrace) {
353 0 : if (OTelLog.isError()) {
354 0 : OTelLog.error('OtlpHttpMetricExporter: Export request failed: $e');
355 0 : OTelLog.error('Stack trace: $stackTrace');
356 : }
357 : return false;
358 : }
359 : }
360 :
361 0 : @override
362 : Future<bool> forceFlush() async {
363 0 : if (OTelLog.isDebug()) {
364 0 : OTelLog.debug('OtlpHttpMetricExporter: Force flush requested');
365 : }
366 0 : if (_isShutdown) {
367 0 : if (OTelLog.isDebug()) {
368 0 : OTelLog.debug(
369 : 'OtlpHttpMetricExporter: Exporter is already shut down, nothing to flush');
370 : }
371 : return true;
372 : }
373 :
374 : // Wait for any pending export operations to complete
375 0 : if (_pendingExports.isNotEmpty) {
376 0 : if (OTelLog.isDebug()) {
377 0 : OTelLog.debug(
378 0 : 'OtlpHttpMetricExporter: Waiting for ${_pendingExports.length} pending exports to complete');
379 : }
380 : try {
381 0 : await Future.wait(_pendingExports);
382 0 : if (OTelLog.isDebug()) {
383 0 : OTelLog.debug(
384 : 'OtlpHttpMetricExporter: All pending exports completed');
385 : }
386 : return true;
387 : } catch (e) {
388 0 : if (OTelLog.isError()) {
389 0 : OTelLog.error('OtlpHttpMetricExporter: Error during force flush: $e');
390 : }
391 : return false;
392 : }
393 : } else {
394 0 : if (OTelLog.isDebug()) {
395 0 : OTelLog.debug('OtlpHttpMetricExporter: No pending exports to flush');
396 : }
397 : return true;
398 : }
399 : }
400 :
401 0 : @override
402 : Future<bool> shutdown() async {
403 0 : if (OTelLog.isDebug()) {
404 0 : OTelLog.debug('OtlpHttpMetricExporter: Shutdown requested');
405 : }
406 0 : if (_isShutdown) {
407 : return true;
408 : }
409 0 : if (OTelLog.isDebug()) {
410 0 : OTelLog.debug(
411 0 : 'OtlpHttpMetricExporter: Shutting down - waiting for ${_pendingExports.length} pending exports');
412 : }
413 :
414 : // Set shutdown flag first
415 0 : _isShutdown = true;
416 :
417 : // Create a safe copy of pending exports to avoid concurrent modification
418 0 : final pendingExportsCopy = List<Future<void>>.of(_pendingExports);
419 :
420 : // Wait for pending exports but don't start any new ones
421 : // Use a timeout to prevent hanging if exports take too long
422 0 : if (pendingExportsCopy.isNotEmpty) {
423 0 : if (OTelLog.isDebug()) {
424 0 : OTelLog.debug(
425 0 : 'OtlpHttpMetricExporter: Waiting for ${pendingExportsCopy.length} pending exports with timeout');
426 : }
427 : try {
428 : // Use a generous timeout but don't wait forever
429 0 : await Future.wait(pendingExportsCopy)
430 0 : .timeout(const Duration(seconds: 10), onTimeout: () {
431 0 : if (OTelLog.isDebug()) {
432 0 : OTelLog.debug(
433 : 'OtlpHttpMetricExporter: Timeout waiting for exports to complete');
434 : }
435 0 : return Future.value([]);
436 : });
437 : } catch (e) {
438 0 : if (OTelLog.isDebug()) {
439 0 : OTelLog.debug(
440 0 : 'OtlpHttpMetricExporter: Error during shutdown while waiting for exports: $e');
441 : }
442 : // Don't return false here - we still want to close the client
443 : }
444 : }
445 :
446 : // Close the HTTP client to release resources
447 0 : _client.close();
448 :
449 0 : if (OTelLog.isDebug()) {
450 0 : OTelLog.debug('OtlpHttpMetricExporter: Shutdown complete');
451 : }
452 : return true;
453 : }
454 : }
|