xrootd
Loading...
Searching...
No Matches
XrdClAsyncPageReader.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
20#define SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
21
23#include "XrdCl/XrdClSocket.hh"
26
27#include <sys/uio.h>
28#include <memory>
29#include <arpa/inet.h>
30
31namespace XrdCl
32{
33
34//------------------------------------------------------------------------------
36//------------------------------------------------------------------------------
38{
39 public:
40
41 //--------------------------------------------------------------------------
46 //--------------------------------------------------------------------------
48 std::vector<uint32_t> &digests ) :
49 chunks( chunks ),
51 dlen( 0 ),
52 rspoff( 0 ),
53 chindex( 0 ),
54 choff( 0 ),
55 dgindex( 0 ),
56 dgoff( 0 ),
57 iovcnt( 0 ),
58 iovindex( 0 )
59 {
60 uint64_t rdoff = chunks.front().offset;
61 uint32_t rdlen = 0;
62 for( auto &ch : chunks )
63 rdlen += ch.length;
64 int fpglen, lpglen;
65 int pgcnt = XrdOucPgrwUtils::csNum( rdoff, rdlen, fpglen, lpglen);
66 digests.resize( pgcnt );
67 }
68
69 //--------------------------------------------------------------------------
71 //--------------------------------------------------------------------------
73 {
74 }
75
76 //--------------------------------------------------------------------------
78 //--------------------------------------------------------------------------
80 {
81 dlen = rsp->status.bdy.dlen;
82 rspoff = rsp->info.pgread.offset;
83
84 uint64_t bufoff = rspoff - chunks[0].offset;
85 chindex = 0;
86
87 for( chindex = 0; chindex < chunks.size(); ++chindex )
88 {
89 if( chunks[chindex].length < bufoff )
90 {
91 bufoff -= chunks[chindex].length;
92 continue;
93 }
94 break;
95 }
96 choff = bufoff;
97 }
98
99 //--------------------------------------------------------------------------
104 //--------------------------------------------------------------------------
105 XRootDStatus Read( Socket &socket, uint32_t &btsread )
106 {
107 if( dlen == 0 || chindex >= chunks.size() )
108 return XRootDStatus();
109 btsread = 0;
110 int nbbts = 0;
111 do
112 {
113 // Prepare the IO vector for receiving the data
114 if( iov.empty() )
115 InitIOV();
116 // read the data into the buffer
117 nbbts = 0;
118 auto st = socket.ReadV( iov.data() + iovindex, iovcnt, nbbts );
119 if( !st.IsOK() )
120 return st;
121 btsread += nbbts;
122 dlen -= nbbts;
123 ShiftIOV( nbbts );
124 if( st.code == suRetry )
125 return st;
126 }
127 while( nbbts > 0 && dlen > 0 && chindex < chunks.size() );
128
129 return XRootDStatus();
130 }
131
132 private:
133
134 //--------------------------------------------------------------------------
136 //--------------------------------------------------------------------------
137 struct iovmax_t
138 {
140 {
141#ifdef _SC_IOV_MAX
142 value = sysconf(_SC_IOV_MAX);
143 if (value == -1)
144#endif
145#ifdef IOV_MAX
146 value = IOV_MAX;
147#else
148 value = 1024;
149#endif
150 value &= ~uint32_t( 1 ); // make sure it is an even number
151 }
152 int32_t value;
153 };
154
155 //--------------------------------------------------------------------------
157 //--------------------------------------------------------------------------
158 inline static int max_iovcnt()
159 {
160 static iovmax_t iovmax;
161 return iovmax.value;
162 }
163
164 //--------------------------------------------------------------------------
166 //--------------------------------------------------------------------------
167 inline void addiov( char *&buf, size_t len )
168 {
169 iov.emplace_back();
170 iov.back().iov_base = buf;
171 iov.back().iov_len = len;
172 buf += len;
173 ++iovcnt;
174 }
175
176 //--------------------------------------------------------------------------
178 //--------------------------------------------------------------------------
179 inline void addiov( char *&buf, uint32_t len, uint32_t &dleft )
180 {
181 if( len > dleft ) len = dleft;
182 addiov( buf, len );
183 dleft -= len;
184 }
185
186 //--------------------------------------------------------------------------
189 //--------------------------------------------------------------------------
190 inline static uint32_t CalcIOVSize( uint32_t dleft )
191 {
192 uint32_t ret = ( dleft / PageWithDigest + 2 ) * 2;
193 return ( ret > uint32_t( max_iovcnt() ) ? max_iovcnt() : ret );
194 }
195
196 //--------------------------------------------------------------------------
198 //--------------------------------------------------------------------------
199 uint32_t CalcRdSize()
200 {
201 // data size in the server response (including digests)
202 uint32_t dleft = dlen;
203 // space in our page buffer
204 uint32_t pgspace = chunks[chindex].length - choff;
205 // space in our digest buffer
206 uint32_t dgspace = sizeof( uint32_t ) * (digests.size() - dgindex ) - dgoff;
207 if( dleft > pgspace + dgspace )
208 dleft = pgspace + dgspace;
209 return dleft;
210 }
211
212 //--------------------------------------------------------------------------
214 //--------------------------------------------------------------------------
215 void InitIOV()
216 {
217 iovindex = 0;
218 // figure out the number of data we can read in one go
219 uint32_t dleft = CalcRdSize();
220 // and reset the I/O vector
221 iov.clear();
222 iovcnt = 0;
223 iov.reserve( CalcIOVSize( dleft ) );
224 // now prepare the page and digest buffers
226 char* pgbuf = static_cast<char*>( ch.buffer ) + choff;
227 uint64_t rdoff = ch.offset + choff;
228 char* dgbuf = reinterpret_cast<char*>( digests.data() + dgindex ) + dgoff;
229 // handle the first digest
230 uint32_t fdglen = sizeof( uint32_t ) - dgoff;
231 addiov( dgbuf, fdglen, dleft );
232 if( dleft == 0 || iovcnt >= max_iovcnt() )
233 return;
234 // handle the first page
235 uint32_t fpglen = XrdSys::PageSize - rdoff % XrdSys::PageSize;
236 addiov( pgbuf, fpglen, dleft );
237 if( dleft == 0 || iovcnt >= max_iovcnt() )
238 return;
239 // handle all the subsequent aligned pages
240 size_t fullpgs = dleft / PageWithDigest;
241 for( size_t i = 0; i < fullpgs; ++i )
242 {
243 addiov( dgbuf, sizeof( uint32_t ), dleft );
244 if( dleft == 0 || iovcnt >= max_iovcnt() )
245 return;
246 addiov( pgbuf, XrdSys::PageSize, dleft );
247 if( dleft == 0 || iovcnt >= max_iovcnt() )
248 return;
249 }
250 // handle the last digest
251 uint32_t ldglen = sizeof( uint32_t );
252 addiov( dgbuf, ldglen, dleft );
253 if( dleft == 0 || iovcnt >= max_iovcnt() )
254 return;
255 // handle the last page
256 addiov( pgbuf, dleft, dleft );
257 }
258
259 //--------------------------------------------------------------------------
261 //--------------------------------------------------------------------------
262 inline void shift( void *&buffer, size_t nbbts )
263 {
264 char *buf = static_cast<char*>( buffer );
265 buf += nbbts;
266 buffer = buf;
267 }
268
269 //--------------------------------------------------------------------------
273 //--------------------------------------------------------------------------
274 inline void shiftdgbuf( uint32_t &btsread )
275 {
276 if( iov[iovindex].iov_len > btsread )
277 {
278 iov[iovindex].iov_len -= btsread;
279 shift( iov[iovindex].iov_base, btsread );
280 dgoff += btsread;
281 btsread = 0;
282 return;
283 }
284
285 btsread -= iov[iovindex].iov_len;
286 iov[iovindex].iov_len = 0;
287 dgoff = 0;
288 digests[dgindex] = ntohl( digests[dgindex] );
289 ++dgindex;
290 ++iovindex;
291 --iovcnt;
292 }
293
294 //--------------------------------------------------------------------------
298 //--------------------------------------------------------------------------
299 inline void shiftpgbuf( uint32_t &btsread )
300 {
301 if( iov[iovindex].iov_len > btsread )
302 {
303 iov[iovindex].iov_len -= btsread;
304 shift( iov[iovindex].iov_base, btsread );
305 choff += btsread;
306 btsread = 0;
307 return;
308 }
309
310 btsread -= iov[iovindex].iov_len;
311 choff += iov[iovindex].iov_len;
312 iov[iovindex].iov_len = 0;
313 ++iovindex;
314 --iovcnt;
315 }
316
317 //--------------------------------------------------------------------------
319 //--------------------------------------------------------------------------
320 void ShiftIOV( uint32_t btsread )
321 {
322 // if iovindex is even it point to digest, otherwise it points to a page
323 if( iovindex % 2 == 0 )
324 shiftdgbuf( btsread );
325 // adjust as many I/O buffers as necessary
326 while( btsread > 0 )
327 {
328 // handle page
329 shiftpgbuf( btsread );
330 if( btsread == 0 ) break;
331 // handle digest
332 shiftdgbuf( btsread );
333 }
334 // if we filled the buffer, move to the next one
335 if( iovcnt == 0 )
336 iov.clear();
337 // do we need to move to the next chunk?
338 if( choff >= chunks[chindex].length )
339 {
340 ++chindex;
341 choff = 0;
342 }
343 }
344
345 ChunkList &chunks; //< list of data chunks to be filled with user data
346 std::vector<uint32_t> &digests; //< list of crc32c digests for every 4KB page of data
347 uint32_t dlen; //< size of the data in the message
348 uint64_t rspoff; //< response offset
349
350 size_t chindex; //< index of the current data buffer
351 size_t choff; //< offset within the current buffer
352 size_t dgindex; //< index of the current digest buffer
353 size_t dgoff; //< offset within the current digest buffer
354
355 std::vector<iovec> iov; //< I/O vector
356 int iovcnt; //< size of the I/O vector
357 size_t iovindex; //< index of the first valid element in the I/O vector
358
359 static const int PageWithDigest = XrdSys::PageSize + sizeof( uint32_t );
360};
361
362} /* namespace XrdEc */
363
364#endif /* SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_ */
Object for reading out data from the PgRead response.
Definition XrdClAsyncPageReader.hh:38
std::vector< uint32_t > & digests
Definition XrdClAsyncPageReader.hh:346
static int max_iovcnt()
Definition XrdClAsyncPageReader.hh:158
void SetRsp(ServerResponseV2 *rsp)
Sets message data size.
Definition XrdClAsyncPageReader.hh:79
size_t choff
Definition XrdClAsyncPageReader.hh:351
static uint32_t CalcIOVSize(uint32_t dleft)
Definition XrdClAsyncPageReader.hh:190
void shiftdgbuf(uint32_t &btsread)
Definition XrdClAsyncPageReader.hh:274
static const int PageWithDigest
Definition XrdClAsyncPageReader.hh:359
uint64_t rspoff
Definition XrdClAsyncPageReader.hh:348
size_t iovindex
Definition XrdClAsyncPageReader.hh:357
void addiov(char *&buf, size_t len)
Add I/O buffer to the vector.
Definition XrdClAsyncPageReader.hh:167
size_t dgindex
Definition XrdClAsyncPageReader.hh:352
virtual ~AsyncPageReader()
Destructor.
Definition XrdClAsyncPageReader.hh:72
std::vector< iovec > iov
Definition XrdClAsyncPageReader.hh:355
ChunkList & chunks
Definition XrdClAsyncPageReader.hh:345
size_t dgoff
Definition XrdClAsyncPageReader.hh:353
uint32_t CalcRdSize()
Calculate the size of the data to be read.
Definition XrdClAsyncPageReader.hh:199
void shift(void *&buffer, size_t nbbts)
Shift buffer by a number of bytes.
Definition XrdClAsyncPageReader.hh:262
void InitIOV()
Initialize the I/O vector.
Definition XrdClAsyncPageReader.hh:215
void ShiftIOV(uint32_t btsread)
shift the I/O vector by the number of bytes read
Definition XrdClAsyncPageReader.hh:320
AsyncPageReader(ChunkList &chunks, std::vector< uint32_t > &digests)
Definition XrdClAsyncPageReader.hh:47
int iovcnt
Definition XrdClAsyncPageReader.hh:356
void addiov(char *&buf, uint32_t len, uint32_t &dleft)
Add I/O buffer to the vector and update number of bytes left to be read.
Definition XrdClAsyncPageReader.hh:179
XRootDStatus Read(Socket &socket, uint32_t &btsread)
Definition XrdClAsyncPageReader.hh:105
uint32_t dlen
Definition XrdClAsyncPageReader.hh:347
size_t chindex
Definition XrdClAsyncPageReader.hh:350
void shiftpgbuf(uint32_t &btsread)
Definition XrdClAsyncPageReader.hh:299
A network socket.
Definition XrdClSocket.hh:43
XRootDStatus ReadV(iovec *iov, int iocnt, int &bytesRead)
Request status.
Definition XrdClXRootDResponses.hh:219
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
Definition XrdClAction.hh:34
const uint16_t suRetry
Definition XrdClStatus.hh:40
std::vector< ChunkInfo > ChunkList
List of chunks.
Definition XrdClXRootDResponses.hh:1055
static const int PageSize
Definition XrdSysPageSize.hh:36
kXR_int32 dlen
Definition XProtocol.hh:1236
kXR_int64 offset
Definition XProtocol.hh:1056
struct ServerResponseBody_Status bdy
Definition XProtocol.hh:1257
Definition XProtocol.hh:1304
ServerResponseStatus status
Definition XProtocol.hh:1305
union ServerResponseV2::@1 info
ServerResponseBody_pgRead pgread
Definition XProtocol.hh:1308
Helper class for retrieving the maximum size of the I/O vector.
Definition XrdClAsyncPageReader.hh:138
int32_t value
Definition XrdClAsyncPageReader.hh:152
iovmax_t()
Definition XrdClAsyncPageReader.hh:139
Describe a data chunk for vector read.
Definition XrdClXRootDResponses.hh:917
void * buffer
length of the chunk
Definition XrdClXRootDResponses.hh:950
uint64_t offset
Definition XrdClXRootDResponses.hh:948
char * data
Definition XrdOucIOVec.hh:45
Definition XrdOucIOVec.hh:65