Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'dart:async';
5 : import 'dart:math';
6 :
7 : import 'package:dartastic_opentelemetry/src/trace/span.dart';
8 : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart'
9 : show OTelLog;
10 : import 'package:grpc/grpc.dart';
11 :
12 : import '../../../../proto/opentelemetry_proto_dart.dart' as proto;
13 : import '../../span_logger.dart';
14 : import '../span_exporter.dart';
15 : import 'certificate_utils.dart';
16 : import 'otlp_grpc_span_exporter_config.dart';
17 : import 'span_transformer.dart';
18 :
19 : /// An OpenTelemetry span exporter that exports spans using OTLP over gRPC.
20 : ///
21 : /// This exporter sends trace data to an OpenTelemetry collector or compatible backend
22 : /// using the OpenTelemetry Protocol (OTLP) over gRPC. It supports features such as:
23 : /// - Retrying failed exports with exponential backoff
24 : /// - Secure and insecure connections
25 : /// - Custom headers and timeouts
26 : /// - Compression
27 : class OtlpGrpcSpanExporter implements SpanExporter {
28 : static const _retryableStatusCodes = [
29 : // Note: Don't retry on deadline exceeded as it indicates a timeout
30 : StatusCode.resourceExhausted, // Maps to HTTP 429
31 : StatusCode.unavailable, // Maps to HTTP 503
32 : ];
33 :
34 : final OtlpGrpcExporterConfig _config;
35 : ClientChannel? _channel;
36 : proto.TraceServiceClient? _traceService;
37 : bool _isShutdown = false;
38 : final Random _random = Random();
39 : final List<Future<void>> _pendingExports = [];
40 :
41 : /// Creates a new OtlpGrpcSpanExporter with the specified configuration.
42 : ///
43 : /// If no configuration is provided, default values will be used.
44 : ///
45 : /// @param config Optional configuration for the exporter
46 1 : OtlpGrpcSpanExporter([OtlpGrpcExporterConfig? config])
47 0 : : _config = config ?? OtlpGrpcExporterConfig();
48 : bool _initialized = false;
49 :
50 : /// Creates channel credentials based on configuration.
51 : ///
52 : /// If insecure is true, returns insecure credentials.
53 : /// Otherwise, creates secure credentials with optional custom certificates for mTLS.
54 1 : ChannelCredentials _createChannelCredentials() {
55 2 : if (_config.insecure) {
56 : return const ChannelCredentials.insecure();
57 : }
58 :
59 : // If no custom certificates are provided, use default secure credentials
60 0 : if (_config.certificate == null &&
61 0 : _config.clientKey == null &&
62 0 : _config.clientCertificate == null) {
63 : return const ChannelCredentials.secure();
64 : }
65 :
66 : try {
67 0 : final context = CertificateUtils.createSecurityContext(
68 0 : certificate: _config.certificate,
69 0 : clientKey: _config.clientKey,
70 0 : clientCertificate: _config.clientCertificate,
71 : );
72 :
73 : if (context == null) {
74 : return const ChannelCredentials.secure();
75 : }
76 :
77 : return const ChannelCredentials.secure(
78 : certificates: null, // We're using SecurityContext instead
79 : authority: null,
80 : onBadCertificate: null,
81 : );
82 : } catch (e) {
83 0 : if (OTelLog.isError()) {
84 0 : OTelLog.error('OtlpGrpcSpanExporter: Failed to load certificates: $e');
85 : }
86 : // Fall back to default secure credentials on error
87 : return const ChannelCredentials.secure();
88 : }
89 : }
90 :
91 : /// Cleanup the gRPC channel and release resources
92 1 : Future<void> _cleanupChannel() async {
93 1 : if (_channel != null) {
94 1 : if (OTelLog.isDebug()) {
95 1 : OTelLog.debug('OtlpGrpcSpanExporter: Shutting down existing channel');
96 : }
97 :
98 : try {
99 : // First try a graceful shutdown
100 : try {
101 1 : if (OTelLog.isDebug()) {
102 1 : OTelLog.debug(
103 : 'OtlpGrpcSpanExporter: Attempting graceful channel shutdown');
104 : }
105 2 : await _channel!.shutdown();
106 1 : await Future<void>.delayed(const Duration(
107 : milliseconds: 100)); // Brief delay for shutdown to complete
108 : } catch (e) {
109 0 : if (OTelLog.isDebug()) {
110 0 : OTelLog.debug(
111 0 : 'OtlpGrpcSpanExporter: Error during graceful shutdown: $e');
112 : }
113 : }
114 :
115 : // Then try to terminate to ensure cleanup
116 : try {
117 1 : if (OTelLog.isDebug()) {
118 1 : OTelLog.debug('OtlpGrpcSpanExporter: Terminating channel');
119 : }
120 2 : _channel!.terminate();
121 1 : await Future<void>.delayed(const Duration(
122 : milliseconds: 100)); // Brief delay for termination to complete
123 : } catch (e) {
124 0 : if (OTelLog.isDebug()) {
125 0 : OTelLog.debug(
126 0 : 'OtlpGrpcSpanExporter: Error terminating channel: $e');
127 : }
128 : }
129 : } catch (e) {
130 0 : if (OTelLog.isError()) {
131 0 : OTelLog.error(
132 0 : 'OtlpGrpcSpanExporter: Error shutting down existing channel: $e');
133 : }
134 : }
135 :
136 : // Set to null to allow garbage collection
137 1 : _channel = null;
138 1 : _traceService = null;
139 :
140 : // Force garbage collection if possible
141 : try {
142 : // In Dart, we can't directly force garbage collection,
143 : // but we can suggest it by setting variables to null and
144 : // creating some memory pressure
145 1 : final List<int> temp = [];
146 2 : for (int i = 0; i < 1000; i++) {
147 1 : temp.add(i);
148 : }
149 1 : temp.clear();
150 : } catch (e) {
151 : // Ignore any errors
152 : }
153 : }
154 : }
155 :
156 1 : Future<void> _setupChannel() async {
157 1 : if (_isShutdown) {
158 0 : if (OTelLog.isDebug()) {
159 0 : OTelLog.debug(
160 : 'OtlpGrpcSpanExporter: Not setting up channel - exporter is shut down');
161 : }
162 : return;
163 : }
164 :
165 1 : if (OTelLog.isDebug()) {
166 1 : OTelLog.debug(
167 3 : 'OtlpGrpcSpanExporter: Setting up gRPC channel with endpoint ${_config.endpoint}');
168 : }
169 :
170 : // First, clean up any existing channel
171 1 : await _cleanupChannel();
172 :
173 : String host;
174 : int port;
175 :
176 : try {
177 2 : final endpoint = _config.endpoint
178 1 : .trim()
179 2 : .replaceAll(RegExp(r'^(http://|https://)'), '');
180 1 : final parts = endpoint.split(':');
181 3 : host = parts[0].isEmpty ? '127.0.0.1' : parts[0];
182 4 : port = parts.length > 1 ? int.parse(parts[1]) : 4317;
183 :
184 : // Replace localhost with 127.0.0.1 for more reliable connections
185 1 : if (host == 'localhost') {
186 : host = '127.0.0.1';
187 : }
188 :
189 1 : if (OTelLog.isDebug()) {
190 1 : OTelLog.debug(
191 1 : 'OtlpGrpcSpanExporter: Setting up gRPC channel to $host:$port');
192 : }
193 :
194 : // Create a channel
195 2 : _channel ??= ClientChannel(
196 : host,
197 : port: port,
198 1 : options: ChannelOptions(
199 1 : credentials: _createChannelCredentials(),
200 : connectTimeout: const Duration(seconds: 5),
201 : // Keep connection alive better
202 : idleTimeout: const Duration(seconds: 30),
203 1 : codecRegistry: CodecRegistry(codecs: const [
204 : GzipCodec(),
205 : IdentityCodec(),
206 : ]),
207 : ),
208 : );
209 :
210 : try {
211 3 : _traceService = proto.TraceServiceClient(_channel!);
212 1 : if (OTelLog.isDebug()) {
213 1 : OTelLog.debug(
214 : 'OtlpGrpcSpanExporter: Successfully created TraceServiceClient');
215 : }
216 : } catch (e) {
217 0 : if (OTelLog.isError()) {
218 0 : OTelLog.error(
219 0 : 'OtlpGrpcSpanExporter: Failed to create TraceServiceClient: $e');
220 : }
221 : rethrow;
222 : }
223 1 : if (OTelLog.isDebug()) {
224 1 : OTelLog.debug(
225 : 'OtlpGrpcSpanExporter: Successfully created gRPC channel and trace service');
226 : }
227 : } catch (e, stackTrace) {
228 0 : if (OTelLog.isError()) {
229 0 : OTelLog.error(
230 0 : ('OtlpGrpcSpanExporter: Failed to setup gRPC channel: $e'));
231 : }
232 0 : if (OTelLog.isError()) OTelLog.error('Stack trace: $stackTrace');
233 : rethrow;
234 : }
235 : }
236 :
237 1 : Future<void> _ensureChannel() async {
238 1 : if (_isShutdown) {
239 0 : if (OTelLog.isDebug()) {
240 0 : OTelLog.debug(
241 : 'OtlpGrpcSpanExporter: Not ensuring channel - exporter is shut down');
242 : }
243 0 : throw StateError('Exporter is shutdown');
244 : }
245 :
246 2 : if (_initialized && _channel != null && _traceService != null) {
247 : return;
248 : }
249 :
250 1 : _initialized = true;
251 1 : if (_channel == null || _traceService == null) {
252 1 : await _setupChannel();
253 : }
254 : }
255 :
256 0 : Duration _calculateJitteredDelay(int retries) {
257 0 : final baseMs = _config.baseDelay.inMilliseconds;
258 0 : final delay = baseMs * pow(2, retries);
259 0 : final jitter = _random.nextDouble() * delay;
260 0 : return Duration(milliseconds: (delay + jitter).toInt());
261 : }
262 :
263 1 : Future<void> _tryExport(List<Span> spans) async {
264 1 : await _ensureChannel();
265 1 : if (_isShutdown) {
266 0 : throw StateError('Exporter is shutdown');
267 : }
268 1 : if (OTelLog.isLogSpans()) {
269 1 : logSpans(spans, "Exporting spans.");
270 : }
271 :
272 1 : if (OTelLog.isDebug()) {
273 1 : OTelLog.debug(
274 2 : 'OtlpGrpcSpanExporter: Preparing to export ${spans.length} spans');
275 2 : for (var span in spans) {
276 1 : OTelLog.debug(
277 6 : ' Span: ${span.name}, spanId: ${span.spanContext.spanId}, traceId: ${span.spanContext.traceId}');
278 : }
279 : }
280 :
281 1 : if (OTelLog.isDebug()) {
282 3 : OTelLog.debug('OtlpGrpcSpanExporter: Transforming ${spans.length} spans');
283 : }
284 1 : final request = OtlpSpanTransformer.transformSpans(spans);
285 1 : if (OTelLog.isDebug()) {
286 1 : OTelLog.debug('OtlpGrpcSpanExporter: Successfully transformed spans');
287 : }
288 :
289 1 : if (OTelLog.isDebug()) {
290 2 : for (var rs in request.resourceSpans) {
291 1 : OTelLog.debug(' ResourceSpan:');
292 1 : if (rs.hasResource()) {
293 1 : OTelLog.debug(' Resource attributes:');
294 3 : for (var attr in rs.resource.attributes) {
295 4 : OTelLog.debug(' ${attr.key}: ${attr.value}');
296 : }
297 : }
298 2 : for (var ss in rs.scopeSpans) {
299 1 : OTelLog.debug(' ScopeSpan:');
300 2 : for (var span in ss.spans) {
301 3 : OTelLog.debug(' Span: ${span.name}');
302 3 : OTelLog.debug(' TraceId: ${span.traceId}');
303 3 : OTelLog.debug(' SpanId: ${span.spanId}');
304 : }
305 : }
306 : }
307 : }
308 :
309 : // Add compression header if configured
310 3 : final headers = Map<String, String>.from(_config.headers);
311 2 : if (_config.compression) {
312 0 : headers['grpc-encoding'] = 'gzip';
313 : }
314 :
315 1 : final CallOptions options = CallOptions(
316 2 : timeout: _config.timeout,
317 : metadata: headers,
318 : );
319 :
320 1 : if (OTelLog.isDebug()) {
321 1 : OTelLog.debug(
322 3 : 'OtlpGrpcSpanExporter: Sending export request to ${_config.endpoint}');
323 : }
324 : try {
325 1 : if (_traceService == null) {
326 0 : throw StateError(
327 : 'Trace service is null, channel may not be properly initialized');
328 : }
329 :
330 2 : final response = await _traceService!.export(request, options: options);
331 1 : if (OTelLog.isDebug()) {
332 1 : OTelLog.debug(
333 : 'OtlpGrpcSpanExporter: Export request completed successfully');
334 : }
335 1 : if (OTelLog.isDebug()) {
336 2 : OTelLog.debug('OtlpGrpcSpanExporter: Response: $response');
337 : }
338 : } catch (e, stackTrace) {
339 0 : if (OTelLog.isError()) {
340 0 : OTelLog.error('OtlpGrpcSpanExporter: Export request failed: $e');
341 0 : OTelLog.error('Stack trace: $stackTrace');
342 : }
343 :
344 : // If we have a channel error, try to recreate it
345 0 : if (e is GrpcError &&
346 0 : (e.code == StatusCode.unavailable ||
347 0 : e.code == StatusCode.unknown ||
348 0 : e.code == StatusCode.internal)) {
349 0 : if (OTelLog.isDebug()) {
350 0 : OTelLog.debug(
351 : 'OtlpGrpcSpanExporter: Channel error detected, recreating channel');
352 : }
353 : // Force channel recreation
354 0 : await _cleanupChannel();
355 0 : _initialized = false;
356 : }
357 :
358 : rethrow;
359 : }
360 : }
361 :
362 1 : @override
363 : Future<void> export(List<Span> spans) async {
364 1 : if (_isShutdown) {
365 0 : throw StateError('Exporter is shutdown');
366 : }
367 :
368 1 : if (spans.isEmpty) {
369 0 : if (OTelLog.isDebug()) {
370 0 : OTelLog.debug('OtlpGrpcSpanExporter: No spans to export');
371 : }
372 : return;
373 : }
374 :
375 1 : if (OTelLog.isDebug()) {
376 1 : OTelLog.debug(
377 2 : 'OtlpGrpcSpanExporter: Beginning export of ${spans.length} spans');
378 : }
379 1 : final exportFuture = _export(spans);
380 :
381 : // Track the pending export but don't throw if it fails during shutdown
382 2 : _pendingExports.add(exportFuture);
383 : try {
384 : await exportFuture;
385 1 : if (OTelLog.isDebug()) {
386 1 : OTelLog.debug('OtlpGrpcSpanExporter: Export completed successfully');
387 : }
388 : } catch (e) {
389 0 : if (_isShutdown &&
390 0 : e is StateError &&
391 0 : e.message.contains('shut down during')) {
392 : // Gracefully handle the case where shutdown interrupted the export
393 0 : if (OTelLog.isDebug()) {
394 0 : OTelLog.debug(
395 : 'OtlpGrpcSpanExporter: Export was interrupted by shutdown, suppressing error');
396 : }
397 : } else {
398 : // Re-throw other errors
399 : rethrow;
400 : }
401 : } finally {
402 2 : _pendingExports.remove(exportFuture);
403 : }
404 : }
405 :
406 1 : Future<void> _export(List<Span> spans) async {
407 1 : if (_isShutdown) {
408 0 : throw StateError('Exporter was shut down during export');
409 : }
410 :
411 1 : if (OTelLog.isDebug()) {
412 1 : OTelLog.debug(
413 4 : 'OtlpGrpcSpanExporter: Attempting to export ${spans.length} spans to ${_config.endpoint}');
414 : }
415 :
416 : var attempts = 0;
417 3 : final maxAttempts = _config.maxRetries + 1; // Initial attempt + retries
418 :
419 1 : while (attempts < maxAttempts) {
420 : // Allow the export to continue even during shutdown, so we complete in-flight requests
421 1 : final wasShutdownDuringRetry = _isShutdown;
422 :
423 : try {
424 : // Only check for shutdown on retry attempts to ensure in-progress exports can complete
425 0 : if (wasShutdownDuringRetry && attempts > 0) {
426 0 : if (OTelLog.isDebug()) {
427 0 : OTelLog.debug(
428 : 'OtlpGrpcSpanExporter: Export interrupted by shutdown');
429 : }
430 0 : throw StateError('Exporter was shut down during export');
431 : }
432 :
433 1 : await _tryExport(spans);
434 1 : if (OTelLog.isDebug()) {
435 1 : OTelLog.debug('OtlpGrpcSpanExporter: Successfully exported spans');
436 : }
437 : return;
438 0 : } on GrpcError catch (e, stackTrace) {
439 0 : if (OTelLog.isError()) {
440 0 : OTelLog.error(
441 0 : 'OtlpGrpcSpanExporter: gRPC error during export: ${e.code} - ${e.message}');
442 : }
443 0 : if (OTelLog.isError()) OTelLog.error('Stack trace: $stackTrace');
444 :
445 : // Check if the exporter was shut down while we were waiting
446 : if (wasShutdownDuringRetry) {
447 0 : if (OTelLog.isError()) {
448 0 : OTelLog.error(
449 : 'OtlpGrpcSpanExporter: Export interrupted by shutdown');
450 : }
451 0 : throw StateError('Exporter was shut down during export');
452 : }
453 :
454 0 : if (!_retryableStatusCodes.contains(e.code)) {
455 0 : if (OTelLog.isError()) {
456 0 : OTelLog.error(
457 0 : 'OtlpGrpcSpanExporter: Non-retryable gRPC error (${e.code}), stopping retry attempts');
458 : }
459 : rethrow;
460 : }
461 :
462 0 : if (attempts >= maxAttempts - 1) {
463 0 : if (OTelLog.isError()) {
464 0 : OTelLog.error(
465 0 : 'OtlpGrpcSpanExporter: Max attempts reached ($attempts out of $maxAttempts), giving up');
466 : }
467 : rethrow;
468 : }
469 :
470 0 : final delay = _calculateJitteredDelay(attempts);
471 0 : if (OTelLog.isDebug()) {
472 0 : OTelLog.debug(
473 0 : 'OtlpGrpcSpanExporter: Retrying export after ${delay.inMilliseconds}ms...');
474 : }
475 0 : await Future<void>.delayed(delay);
476 0 : if (!_isShutdown) {
477 : // Only recreate channel if not shut down
478 0 : await _setupChannel();
479 : }
480 0 : attempts++;
481 : } catch (e, stackTrace) {
482 0 : if (OTelLog.isError()) {
483 0 : OTelLog.error(
484 0 : 'OtlpGrpcSpanExporter: Unexpected error during export: $e');
485 : }
486 0 : if (OTelLog.isError()) OTelLog.error('Stack trace: $stackTrace');
487 :
488 : // Check if we should stop retrying due to shutdown
489 : if (wasShutdownDuringRetry) {
490 0 : throw StateError('Exporter was shut down during export');
491 : }
492 :
493 0 : if (attempts >= maxAttempts - 1) {
494 : rethrow;
495 : }
496 :
497 0 : final delay = _calculateJitteredDelay(attempts);
498 0 : if (OTelLog.isDebug()) {
499 0 : OTelLog.debug(
500 0 : 'OtlpGrpcSpanExporter: Retrying export after ${delay.inMilliseconds}ms...');
501 : }
502 0 : await Future<void>.delayed(delay);
503 0 : if (!_isShutdown) {
504 : // Only recreate channel if not shut down
505 0 : await _setupChannel();
506 : }
507 0 : attempts++;
508 : }
509 : }
510 : }
511 :
512 : /// Force flush any pending spans
513 1 : @override
514 : Future<void> forceFlush() async {
515 1 : if (OTelLog.isDebug()) {
516 1 : OTelLog.debug('OtlpGrpcSpanExporter: Force flush requested');
517 : }
518 1 : if (_isShutdown) {
519 0 : if (OTelLog.isDebug()) {
520 0 : OTelLog.debug(
521 : 'OtlpGrpcSpanExporter: Exporter is already shut down, nothing to flush');
522 : }
523 : return;
524 : }
525 :
526 : // Wait for any pending export operations to complete
527 2 : if (_pendingExports.isNotEmpty) {
528 0 : if (OTelLog.isDebug()) {
529 0 : OTelLog.debug(
530 0 : 'OtlpGrpcSpanExporter: Waiting for ${_pendingExports.length} pending exports to complete');
531 : }
532 : try {
533 0 : await Future.wait(_pendingExports);
534 0 : if (OTelLog.isDebug()) {
535 0 : OTelLog.debug('OtlpGrpcSpanExporter: All pending exports completed');
536 : }
537 : } catch (e) {
538 0 : if (OTelLog.isError()) {
539 0 : OTelLog.error('OtlpGrpcSpanExporter: Error during force flush: $e');
540 : }
541 : }
542 : } else {
543 1 : if (OTelLog.isDebug()) {
544 1 : OTelLog.debug('OtlpGrpcSpanExporter: No pending exports to flush');
545 : }
546 : }
547 : }
548 :
549 1 : @override
550 : Future<void> shutdown() async {
551 1 : if (OTelLog.isDebug()) {
552 1 : OTelLog.debug('OtlpGrpcSpanExporter: Shutdown requested');
553 : }
554 1 : if (_isShutdown) {
555 : return;
556 : }
557 1 : if (OTelLog.isDebug()) {
558 1 : OTelLog.debug(
559 3 : 'OtlpGrpcSpanExporter: Shutting down - waiting for ${_pendingExports.length} pending exports');
560 : }
561 :
562 : // Set shutdown flag first
563 1 : _isShutdown = true;
564 :
565 : // Create a safe copy of pending exports to avoid concurrent modification
566 2 : final pendingExportsCopy = List<Future<void>>.of(_pendingExports);
567 :
568 : // Wait for pending exports but don't start any new ones
569 : // Use a timeout to prevent hanging if exports take too long
570 1 : if (pendingExportsCopy.isNotEmpty) {
571 0 : if (OTelLog.isDebug()) {
572 0 : OTelLog.debug(
573 0 : 'OtlpGrpcSpanExporter: Waiting for ${pendingExportsCopy.length} pending exports with timeout');
574 : }
575 : try {
576 : // Use a generous timeout but don't wait forever
577 0 : await Future.wait(pendingExportsCopy)
578 0 : .timeout(const Duration(seconds: 10), onTimeout: () {
579 0 : if (OTelLog.isDebug()) {
580 0 : OTelLog.debug(
581 : 'OtlpGrpcSpanExporter: Timeout waiting for exports to complete');
582 : }
583 0 : return Future.value([]);
584 : });
585 : } catch (e) {
586 0 : if (OTelLog.isDebug()) {
587 0 : OTelLog.debug(
588 0 : 'OtlpGrpcSpanExporter: Error during shutdown while waiting for exports: $e');
589 : }
590 : }
591 : }
592 :
593 : // Clean up channel resources
594 1 : await _cleanupChannel();
595 :
596 1 : if (OTelLog.isDebug()) {
597 1 : OTelLog.debug('OtlpGrpcSpanExporter: Shutdown complete');
598 : }
599 : }
600 : }
|