xrootd
Loading...
Searching...
No Matches
XrdClParallelOperation.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4// Michal Simon <michal.simon@cern.ch>
5//------------------------------------------------------------------------------
6// This file is part of the XRootD software suite.
7//
8// XRootD is free software: you can redistribute it and/or modify
9// it under the terms of the GNU Lesser General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// XRootD is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public License
19// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20//
21// In applying this licence, CERN does not waive the privileges and immunities
22// granted to it by virtue of its status as an Intergovernmental Organization
23// or submit itself to any jurisdiction.
24//------------------------------------------------------------------------------
25
26#ifndef __XRD_CL_PARALLELOPERATION_HH__
27#define __XRD_CL_PARALLELOPERATION_HH__
28
34
35#include <atomic>
36#include <condition_variable>
37#include <mutex>
38
39namespace XrdCl
40{
41
42 //----------------------------------------------------------------------------
43 // Interface for different execution policies:
44 // - all : all operations need to succeed in order for the parallel
45 // operation to be successful
46 // - any : just one of the operations needs to succeed in order for
47 // the parallel operation to be successful
48 // - some : n (user defined) operations need to succeed in order for
49 // the parallel operation to be successful
50 // - at least : at least n (user defined) operations need to succeed in
51 // order for the parallel operation to be successful (the
52 // user handler will be called only when all operations are
53 // resolved)
54 //
55 // @param status : status returned by one of the aggregated operations
56 //
57 // @return : true if the status should be passed to the user handler,
58 // false otherwise.
59 //----------------------------------------------------------------------------
61 {
63 {
64 }
65
66 virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
67
68 virtual XRootDStatus Result() = 0;
69 };
70
71 //----------------------------------------------------------------------------
77 //----------------------------------------------------------------------------
78 template<bool HasHndl>
79 class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
80 {
81 template<bool> friend class ParallelOperation;
82
83 public:
84
85 //------------------------------------------------------------------------
87 //------------------------------------------------------------------------
88 template<bool from>
90 ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
91 pipelines( std::move( obj.pipelines ) ),
92 policy( std::move( obj.policy ) )
93 {
94 }
95
96 //------------------------------------------------------------------------
102 //------------------------------------------------------------------------
103 template<class Container>
104 ParallelOperation( Container &&container )
105 {
106 static_assert( !HasHndl, "Constructor is available only operation without handler");
107
108 pipelines.reserve( container.size() );
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter( pipelines ) );
112 container.clear(); // there's junk inside so we clear it
113 }
114
116 {
117 }
118
119 //------------------------------------------------------------------------
121 //------------------------------------------------------------------------
122 std::string ToString()
123 {
124 std::ostringstream oss;
125 oss << "Parallel(";
126 for( size_t i = 0; i < pipelines.size(); i++ )
127 {
128 oss << pipelines[i]->ToString();
129 if( i + 1 != pipelines.size() )
130 {
131 oss << " && ";
132 }
133 }
134 oss << ")";
135 return oss.str();
136 }
137
138 //------------------------------------------------------------------------
143 //------------------------------------------------------------------------
145 {
146 policy.reset( new AllPolicy() );
147 return std::move( *this );
148 }
149
150 //------------------------------------------------------------------------
155 //------------------------------------------------------------------------
157 {
158 policy.reset( new AnyPolicy( pipelines.size() ) );
159 return std::move( *this );
160 }
161
162 //------------------------------------------------------------------------
163 // Set policy to `Some`
167 //------------------------------------------------------------------------
169 {
170 policy.reset( new SomePolicy( pipelines.size(), threshold ) );
171 return std::move( *this );
172 }
173
174 //------------------------------------------------------------------------
180 //------------------------------------------------------------------------
182 {
183 policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
184 return std::move( *this );
185 }
186
187 private:
188
189 //------------------------------------------------------------------------
194 //------------------------------------------------------------------------
195 struct AllPolicy : public PolicyExecutor
196 {
197 bool Examine( const XrdCl::XRootDStatus &status )
198 {
199 // keep the status in case this is the final result
200 res = status;
201 if( status.IsOK() ) return false;
202 // we require all request to succeed
203 return true;
204 }
205
207 {
208 return res;
209 }
210
212 };
213
214 //------------------------------------------------------------------------
219 //------------------------------------------------------------------------
220 struct AnyPolicy : public PolicyExecutor
221 {
222 AnyPolicy( size_t size) : cnt( size )
223 {
224 }
225
226 bool Examine( const XrdCl::XRootDStatus &status )
227 {
228 // keep the status in case this is the final result
229 res = status;
230 // decrement the counter
231 size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed );
232 // we require just one operation to be successful
233 if( status.IsOK() ) return true;
234 // lets see if this is the last one?
235 if( nb == 1 ) return true;
236 // we still have a chance there will be one that is successful
237 return false;
238 }
239
241 {
242 return res;
243 }
244
245 private:
246 std::atomic<size_t> cnt;
248 };
249
250 //------------------------------------------------------------------------
255 //------------------------------------------------------------------------
257 {
258 SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ),
260 {
261 }
262
263 bool Examine( const XrdCl::XRootDStatus &status )
264 {
265 // keep the status in case this is the final result
266 res = status;
267 if( status.IsOK() )
268 {
269 size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed );
270 if( s + 1 == threshold ) return true; // we reached the threshold
271 // we are not yet there
272 return false;
273 }
274 size_t f = failed.fetch_add( 1, std::memory_order_relaxed );
275 // did we drop below the threshold
276 if( f == size - threshold ) return true;
277 // we still have a chance there will be enough of successful operations
278 return false;
279 }
280
282 {
283 return res;
284 }
285
286 private:
287 std::atomic<size_t> failed;
288 std::atomic<size_t> succeeded;
289 const size_t threshold;
290 const size_t size;
292 };
293
294 //------------------------------------------------------------------------
300 //------------------------------------------------------------------------
302 {
303 AtLeastPolicy( size_t size, size_t threshold ) : pending_cnt( size ),
304 failed_cnt( 0 ),
305 failed_threshold( size - threshold )
306 {
307 }
308
309 bool Examine( const XrdCl::XRootDStatus &status )
310 {
311 // update number of pending operations
312 size_t pending = pending_cnt.fetch_sub( 1, std::memory_order_relaxed ) - 1;
313 // although we might have the minimum to succeed we wait for the rest
314 if( status.IsOK() ) return ( pending == 0 );
315 size_t nb = failed_cnt.fetch_add( 1, std::memory_order_relaxed );
316 if( nb == failed_threshold ) res = status; // we dropped below the threshold
317 // if we still have to wait for pending operations return false,
318 // otherwise all is done, return true
319 return ( pending == 0 );
320 }
321
323 {
324 return res;
325 }
326
327 private:
328 std::atomic<size_t> pending_cnt;
329 std::atomic<size_t> failed_cnt;
330 const size_t failed_threshold;
332 };
333
334 //------------------------------------------------------------------------
336 //------------------------------------------------------------------------
338 {
339 barrier_t() : on( true ) { }
340
341 void wait()
342 {
343 std::unique_lock<std::mutex> lck( mtx );
344 if( on ) cv.wait( lck );
345 }
346
347 void lift()
348 {
349 std::unique_lock<std::mutex> lck( mtx );
350 on = false;
351 cv.notify_all();
352 }
353
354 private:
355 std::condition_variable cv;
356 std::mutex mtx;
357 bool on;
358 };
359
360 //------------------------------------------------------------------------
365 //------------------------------------------------------------------------
366 struct Ctx
367 {
368 //----------------------------------------------------------------------
372 //----------------------------------------------------------------------
377
378 //----------------------------------------------------------------------
380 //----------------------------------------------------------------------
382 {
383 Handle( XRootDStatus() );
384 }
385
386 //----------------------------------------------------------------------
391 //----------------------------------------------------------------------
392 inline void Examine( const XRootDStatus &st )
393 {
394 if( policy->Examine( st ) )
395 Handle( policy->Result() );
396 }
397
398 //----------------------------------------------------------------------
403 //---------------------------------------------------------------------
404 inline void Handle( const XRootDStatus &st )
405 {
406 PipelineHandler* hdlr = handler.exchange( nullptr, std::memory_order_relaxed );
407 if( hdlr )
408 {
409 barrier.wait();
410 hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
411 }
412 }
413
414 //----------------------------------------------------------------------
416 //----------------------------------------------------------------------
417 std::atomic<PipelineHandler*> handler;
418
419 //----------------------------------------------------------------------
421 //----------------------------------------------------------------------
422 std::unique_ptr<PolicyExecutor> policy;
423
424 //----------------------------------------------------------------------
427 //----------------------------------------------------------------------
429 };
430
431 //------------------------------------------------------------------------
433 //------------------------------------------------------------------------
434 struct PipelineEnd : public Job
435 {
436 //----------------------------------------------------------------------
437 // Constructor
438 //----------------------------------------------------------------------
439 PipelineEnd( std::shared_ptr<Ctx> &ctx,
440 const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
441 {
442 }
443
444 //----------------------------------------------------------------------
445 // Run Ctx::Examine in the thread-pool
446 //----------------------------------------------------------------------
447 void Run( void* )
448 {
449 ctx->Examine( st );
450 delete this;
451 }
452
453 private:
454 std::shared_ptr<Ctx> ctx; //< ParallelOperaion context
455 XrdCl::XRootDStatus st; //< final status of the ParallelOperation
456 };
457
458 //------------------------------------------------------------------------
460 //------------------------------------------------------------------------
461 inline static
462 void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st)
463 {
465 PipelineEnd *end = new PipelineEnd( ctx, st );
466 mgr->QueueJob( end, nullptr );
467 }
468
469 //------------------------------------------------------------------------
475 //------------------------------------------------------------------------
476 XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
477 {
478 // make sure we have a valid policy for the parallel operation
479 if( !policy ) policy.reset( new AllPolicy() );
480
481 std::shared_ptr<Ctx> ctx =
482 std::make_shared<Ctx>( handler, policy.release() );
483
484 uint16_t timeout = pipelineTimeout < this->timeout ?
485 pipelineTimeout : this->timeout;
486
487 for( size_t i = 0; i < pipelines.size(); ++i )
488 {
489 if( !pipelines[i] ) continue;
490 pipelines[i].Run( timeout,
491 [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } );
492 }
493
494 ctx->barrier.lift();
495 return XRootDStatus();
496 }
497
498 std::vector<Pipeline> pipelines;
499 std::unique_ptr<PolicyExecutor> policy;
500 };
501
502 //----------------------------------------------------------------------------
504 //----------------------------------------------------------------------------
505 template<class Container>
506 inline ParallelOperation<false> Parallel( Container &&container )
507 {
508 return ParallelOperation<false>( container );
509 }
510
511 //----------------------------------------------------------------------------
513 //----------------------------------------------------------------------------
514 inline void PipesToVec( std::vector<Pipeline>& )
515 {
516 // base case
517 }
518
519 //----------------------------------------------------------------------------
520 // Declare PipesToVec (we need to do declare those functions ahead of
521 // definitions, as they may call each other.
522 //----------------------------------------------------------------------------
523 template<typename ... Others>
524 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
525 Others&... others );
526
527 template<typename ... Others>
528 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
529 Others&... others );
530
531 template<typename ... Others>
532 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
533 Others&... others );
534
535 //----------------------------------------------------------------------------
536 // Define PipesToVec
537 //----------------------------------------------------------------------------
538 template<typename ... Others>
539 void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
540 Others&... others )
541 {
542 v.emplace_back( operation );
543 PipesToVec( v, others... );
544 }
545
546 template<typename ... Others>
547 void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
548 Others&... others )
549 {
550 v.emplace_back( operation );
551 PipesToVec( v, others... );
552 }
553
554 template<typename ... Others>
555 void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
556 Others&... others )
557 {
558 v.emplace_back( std::move( pipeline ) );
559 PipesToVec( v, others... );
560 }
561
562 //----------------------------------------------------------------------------
567 //----------------------------------------------------------------------------
568 template<typename ... Operations>
569 inline ParallelOperation<false> Parallel( Operations&& ... operations )
570 {
571 constexpr size_t size = sizeof...( operations );
572 std::vector<Pipeline> v;
573 v.reserve( size );
574 PipesToVec( v, operations... );
575 return Parallel( v );
576 }
577}
578
579#endif // __XRD_CL_OPERATIONS_HH__
Definition XrdClOperations.hh:552
uint16_t timeout
Operation timeout.
Definition XrdClOperations.hh:776
static PostMaster * GetPostMaster()
Get default post master.
A synchronized queue.
Definition XrdClJobManager.hh:51
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition XrdClJobManager.hh:92
Interface for a job to be run by the job manager.
Definition XrdClJobManager.hh:34
Definition XrdClOperations.hh:188
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition XrdClOperations.hh:309
Definition XrdClParallelOperation.hh:80
ParallelOperation< HasHndl > Some(size_t threshold)
Definition XrdClParallelOperation.hh:168
std::vector< Pipeline > pipelines
Definition XrdClParallelOperation.hh:498
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
Definition XrdClParallelOperation.hh:89
ParallelOperation(Container &&container)
Definition XrdClParallelOperation.hh:104
static void Schedule(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Schedule Ctx::Examine to be executed in the client thread-pool.
Definition XrdClParallelOperation.hh:462
std::unique_ptr< PolicyExecutor > policy
Definition XrdClParallelOperation.hh:499
std::string ToString()
Definition XrdClParallelOperation.hh:122
ParallelOperation< HasHndl > All()
Definition XrdClParallelOperation.hh:144
ParallelOperation< HasHndl > Any()
Definition XrdClParallelOperation.hh:156
XRootDStatus RunImpl(PipelineHandler *handler, uint16_t pipelineTimeout)
Definition XrdClParallelOperation.hh:476
~ParallelOperation()
Definition XrdClParallelOperation.hh:115
ParallelOperation< HasHndl > AtLeast(size_t threshold)
Definition XrdClParallelOperation.hh:181
Definition XrdClOperations.hh:64
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
Definition XrdClOperations.hh:325
JobManager * GetJobManager()
Get the job manager object user by the post master.
Request status.
Definition XrdClXRootDResponses.hh:219
Definition XrdClAction.hh:34
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
Definition XrdClParallelOperation.hh:514
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
Definition XrdClParallelOperation.hh:506
Definition XrdOucJson.hh:4517
Definition XrdClParallelOperation.hh:196
XRootDStatus Result()
Definition XrdClParallelOperation.hh:206
bool Examine(const XrdCl::XRootDStatus &status)
Definition XrdClParallelOperation.hh:197
XRootDStatus res
Definition XrdClParallelOperation.hh:211
Definition XrdClParallelOperation.hh:221
XRootDStatus Result()
Definition XrdClParallelOperation.hh:240
std::atomic< size_t > cnt
Definition XrdClParallelOperation.hh:246
bool Examine(const XrdCl::XRootDStatus &status)
Definition XrdClParallelOperation.hh:226
XRootDStatus res
Definition XrdClParallelOperation.hh:247
AnyPolicy(size_t size)
Definition XrdClParallelOperation.hh:222
Definition XrdClParallelOperation.hh:302
std::atomic< size_t > failed_cnt
Definition XrdClParallelOperation.hh:329
std::atomic< size_t > pending_cnt
Definition XrdClParallelOperation.hh:328
const size_t failed_threshold
Definition XrdClParallelOperation.hh:330
XRootDStatus res
Definition XrdClParallelOperation.hh:331
AtLeastPolicy(size_t size, size_t threshold)
Definition XrdClParallelOperation.hh:303
XRootDStatus Result()
Definition XrdClParallelOperation.hh:322
bool Examine(const XrdCl::XRootDStatus &status)
Definition XrdClParallelOperation.hh:309
Definition XrdClParallelOperation.hh:367
barrier_t barrier
Definition XrdClParallelOperation.hh:428
void Handle(const XRootDStatus &st)
Definition XrdClParallelOperation.hh:404
void Examine(const XRootDStatus &st)
Definition XrdClParallelOperation.hh:392
~Ctx()
Destructor.
Definition XrdClParallelOperation.hh:381
std::unique_ptr< PolicyExecutor > policy
Policy defining when the user handler should be called.
Definition XrdClParallelOperation.hh:422
std::atomic< PipelineHandler * > handler
PipelineHandler of the ParallelOperation.
Definition XrdClParallelOperation.hh:417
Ctx(PipelineHandler *handler, PolicyExecutor *policy)
Definition XrdClParallelOperation.hh:373
The thread-pool job for schedule Ctx::Examine.
Definition XrdClParallelOperation.hh:435
std::shared_ptr< Ctx > ctx
Definition XrdClParallelOperation.hh:454
XrdCl::XRootDStatus st
Definition XrdClParallelOperation.hh:455
PipelineEnd(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Definition XrdClParallelOperation.hh:439
void Run(void *)
The job logic.
Definition XrdClParallelOperation.hh:447
Definition XrdClParallelOperation.hh:257
SomePolicy(size_t size, size_t threshold)
Definition XrdClParallelOperation.hh:258
XRootDStatus res
Definition XrdClParallelOperation.hh:291
const size_t size
Definition XrdClParallelOperation.hh:290
bool Examine(const XrdCl::XRootDStatus &status)
Definition XrdClParallelOperation.hh:263
std::atomic< size_t > failed
Definition XrdClParallelOperation.hh:287
const size_t threshold
Definition XrdClParallelOperation.hh:289
std::atomic< size_t > succeeded
Definition XrdClParallelOperation.hh:288
XRootDStatus Result()
Definition XrdClParallelOperation.hh:281
A wait barrier helper class.
Definition XrdClParallelOperation.hh:338
void lift()
Definition XrdClParallelOperation.hh:347
bool on
Definition XrdClParallelOperation.hh:357
void wait()
Definition XrdClParallelOperation.hh:341
std::condition_variable cv
Definition XrdClParallelOperation.hh:355
barrier_t()
Definition XrdClParallelOperation.hh:339
std::mutex mtx
Definition XrdClParallelOperation.hh:356
Definition XrdClParallelOperation.hh:61
virtual ~PolicyExecutor()
Definition XrdClParallelOperation.hh:62
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
Definition XrdClOperationHandlers.hh:662
bool IsOK() const
We're fine.
Definition XrdClStatus.hh:124