GCC Code Coverage Report


Directory: ./
File: src/calib/manager.cc
Date: 2025-09-01 06:19:01
Exec Total Coverage
Lines: 6 195 3.1%
Functions: 4 17 23.5%
Branches: 3 47 6.4%

Line Branch Exec Source
1 #include "na64calib/manager.hh"
2 #include "na64util/str-fmt.hh"
3 #include "na64util/tsort.hh"
4
5 #include "yaml-cpp/yaml.h"
6
7 #if defined(jsoncpp_FOUND) && jsoncpp_FOUND
8 # include <json/writer.h>
9 #endif
10
11 // create_calibration_handle() shall know about all the existing handle
12 // subclasses. TODO: refactor it for VCtr
13 //#include "na64calib/YAMLCalibHandle.hh"
14
15 #include <fstream>
16 #include <unordered_set>
17
18 namespace na64dp {
19 namespace calib {
20
21 CIDataAliases * CIDataAliases::_self = nullptr;
22
23 1 CIDataAliases::CIDataAliases() : _logPtr(nullptr), _depsCacheValid(false) {}
24
25 CIDataAliases &
26 71 CIDataAliases::self() {
27
3/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 70 times.
✓ Branch 4 taken 1 times.
✗ Branch 7 not taken.
71 if(!_self) _self = new CIDataAliases();
28 71 return *_self;
29 }
30
31 void
32 CIDataAliases::add_alias( const std::string & name
33 , Dispatcher::CIDataID dataID ) {
34 _depsCacheValid = false;
35 {
36 auto ir = _nameToTypeID.emplace(name, dataID);
37 auto it = ir.first;
38 if( (!ir.second) && dataID != it->second ) {
39 NA64DP_RUNTIME_ERROR( "Calibration data type alias ambiguity:"
40 " name \"%s\" is already reserved for %s, while"
41 " another type (%s) claims same name now."
42 , name.c_str()
43 , util::calib_id_to_str(it->second, true).c_str()
44 , util::calib_id_to_str(dataID, true).c_str()
45 );
46 }
47 }
48 {
49 auto ir = _typeIDToName.emplace(dataID, name);
50 auto it = ir.first;
51 if( _typeIDToName.end() == it && name != it->second ) {
52 NA64DP_RUNTIME_ERROR( "Calibration data type alias ambiguity:"
53 " type %s is already aliased by name \"%s\", while"
54 " it is referenced again by name alias \"%s\"."
55 , util::calib_id_to_str(dataID, true).c_str()
56 , it->second.c_str()
57 , name.c_str()
58 );
59 }
60 }
61 if( _logPtr )
62 _logPtr->debug(
63 "Alias \"%s\" from now refers to data type %s."
64 , name.c_str()
65 , util::calib_id_to_str(dataID, true).c_str()
66 );
67 }
68
69 void
70 CIDataAliases::init_logging() {
71 if( _logPtr ) {
72 _logPtr->debug( "Skipping repeated CIDataAliases logging initialization." );
73 return;
74 }
75 _logPtr = &(log4cpp::Category::getInstance( "calib.aliases" ));
76 std::ostringstream oss;
77 oss << "Existing type aliases: ";
78 bool isFirst = true;
79 for( const auto & p : _nameToTypeID ) {
80 if(!isFirst) oss << ", "; else isFirst = false;
81 oss << "\"" << p.first << "\":" << util::calib_id_to_str(p.second, true);
82 }
83 _logPtr->debug( oss.str() );
84 #if 0
85 oss.clear();
86 if( !_depends.empty() ) {
87 oss << "Type dependencies: ";
88 for() // ... TODO
89 }
90 #endif
91 }
92
93 void
94 CIDataAliases::add_dependency( const std::string & product
95 , const std::string & requirement
96 ) {
97 _depsCacheValid = false;
98 auto itProd = _nameToTypeID.find(product);
99 if( _nameToTypeID.end() == itProd ) {
100 NA64DP_RUNTIME_ERROR( "Unknown dependee type alias \"%s\""
101 , product.c_str() );
102 }
103 auto er = _depends.equal_range(itProd->second);
104 // if we already know this dependency, just skip it
105 for( auto it = er.first; it != er.second; ++it ) {
106 if(it->second != requirement) continue;
107 if(_logPtr)
108 _logPtr->debug(
109 "Calib data type dependency already known: \"%s\" depends"
110 " on \"%s\"."
111 , product.c_str()
112 , requirement.c_str()
113 );
114 return; // known
115 }
116
117 _depends.emplace(itProd->second, requirement);
118 if( _logPtr )
119 _logPtr->debug(
120 "New calib data type dependency registered: \"%s\""
121 " depends on \"%s\"."
122 , product.c_str()
123 , requirement.c_str()
124 );
125 }
126
127 const std::unordered_multimap< Dispatcher::CIDataID
128 , Dispatcher::CIDataID
129 , util::PairHash > &
130 CIDataAliases::dependency_map() const {
131 if( _depsCacheValid ) return _depsCache;
132 _depsCache.clear();
133
134 for( const auto & p : _depends ) {
135 auto it = type_id_by_name().find(p.second);
136 if( type_id_by_name().end() == it ) {
137 NA64DP_RUNTIME_ERROR("Unknown type alias \"%s\" provided as a"
138 " requirement for %s."
139 , p.second.c_str()
140 , util::calib_id_to_str(p.first).c_str()
141 );
142 }
143 _depsCache.emplace(p.first, it->second);
144 }
145
146 _depsCacheValid = true;
147 return _depsCache;
148 }
149
150 //
151 // Manager
152 /////////
153
154 1 Manager::Manager(log4cpp::Category & L) : Dispatcher(L) {}
155 4 Manager::~Manager() {}
156
157 void Manager::_collect_updates( Updates & upds
158 , EventID eventID
159 , const std::pair<time_t, uint32_t> & eventTime
160 ) const {
161 for(auto iPtr : _indeces) {
162 iPtr->append_updates( _cEventID, eventID
163 , _cDatetime, eventTime
164 , upds );
165 }
166 }
167
168 void
169 Manager::_load_updates( const Updates & updates ) {
170 size_t nLoaded = 0;
171 // load updates if they are relevant (at least one observable exists)
172 for( const iUpdate * updateEntry : updates ) {
173 if(!observable_exists(updateEntry->subject_data_type())) {
174 _log.debug( "Update of type %s is not in demand."
175 , util::calib_id_to_str(updateEntry->subject_data_type(), true).c_str()
176 );
177 continue;
178 }
179 if(updateEntry->recommended_loader()) {
180 #ifndef NDEBUG
181 // returned is const ptr, we shall find non-const ptr within
182 // a mgr to assure it is owned by mgr. TODO: is it needed?
183 // For instance, registering a loader in the mgr will guarantee
184 // correct accounting of the deps prior to _sort_updates()
185 auto it = std::find_if( _loaders.begin(), _loaders.end()
186 , [&]( const std::pair<std::string, std::shared_ptr<iLoader>> & p ){
187 return updateEntry->recommended_loader() == p.second.get();
188 });
189 if( _loaders.end() == it ) {
190 // update provided a recommended loader, but it is not
191 // known to the mgr. Currently unclear that it is really
192 // necessary...
193 NA64DP_RUNTIME_ERROR("recommended loader is not owned by mgr"); // TODO
194 }
195 #endif
196 updateEntry->recommended_loader()->load(*updateEntry, *this);
197 ++nLoaded;
198 continue;
199 }
200 // no recommended loader -- try to handle with any existing
201 bool loaded = false;
202 for( auto lp : _loaders ) {
203 if( ! lp.second->can_handle(*updateEntry) ) continue;
204 lp.second->load(*updateEntry, *this);
205 loaded = true;
206 ++nLoaded;
207 break;
208 }
209 if( ! loaded ) {
210 // means that this update was enqueued with no recommended
211 // loader and none of the enabled loaders were capable to
212 // load this update
213 NA64DP_RUNTIME_ERROR("can't load an update"); // TODO details
214 }
215 }
216 _log.debug("%zu calibration updates have been applied.", nLoaded );
217 }
218
219 void
220 Manager::event_id( EventID newEventID
221 , const std::pair<time_t, uint32_t> & newDateTime
222 ) {
223 Updates upds;
224 // collect updates for the event
225 for( auto p : _indeces ) {
226 // msg_debug( "Loading updates of type ...", p.first.name() )
227 p->append_updates( _cEventID, newEventID
228 , _cDatetime, newDateTime
229 , upds
230 );
231 }
232 if( upds.empty() ) {
233 _log.debug( "No calibration data updates for event %s -> %s"
234 , _cEventID.to_str().c_str() // datetime?
235 , newEventID.to_str().c_str()
236 );
237 } else {
238 std::string msg;
239 #if 0 // TODO; temporary disabled as periods aliasing moved to extension lib
240 { // for better verbosity, attempt to get description of runs
241 // transition using aliases. Get aliases by time and run number and:
242 // - on failure, just print warning on failure to notify user on
243 // possible problems (yet, that's normal for runs outside
244 // official data taking).
245 // - if succeed, check that alias is the same (otherwise run and
246 // time identification diverged in calibrations) and add
247 // period description to message
248 try {
249 // get aliases independently for run number and run time, if
250 // possible.
251 auto aTo1 = Periods::self()[p348reco::RunNo_t{newEventID.run_no()}]
252 , aTo2 = Periods::self()[p348reco::Datetime_t{newDateTime.first}]
253 ;
254 if( aTo1.alias != aTo2.alias ) {
255 _log.warn( "Erroneous period aliasing for (current) run"
256 " #%d; by time -- \"%s\", by run number -- \"%s\"."
257 , (int) newEventID.run_no()
258 , aTo2.name.c_str()
259 , aTo1.name.c_str()
260 );
261 } else {
262 msg = util::format( "(\"%s\" -- %s)"
263 , aTo1.name.c_str()
264 , aTo2.comment.c_str() );
265 }
266 } catch( na64dp::errors::NoAliasDefined & e ) {
267 _log.debug( "Couldn't find alias for run causing calibration"
268 " update: %s (should be ok for non-data taking"
269 " runs).", e.what() );
270 msg.clear();
271 }
272 }
273 #endif
274 _sort_updates(upds);
275 _log.debug( "Loading %zu calibration data updates for event %s -> %s %s."
276 , upds.size()
277 , _cEventID.to_str().c_str() // datetime?
278 , newEventID.to_str().c_str()
279 , msg.empty() ? "" : msg.c_str()
280 );
281 _load_updates(upds);
282 _log.info( "%zu calibration data updates have been loaded for event %s -> %s %s."
283 , upds.size()
284 , _cEventID.to_str().c_str() // datetime?
285 , newEventID.to_str().c_str()
286 , msg.empty() ? "" : msg.c_str()
287 );
288 }
289 _cEventID = newEventID;
290 _cDatetime = newDateTime;
291
292 // let indeces to cleanup temporary objects
293 for( auto p : _indeces ) {
294 p->clear_tmp_update_objects();
295 }
296 }
297
298 void
299 Manager::_sort_updates( Updates & upds ) {
300 if( upds.size() < 2 ) return; // nothing to sort
301 // retrieve types
302 //std::unordered_multimap<Dispatcher::CIDataID, iUpdate>
303 std::unordered_map< Dispatcher::CIDataID
304 , std::vector<typename Updates::value_type>
305 , util::PairHash> byTypes;
306 for(auto upd : upds) {
307 auto ir = byTypes.emplace( upd->subject_data_type()
308 , decltype(byTypes)::mapped_type()
309 );
310 ir.first->second.push_back(upd);
311 }
312
313 const auto & depMap = CIDataAliases::self().dependency_map();
314 // Updates list not necessarily contains all the dependencies of depndee.
315 // They must be loaded beforehead (in previous invokations).
316 std::vector<Dispatcher::CIDataID> order;
317 {
318 std::unordered_set<Dispatcher::CIDataID, util::PairHash> unaccountedTypes;
319 std::transform( byTypes.begin(), byTypes.end()
320 , std::inserter(unaccountedTypes, unaccountedTypes.end())
321 , [](const decltype(byTypes)::value_type & e){
322 return e.first;
323 }
324 );
325 util::DAG< Dispatcher::CIDataID
326 , util::PairHash
327 , std::equal_to<Dispatcher::CIDataID>
328 , CIDataIDCompare
329 > g;
330 for( const auto & tp : byTypes ) {
331 const auto & requestedType = tp.first;
332 // get all deps for this type
333 auto allRequirements = depMap.equal_range(requestedType);
334 // add any deps with type existing in current updates list to graph
335 for( auto it = allRequirements.first
336 ; it != allRequirements.second
337 ; ++it ) {
338 // required type that may or may not be present in updates list
339 const auto & typeRequiredByRequested = it->second;
340 if( byTypes.end() != byTypes.find(typeRequiredByRequested) ) {
341 // push node in graph
342 g.add( it->second, requestedType );
343 unaccountedTypes.erase(it->second);
344 unaccountedTypes.erase(requestedType);
345 _log << log4cpp::Priority::DEBUG
346 << "Calibration data type "
347 << util::calib_id_to_str(requestedType)
348 << " depends on "
349 << util::calib_id_to_str(typeRequiredByRequested)
350 << " (added to DAG for tsort)"
351 ;
352 }
353 }
354 }
355 #if 0
356 for(const auto & tp1 : byTypes) {
357 for(const auto & tp2 : byTypes) {
358 if(std::equal_to<Dispatcher::CIDataID>()(tp1.first, tp2.first)) {
359 std::cout << util::calib_id_to_str(tp1.first)
360 << " == "
361 << util::calib_id_to_str(tp2.first)
362 << std::endl;
363 ;
364 } else {
365 std::cout << util::calib_id_to_str(tp1.first)
366 << " != "
367 << util::calib_id_to_str(tp2.first)
368 << std::endl;
369 ;
370 }
371 }
372 }
373 #endif
374 if(!unaccountedTypes.empty()) {
375 order.insert( order.end()
376 , unaccountedTypes.begin()
377 , unaccountedTypes.end()
378 );
379 }
380 const auto sorted = g.sorted();
381 if(!g.empty()) {
382 const auto edges = g.edges();
383 _log << log4cpp::Priority::ERROR
384 << "Edges remained after Kahn's algorithm for topological"
385 " sort (DFS): "
386 << util::str_join<decltype(edges)::const_iterator>(edges.begin(), edges.end(), ", "
387 , [](decltype(edges)::const_iterator it) {
388 return "(" + util::calib_id_to_str(it->first)
389 + "<-"
390 + util::calib_id_to_str(it->second) + ")";
391 } )
392 << "; retrieved: "
393 << util::str_join<decltype(order)::const_iterator>(
394 sorted.begin(), sorted.end(), ", "
395 , [](decltype(order)::const_iterator it){
396 return util::calib_id_to_str(*it);
397 })
398 ;
399 NA64DP_RUNTIME_ERROR("Cyclic dependency revealed in the calibration"
400 " data."); // TODO: how to get info on loops in Kahns' algo?
401 }
402 // concat free ones with sorted
403 order.insert(order.end(), sorted.begin(), sorted.end() );
404 }
405 // Iterate over the sorted types vector to put the updates in a proper
406 // order
407 Updates r;
408 std::ostringstream oss;
409 oss << "Calibration data types enqueued for update: ";
410 bool isFirst = true;
411 for( auto tpe : order ) {
412 const auto & upds = byTypes[tpe];
413 r.insert(r.end(), upds.begin(), upds.end());
414 if(isFirst) isFirst = false; else oss << ", ";
415 oss << util::calib_id_to_str(tpe) << "(" << upds.size() << ")";
416 }
417 _log.info(oss.str());
418 assert(r.size() == upds.size());
419 upds = r;
420 }
421
422 void
423 Manager::to_yaml(YAML::Node & rootNode) const {
424 { // state
425 YAML::Node cStateNode;
426 cStateNode["eventID"] = _cEventID.to_str();
427 YAML::Node timeNode;
428 {
429 std::ostringstream oss;
430 oss << _cDatetime.first << "." << _cDatetime.second;
431 cStateNode["time"] = oss.str();
432 }
433 rootNode["currentState"] = cStateNode;
434 }
435 { // loaders
436 YAML::Node loadersNode;
437 size_t nLoader = 0;
438 for( auto loaderNamedPair : _loaders ) {
439 YAML::Node loaderNode;
440 loaderNamedPair.second->to_yaml_as_loader(loaderNode);
441 loaderNode["_name"] = loaderNamedPair.first;
442 loadersNode[nLoader++] = loaderNode;
443 }
444 rootNode["loaders"] = loadersNode;
445 }
446 { // indices
447 YAML::Node idxsNode;
448 size_t nIdx = 0;
449 for( auto idxPtr : _indeces ) {
450 YAML::Node idxNode;
451 idxPtr->to_yaml(idxNode);
452 idxsNode[nIdx++] = idxNode;
453 }
454 rootNode["indeces"] = idxsNode;
455 }
456 }
457
458 } // namespace ::na64dp::calib
459 } // namespace na64dp
460
461