| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | # include "na64dp/pipeline.hh" | ||
| 2 | # include "na64dp/processingInfo.hh" | ||
| 3 | # include "na64util/runtimeDirs.hh" | ||
| 4 | #include <exception> | ||
| 5 | |||
| 6 | namespace na64dp { | ||
| 7 | |||
| 8 |
2/2✓ Branch 1 taken 1 times.
✓ Branch 4 taken 1 times.
|
1 | Pipeline::Pipeline(iEvProcInfo * pi) : _log(log4cpp::Category::getInstance("pipeline")) |
| 9 | 1 | , _procInfoPtr(pi) { | |
| 10 | 1 | msg_debug( _log, "Empty pipeline object %p instantiated." ); | |
| 11 | 1 | } | |
| 12 | |||
| 13 | ✗ | Pipeline::Pipeline( YAML::Node cfg | |
| 14 | , calib::Manager & dspPtr | ||
| 15 | ✗ | , iEvProcInfo * pi ) : _log(log4cpp::Category::getInstance("pipeline")) | |
| 16 | ✗ | , _procInfoPtr(pi) { | |
| 17 | ✗ | if( ! cfg.IsSequence() ) { | |
| 18 | ✗ | NA64DP_RUNTIME_ERROR( "Pipeline description is not a sequence." ); | |
| 19 | } | ||
| 20 | ✗ | size_t nHandler = 0; | |
| 21 | ✗ | for( auto cHandlerNode : cfg ) { | |
| 22 | AbstractHandler * hPtr; | ||
| 23 | ✗ | if( ! cHandlerNode.IsMap() ) { | |
| 24 | ✗ | NA64DP_RUNTIME_ERROR( "Handler description #%zu is not" | |
| 25 | " a map.", nHandler ); | ||
| 26 | } | ||
| 27 | ✗ | std::string handlerTypeStr; | |
| 28 | ✗ | if( ! cHandlerNode["_type"] ) { | |
| 29 | ✗ | NA64DP_RUNTIME_ERROR( "Handler description #%zu does not" | |
| 30 | " provide \"_type\" property.", nHandler ); | ||
| 31 | } | ||
| 32 | ✗ | handlerTypeStr = cHandlerNode["_type"].as<std::string>(); | |
| 33 | #if 0 // TODO: experimental feature -- conditional hanlers | ||
| 34 | // check enableIf | ||
| 35 | if( cHandlerNode["_enableIf"] ) { | ||
| 36 | std::list<std::string> defs; | ||
| 37 | if( cHandlerNode["_enableIf"].Type() == YAML::NodeType::Scalar ) { | ||
| 38 | defs.push_back( cHandlerNode["_enableIf"].as<std::string>() ); | ||
| 39 | } else if( cHandlerNode["_enableIf"].Type() == YAML::NodeType::Sequence ) { | ||
| 40 | defs = cHandlerNode["_enableIf"].as<std::list<std::string> >(); | ||
| 41 | } else { | ||
| 42 | NA64DP_RUNTIME_ERROR( "\"_enableIf\" is expected to be a" | ||
| 43 | " string or list of strings." ); | ||
| 44 | } | ||
| 45 | bool disable = false; | ||
| 46 | for( const std::string & def : defs ) { | ||
| 47 | if( (!cHandlerNode["__definitions"][def]) | ||
| 48 | || (!cHandlerNode["__definitions"][def].as<bool>()) ) | ||
| 49 | disable = true; | ||
| 50 | } | ||
| 51 | if(disable) continue; | ||
| 52 | } | ||
| 53 | #endif | ||
| 54 | try { | ||
| 55 | ✗ | hPtr = VCtr::self().make<AbstractHandler>( handlerTypeStr | |
| 56 | , dspPtr | ||
| 57 | , cHandlerNode ); | ||
| 58 | ✗ | } catch(const std::exception & e) { | |
| 59 | ✗ | _log.error( "An exception has been thrown during" | |
| 60 | ✗ | " instantiation of handler %zu: %s", nHandler, e.what() ); | |
| 61 | { // TODO: mark with padding or something... | ||
| 62 | ✗ | std::ostringstream yos; | |
| 63 | ✗ | yos << cHandlerNode; | |
| 64 | ✗ | _log.error( "Configuration of the problematic piece: \n%s", yos.str().c_str() ); | |
| 65 | } | ||
| 66 | ✗ | throw; | |
| 67 | } | ||
| 68 | ✗ | push_back( hPtr ); | |
| 69 | ✗ | if( _procInfoPtr ) { | |
| 70 | char epiLabelStr[256]; | ||
| 71 | ✗ | if( cHandlerNode["_label"] && !cHandlerNode["_label"].as<std::string>().empty() ) { | |
| 72 | ✗ | snprintf( epiLabelStr, sizeof(epiLabelStr) | |
| 73 | ✗ | , "%s : %s", cHandlerNode["_label"].as<std::string>().c_str(), handlerTypeStr.c_str() ); | |
| 74 | } else { | ||
| 75 | ✗ | snprintf( epiLabelStr, sizeof(epiLabelStr) | |
| 76 | , "%p : %s", hPtr, handlerTypeStr.c_str() ); | ||
| 77 | } | ||
| 78 | ✗ | _procInfoPtr->register_handler( hPtr, epiLabelStr ); | |
| 79 | } | ||
| 80 | ✗ | ++nHandler; | |
| 81 | } | ||
| 82 | ✗ | msg_debug( _log | |
| 83 | , "Pipeline object %p with %zu handlers instantiated." | ||
| 84 | , this, this->size() ); | ||
| 85 | } | ||
| 86 | |||
| 87 | 1 | Pipeline::~Pipeline() { | |
| 88 |
2/2✓ Branch 5 taken 2 times.
✓ Branch 6 taken 1 times.
|
3 | for( auto & handlerPtr : *this ) { |
| 89 | 2 | handlerPtr->finalize(); | |
| 90 | } | ||
| 91 |
2/2✓ Branch 5 taken 2 times.
✓ Branch 6 taken 1 times.
|
3 | for( auto & handlerPtr : *this ) { |
| 92 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | delete handlerPtr; |
| 93 | } | ||
| 94 | 1 | } | |
| 95 | |||
| 96 | std::pair<bool, bool> | ||
| 97 | 18 | Pipeline::process( event::Event & event, LocalMemory & lmem ) { | |
| 98 | 18 | bool processIngStoppedByHandler = false | |
| 99 | 18 | , eventDiscriminated = false; | |
| 100 | 18 | size_t nHandler = 0; | |
| 101 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
|
18 | if( _procInfoPtr ) { |
| 102 | // Note: the notify_event_read() returns `false' when events counter | ||
| 103 | // exceeds maximum number and probably may return it as feedback | ||
| 104 | // mechanism for rudimentary remote control over running processing. | ||
| 105 | // Nevertheless we do not use this result now. | ||
| 106 | ✗ | _procInfoPtr->notify_event_read(); | |
| 107 | ✗ | for( auto & handlerPtr : *this ) { | |
| 108 | ✗ | ++nHandler; | |
| 109 | ✗ | _procInfoPtr->notify_handler_starts( handlerPtr ); | |
| 110 | |||
| 111 | AbstractHandler::ProcRes lpr; | ||
| 112 | ✗ | handlerPtr->set_local_memory(lmem); | |
| 113 | //try { | ||
| 114 | ✗ | lpr = handlerPtr->process_event( event ); | |
| 115 | //} catch(std::exception & e) { | ||
| 116 | // _log << log4cpp::Priority::ERROR | ||
| 117 | // << "Handler #" << nHandler | ||
| 118 | // << " in pipeline " << ((void*) this) | ||
| 119 | // << " emitted error while processing event: " << e.what(); | ||
| 120 | // throw; | ||
| 121 | //} | ||
| 122 | ✗ | handlerPtr->reset_local_memory(); // TODO RAI | |
| 123 | |||
| 124 | ✗ | processIngStoppedByHandler |= (lpr & AbstractHandler::kStopProcessing); | |
| 125 | ✗ | if( lpr & AbstractHandler::kDiscriminateEvent ) { | |
| 126 | ✗ | _procInfoPtr->notify_event_discriminated( handlerPtr ); | |
| 127 | ✗ | eventDiscriminated = true; | |
| 128 | ✗ | break; | |
| 129 | } else { | ||
| 130 | ✗ | _procInfoPtr->notify_handler_done( handlerPtr ); | |
| 131 | } | ||
| 132 | } | ||
| 133 | } else { | ||
| 134 |
2/2✓ Branch 5 taken 27 times.
✓ Branch 6 taken 6 times.
|
33 | for( auto & handlerPtr : *this ) { |
| 135 | 27 | ++nHandler; | |
| 136 | AbstractHandler::ProcRes lpr; | ||
| 137 |
1/1✓ Branch 1 taken 27 times.
|
27 | handlerPtr->set_local_memory(lmem); |
| 138 | //try { | ||
| 139 |
1/1✓ Branch 1 taken 27 times.
|
27 | lpr = handlerPtr->process_event( event ); |
| 140 | //} catch(std::exception & e) { | ||
| 141 | // _log << log4cpp::Priority::ERROR | ||
| 142 | // << "Handler #" << nHandler | ||
| 143 | // << " in pipeline " << ((void*) this) | ||
| 144 | // << " emitted error while processing event: " << e.what(); | ||
| 145 | // throw; | ||
| 146 | //} | ||
| 147 |
1/1✓ Branch 1 taken 27 times.
|
27 | handlerPtr->reset_local_memory(); // TODO RAI |
| 148 | |||
| 149 | 27 | processIngStoppedByHandler |= (lpr & AbstractHandler::kStopProcessing); | |
| 150 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 15 times.
|
27 | if( lpr & AbstractHandler::kDiscriminateEvent ) { |
| 151 | 12 | eventDiscriminated = true; | |
| 152 | 12 | break; | |
| 153 | } | ||
| 154 | } | ||
| 155 | } | ||
| 156 | 18 | return {processIngStoppedByHandler, eventDiscriminated}; | |
| 157 | } | ||
| 158 | |||
| 159 | |||
| 160 | ✗ | RunCfg::RunCfg( const std::string & name | |
| 161 | , const std::string & paths | ||
| 162 | ✗ | ) : _paths(paths) | |
| 163 | ✗ | , _filename(_find_rcfg_file(name)) | |
| 164 | ✗ | {} | |
| 165 | |||
| 166 | ✗ | RunCfg::RunCfg( YAML::Node & root | |
| 167 | , const std::string & paths | ||
| 168 | ✗ | ) : _paths(paths) | |
| 169 | ✗ | , _root(root) | |
| 170 | ✗ | {} | |
| 171 | |||
| 172 | std::string | ||
| 173 | ✗ | RunCfg::_find_rcfg_file(const std::string & fname) { | |
| 174 | ✗ | util::RuntimeDirs rd(_paths.c_str()); | |
| 175 | ✗ | auto files = rd.locate_files(fname); | |
| 176 | ✗ | if( files.size() > 1 ) { | |
| 177 | ✗ | std::ostringstream oss; | |
| 178 | ✗ | bool isFirst = true; | |
| 179 | ✗ | for( const auto & occ : files ) { | |
| 180 | ✗ | if(isFirst) isFirst = false; else oss << ", "; | |
| 181 | ✗ | oss << "\"" << occ << "\""; | |
| 182 | } | ||
| 183 | ✗ | log4cpp::Category::getInstance("pipeline.config") | |
| 184 | ✗ | .warn( "Got %zu matches for runtime config matching pattern \"%s\": %s" | |
| 185 | " Using first occurence (\"%s\")" | ||
| 186 | , files.size() | ||
| 187 | , fname.c_str() | ||
| 188 | ✗ | , oss.str().c_str() | |
| 189 | ✗ | , files[0].c_str() | |
| 190 | ); | ||
| 191 | ✗ | } else if( files.empty() ) { | |
| 192 | ✗ | log4cpp::Category::getInstance("pipeline.config") | |
| 193 | ✗ | .debug(rd.dir_lookup_log().c_str()); | |
| 194 | ✗ | NA64DP_RUNTIME_ERROR( "No runtime config named \"%s\" found. Lookup" | |
| 195 | " locations: \"%s\".", fname.c_str(), _paths.c_str() ); | ||
| 196 | } | ||
| 197 | ✗ | return files[0]; | |
| 198 | } | ||
| 199 | |||
| 200 | YAML::Node | ||
| 201 | ✗ | RunCfg::get() { | |
| 202 | ✗ | if(_filename.empty()) { | |
| 203 | ✗ | log4cpp::Category::getInstance("pipeline.config") | |
| 204 | ✗ | << log4cpp::Priority::INFO | |
| 205 | ✗ | << "Using cached pipeline config node."; | |
| 206 | ✗ | return _root; | |
| 207 | } | ||
| 208 | ✗ | log4cpp::Category::getInstance("pipeline.config") | |
| 209 | ✗ | << log4cpp::Priority::INFO | |
| 210 | ✗ | << "Using pipeline configuration from \"" << _filename << "\"."; | |
| 211 | |||
| 212 | // TODO: ... one can place some cfg-file preprocessing here | ||
| 213 | |||
| 214 | try { | ||
| 215 | ✗ | _root = YAML::LoadFile(_filename); | |
| 216 | ✗ | } catch( std::exception & e ) { | |
| 217 | ✗ | log4cpp::Category::getInstance("pipeline.config") | |
| 218 | ✗ | .error( "An error occured while accessing, reading, or parsing" | |
| 219 | " file \"%s\": %s" | ||
| 220 | , _filename.c_str() | ||
| 221 | ✗ | , e.what() | |
| 222 | ); | ||
| 223 | ✗ | throw e; | |
| 224 | } | ||
| 225 | ✗ | return _root; | |
| 226 | } | ||
| 227 | |||
| 228 | } // namespace na64dp | ||
| 229 | |||
| 230 |