GCC Code Coverage Report


Directory: ./
File: src/dp/pipeline.cc
Date: 2025-09-01 06:19:01
Exec Total Coverage
Lines: 25 110 22.7%
Functions: 3 8 37.5%
Branches: 15 40 37.5%

Line Branch Exec Source
1 # include "na64dp/pipeline.hh"
2 # include "na64dp/processingInfo.hh"
3 # include "na64util/runtimeDirs.hh"
4 #include <exception>
5
6 namespace na64dp {
7
8
2/2
✓ Branch 1 taken 1 times.
✓ Branch 4 taken 1 times.
1 Pipeline::Pipeline(iEvProcInfo * pi) : _log(log4cpp::Category::getInstance("pipeline"))
9 1 , _procInfoPtr(pi) {
10 1 msg_debug( _log, "Empty pipeline object %p instantiated." );
11 1 }
12
13 Pipeline::Pipeline( YAML::Node cfg
14 , calib::Manager & dspPtr
15 , iEvProcInfo * pi ) : _log(log4cpp::Category::getInstance("pipeline"))
16 , _procInfoPtr(pi) {
17 if( ! cfg.IsSequence() ) {
18 NA64DP_RUNTIME_ERROR( "Pipeline description is not a sequence." );
19 }
20 size_t nHandler = 0;
21 for( auto cHandlerNode : cfg ) {
22 AbstractHandler * hPtr;
23 if( ! cHandlerNode.IsMap() ) {
24 NA64DP_RUNTIME_ERROR( "Handler description #%zu is not"
25 " a map.", nHandler );
26 }
27 std::string handlerTypeStr;
28 if( ! cHandlerNode["_type"] ) {
29 NA64DP_RUNTIME_ERROR( "Handler description #%zu does not"
30 " provide \"_type\" property.", nHandler );
31 }
32 handlerTypeStr = cHandlerNode["_type"].as<std::string>();
33 #if 0 // TODO: experimental feature -- conditional hanlers
34 // check enableIf
35 if( cHandlerNode["_enableIf"] ) {
36 std::list<std::string> defs;
37 if( cHandlerNode["_enableIf"].Type() == YAML::NodeType::Scalar ) {
38 defs.push_back( cHandlerNode["_enableIf"].as<std::string>() );
39 } else if( cHandlerNode["_enableIf"].Type() == YAML::NodeType::Sequence ) {
40 defs = cHandlerNode["_enableIf"].as<std::list<std::string> >();
41 } else {
42 NA64DP_RUNTIME_ERROR( "\"_enableIf\" is expected to be a"
43 " string or list of strings." );
44 }
45 bool disable = false;
46 for( const std::string & def : defs ) {
47 if( (!cHandlerNode["__definitions"][def])
48 || (!cHandlerNode["__definitions"][def].as<bool>()) )
49 disable = true;
50 }
51 if(disable) continue;
52 }
53 #endif
54 try {
55 hPtr = VCtr::self().make<AbstractHandler>( handlerTypeStr
56 , dspPtr
57 , cHandlerNode );
58 } catch(const std::exception & e) {
59 _log.error( "An exception has been thrown during"
60 " instantiation of handler %zu: %s", nHandler, e.what() );
61 { // TODO: mark with padding or something...
62 std::ostringstream yos;
63 yos << cHandlerNode;
64 _log.error( "Configuration of the problematic piece: \n%s", yos.str().c_str() );
65 }
66 throw;
67 }
68 push_back( hPtr );
69 if( _procInfoPtr ) {
70 char epiLabelStr[256];
71 if( cHandlerNode["_label"] && !cHandlerNode["_label"].as<std::string>().empty() ) {
72 snprintf( epiLabelStr, sizeof(epiLabelStr)
73 , "%s : %s", cHandlerNode["_label"].as<std::string>().c_str(), handlerTypeStr.c_str() );
74 } else {
75 snprintf( epiLabelStr, sizeof(epiLabelStr)
76 , "%p : %s", hPtr, handlerTypeStr.c_str() );
77 }
78 _procInfoPtr->register_handler( hPtr, epiLabelStr );
79 }
80 ++nHandler;
81 }
82 msg_debug( _log
83 , "Pipeline object %p with %zu handlers instantiated."
84 , this, this->size() );
85 }
86
87 1 Pipeline::~Pipeline() {
88
2/2
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 1 times.
3 for( auto & handlerPtr : *this ) {
89 2 handlerPtr->finalize();
90 }
91
2/2
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 1 times.
3 for( auto & handlerPtr : *this ) {
92
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 delete handlerPtr;
93 }
94 1 }
95
96 std::pair<bool, bool>
97 18 Pipeline::process( event::Event & event, LocalMemory & lmem ) {
98 18 bool processIngStoppedByHandler = false
99 18 , eventDiscriminated = false;
100 18 size_t nHandler = 0;
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
18 if( _procInfoPtr ) {
102 // Note: the notify_event_read() returns `false' when events counter
103 // exceeds maximum number and probably may return it as feedback
104 // mechanism for rudimentary remote control over running processing.
105 // Nevertheless we do not use this result now.
106 _procInfoPtr->notify_event_read();
107 for( auto & handlerPtr : *this ) {
108 ++nHandler;
109 _procInfoPtr->notify_handler_starts( handlerPtr );
110
111 AbstractHandler::ProcRes lpr;
112 handlerPtr->set_local_memory(lmem);
113 //try {
114 lpr = handlerPtr->process_event( event );
115 //} catch(std::exception & e) {
116 // _log << log4cpp::Priority::ERROR
117 // << "Handler #" << nHandler
118 // << " in pipeline " << ((void*) this)
119 // << " emitted error while processing event: " << e.what();
120 // throw;
121 //}
122 handlerPtr->reset_local_memory(); // TODO RAI
123
124 processIngStoppedByHandler |= (lpr & AbstractHandler::kStopProcessing);
125 if( lpr & AbstractHandler::kDiscriminateEvent ) {
126 _procInfoPtr->notify_event_discriminated( handlerPtr );
127 eventDiscriminated = true;
128 break;
129 } else {
130 _procInfoPtr->notify_handler_done( handlerPtr );
131 }
132 }
133 } else {
134
2/2
✓ Branch 5 taken 27 times.
✓ Branch 6 taken 6 times.
33 for( auto & handlerPtr : *this ) {
135 27 ++nHandler;
136 AbstractHandler::ProcRes lpr;
137
1/1
✓ Branch 1 taken 27 times.
27 handlerPtr->set_local_memory(lmem);
138 //try {
139
1/1
✓ Branch 1 taken 27 times.
27 lpr = handlerPtr->process_event( event );
140 //} catch(std::exception & e) {
141 // _log << log4cpp::Priority::ERROR
142 // << "Handler #" << nHandler
143 // << " in pipeline " << ((void*) this)
144 // << " emitted error while processing event: " << e.what();
145 // throw;
146 //}
147
1/1
✓ Branch 1 taken 27 times.
27 handlerPtr->reset_local_memory(); // TODO RAI
148
149 27 processIngStoppedByHandler |= (lpr & AbstractHandler::kStopProcessing);
150
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 15 times.
27 if( lpr & AbstractHandler::kDiscriminateEvent ) {
151 12 eventDiscriminated = true;
152 12 break;
153 }
154 }
155 }
156 18 return {processIngStoppedByHandler, eventDiscriminated};
157 }
158
159
160 RunCfg::RunCfg( const std::string & name
161 , const std::string & paths
162 ) : _paths(paths)
163 , _filename(_find_rcfg_file(name))
164 {}
165
166 RunCfg::RunCfg( YAML::Node & root
167 , const std::string & paths
168 ) : _paths(paths)
169 , _root(root)
170 {}
171
172 std::string
173 RunCfg::_find_rcfg_file(const std::string & fname) {
174 util::RuntimeDirs rd(_paths.c_str());
175 auto files = rd.locate_files(fname);
176 if( files.size() > 1 ) {
177 std::ostringstream oss;
178 bool isFirst = true;
179 for( const auto & occ : files ) {
180 if(isFirst) isFirst = false; else oss << ", ";
181 oss << "\"" << occ << "\"";
182 }
183 log4cpp::Category::getInstance("pipeline.config")
184 .warn( "Got %zu matches for runtime config matching pattern \"%s\": %s"
185 " Using first occurence (\"%s\")"
186 , files.size()
187 , fname.c_str()
188 , oss.str().c_str()
189 , files[0].c_str()
190 );
191 } else if( files.empty() ) {
192 log4cpp::Category::getInstance("pipeline.config")
193 .debug(rd.dir_lookup_log().c_str());
194 NA64DP_RUNTIME_ERROR( "No runtime config named \"%s\" found. Lookup"
195 " locations: \"%s\".", fname.c_str(), _paths.c_str() );
196 }
197 return files[0];
198 }
199
200 YAML::Node
201 RunCfg::get() {
202 if(_filename.empty()) {
203 log4cpp::Category::getInstance("pipeline.config")
204 << log4cpp::Priority::INFO
205 << "Using cached pipeline config node.";
206 return _root;
207 }
208 log4cpp::Category::getInstance("pipeline.config")
209 << log4cpp::Priority::INFO
210 << "Using pipeline configuration from \"" << _filename << "\".";
211
212 // TODO: ... one can place some cfg-file preprocessing here
213
214 try {
215 _root = YAML::LoadFile(_filename);
216 } catch( std::exception & e ) {
217 log4cpp::Category::getInstance("pipeline.config")
218 .error( "An error occured while accessing, reading, or parsing"
219 " file \"%s\": %s"
220 , _filename.c_str()
221 , e.what()
222 );
223 throw e;
224 }
225 return _root;
226 }
227
228 } // namespace na64dp
229
230