Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'dart:async';
5 : import 'dart:io';
6 : import 'dart:math';
7 : import 'dart:typed_data';
8 :
9 : import 'package:dartastic_opentelemetry/src/trace/span.dart';
10 : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart'
11 : show OTelLog;
12 : import 'package:http/http.dart' as http;
13 : import 'package:http/io_client.dart';
14 :
15 : import '../../../../util/zip/gzip.dart';
16 : import '../../../span_logger.dart';
17 : import '../../span_exporter.dart';
18 : import '../certificate_utils.dart';
19 : import '../span_transformer.dart';
20 : import 'otlp_http_span_exporter_config.dart';
21 :
22 : /// An OpenTelemetry span exporter that exports spans using OTLP over HTTP/protobuf
23 : class OtlpHttpSpanExporter implements SpanExporter {
24 : static const _retryableStatusCodes = [
25 : 429, // Too Many Requests
26 : 503, // Service Unavailable
27 : ];
28 :
29 : final OtlpHttpExporterConfig _config;
30 : bool _isShutdown = false;
31 : final Random _random = Random();
32 : final List<Future<void>> _pendingExports = [];
33 : late final http.Client _client;
34 :
35 : /// Creates a new OTLP HTTP span exporter with the specified configuration.
36 : /// If no configuration is provided, default settings will be used.
37 : ///
38 : /// @param config Optional configuration for the exporter
39 72 : OtlpHttpSpanExporter([OtlpHttpExporterConfig? config])
40 0 : : _config = config ?? OtlpHttpExporterConfig() {
41 72 : if (OTelLog.isDebug()) {
42 72 : OTelLog.debug(
43 216 : 'OtlpHttpSpanExporter: Created with endpoint: ${_config.endpoint}');
44 72 : OTelLog.debug(
45 288 : 'OtlpHttpSpanExporter: Configured headers count: ${_config.headers.length}');
46 216 : _config.headers.forEach((key, value) {
47 0 : if (key.toLowerCase() == 'authorization') {
48 0 : OTelLog.debug(' $key: [REDACTED - length: ${value.length}]');
49 : } else {
50 0 : OTelLog.debug(' $key: $value');
51 : }
52 : });
53 : }
54 144 : _client = _createHttpClient();
55 : }
56 :
57 : /// Creates an HTTP client with custom certificates if configured.
58 : ///
59 : /// This method creates an HttpClient with a SecurityContext configured
60 : /// with any custom certificates specified in the exporter configuration.
61 72 : http.Client _createHttpClient() {
62 : // If no certificates are configured, use the default client
63 144 : if (_config.certificate == null &&
64 144 : _config.clientKey == null &&
65 144 : _config.clientCertificate == null) {
66 72 : return http.Client();
67 : }
68 :
69 : try {
70 0 : final context = CertificateUtils.createSecurityContext(
71 0 : certificate: _config.certificate,
72 0 : clientKey: _config.clientKey,
73 0 : clientCertificate: _config.clientCertificate,
74 : );
75 :
76 : if (context == null) {
77 0 : return http.Client();
78 : }
79 :
80 : // Create an HttpClient with the custom SecurityContext
81 0 : final httpClient = HttpClient(context: context);
82 :
83 : // Wrap in IOClient for use with the http package
84 0 : return IOClient(httpClient);
85 : } catch (e) {
86 0 : if (OTelLog.isError()) {
87 0 : OTelLog.error(
88 0 : 'OtlpHttpSpanExporter: Failed to create HTTP client with certificates: $e');
89 : }
90 : // Fall back to default client on error
91 0 : return http.Client();
92 : }
93 : }
94 :
95 0 : Duration _calculateJitteredDelay(int retries) {
96 0 : final baseMs = _config.baseDelay.inMilliseconds;
97 0 : final delay = baseMs * pow(2, retries);
98 0 : final jitter = _random.nextDouble() * delay;
99 0 : return Duration(milliseconds: (delay + jitter).toInt());
100 : }
101 :
102 15 : String _getEndpointUrl() {
103 : // Ensure the endpoint ends with /v1/traces
104 30 : String endpoint = _config.endpoint;
105 15 : if (!endpoint.endsWith('/v1/traces')) {
106 : // Ensure there's no trailing slash before adding path
107 15 : if (endpoint.endsWith('/')) {
108 0 : endpoint = endpoint.substring(0, endpoint.length - 1);
109 : }
110 15 : endpoint = '$endpoint/v1/traces';
111 : }
112 : return endpoint;
113 : }
114 :
115 15 : Future<void> _tryExport(List<Span> spans) async {
116 15 : if (_isShutdown) {
117 0 : throw StateError('Exporter is shutdown');
118 : }
119 :
120 15 : if (OTelLog.isLogSpans()) {
121 15 : logSpans(spans, "Exporting spans via HTTP.");
122 : }
123 :
124 15 : if (OTelLog.isDebug()) {
125 15 : OTelLog.debug(
126 30 : 'OtlpHttpSpanExporter: Preparing to export ${spans.length} spans');
127 30 : for (var span in spans) {
128 15 : OTelLog.debug(
129 90 : ' Span: ${span.name}, spanId: ${span.spanContext.spanId}, traceId: ${span.spanContext.traceId}');
130 : }
131 : }
132 :
133 15 : if (OTelLog.isDebug()) {
134 45 : OTelLog.debug('OtlpHttpSpanExporter: Transforming ${spans.length} spans');
135 : }
136 15 : final request = OtlpSpanTransformer.transformSpans(spans);
137 15 : if (OTelLog.isDebug()) {
138 15 : OTelLog.debug('OtlpHttpSpanExporter: Successfully transformed spans');
139 : }
140 :
141 : // Prepare headers
142 45 : final headers = Map<String, String>.from(_config.headers);
143 15 : headers['Content-Type'] = 'application/x-protobuf';
144 :
145 30 : if (_config.compression) {
146 0 : headers['Content-Encoding'] = 'gzip';
147 : }
148 :
149 : // Convert protobuf to bytes
150 15 : final Uint8List messageBytes = request.writeToBuffer();
151 : Uint8List bodyBytes = messageBytes;
152 :
153 : // Apply gzip compression if configured
154 30 : if (_config.compression) {
155 0 : final gZip = GZip();
156 0 : final listInt = await gZip.compress(messageBytes);
157 0 : bodyBytes = Uint8List.fromList(listInt);
158 : }
159 :
160 : // Get the endpoint URL with the correct path
161 15 : final endpointUrl = _getEndpointUrl();
162 15 : if (OTelLog.isDebug()) {
163 15 : OTelLog.debug(
164 15 : 'OtlpHttpSpanExporter: Sending export request to $endpointUrl');
165 15 : OTelLog.debug('OtlpHttpSpanExporter: Request headers:');
166 30 : headers.forEach((key, value) {
167 : // Mask authorization header value for security, but show it exists
168 30 : if (key.toLowerCase() == 'authorization') {
169 0 : OTelLog.debug(' $key: [REDACTED - length: ${value.length}]');
170 : } else {
171 30 : OTelLog.debug(' $key: $value');
172 : }
173 : });
174 : }
175 :
176 : try {
177 15 : final http.Response response = await _client
178 15 : .post(
179 15 : Uri.parse(endpointUrl),
180 : headers: headers,
181 : body: bodyBytes,
182 : )
183 45 : .timeout(_config.timeout);
184 :
185 0 : if (response.statusCode >= 200 && response.statusCode < 300) {
186 0 : if (OTelLog.isDebug()) {
187 0 : OTelLog.debug(
188 : 'OtlpHttpSpanExporter: Export request completed successfully');
189 : }
190 : } else {
191 : final String errorMessage =
192 0 : 'OtlpHttpSpanExporter: Export request failed with status code ${response.statusCode}';
193 0 : if (OTelLog.isError()) OTelLog.error(errorMessage);
194 0 : throw http.ClientException(errorMessage);
195 : }
196 : } catch (e, stackTrace) {
197 15 : if (OTelLog.isError()) {
198 30 : OTelLog.error('OtlpHttpSpanExporter: Export request failed: $e');
199 30 : OTelLog.error('Stack trace: $stackTrace');
200 : }
201 : rethrow;
202 : }
203 : }
204 :
205 15 : @override
206 : Future<void> export(List<Span> spans) async {
207 15 : if (_isShutdown) {
208 0 : throw StateError('Exporter is shutdown');
209 : }
210 :
211 15 : if (spans.isEmpty) {
212 0 : if (OTelLog.isDebug()) {
213 0 : OTelLog.debug('OtlpHttpSpanExporter: No spans to export');
214 : }
215 : return;
216 : }
217 :
218 15 : if (OTelLog.isDebug()) {
219 15 : OTelLog.debug(
220 30 : 'OtlpHttpSpanExporter: Beginning export of ${spans.length} spans');
221 : }
222 15 : final exportFuture = _export(spans);
223 :
224 : // Track the pending export but don't throw if it fails during shutdown
225 30 : _pendingExports.add(exportFuture);
226 : try {
227 : await exportFuture;
228 0 : if (OTelLog.isDebug()) {
229 0 : OTelLog.debug('OtlpHttpSpanExporter: Export completed successfully');
230 : }
231 : } catch (e) {
232 15 : if (_isShutdown &&
233 0 : e is StateError &&
234 0 : e.message.contains('shut down during')) {
235 : // Gracefully handle the case where shutdown interrupted the export
236 0 : if (OTelLog.isDebug()) {
237 0 : OTelLog.debug(
238 : 'OtlpHttpSpanExporter: Export was interrupted by shutdown, suppressing error');
239 : }
240 : } else {
241 : // Re-throw other errors
242 : rethrow;
243 : }
244 : } finally {
245 30 : _pendingExports.remove(exportFuture);
246 : }
247 : }
248 :
249 15 : Future<void> _export(List<Span> spans) async {
250 15 : if (_isShutdown) {
251 0 : throw StateError('Exporter was shut down during export');
252 : }
253 :
254 15 : if (OTelLog.isDebug()) {
255 15 : OTelLog.debug(
256 60 : 'OtlpHttpSpanExporter: Attempting to export ${spans.length} spans to ${_config.endpoint}');
257 : }
258 :
259 : var attempts = 0;
260 45 : final maxAttempts = _config.maxRetries + 1; // Initial attempt + retries
261 :
262 15 : while (attempts < maxAttempts) {
263 : // Allow the export to continue even during shutdown, so we complete in-flight requests
264 15 : final wasShutdownDuringRetry = _isShutdown;
265 :
266 : try {
267 : // Only check for shutdown on retry attempts to ensure in-progress exports can complete
268 0 : if (wasShutdownDuringRetry && attempts > 0) {
269 0 : if (OTelLog.isDebug()) {
270 0 : OTelLog.debug(
271 : 'OtlpHttpSpanExporter: Export interrupted by shutdown');
272 : }
273 0 : throw StateError('Exporter was shut down during export');
274 : }
275 :
276 15 : await _tryExport(spans);
277 0 : if (OTelLog.isDebug()) {
278 0 : OTelLog.debug('OtlpHttpSpanExporter: Successfully exported spans');
279 : }
280 : return;
281 15 : } on http.ClientException catch (e, stackTrace) {
282 15 : if (OTelLog.isError()) {
283 30 : OTelLog.error('OtlpHttpSpanExporter: HTTP error during export: $e');
284 : }
285 45 : if (OTelLog.isError()) OTelLog.error('Stack trace: $stackTrace');
286 :
287 : // Check if the exporter was shut down while we were waiting
288 : if (wasShutdownDuringRetry) {
289 0 : if (OTelLog.isError()) {
290 0 : OTelLog.error(
291 : 'OtlpHttpSpanExporter: Export interrupted by shutdown');
292 : }
293 0 : throw StateError('Exporter was shut down during export');
294 : }
295 :
296 : // Handle status code-based retries
297 : bool shouldRetry = false;
298 30 : if (e.message.contains('status code')) {
299 0 : for (final code in _retryableStatusCodes) {
300 0 : if (e.message.contains('status code $code')) {
301 : shouldRetry = true;
302 : break;
303 : }
304 : }
305 : }
306 :
307 : if (!shouldRetry) {
308 15 : if (OTelLog.isError()) {
309 15 : OTelLog.error(
310 : 'OtlpHttpSpanExporter: Non-retryable HTTP error, stopping retry attempts');
311 : }
312 : rethrow;
313 : }
314 :
315 0 : if (attempts >= maxAttempts - 1) {
316 0 : if (OTelLog.isError()) {
317 0 : OTelLog.error(
318 0 : 'OtlpHttpSpanExporter: Max attempts reached ($attempts out of $maxAttempts), giving up');
319 : }
320 : rethrow;
321 : }
322 :
323 0 : final delay = _calculateJitteredDelay(attempts);
324 0 : if (OTelLog.isDebug()) {
325 0 : OTelLog.debug(
326 0 : 'OtlpHttpSpanExporter: Retrying export after ${delay.inMilliseconds}ms...');
327 : }
328 0 : await Future<void>.delayed(delay);
329 0 : attempts++;
330 : } catch (e, stackTrace) {
331 0 : if (OTelLog.isError()) {
332 0 : OTelLog.error(
333 0 : 'OtlpHttpSpanExporter: Unexpected error during export: $e');
334 : }
335 0 : if (OTelLog.isError()) OTelLog.error('Stack trace: $stackTrace');
336 :
337 : // Check if we should stop retrying due to shutdown
338 : if (wasShutdownDuringRetry) {
339 0 : throw StateError('Exporter was shut down during export');
340 : }
341 :
342 0 : if (attempts >= maxAttempts - 1) {
343 : rethrow;
344 : }
345 :
346 0 : final delay = _calculateJitteredDelay(attempts);
347 0 : if (OTelLog.isDebug()) {
348 0 : OTelLog.debug(
349 0 : 'OtlpHttpSpanExporter: Retrying export after ${delay.inMilliseconds}ms...');
350 : }
351 0 : await Future<void>.delayed(delay);
352 0 : attempts++;
353 : }
354 : }
355 : }
356 :
357 : /// Force flush any pending spans
358 0 : @override
359 : Future<void> forceFlush() async {
360 0 : if (OTelLog.isDebug()) {
361 0 : OTelLog.debug('OtlpHttpSpanExporter: Force flush requested');
362 : }
363 0 : if (_isShutdown) {
364 0 : if (OTelLog.isDebug()) {
365 0 : OTelLog.debug(
366 : 'OtlpHttpSpanExporter: Exporter is already shut down, nothing to flush');
367 : }
368 : return;
369 : }
370 :
371 : // Wait for any pending export operations to complete
372 0 : if (_pendingExports.isNotEmpty) {
373 0 : if (OTelLog.isDebug()) {
374 0 : OTelLog.debug(
375 0 : 'OtlpHttpSpanExporter: Waiting for ${_pendingExports.length} pending exports to complete');
376 : }
377 : try {
378 0 : await Future.wait(_pendingExports);
379 0 : if (OTelLog.isDebug()) {
380 0 : OTelLog.debug('OtlpHttpSpanExporter: All pending exports completed');
381 : }
382 : } catch (e) {
383 0 : if (OTelLog.isError()) {
384 0 : OTelLog.error('OtlpHttpSpanExporter: Error during force flush: $e');
385 : }
386 : }
387 : } else {
388 0 : if (OTelLog.isDebug()) {
389 0 : OTelLog.debug('OtlpHttpSpanExporter: No pending exports to flush');
390 : }
391 : }
392 : }
393 :
394 71 : @override
395 : Future<void> shutdown() async {
396 71 : if (OTelLog.isDebug()) {
397 70 : OTelLog.debug('OtlpHttpSpanExporter: Shutdown requested');
398 : }
399 71 : if (_isShutdown) {
400 : return;
401 : }
402 71 : if (OTelLog.isDebug()) {
403 70 : OTelLog.debug(
404 210 : 'OtlpHttpSpanExporter: Shutting down - waiting for ${_pendingExports.length} pending exports');
405 : }
406 :
407 : // Set shutdown flag first
408 71 : _isShutdown = true;
409 :
410 : // Create a safe copy of pending exports to avoid concurrent modification
411 142 : final pendingExportsCopy = List<Future<void>>.of(_pendingExports);
412 :
413 : // Wait for pending exports but don't start any new ones
414 : // Use a timeout to prevent hanging if exports take too long
415 71 : if (pendingExportsCopy.isNotEmpty) {
416 0 : if (OTelLog.isDebug()) {
417 0 : OTelLog.debug(
418 0 : 'OtlpHttpSpanExporter: Waiting for ${pendingExportsCopy.length} pending exports with timeout');
419 : }
420 : try {
421 : // Use a generous timeout but don't wait forever
422 0 : await Future.wait(pendingExportsCopy)
423 0 : .timeout(const Duration(seconds: 10), onTimeout: () {
424 0 : if (OTelLog.isDebug()) {
425 0 : OTelLog.debug(
426 : 'OtlpHttpSpanExporter: Timeout waiting for exports to complete');
427 : }
428 0 : return Future.value([]);
429 : });
430 : } catch (e) {
431 0 : if (OTelLog.isDebug()) {
432 0 : OTelLog.debug(
433 0 : 'OtlpHttpSpanExporter: Error during shutdown while waiting for exports: $e');
434 : }
435 : }
436 : }
437 :
438 : // Close the HTTP client to release resources
439 142 : _client.close();
440 :
441 71 : if (OTelLog.isDebug()) {
442 70 : OTelLog.debug('OtlpHttpSpanExporter: Shutdown complete');
443 : }
444 : }
445 : }
|