Line data Source code
1 : // Licensed under the Apache License, Version 2.0
2 : // Copyright 2025, Michael Bushe, All rights reserved.
3 :
4 : import 'dart:async';
5 : import 'dart:collection';
6 :
7 : import 'package:dartastic_opentelemetry/src/trace/span.dart';
8 : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart';
9 : import 'package:synchronized/synchronized.dart';
10 :
11 : import '../span_processor.dart';
12 : import 'span_exporter.dart';
13 :
14 : /// Configuration for the [BatchSpanProcessor].
15 : ///
16 : /// This class configures how the batch span processor behaves, including
17 : /// queue size limits, export scheduling, and batch size parameters.
18 : class BatchSpanProcessorConfig {
19 : /// The maximum queue size for spans. After this is reached,
20 : /// spans will be dropped.
21 : final int maxQueueSize;
22 :
23 : /// The delay between two consecutive exports.
24 : final Duration scheduleDelay;
25 :
26 : /// The maximum batch size of spans that can be exported at once.
27 : final int maxExportBatchSize;
28 :
29 : /// The amount of time to wait for an export to complete before timing out.
30 : final Duration exportTimeout;
31 :
32 : /// Creates a new configuration for a [BatchSpanProcessor].
33 : ///
34 : /// [maxQueueSize] The maximum number of spans that can be queued for export. Default is 2048.
35 : /// If this limit is reached, additional spans will be dropped.
36 : /// [scheduleDelay] The time interval between two consecutive exports. Default is 5 seconds.
37 : /// This controls how frequently batches are sent to the exporter.
38 : /// [maxExportBatchSize] The maximum number of spans to export in a single batch. Default is 512.
39 : /// This helps control resource usage during export operations.
40 : /// [exportTimeout] The maximum time to wait for an export operation to complete. Default is 30 seconds.
41 : /// After this time, export operations will be considered failed.
42 81 : const BatchSpanProcessorConfig({
43 : this.maxQueueSize = 2048,
44 : this.scheduleDelay = const Duration(milliseconds: 5000),
45 : this.maxExportBatchSize = 512,
46 : this.exportTimeout = const Duration(seconds: 30),
47 : });
48 : }
49 :
50 : /// A [SpanProcessor] that batches spans before export.
51 : ///
52 : /// This processor collects finished spans in a queue and exports them in batches
53 : /// at regular intervals, improving efficiency compared to exporting each span
54 : /// individually. Spans are added to a queue when they end, and periodically sent
55 : /// to the configured exporter in batches according to the configured schedule.
56 : ///
57 : /// The batch behavior can be tuned using [BatchSpanProcessorConfig] to control
58 : /// batch size, queue limits, and export timing.
59 : class BatchSpanProcessor implements SpanProcessor {
60 : /// The exporter used to send spans to the backend
61 : final SpanExporter exporter;
62 :
63 : /// Configuration for the batch processor behavior
64 : final BatchSpanProcessorConfig _config;
65 :
66 : /// Queue of spans waiting to be exported
67 : final Queue<Span> _spanQueue = Queue<Span>();
68 :
69 : /// Whether the processor has been shut down
70 : bool _isShutdown = false;
71 :
72 : /// Timer for scheduling periodic exports
73 : Timer? _timer;
74 :
75 : /// Lock for synchronizing queue access
76 : final _lock = Lock();
77 :
78 : /// Creates a new BatchSpanProcessor with the specified exporter and configuration.
79 : ///
80 : /// The BatchSpanProcessor collects finished spans in a queue and exports them in batches
81 : /// at regular intervals. This improves efficiency compared to exporting each span individually.
82 : ///
83 : /// A timer is started when this processor is created based on the [config]'s scheduleDelay.
84 : /// The timer triggers periodic batch exports of completed spans to the configured exporter.
85 : ///
86 : /// When the maximum queue size is reached, new spans will be dropped and not exported.
87 : ///
88 : /// This processor does not modify spans on start or when their names are updated,
89 : /// it only processes spans when they end.
90 : ///
91 : /// If an error occurs during export, it will be logged but not propagated.
92 : ///
93 : /// [exporter] The SpanExporter to use for exporting batches of spans
94 : /// [config] Optional configuration for the batch processor
95 72 : BatchSpanProcessor(this.exporter, [BatchSpanProcessorConfig? config])
96 : : _config = config ?? const BatchSpanProcessorConfig() {
97 297 : _timer = Timer.periodic(_config.scheduleDelay, (_) async {
98 : try {
99 9 : await _exportBatch();
100 : } catch (e) {
101 0 : if (OTelLog.isError()) OTelLog.error('Error in batch export timer: $e');
102 : }
103 : });
104 : }
105 :
106 19 : @override
107 : Future<void> onEnd(Span span) async {
108 19 : if (_isShutdown) {
109 : return;
110 : }
111 :
112 57 : return _lock.synchronized(() {
113 95 : if (_spanQueue.length >= _config.maxQueueSize) {
114 0 : if (OTelLog.isDebug()) {
115 0 : OTelLog.debug('BatchSpanProcessor queue full - dropping span');
116 : }
117 : return;
118 : }
119 38 : _spanQueue.add(span);
120 : });
121 : }
122 :
123 23 : @override
124 : Future<void> onStart(Span span, Context? parentContext) async {
125 : // Nothing to do on start
126 : }
127 :
128 0 : @override
129 : Future<void> onNameUpdate(Span span, String newName) async {
130 : // Nothing to do on name update
131 : }
132 :
133 : /// Exports a batch of spans from the queue to the configured exporter.
134 : ///
135 : /// This method acquires a lock on the queue, extracts spans up to the maximum batch size,
136 : /// and then sends them to the exporter. If an error occurs during export, it is logged
137 : /// but not propagated (no retry mechanism is implemented by default).
138 : ///
139 : /// @return A future that completes when the export is finished
140 68 : Future<void> _exportBatch() async {
141 68 : if (_isShutdown) {
142 : return;
143 : }
144 :
145 68 : final List<Span> spansToExport = [];
146 :
147 204 : await _lock.synchronized(() {
148 340 : final batchSize = _spanQueue.length > _config.maxExportBatchSize
149 2 : ? _config.maxExportBatchSize
150 136 : : _spanQueue.length;
151 :
152 83 : for (var i = 0; i < batchSize; i++) {
153 30 : if (_spanQueue.isEmpty) break;
154 45 : spansToExport.add(_spanQueue.removeFirst());
155 : }
156 : });
157 :
158 68 : if (spansToExport.isEmpty) {
159 : return;
160 : }
161 :
162 : try {
163 30 : await exporter.export(spansToExport);
164 : } catch (e) {
165 15 : if (OTelLog.isError()) {
166 30 : OTelLog.error('Error exporting batch of spans: $e');
167 : }
168 : // Consider implementing retry logic here
169 : }
170 : }
171 :
172 71 : @override
173 : Future<void> forceFlush() async {
174 71 : if (_isShutdown) {
175 : return;
176 : }
177 :
178 67 : await _exportBatch();
179 : }
180 :
181 71 : @override
182 : Future<void> shutdown() async {
183 71 : if (_isShutdown) {
184 : return;
185 : }
186 :
187 71 : _isShutdown = true;
188 142 : _timer?.cancel();
189 :
190 : // Export any remaining spans
191 71 : await forceFlush();
192 :
193 : // Shutdown the exporter
194 142 : await exporter.shutdown();
195 : }
196 : }
|