LCOV - code coverage report
Current view: top level - src/trace/export - batch_span_processor.dart (source / functions) Coverage Total Hit
Test: lcov.info Lines: 89.2 % 37 33
Test Date: 2025-11-15 13:23:01 Functions: - 0 0

            Line data    Source code
       1              : // Licensed under the Apache License, Version 2.0
       2              : // Copyright 2025, Michael Bushe, All rights reserved.
       3              : 
       4              : import 'dart:async';
       5              : import 'dart:collection';
       6              : 
       7              : import 'package:dartastic_opentelemetry/src/trace/span.dart';
       8              : import 'package:dartastic_opentelemetry_api/dartastic_opentelemetry_api.dart';
       9              : import 'package:synchronized/synchronized.dart';
      10              : 
      11              : import '../span_processor.dart';
      12              : import 'span_exporter.dart';
      13              : 
      14              : /// Configuration for the [BatchSpanProcessor].
      15              : ///
      16              : /// This class configures how the batch span processor behaves, including
      17              : /// queue size limits, export scheduling, and batch size parameters.
      18              : class BatchSpanProcessorConfig {
      19              :   /// The maximum queue size for spans. After this is reached,
      20              :   /// spans will be dropped.
      21              :   final int maxQueueSize;
      22              : 
      23              :   /// The delay between two consecutive exports.
      24              :   final Duration scheduleDelay;
      25              : 
      26              :   /// The maximum batch size of spans that can be exported at once.
      27              :   final int maxExportBatchSize;
      28              : 
      29              :   /// The amount of time to wait for an export to complete before timing out.
      30              :   final Duration exportTimeout;
      31              : 
      32              :   /// Creates a new configuration for a [BatchSpanProcessor].
      33              :   ///
      34              :   /// [maxQueueSize] The maximum number of spans that can be queued for export. Default is 2048.
      35              :   ///    If this limit is reached, additional spans will be dropped.
      36              :   /// [scheduleDelay] The time interval between two consecutive exports. Default is 5 seconds.
      37              :   ///    This controls how frequently batches are sent to the exporter.
      38              :   /// [maxExportBatchSize] The maximum number of spans to export in a single batch. Default is 512.
      39              :   ///    This helps control resource usage during export operations.
      40              :   /// [exportTimeout] The maximum time to wait for an export operation to complete. Default is 30 seconds.
      41              :   ///    After this time, export operations will be considered failed.
      42           81 :   const BatchSpanProcessorConfig({
      43              :     this.maxQueueSize = 2048,
      44              :     this.scheduleDelay = const Duration(milliseconds: 5000),
      45              :     this.maxExportBatchSize = 512,
      46              :     this.exportTimeout = const Duration(seconds: 30),
      47              :   });
      48              : }
      49              : 
      50              : /// A [SpanProcessor] that batches spans before export.
      51              : ///
      52              : /// This processor collects finished spans in a queue and exports them in batches
      53              : /// at regular intervals, improving efficiency compared to exporting each span
      54              : /// individually. Spans are added to a queue when they end, and periodically sent
      55              : /// to the configured exporter in batches according to the configured schedule.
      56              : ///
      57              : /// The batch behavior can be tuned using [BatchSpanProcessorConfig] to control
      58              : /// batch size, queue limits, and export timing.
      59              : class BatchSpanProcessor implements SpanProcessor {
      60              :   /// The exporter used to send spans to the backend
      61              :   final SpanExporter exporter;
      62              : 
      63              :   /// Configuration for the batch processor behavior
      64              :   final BatchSpanProcessorConfig _config;
      65              : 
      66              :   /// Queue of spans waiting to be exported
      67              :   final Queue<Span> _spanQueue = Queue<Span>();
      68              : 
      69              :   /// Whether the processor has been shut down
      70              :   bool _isShutdown = false;
      71              : 
      72              :   /// Timer for scheduling periodic exports
      73              :   Timer? _timer;
      74              : 
      75              :   /// Lock for synchronizing queue access
      76              :   final _lock = Lock();
      77              : 
      78              :   /// Creates a new BatchSpanProcessor with the specified exporter and configuration.
      79              :   ///
      80              :   /// The BatchSpanProcessor collects finished spans in a queue and exports them in batches
      81              :   /// at regular intervals. This improves efficiency compared to exporting each span individually.
      82              :   ///
      83              :   /// A timer is started when this processor is created based on the [config]'s scheduleDelay.
      84              :   /// The timer triggers periodic batch exports of completed spans to the configured exporter.
      85              :   ///
      86              :   /// When the maximum queue size is reached, new spans will be dropped and not exported.
      87              :   ///
      88              :   /// This processor does not modify spans on start or when their names are updated,
      89              :   /// it only processes spans when they end.
      90              :   ///
      91              :   /// If an error occurs during export, it will be logged but not propagated.
      92              :   ///
      93              :   /// [exporter] The SpanExporter to use for exporting batches of spans
      94              :   /// [config] Optional configuration for the batch processor
      95           72 :   BatchSpanProcessor(this.exporter, [BatchSpanProcessorConfig? config])
      96              :       : _config = config ?? const BatchSpanProcessorConfig() {
      97          297 :     _timer = Timer.periodic(_config.scheduleDelay, (_) async {
      98              :       try {
      99            9 :         await _exportBatch();
     100              :       } catch (e) {
     101            0 :         if (OTelLog.isError()) OTelLog.error('Error in batch export timer: $e');
     102              :       }
     103              :     });
     104              :   }
     105              : 
     106           19 :   @override
     107              :   Future<void> onEnd(Span span) async {
     108           19 :     if (_isShutdown) {
     109              :       return;
     110              :     }
     111              : 
     112           57 :     return _lock.synchronized(() {
     113           95 :       if (_spanQueue.length >= _config.maxQueueSize) {
     114            0 :         if (OTelLog.isDebug()) {
     115            0 :           OTelLog.debug('BatchSpanProcessor queue full - dropping span');
     116              :         }
     117              :         return;
     118              :       }
     119           38 :       _spanQueue.add(span);
     120              :     });
     121              :   }
     122              : 
     123           23 :   @override
     124              :   Future<void> onStart(Span span, Context? parentContext) async {
     125              :     // Nothing to do on start
     126              :   }
     127              : 
     128            0 :   @override
     129              :   Future<void> onNameUpdate(Span span, String newName) async {
     130              :     // Nothing to do on name update
     131              :   }
     132              : 
     133              :   /// Exports a batch of spans from the queue to the configured exporter.
     134              :   ///
     135              :   /// This method acquires a lock on the queue, extracts spans up to the maximum batch size,
     136              :   /// and then sends them to the exporter. If an error occurs during export, it is logged
     137              :   /// but not propagated (no retry mechanism is implemented by default).
     138              :   ///
     139              :   /// @return A future that completes when the export is finished
     140           68 :   Future<void> _exportBatch() async {
     141           68 :     if (_isShutdown) {
     142              :       return;
     143              :     }
     144              : 
     145           68 :     final List<Span> spansToExport = [];
     146              : 
     147          204 :     await _lock.synchronized(() {
     148          340 :       final batchSize = _spanQueue.length > _config.maxExportBatchSize
     149            2 :           ? _config.maxExportBatchSize
     150          136 :           : _spanQueue.length;
     151              : 
     152           83 :       for (var i = 0; i < batchSize; i++) {
     153           30 :         if (_spanQueue.isEmpty) break;
     154           45 :         spansToExport.add(_spanQueue.removeFirst());
     155              :       }
     156              :     });
     157              : 
     158           68 :     if (spansToExport.isEmpty) {
     159              :       return;
     160              :     }
     161              : 
     162              :     try {
     163           30 :       await exporter.export(spansToExport);
     164              :     } catch (e) {
     165           15 :       if (OTelLog.isError()) {
     166           30 :         OTelLog.error('Error exporting batch of spans: $e');
     167              :       }
     168              :       // Consider implementing retry logic here
     169              :     }
     170              :   }
     171              : 
     172           71 :   @override
     173              :   Future<void> forceFlush() async {
     174           71 :     if (_isShutdown) {
     175              :       return;
     176              :     }
     177              : 
     178           67 :     await _exportBatch();
     179              :   }
     180              : 
     181           71 :   @override
     182              :   Future<void> shutdown() async {
     183           71 :     if (_isShutdown) {
     184              :       return;
     185              :     }
     186              : 
     187           71 :     _isShutdown = true;
     188          142 :     _timer?.cancel();
     189              : 
     190              :     // Export any remaining spans
     191           71 :     await forceFlush();
     192              : 
     193              :     // Shutdown the exporter
     194          142 :     await exporter.shutdown();
     195              :   }
     196              : }
        

Generated by: LCOV version 2.0-1