xrootd
Loading...
Searching...
No Matches
XrdEcUtilities.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef SRC_XRDEC_XRDECUTILITIES_HH_
26#define SRC_XRDEC_XRDECUTILITIES_HH_
27
28#include "XrdEc/XrdEcObjCfg.hh"
31#include "XrdCl/XrdClUtils.hh"
32
33#include <exception>
34#include <memory>
35#include <random>
36#include <queue>
37#include <mutex>
38#include <condition_variable>
39
40namespace XrdEc
41{
42 //---------------------------------------------------------------------------
44 //---------------------------------------------------------------------------
45 struct stripe_t
46 {
47 //-------------------------------------------------------------------------
52 //-------------------------------------------------------------------------
53 stripe_t( char *buffer, bool valid ) : buffer( buffer ), valid( valid )
54 {
55 }
56
57 char *buffer; //< buffer with stripe data
58 bool valid; //< true if data are valid, otherwise false
59 };
60
61 //---------------------------------------------------------------------------
63 //---------------------------------------------------------------------------
64 typedef std::vector<stripe_t> stripes_t;
65
66 //----------------------------------------------------------------------------
68 //----------------------------------------------------------------------------
69 typedef std::vector<char> buffer_t;
70
71 //----------------------------------------------------------------------------
73 //----------------------------------------------------------------------------
74 class IOError : public std::exception
75 {
76 public:
77
78 //------------------------------------------------------------------------
82 //------------------------------------------------------------------------
83 IOError( const XrdCl::XRootDStatus &st ) noexcept : st( st ), msg( st.ToString() )
84 {
85 }
86
87 //------------------------------------------------------------------------
89 //------------------------------------------------------------------------
90 IOError( const IOError &err ) noexcept : st( err.st ), msg( err.st.ToString() )
91 {
92 }
93
94 //------------------------------------------------------------------------
96 //------------------------------------------------------------------------
97 IOError& operator=( const IOError &err ) noexcept
98 {
99 st = err.st;
100 msg = err.st.ToString();
101 return *this;
102 }
103
104 //------------------------------------------------------------------------
106 //------------------------------------------------------------------------
107 virtual ~IOError()
108 {
109 }
110
111 //------------------------------------------------------------------------
113 //------------------------------------------------------------------------
114 virtual const char* what() const noexcept
115 {
116 return msg.c_str();
117 }
118
119 //------------------------------------------------------------------------
121 //------------------------------------------------------------------------
123 {
124 return st;
125 }
126
127 enum
128 {
130 };
131
132 private:
133
134 //------------------------------------------------------------------------
136 //------------------------------------------------------------------------
138
139 //------------------------------------------------------------------------
141 //------------------------------------------------------------------------
142 std::string msg;
143 };
144
145 //---------------------------------------------------------------------------
152 //---------------------------------------------------------------------------
153 void ScheduleHandler( uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler );
154
155 //---------------------------------------------------------------------------
160 //---------------------------------------------------------------------------
162
163
164 //---------------------------------------------------------------------------
165 // A class implementing synchronous queue
166 //---------------------------------------------------------------------------
167 template<typename Element>
169 {
170 //-------------------------------------------------------------------------
171 // An internal exception used for interrupting the `dequeue` method
172 //-------------------------------------------------------------------------
174
175 //-------------------------------------------------------------------------
176 // Default constructor
177 //-------------------------------------------------------------------------
179 {
180 }
181
182 //-------------------------------------------------------------------------
183 // Enqueue new element into the queue
184 //-------------------------------------------------------------------------
185 inline void enqueue( Element && element )
186 {
187 std::unique_lock<std::mutex> lck( mtx );
188 elements.push( std::move( element ) );
189 cv.notify_all();
190 }
191
192 //-------------------------------------------------------------------------
193 // Dequeue an element from the front of the queue
194 // Note: if the queue is empty blocks until a new element is enqueued
195 //-------------------------------------------------------------------------
196 inline Element dequeue()
197 {
198 std::unique_lock<std::mutex> lck( mtx );
199 while( elements.empty() )
200 {
201 cv.wait( lck );
202 if( interrupted ) throw wait_interrupted();
203 }
204 Element element = std::move( elements.front() );
205 elements.pop();
206 return element;
207 }
208
209 //-------------------------------------------------------------------------
210 // Dequeue an element from the front of the queue
211 // Note: if the queue is empty returns false, true otherwise
212 //-------------------------------------------------------------------------
213 inline bool dequeue( Element &e )
214 {
215 std::unique_lock<std::mutex> lck( mtx );
216 if( elements.empty() ) return false;
217 e = std::move( elements.front() );
218 elements.pop();
219 return true;
220 }
221
222 //-------------------------------------------------------------------------
223 // Checks if the queue is empty
224 //-------------------------------------------------------------------------
225 bool empty()
226 {
227 std::unique_lock<std::mutex> lck( mtx );
228 return elements.empty();
229 }
230
231 //-------------------------------------------------------------------------
232 // Interrupt all waiting `dequeue` routines
233 //-------------------------------------------------------------------------
234 inline void interrupt()
235 {
236 interrupted = true;
237 cv.notify_all();
238 }
239
240 private:
241 std::queue<Element> elements; //< the queue itself
242 std::mutex mtx; //< mutex guarding the queue
243 std::condition_variable cv;
244 std::atomic<bool> interrupted; //< a flag, true if all `dequeue` routines
245 //< should be interrupted
246 };
247
248 //---------------------------------------------------------------------------
249 // Extract the block ID from the chunk file name
250 //---------------------------------------------------------------------------
251 inline static size_t fntoblk( const std::string &fn )
252 {
253 size_t end = fn.rfind( '.' );
254 size_t begin = fn.rfind( '.', end - 1 ) + 1;
255 size_t len = end - begin;
256 return std::stoul( fn.substr( begin, len ) );
257 }
258}
259
260#endif /* SRC_XRDEC_XRDECUTILITIES_HH_ */
Handle an async response.
Definition XrdClXRootDResponses.hh:1126
Request status.
Definition XrdClXRootDResponses.hh:219
Generic I/O exception, wraps up XrdCl::XRootDStatus (.
Definition XrdEcUtilities.hh:75
IOError(const IOError &err) noexcept
Copy constructor.
Definition XrdEcUtilities.hh:90
XrdCl::XRootDStatus st
The status object.
Definition XrdEcUtilities.hh:137
const XrdCl::XRootDStatus & Status() const
Definition XrdEcUtilities.hh:122
IOError(const XrdCl::XRootDStatus &st) noexcept
Definition XrdEcUtilities.hh:83
virtual ~IOError()
Destructor.
Definition XrdEcUtilities.hh:107
@ ioTooManyErrors
Definition XrdEcUtilities.hh:129
virtual const char * what() const noexcept
overloaded
Definition XrdEcUtilities.hh:114
IOError & operator=(const IOError &err) noexcept
Assigment operator.
Definition XrdEcUtilities.hh:97
std::string msg
The error message.
Definition XrdEcUtilities.hh:142
Definition XrdClZipArchive.hh:45
std::vector< stripe_t > stripes_t
All stripes in a block.
Definition XrdEcUtilities.hh:64
static size_t fntoblk(const std::string &fn)
Definition XrdEcUtilities.hh:251
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
std::vector< char > buffer_t
a buffer type
Definition XrdEcReader.hh:49
std::string ToString() const
Create a string representation.
A buffer with stripe data and info on validity.
Definition XrdEcUtilities.hh:46
char * buffer
Definition XrdEcUtilities.hh:57
bool valid
Definition XrdEcUtilities.hh:58
stripe_t(char *buffer, bool valid)
Definition XrdEcUtilities.hh:53
Definition XrdEcUtilities.hh:173
Definition XrdEcUtilities.hh:169
std::condition_variable cv
Definition XrdEcUtilities.hh:243
bool dequeue(Element &e)
Definition XrdEcUtilities.hh:213
bool empty()
Definition XrdEcUtilities.hh:225
void enqueue(Element &&element)
Definition XrdEcUtilities.hh:185
std::queue< Element > elements
Definition XrdEcUtilities.hh:241
sync_queue()
Definition XrdEcUtilities.hh:178
Element dequeue()
Definition XrdEcUtilities.hh:196
std::mutex mtx
Definition XrdEcUtilities.hh:242
void interrupt()
Definition XrdEcUtilities.hh:234
std::atomic< bool > interrupted
Definition XrdEcUtilities.hh:244