26#ifndef __XRD_CL_PARALLELOPERATION_HH__
27#define __XRD_CL_PARALLELOPERATION_HH__
36#include <condition_variable>
78 template<
bool HasHndl>
103 template<
class Container>
106 static_assert( !HasHndl,
"Constructor is available only operation without handler");
109 auto begin = std::make_move_iterator( container.begin() );
110 auto end = std::make_move_iterator( container.end() );
111 std::copy( begin, end, std::back_inserter(
pipelines ) );
124 std::ostringstream oss;
126 for(
size_t i = 0; i <
pipelines.size(); i++ )
147 return std::move( *
this );
159 return std::move( *
this );
171 return std::move( *
this );
184 return std::move( *
this );
201 if( status.
IsOK() )
return false;
231 size_t nb =
cnt.fetch_sub( 1, std::memory_order_relaxed );
233 if( status.
IsOK() )
return true;
235 if( nb == 1 )
return true;
269 size_t s =
succeeded.fetch_add( 1, std::memory_order_relaxed );
274 size_t f =
failed.fetch_add( 1, std::memory_order_relaxed );
312 size_t pending =
pending_cnt.fetch_sub( 1, std::memory_order_relaxed ) - 1;
314 if( status.
IsOK() )
return ( pending == 0 );
315 size_t nb =
failed_cnt.fetch_add( 1, std::memory_order_relaxed );
319 return ( pending == 0 );
343 std::unique_lock<std::mutex> lck(
mtx );
344 if(
on )
cv.wait( lck );
349 std::unique_lock<std::mutex> lck(
mtx );
355 std::condition_variable
cv;
394 if(
policy->Examine( st ) )
481 std::shared_ptr<Ctx> ctx =
484 uint16_t
timeout = pipelineTimeout < this->timeout ?
485 pipelineTimeout : this->
timeout;
487 for(
size_t i = 0; i <
pipelines.size(); ++i )
505 template<
class Container>
523 template<
typename ... Others>
524 inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
527 template<
typename ... Others>
528 inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
531 template<
typename ... Others>
532 inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
538 template<
typename ... Others>
542 v.emplace_back( operation );
546 template<
typename ... Others>
550 v.emplace_back( operation );
554 template<
typename ... Others>
558 v.emplace_back( std::move( pipeline ) );
568 template<
typename ... Operations>
571 constexpr size_t size =
sizeof...( operations );
572 std::vector<Pipeline> v;
Definition XrdClOperations.hh:552
uint16_t timeout
Operation timeout.
Definition XrdClOperations.hh:776
static PostMaster * GetPostMaster()
Get default post master.
A synchronized queue.
Definition XrdClJobManager.hh:51
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition XrdClJobManager.hh:92
Interface for a job to be run by the job manager.
Definition XrdClJobManager.hh:34
Definition XrdClOperations.hh:188
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition XrdClOperations.hh:309
Definition XrdClParallelOperation.hh:80
ParallelOperation< HasHndl > Some(size_t threshold)
Definition XrdClParallelOperation.hh:168
std::vector< Pipeline > pipelines
Definition XrdClParallelOperation.hh:498
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
Definition XrdClParallelOperation.hh:89
ParallelOperation(Container &&container)
Definition XrdClParallelOperation.hh:104
static void Schedule(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Schedule Ctx::Examine to be executed in the client thread-pool.
Definition XrdClParallelOperation.hh:462
std::unique_ptr< PolicyExecutor > policy
Definition XrdClParallelOperation.hh:499
std::string ToString()
Definition XrdClParallelOperation.hh:122
ParallelOperation< HasHndl > All()
Definition XrdClParallelOperation.hh:144
ParallelOperation< HasHndl > Any()
Definition XrdClParallelOperation.hh:156
XRootDStatus RunImpl(PipelineHandler *handler, uint16_t pipelineTimeout)
Definition XrdClParallelOperation.hh:476
~ParallelOperation()
Definition XrdClParallelOperation.hh:115
ParallelOperation< HasHndl > AtLeast(size_t threshold)
Definition XrdClParallelOperation.hh:181
Definition XrdClOperations.hh:64
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
Definition XrdClOperations.hh:325
JobManager * GetJobManager()
Get the job manager object user by the post master.
Request status.
Definition XrdClXRootDResponses.hh:219
Definition XrdClAction.hh:34
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
Definition XrdClParallelOperation.hh:514
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
Definition XrdClParallelOperation.hh:506
Definition XrdOucJson.hh:4517
Definition XrdClParallelOperation.hh:196
XRootDStatus Result()
Definition XrdClParallelOperation.hh:206
bool Examine(const XrdCl::XRootDStatus &status)
Definition XrdClParallelOperation.hh:197
XRootDStatus res
Definition XrdClParallelOperation.hh:211
Definition XrdClParallelOperation.hh:221
XRootDStatus Result()
Definition XrdClParallelOperation.hh:240
std::atomic< size_t > cnt
Definition XrdClParallelOperation.hh:246
bool Examine(const XrdCl::XRootDStatus &status)
Definition XrdClParallelOperation.hh:226
XRootDStatus res
Definition XrdClParallelOperation.hh:247
AnyPolicy(size_t size)
Definition XrdClParallelOperation.hh:222
Definition XrdClParallelOperation.hh:302
std::atomic< size_t > failed_cnt
Definition XrdClParallelOperation.hh:329
std::atomic< size_t > pending_cnt
Definition XrdClParallelOperation.hh:328
const size_t failed_threshold
Definition XrdClParallelOperation.hh:330
XRootDStatus res
Definition XrdClParallelOperation.hh:331
AtLeastPolicy(size_t size, size_t threshold)
Definition XrdClParallelOperation.hh:303
XRootDStatus Result()
Definition XrdClParallelOperation.hh:322
bool Examine(const XrdCl::XRootDStatus &status)
Definition XrdClParallelOperation.hh:309
Definition XrdClParallelOperation.hh:367
barrier_t barrier
Definition XrdClParallelOperation.hh:428
void Handle(const XRootDStatus &st)
Definition XrdClParallelOperation.hh:404
void Examine(const XRootDStatus &st)
Definition XrdClParallelOperation.hh:392
~Ctx()
Destructor.
Definition XrdClParallelOperation.hh:381
std::unique_ptr< PolicyExecutor > policy
Policy defining when the user handler should be called.
Definition XrdClParallelOperation.hh:422
std::atomic< PipelineHandler * > handler
PipelineHandler of the ParallelOperation.
Definition XrdClParallelOperation.hh:417
Ctx(PipelineHandler *handler, PolicyExecutor *policy)
Definition XrdClParallelOperation.hh:373
The thread-pool job for schedule Ctx::Examine.
Definition XrdClParallelOperation.hh:435
std::shared_ptr< Ctx > ctx
Definition XrdClParallelOperation.hh:454
XrdCl::XRootDStatus st
Definition XrdClParallelOperation.hh:455
PipelineEnd(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Definition XrdClParallelOperation.hh:439
void Run(void *)
The job logic.
Definition XrdClParallelOperation.hh:447
Definition XrdClParallelOperation.hh:257
SomePolicy(size_t size, size_t threshold)
Definition XrdClParallelOperation.hh:258
XRootDStatus res
Definition XrdClParallelOperation.hh:291
const size_t size
Definition XrdClParallelOperation.hh:290
bool Examine(const XrdCl::XRootDStatus &status)
Definition XrdClParallelOperation.hh:263
std::atomic< size_t > failed
Definition XrdClParallelOperation.hh:287
const size_t threshold
Definition XrdClParallelOperation.hh:289
std::atomic< size_t > succeeded
Definition XrdClParallelOperation.hh:288
XRootDStatus Result()
Definition XrdClParallelOperation.hh:281
A wait barrier helper class.
Definition XrdClParallelOperation.hh:338
void lift()
Definition XrdClParallelOperation.hh:347
bool on
Definition XrdClParallelOperation.hh:357
void wait()
Definition XrdClParallelOperation.hh:341
std::condition_variable cv
Definition XrdClParallelOperation.hh:355
barrier_t()
Definition XrdClParallelOperation.hh:339
std::mutex mtx
Definition XrdClParallelOperation.hh:356
Definition XrdClParallelOperation.hh:61
virtual ~PolicyExecutor()
Definition XrdClParallelOperation.hh:62
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
Definition XrdClOperationHandlers.hh:662
bool IsOK() const
We're fine.
Definition XrdClStatus.hh:124