Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'package:dartastic_opentelemetry/src/trace/span.dart';
5 : import 'package:dartastic_opentelemetry/src/trace/span_processor.dart';
6 : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart';
7 :
8 : import 'span_exporter.dart';
9 :
10 : /// A simple SpanProcessor that exports spans synchronously when they end.
11 : ///
12 : /// This processor should only be used for testing or debugging purposes as it
13 : /// blocks until the export is complete.
14 : class SimpleSpanProcessor implements SpanProcessor {
15 : final SpanExporter _spanExporter;
16 : bool _isShutdown = false;
17 : final List<Future<void>> _pendingExports = [];
18 :
19 : /// Creates a new SimpleSpanProcessor that exports spans using the given [SpanExporter].
20 9 : SimpleSpanProcessor(this._spanExporter);
21 :
22 9 : @override
23 : Future<void> onStart(Span span, Context? parentContext) async {
24 9 : if (OTelLog.isDebug()) {
25 9 : OTelLog.debug(
26 45 : 'SimpleSpanProcessor: onStart called for span ${span.spanContext.spanId}, traceId: ${span.spanContext.traceId}');
27 : }
28 : }
29 :
30 9 : @override
31 : Future<void> onEnd(Span span) async {
32 9 : if (OTelLog.isDebug()) {
33 9 : OTelLog.debug(
34 36 : 'SimpleSpanProcessor: onEnd called for span ${span.name} with ID ${span.spanContext.spanId}');
35 : }
36 9 : if (_isShutdown) {
37 2 : if (OTelLog.isDebug()) {
38 2 : OTelLog.debug(
39 : 'SimpleSpanProcessor: Skipping export - processor is shutdown');
40 : }
41 2 : print('SimpleSpanProcessor: Skipping export - processor is shutdown');
42 : return;
43 : }
44 :
45 : // Verify the span has a valid end time
46 9 : if (span.endTime == null) {
47 0 : if (OTelLog.isWarn()) {
48 0 : OTelLog.warn(
49 0 : 'SimpleSpanProcessor: Span ${span.name} with ID ${span.spanContext.spanId} has no end time, which suggests it may not be properly ended');
50 : }
51 : // Continue with export anyway
52 : }
53 :
54 9 : if (OTelLog.isDebug()) {
55 9 : OTelLog.debug(
56 36 : 'SimpleSpanProcessor: Exporting span ${span.spanContext.spanId} with name ${span.name}');
57 : }
58 :
59 : try {
60 : // Create a copy of the span list to avoid concurrent modification issues
61 9 : final spanToExport = [span];
62 9 : if (OTelLog.isDebug()) {
63 9 : OTelLog.debug('SimpleSpanProcessor: Created list of spans to export');
64 : }
65 :
66 18 : final Future<void> pendingExport = _spanExporter.export(spanToExport);
67 18 : _pendingExports.add(pendingExport);
68 9 : if (OTelLog.isDebug()) {
69 9 : OTelLog.debug(
70 : 'SimpleSpanProcessor: Added export to pending exports list');
71 : }
72 :
73 : // Directly await the export for better reliability in tests
74 : try {
75 9 : if (OTelLog.isDebug()) {
76 9 : OTelLog.debug(
77 18 : 'SimpleSpanProcessor: Awaiting export completion for span ${span.name}');
78 : }
79 : await pendingExport;
80 9 : if (OTelLog.isDebug()) {
81 9 : OTelLog.debug(
82 36 : 'SimpleSpanProcessor: Successfully exported span ${span.name} with ID ${span.spanContext.spanId}');
83 : }
84 : } catch (e, stackTrace) {
85 1 : if (OTelLog.isError()) {
86 1 : OTelLog.error(
87 3 : 'SimpleSpanProcessor: Export error while processing span ${span.spanContext.spanId}: $e');
88 2 : OTelLog.error('Stack trace: $stackTrace');
89 : }
90 : } finally {
91 18 : _pendingExports.remove(pendingExport);
92 9 : if (OTelLog.isDebug()) {
93 9 : OTelLog.debug(
94 : 'SimpleSpanProcessor: Removed export from pending list');
95 : }
96 : }
97 : } catch (e, stackTrace) {
98 0 : if (OTelLog.isError()) {
99 0 : OTelLog.error(
100 0 : 'SimpleSpanProcessor: Failed to start export for span ${span.spanContext.spanId}: $e');
101 0 : OTelLog.error('Stack trace: $stackTrace');
102 : }
103 : }
104 : }
105 :
106 0 : @override
107 : Future<void> onNameUpdate(Span span, String newName) async {
108 : // Simple processor doesn't need to do anything for name updates
109 : // since it only processes spans when they end
110 0 : if (OTelLog.isDebug()) {
111 0 : OTelLog.debug(
112 0 : 'SimpleSpanProcessor: Name updated for span ${span.spanContext.spanId} to $newName');
113 : }
114 : }
115 :
116 9 : @override
117 : Future<void> shutdown() async {
118 9 : if (_isShutdown) {
119 6 : if (OTelLog.isDebug()) {
120 6 : OTelLog.debug('SimpleSpanProcessor: Already shut down');
121 : }
122 : return;
123 : }
124 :
125 9 : if (OTelLog.isDebug()) {
126 9 : OTelLog.debug(
127 27 : 'SimpleSpanProcessor: Shutting down - waiting for ${_pendingExports.length} pending exports');
128 : }
129 9 : _isShutdown = true;
130 :
131 : try {
132 18 : if (_pendingExports.isNotEmpty) {
133 0 : if (OTelLog.isDebug()) {
134 0 : OTelLog.debug(
135 0 : 'SimpleSpanProcessor: Waiting for ${_pendingExports.length} pending exports to complete');
136 : }
137 : try {
138 0 : await Future.wait(_pendingExports);
139 0 : if (OTelLog.isDebug()) {
140 0 : OTelLog.debug('SimpleSpanProcessor: All pending exports completed');
141 : }
142 : } catch (e) {
143 0 : if (OTelLog.isError()) {
144 0 : OTelLog.error(
145 0 : 'SimpleSpanProcessor: Error waiting for pending exports: $e');
146 : }
147 : }
148 : }
149 :
150 : try {
151 9 : if (OTelLog.isDebug()) {
152 9 : OTelLog.debug('SimpleSpanProcessor: Shutting down exporter');
153 : }
154 18 : await _spanExporter.shutdown();
155 9 : if (OTelLog.isDebug()) {
156 9 : OTelLog.debug('SimpleSpanProcessor: Exporter shutdown complete');
157 : }
158 : } catch (e) {
159 0 : if (OTelLog.isError()) {
160 0 : OTelLog.error(
161 0 : 'SimpleSpanProcessor: Error shutting down exporter: $e');
162 : }
163 : }
164 :
165 9 : if (OTelLog.isDebug()) {
166 9 : OTelLog.debug('SimpleSpanProcessor: Shutdown complete');
167 : }
168 : } catch (e, stackTrace) {
169 0 : if (OTelLog.isError()) {
170 0 : OTelLog.error('SimpleSpanProcessor: Error during shutdown: $e');
171 0 : OTelLog.error('Stack trace: $stackTrace');
172 : }
173 : }
174 : }
175 :
176 9 : @override
177 : Future<void> forceFlush() async {
178 9 : if (_isShutdown) {
179 2 : if (OTelLog.isDebug()) {
180 2 : OTelLog.debug(
181 : 'SimpleSpanProcessor: Cannot force flush - processor is shut down');
182 : }
183 : return;
184 : }
185 :
186 8 : if (OTelLog.isDebug()) {
187 8 : OTelLog.debug(
188 24 : 'SimpleSpanProcessor: Force flushing - waiting for ${_pendingExports.length} pending exports');
189 : }
190 :
191 : try {
192 16 : if (_pendingExports.isEmpty) {
193 5 : if (OTelLog.isDebug()) {
194 5 : OTelLog.debug('SimpleSpanProcessor: No pending exports to flush');
195 : }
196 : // If there are no pending exports, just force flush the exporter
197 10 : await _spanExporter.forceFlush();
198 : } else {
199 7 : if (OTelLog.isDebug()) {
200 7 : OTelLog.debug(
201 21 : 'SimpleSpanProcessor: Waiting for ${_pendingExports.length} pending exports');
202 : }
203 14 : await Future.wait(_pendingExports);
204 7 : if (OTelLog.isDebug()) {
205 7 : OTelLog.debug('SimpleSpanProcessor: All pending exports completed');
206 : }
207 :
208 : // Also force flush the exporter
209 7 : if (OTelLog.isDebug()) {
210 7 : OTelLog.debug('SimpleSpanProcessor: Force flushing exporter');
211 : }
212 14 : await _spanExporter.forceFlush();
213 : }
214 :
215 8 : if (OTelLog.isDebug()) {
216 8 : OTelLog.debug('SimpleSpanProcessor: Force flush complete');
217 : }
218 : } catch (e, stackTrace) {
219 0 : if (OTelLog.isError()) {
220 0 : OTelLog.error('SimpleSpanProcessor: Error during force flush: $e');
221 0 : OTelLog.error('Stack trace: $stackTrace');
222 : }
223 : }
224 : }
225 : }
|