LCOV - code coverage report
Current view: top level - src/metrics/export/otlp/http - otlp_http_metric_exporter.dart (source / functions) Coverage Total Hit
Test: lcov.info Lines: 0.0 % 191 0
Test Date: 2025-11-15 13:23:01 Functions: - 0 0

            Line data    Source code
       1              : // Licensed under the Apache License, Version 2.0
       2              : // Copyright 2025, Michael Bushe, All rights reserved.
       3              : 
       4              : import 'dart:async';
       5              : import 'dart:io';
       6              : import 'dart:math';
       7              : import 'dart:typed_data';
       8              : 
       9              : import 'package:dartastic_opentelemetry/dartastic_opentelemetry.dart';
      10              : import 'package:dartastic_opentelemetry/src/metrics/export/otlp/metric_transformer.dart';
      11              : import 'package:http/http.dart' as http;
      12              : import 'package:http/io_client.dart';
      13              : 
      14              : import '../../../../../proto/collector/metrics/v1/metrics_service.pb.dart';
      15              : import '../../../../../proto/metrics/v1/metrics.pb.dart' as metrics_pb;
      16              : import '../../../../trace/export/otlp/certificate_utils.dart';
      17              : import '../../../../util/zip/gzip.dart';
      18              : 
      19              : /// An OpenTelemetry metric exporter that exports metrics using OTLP over HTTP/protobuf
      20              : class OtlpHttpMetricExporter implements MetricExporter {
      21              :   static const _retryableStatusCodes = [
      22              :     429, // Too Many Requests
      23              :     503, // Service Unavailable
      24              :   ];
      25              : 
      26              :   final OtlpHttpMetricExporterConfig _config;
      27              :   bool _isShutdown = false;
      28              :   final Random _random = Random();
      29              :   final List<Future<void>> _pendingExports = [];
      30              :   late final http.Client _client;
      31              : 
      32              :   /// Creates a new OTLP HTTP metric exporter with the specified configuration.
      33              :   /// If no configuration is provided, default settings will be used.
      34              :   ///
      35              :   /// @param config Optional configuration for the exporter
      36            0 :   OtlpHttpMetricExporter([OtlpHttpMetricExporterConfig? config])
      37            0 :       : _config = config ?? OtlpHttpMetricExporterConfig() {
      38            0 :     _client = _createHttpClient();
      39              :   }
      40              : 
      41              :   /// Creates an HTTP client with custom certificates if configured.
      42              :   ///
      43              :   /// This method creates an HttpClient with a SecurityContext configured
      44              :   /// with any custom certificates specified in the exporter configuration.
      45            0 :   http.Client _createHttpClient() {
      46              :     // If no certificates are configured, use the default client
      47            0 :     if (_config.certificate == null &&
      48            0 :         _config.clientKey == null &&
      49            0 :         _config.clientCertificate == null) {
      50            0 :       return http.Client();
      51              :     }
      52              : 
      53              :     try {
      54            0 :       final context = CertificateUtils.createSecurityContext(
      55            0 :         certificate: _config.certificate,
      56            0 :         clientKey: _config.clientKey,
      57            0 :         clientCertificate: _config.clientCertificate,
      58              :       );
      59              : 
      60              :       if (context == null) {
      61            0 :         return http.Client();
      62              :       }
      63              : 
      64              :       // Create an HttpClient with the custom SecurityContext
      65            0 :       final httpClient = HttpClient(context: context);
      66              : 
      67              :       // Wrap in IOClient for use with the http package
      68            0 :       return IOClient(httpClient);
      69              :     } catch (e) {
      70            0 :       if (OTelLog.isError()) {
      71            0 :         OTelLog.error(
      72            0 :             'OtlpHttpMetricExporter: Failed to create HTTP client with certificates: $e');
      73              :       }
      74              :       // Fall back to default client on error
      75            0 :       return http.Client();
      76              :     }
      77              :   }
      78              : 
      79            0 :   Duration _calculateJitteredDelay(int retries) {
      80            0 :     final baseMs = _config.baseDelay.inMilliseconds;
      81            0 :     final delay = baseMs * pow(2, retries);
      82            0 :     final jitter = _random.nextDouble() * delay;
      83            0 :     return Duration(milliseconds: (delay + jitter).toInt());
      84              :   }
      85              : 
      86            0 :   String _getEndpointUrl() {
      87              :     // Ensure the endpoint ends with /v1/metrics
      88            0 :     String endpoint = _config.endpoint;
      89            0 :     if (!endpoint.endsWith('/v1/metrics')) {
      90              :       // Ensure there's no trailing slash before adding path
      91            0 :       if (endpoint.endsWith('/')) {
      92            0 :         endpoint = endpoint.substring(0, endpoint.length - 1);
      93              :       }
      94            0 :       endpoint = '$endpoint/v1/metrics';
      95              :     }
      96              :     return endpoint;
      97              :   }
      98              : 
      99            0 :   @override
     100              :   Future<bool> export(MetricData metrics) async {
     101            0 :     if (_isShutdown) {
     102            0 :       throw StateError('Exporter is shutdown');
     103              :     }
     104              : 
     105            0 :     if (metrics.metrics.isEmpty) {
     106            0 :       if (OTelLog.isDebug()) {
     107            0 :         OTelLog.debug('OtlpHttpMetricExporter: No metrics to export');
     108              :       }
     109              :       return true;
     110              :     }
     111              : 
     112            0 :     if (OTelLog.isDebug()) {
     113            0 :       OTelLog.debug(
     114            0 :           'OtlpHttpMetricExporter: Beginning export of ${metrics.metrics.length} metrics');
     115              :     }
     116              : 
     117              :     try {
     118            0 :       final result = await _export(metrics);
     119            0 :       if (OTelLog.isDebug()) {
     120            0 :         OTelLog.debug('OtlpHttpMetricExporter: Export completed successfully');
     121              :       }
     122              :       return result;
     123              :     } catch (e) {
     124            0 :       if (_isShutdown &&
     125            0 :           e is StateError &&
     126            0 :           e.message.contains('shut down during')) {
     127              :         // Gracefully handle the case where shutdown interrupted the export
     128            0 :         if (OTelLog.isDebug()) {
     129            0 :           OTelLog.debug(
     130              :               'OtlpHttpMetricExporter: Export was interrupted by shutdown, suppressing error');
     131              :         }
     132              :         return false;
     133              :       } else {
     134              :         // Re-throw other errors
     135              :         rethrow;
     136              :       }
     137              :     }
     138              :   }
     139              : 
     140            0 :   Future<bool> _export(MetricData metrics) async {
     141            0 :     if (_isShutdown) {
     142            0 :       throw StateError('Exporter was shut down during export');
     143              :     }
     144              : 
     145            0 :     if (OTelLog.isDebug()) {
     146            0 :       OTelLog.debug(
     147            0 :           'OtlpHttpMetricExporter: Attempting to export ${metrics.metrics.length} metrics to ${_config.endpoint}');
     148              :     }
     149              : 
     150              :     var attempts = 0;
     151            0 :     final maxAttempts = _config.maxRetries + 1; // Initial attempt + retries
     152              : 
     153            0 :     while (attempts < maxAttempts) {
     154              :       // Allow the export to continue even during shutdown, so we complete in-flight requests
     155            0 :       final wasShutdownDuringRetry = _isShutdown;
     156              : 
     157              :       try {
     158              :         // Only check for shutdown on retry attempts to ensure in-progress exports can complete
     159            0 :         if (wasShutdownDuringRetry && attempts > 0) {
     160            0 :           if (OTelLog.isDebug()) {
     161            0 :             OTelLog.debug(
     162              :                 'OtlpHttpMetricExporter: Export interrupted by shutdown');
     163              :           }
     164            0 :           throw StateError('Exporter was shut down during export');
     165              :         }
     166              : 
     167            0 :         final success = await _tryExport(metrics);
     168            0 :         if (OTelLog.isDebug()) {
     169            0 :           OTelLog.debug(
     170              :               'OtlpHttpMetricExporter: Successfully exported metrics');
     171              :         }
     172              :         return success;
     173            0 :       } on http.ClientException catch (e, stackTrace) {
     174            0 :         if (OTelLog.isError()) {
     175            0 :           OTelLog.error('OtlpHttpMetricExporter: HTTP error during export: $e');
     176              :         }
     177            0 :         if (OTelLog.isError()) OTelLog.error('Stack trace: $stackTrace');
     178              : 
     179              :         // Check if the exporter was shut down while we were waiting
     180              :         if (wasShutdownDuringRetry) {
     181            0 :           if (OTelLog.isError()) {
     182            0 :             OTelLog.error(
     183              :                 'OtlpHttpMetricExporter: Export interrupted by shutdown');
     184              :           }
     185            0 :           throw StateError('Exporter was shut down during export');
     186              :         }
     187              : 
     188              :         // Handle status code-based retries
     189              :         bool shouldRetry = false;
     190            0 :         if (e.message.contains('status code')) {
     191            0 :           for (final code in _retryableStatusCodes) {
     192            0 :             if (e.message.contains('status code $code')) {
     193              :               shouldRetry = true;
     194              :               break;
     195              :             }
     196              :           }
     197              :         }
     198              : 
     199              :         if (!shouldRetry) {
     200            0 :           if (OTelLog.isError()) {
     201            0 :             OTelLog.error(
     202              :                 'OtlpHttpMetricExporter: Non-retryable HTTP error, stopping retry attempts');
     203              :           }
     204              :           return false;
     205              :         }
     206              : 
     207            0 :         if (attempts >= maxAttempts - 1) {
     208            0 :           if (OTelLog.isError()) {
     209            0 :             OTelLog.error(
     210            0 :                 'OtlpHttpMetricExporter: Max attempts reached ($attempts out of $maxAttempts), giving up');
     211              :           }
     212              :           return false;
     213              :         }
     214              : 
     215            0 :         final delay = _calculateJitteredDelay(attempts);
     216            0 :         if (OTelLog.isDebug()) {
     217            0 :           OTelLog.debug(
     218            0 :               'OtlpHttpMetricExporter: Retrying export after ${delay.inMilliseconds}ms...');
     219              :         }
     220            0 :         await Future<void>.delayed(delay);
     221            0 :         attempts++;
     222              :       } catch (e, stackTrace) {
     223            0 :         if (OTelLog.isError()) {
     224            0 :           OTelLog.error(
     225            0 :               'OtlpHttpMetricExporter: Unexpected error during export: $e');
     226              :         }
     227            0 :         if (OTelLog.isError()) OTelLog.error('Stack trace: $stackTrace');
     228              : 
     229              :         // Check if we should stop retrying due to shutdown
     230              :         if (wasShutdownDuringRetry) {
     231            0 :           throw StateError('Exporter was shut down during export');
     232              :         }
     233              : 
     234            0 :         if (attempts >= maxAttempts - 1) {
     235              :           return false;
     236              :         }
     237              : 
     238            0 :         final delay = _calculateJitteredDelay(attempts);
     239            0 :         if (OTelLog.isDebug()) {
     240            0 :           OTelLog.debug(
     241            0 :               'OtlpHttpMetricExporter: Retrying export after ${delay.inMilliseconds}ms...');
     242              :         }
     243            0 :         await Future<void>.delayed(delay);
     244            0 :         attempts++;
     245              :       }
     246              :     }
     247              : 
     248              :     return false;
     249              :   }
     250              : 
     251            0 :   Future<bool> _tryExport(MetricData metrics) async {
     252            0 :     if (_isShutdown) {
     253            0 :       throw StateError('Exporter is shutdown');
     254              :     }
     255              : 
     256            0 :     if (OTelLog.isLogMetrics()) {
     257            0 :       OTelLog.logMetric(
     258            0 :           "Exporting metrics via HTTP: ${metrics.metrics.length} metrics");
     259              :     }
     260              : 
     261            0 :     if (OTelLog.isDebug()) {
     262            0 :       OTelLog.debug(
     263            0 :           'OtlpHttpMetricExporter: Preparing to export ${metrics.metrics.length} metrics');
     264              :     }
     265              : 
     266            0 :     if (OTelLog.isDebug()) {
     267            0 :       OTelLog.debug('OtlpHttpMetricExporter: Transforming metrics');
     268              :     }
     269              : 
     270              :     // Create the export request
     271            0 :     final request = ExportMetricsServiceRequest();
     272            0 :     final resourceMetrics = metrics_pb.ResourceMetrics();
     273              : 
     274              :     // Add resource
     275            0 :     if (metrics.resource != null) {
     276            0 :       resourceMetrics.resource =
     277            0 :           MetricTransformer.transformResource(metrics.resource!);
     278              :     } else {
     279            0 :       resourceMetrics.resource =
     280            0 :           MetricTransformer.transformResource(OTel.resource(null));
     281              :     }
     282              : 
     283              :     // Create scope metrics
     284            0 :     final scopeMetrics = metrics_pb.ScopeMetrics();
     285              : 
     286              :     // Add instrumentation scope
     287            0 :     scopeMetrics.scope.name = '@dart/dartastic_opentelemetry';
     288            0 :     scopeMetrics.scope.version = '1.0.0';
     289              : 
     290              :     // Add metrics to scope
     291            0 :     for (final metric in metrics.metrics) {
     292            0 :       scopeMetrics.metrics.add(MetricTransformer.transformMetric(metric));
     293              :     }
     294              : 
     295              :     // Add scope metrics to resource metrics
     296            0 :     resourceMetrics.scopeMetrics.add(scopeMetrics);
     297              : 
     298              :     // Add resource metrics to request
     299            0 :     request.resourceMetrics.add(resourceMetrics);
     300              : 
     301            0 :     if (OTelLog.isDebug()) {
     302            0 :       OTelLog.debug('OtlpHttpMetricExporter: Successfully transformed metrics');
     303              :     }
     304              : 
     305              :     // Prepare headers
     306            0 :     final headers = Map<String, String>.from(_config.headers);
     307            0 :     headers['Content-Type'] = 'application/x-protobuf';
     308              : 
     309            0 :     if (_config.compression) {
     310            0 :       headers['Content-Encoding'] = 'gzip';
     311              :     }
     312              : 
     313              :     // Convert protobuf to bytes
     314            0 :     final Uint8List messageBytes = request.writeToBuffer();
     315              :     Uint8List bodyBytes = messageBytes;
     316              : 
     317              :     // Apply gzip compression if configured
     318            0 :     if (_config.compression) {
     319            0 :       final gzip = GZip();
     320            0 :       final compressedBytes = await gzip.compress(messageBytes);
     321            0 :       bodyBytes = Uint8List.fromList(compressedBytes);
     322              :     }
     323              : 
     324              :     // Get the endpoint URL with the correct path
     325            0 :     final endpointUrl = _getEndpointUrl();
     326            0 :     if (OTelLog.isDebug()) {
     327            0 :       OTelLog.debug(
     328            0 :           'OtlpHttpMetricExporter: Sending export request to $endpointUrl');
     329              :     }
     330              : 
     331              :     try {
     332            0 :       final http.Response response = await _client
     333            0 :           .post(
     334            0 :             Uri.parse(endpointUrl),
     335              :             headers: headers,
     336              :             body: bodyBytes,
     337              :           )
     338            0 :           .timeout(_config.timeout);
     339              : 
     340            0 :       if (response.statusCode >= 200 && response.statusCode < 300) {
     341            0 :         if (OTelLog.isDebug()) {
     342            0 :           OTelLog.debug(
     343              :               'OtlpHttpMetricExporter: Export request completed successfully');
     344              :         }
     345              :         return true;
     346              :       } else {
     347              :         final String errorMessage =
     348            0 :             'OtlpHttpMetricExporter: Export request failed with status code ${response.statusCode}';
     349            0 :         if (OTelLog.isError()) OTelLog.error(errorMessage);
     350            0 :         throw http.ClientException(errorMessage);
     351              :       }
     352              :     } catch (e, stackTrace) {
     353            0 :       if (OTelLog.isError()) {
     354            0 :         OTelLog.error('OtlpHttpMetricExporter: Export request failed: $e');
     355            0 :         OTelLog.error('Stack trace: $stackTrace');
     356              :       }
     357              :       return false;
     358              :     }
     359              :   }
     360              : 
     361            0 :   @override
     362              :   Future<bool> forceFlush() async {
     363            0 :     if (OTelLog.isDebug()) {
     364            0 :       OTelLog.debug('OtlpHttpMetricExporter: Force flush requested');
     365              :     }
     366            0 :     if (_isShutdown) {
     367            0 :       if (OTelLog.isDebug()) {
     368            0 :         OTelLog.debug(
     369              :             'OtlpHttpMetricExporter: Exporter is already shut down, nothing to flush');
     370              :       }
     371              :       return true;
     372              :     }
     373              : 
     374              :     // Wait for any pending export operations to complete
     375            0 :     if (_pendingExports.isNotEmpty) {
     376            0 :       if (OTelLog.isDebug()) {
     377            0 :         OTelLog.debug(
     378            0 :             'OtlpHttpMetricExporter: Waiting for ${_pendingExports.length} pending exports to complete');
     379              :       }
     380              :       try {
     381            0 :         await Future.wait(_pendingExports);
     382            0 :         if (OTelLog.isDebug()) {
     383            0 :           OTelLog.debug(
     384              :               'OtlpHttpMetricExporter: All pending exports completed');
     385              :         }
     386              :         return true;
     387              :       } catch (e) {
     388            0 :         if (OTelLog.isError()) {
     389            0 :           OTelLog.error('OtlpHttpMetricExporter: Error during force flush: $e');
     390              :         }
     391              :         return false;
     392              :       }
     393              :     } else {
     394            0 :       if (OTelLog.isDebug()) {
     395            0 :         OTelLog.debug('OtlpHttpMetricExporter: No pending exports to flush');
     396              :       }
     397              :       return true;
     398              :     }
     399              :   }
     400              : 
     401            0 :   @override
     402              :   Future<bool> shutdown() async {
     403            0 :     if (OTelLog.isDebug()) {
     404            0 :       OTelLog.debug('OtlpHttpMetricExporter: Shutdown requested');
     405              :     }
     406            0 :     if (_isShutdown) {
     407              :       return true;
     408              :     }
     409            0 :     if (OTelLog.isDebug()) {
     410            0 :       OTelLog.debug(
     411            0 :           'OtlpHttpMetricExporter: Shutting down - waiting for ${_pendingExports.length} pending exports');
     412              :     }
     413              : 
     414              :     // Set shutdown flag first
     415            0 :     _isShutdown = true;
     416              : 
     417              :     // Create a safe copy of pending exports to avoid concurrent modification
     418            0 :     final pendingExportsCopy = List<Future<void>>.of(_pendingExports);
     419              : 
     420              :     // Wait for pending exports but don't start any new ones
     421              :     // Use a timeout to prevent hanging if exports take too long
     422            0 :     if (pendingExportsCopy.isNotEmpty) {
     423            0 :       if (OTelLog.isDebug()) {
     424            0 :         OTelLog.debug(
     425            0 :             'OtlpHttpMetricExporter: Waiting for ${pendingExportsCopy.length} pending exports with timeout');
     426              :       }
     427              :       try {
     428              :         // Use a generous timeout but don't wait forever
     429            0 :         await Future.wait(pendingExportsCopy)
     430            0 :             .timeout(const Duration(seconds: 10), onTimeout: () {
     431            0 :           if (OTelLog.isDebug()) {
     432            0 :             OTelLog.debug(
     433              :                 'OtlpHttpMetricExporter: Timeout waiting for exports to complete');
     434              :           }
     435            0 :           return Future.value([]);
     436              :         });
     437              :       } catch (e) {
     438            0 :         if (OTelLog.isDebug()) {
     439            0 :           OTelLog.debug(
     440            0 :               'OtlpHttpMetricExporter: Error during shutdown while waiting for exports: $e');
     441              :         }
     442              :         // Don't return false here - we still want to close the client
     443              :       }
     444              :     }
     445              : 
     446              :     // Close the HTTP client to release resources
     447            0 :     _client.close();
     448              : 
     449            0 :     if (OTelLog.isDebug()) {
     450            0 :       OTelLog.debug('OtlpHttpMetricExporter: Shutdown complete');
     451              :     }
     452              :     return true;
     453              :   }
     454              : }
        

Generated by: LCOV version 2.0-1