xrootd
Loading...
Searching...
No Matches
XrdOssCsiFileAio.hh
Go to the documentation of this file.
1#ifndef _XRDOSSCSIFILEAIO_H
2#define _XRDOSSCSIFILEAIO_H
3/******************************************************************************/
4/* */
5/* X r d O s s C s i F i l e A i o . h h */
6/* */
7/* (C) Copyright 2021 CERN. */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* In applying this licence, CERN does not waive the privileges and */
17/* immunities granted to it by virtue of its status as an Intergovernmental */
18/* Organization or submit itself to any jurisdiction. */
19/* */
20/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
21/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
22/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
23/* License for more details. */
24/* */
25/* You should have received a copy of the GNU Lesser General Public License */
26/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
27/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
28/* */
29/* The copyright holder's institutional names and contributor's names may not */
30/* be used to endorse or promote products derived from this software without */
31/* specific prior written permission of the institution or contributor. */
32/******************************************************************************/
33
34#include "Xrd/XrdScheduler.hh"
35#include "XrdSfs/XrdSfsAio.hh"
36#include "XrdOssCsi.hh"
38
39#include <mutex>
40#include <thread>
41
43{
44public:
45
48
49 void Init(XrdOssCsiFile *fp, XrdOssCsiFileAio *nio, XrdSfsAio *aiop, bool isPg, bool read)
50 {
51 fp_ = fp;
52 nio_ = nio;
53 aiop_ = aiop;
54 pg_ = isPg;
55 read_ = read;
57 }
58
60 {
62 }
63
65 {
67 }
68
69 void DoIt() /* override */
70 {
71 switch(jobtype_)
72 {
73 case JobReadStep1:
74 // take rangelock, then submit aio read
75 DoItRead1();
76 break;
77
78 case JobReadStep2:
79 // fetch any extra bytes then verify/fetch csvec
80 DoItRead2();
81 break;
82
83 case JobWriteStep1:
84 // lock byte range, update/store csvec and queue aio write
85 DoItWrite1();
86 break;
87
88 case JobWriteStep2:
89 // check return from aio write, write any extra
90 DoItWrite2();
91 break;
92 }
93 }
94
95 void DoItRead1();
96 void DoItRead2();
97 void DoItWrite1();
98 void DoItWrite2();
99
100private:
104 bool pg_;
105 bool read_;
107};
108
110{
112public:
113
115 uint64_t pgOpts_;
116
117 virtual void doneRead() /* override */
118 {
119 parentaio_->Result = this->Result;
120 // schedule the result check and verify/fetchrange
122 }
123
124 virtual void doneWrite() /* override */
125 {
126 parentaio_->Result = this->Result;
127 // schedule the result check and write any extra
129 }
130
131 virtual void Recycle()
132 {
133 rg_.ReleaseAll();
134 parentaio_ = NULL;
135 XrdOssCsiFile *f = file_;
136 file_ = NULL;
137 if (store_)
138 {
139 std::lock_guard<std::mutex> guard(store_->mtx_);
140 next_ = store_->list_;
141 store_->list_ = this;
142 }
143 else
144 {
145 delete this;
146 }
147 if (f)
148 {
149 f->aioDec();
150 }
151 }
152
153 void Init(XrdSfsAio *aiop, XrdOssCsiFile *file, bool isPgOp, uint64_t opts, bool isread)
154 {
155 parentaio_ = aiop;
156 this->sfsAio.aio_fildes = aiop->sfsAio.aio_fildes;
157 this->sfsAio.aio_buf = aiop->sfsAio.aio_buf;
158 this->sfsAio.aio_nbytes = aiop->sfsAio.aio_nbytes;
159 this->sfsAio.aio_offset = aiop->sfsAio.aio_offset;
160 this->sfsAio.aio_reqprio = aiop->sfsAio.aio_reqprio;
161 this->cksVec = aiop->cksVec;
162 this->TIdent = aiop->TIdent;
163 file_ = file;
164 isPgOp_ = isPgOp;
165 pgOpts_ = opts;
167 job_.Init(file, this, aiop, isPgOp, isread);
168 file_->aioInc();
169 }
170
172 {
173 XrdOssCsiFileAio *p=NULL;
174 if (store)
175 {
176 std::lock_guard<std::mutex> guard(store->mtx_);
177 if ((p = store->list_)) store->list_ = p->next_;
178 }
179 if (!p) p = new XrdOssCsiFileAio(store);
180 return p;
181 }
182
184 {
187 }
188
190 {
192 }
193
195 {
198 }
199
201 {
203 }
204
207
208private:
216};
217
219{
220 // this job runs after async Read
221 // range was already locked read-only before the read
222
223 if (aiop_->Result<0 || nio_->sfsAio.aio_nbytes==0)
224 {
225 aiop_->doneRead();
226 nio_->Recycle();
227 return;
228 }
229
230 // if this is a pg operation and this was a short read, try to complete,
231 // otherwise caller will have to deal with joining csvec values from repeated reads
232
233 ssize_t toread = nio_->sfsAio.aio_nbytes - nio_->Result;
234 ssize_t nread = nio_->Result;
235
236 if (!pg_)
237 {
238 // not a pg operation, no need to read more
239 toread = 0;
240 }
241 char *p = (char*)nio_->sfsAio.aio_buf;
242 while(toread>0)
243 {
244 const ssize_t rret = fp_->successor_->Read(&p[nread], nio_->sfsAio.aio_offset+nread, toread);
245 if (rret == 0) break;
246 if (rret<0)
247 {
248 aiop_->Result = rret;
249 aiop_->doneRead();
250 nio_->Recycle();
251 return;
252 }
253 toread -= rret;
254 nread += rret;
255 }
256 aiop_->Result = nread;
257
258 ssize_t puret;
259 if (pg_)
260 {
261 puret = fp_->Pages()->FetchRange(fp_->successor_,
262 (void *)nio_->sfsAio.aio_buf,
263 (off_t)nio_->sfsAio.aio_offset,
264 (size_t)nio_->Result,
265 (uint32_t*)nio_->cksVec,
266 nio_->pgOpts_,
267 nio_->rg_);
268 }
269 else
270 {
271 puret = fp_->Pages()->VerifyRange(fp_->successor_,
272 (void *)nio_->sfsAio.aio_buf,
273 (off_t)nio_->sfsAio.aio_offset,
274 (size_t)nio_->Result,
275 nio_->rg_);
276 }
277 if (puret<0)
278 {
279 aiop_->Result = puret;
280 }
281 aiop_->doneRead();
282 nio_->Recycle();
283}
284
286{
287 // this job takes rangelock and then queues aio read
288
289 // lock range
291 (off_t)(aiop_->sfsAio.aio_offset+aiop_->sfsAio.aio_nbytes), true);
292
293 const int ret = fp_->successor_->Read(nio_);
294 if (ret<0)
295 {
296 aiop_->Result = ret;
297 aiop_->doneRead();
298 nio_->Recycle();
299 return;
300 }
301}
302
304{
305 // this job runs before async Write
306
307 // lock range
309 (off_t)(aiop_->sfsAio.aio_offset+aiop_->sfsAio.aio_nbytes), false);
310 int puret;
311 if (pg_) {
312 puret = fp_->Pages()->StoreRange(fp_->successor_,
313 (const void *)aiop_->sfsAio.aio_buf, (off_t)aiop_->sfsAio.aio_offset,
314 (size_t)aiop_->sfsAio.aio_nbytes, (uint32_t*)aiop_->cksVec, nio_->pgOpts_, nio_->rg_);
315
316 }
317 else
318 {
319 puret = fp_->Pages()->UpdateRange(fp_->successor_,
320 (const void *)aiop_->sfsAio.aio_buf, (off_t)aiop_->sfsAio.aio_offset,
321 (size_t)aiop_->sfsAio.aio_nbytes, nio_->rg_);
322 }
323 if (puret<0)
324 {
326 fp_->resyncSizes();
327 aiop_->Result = puret;
328 aiop_->doneWrite();
329 nio_->Recycle();
330 return;
331 }
332
333 const int ret = fp_->successor_->Write(nio_);
334 if (ret<0)
335 {
337 fp_->resyncSizes();
338 aiop_->Result = ret;
339 aiop_->doneWrite();
340 nio_->Recycle();
341 return;
342 }
343}
344
346{
347 // this job runs after the async Write
348
349 if (aiop_->Result<0)
350 {
352 fp_->resyncSizes();
353 aiop_->doneWrite();
354 nio_->Recycle();
355 return;
356 }
357
358 // in case there was a short write during the async write, finish
359 // writing the data now, otherwise the crc values will be inconsistent
360 ssize_t towrite = nio_->sfsAio.aio_nbytes - nio_->Result;
361 ssize_t nwritten = nio_->Result;
362 const char *p = (const char*)nio_->sfsAio.aio_buf;
363 while(towrite>0)
364 {
365 const ssize_t wret = fp_->successor_->Write(&p[nwritten], nio_->sfsAio.aio_offset+nwritten, towrite);
366 if (wret<0)
367 {
368 aiop_->Result = wret;
370 fp_->resyncSizes();
371 aiop_->doneWrite();
372 nio_->Recycle();
373 return;
374 }
375 towrite -= wret;
376 nwritten += wret;
377 }
378 aiop_->Result = nwritten;
379 aiop_->doneWrite();
380 nio_->Recycle();
381}
382
383#endif
#define read(a, b, c)
Definition XrdPosix.hh:77
Definition XrdJob.hh:43
Definition XrdOssCsiFileAio.hh:43
virtual ~XrdOssCsiFileAioJob()
Definition XrdOssCsiFileAio.hh:47
void DoIt()
Definition XrdOssCsiFileAio.hh:69
void Init(XrdOssCsiFile *fp, XrdOssCsiFileAio *nio, XrdSfsAio *aiop, bool isPg, bool read)
Definition XrdOssCsiFileAio.hh:49
@ JobReadStep2
Definition XrdOssCsiFileAio.hh:106
@ JobWriteStep1
Definition XrdOssCsiFileAio.hh:106
@ JobWriteStep2
Definition XrdOssCsiFileAio.hh:106
@ JobReadStep1
Definition XrdOssCsiFileAio.hh:106
XrdOssCsiFileAioJob()
Definition XrdOssCsiFileAio.hh:46
enum XrdOssCsiFileAioJob::@101 jobtype_
XrdOssCsiFile * fp_
Definition XrdOssCsiFileAio.hh:101
bool read_
Definition XrdOssCsiFileAio.hh:105
XrdOssCsiFileAio * nio_
Definition XrdOssCsiFileAio.hh:102
bool pg_
Definition XrdOssCsiFileAio.hh:104
void PrepareWrite2()
Definition XrdOssCsiFileAio.hh:59
void DoItWrite1()
Definition XrdOssCsiFileAio.hh:303
void PrepareRead2()
Definition XrdOssCsiFileAio.hh:64
XrdSfsAio * aiop_
Definition XrdOssCsiFileAio.hh:103
void DoItRead2()
Definition XrdOssCsiFileAio.hh:218
void DoItWrite2()
Definition XrdOssCsiFileAio.hh:345
void DoItRead1()
Definition XrdOssCsiFileAio.hh:285
Definition XrdOssCsi.hh:47
XrdOssCsiFileAio * list_
Definition XrdOssCsi.hh:53
std::mutex mtx_
Definition XrdOssCsi.hh:52
Definition XrdOssCsiFileAio.hh:110
~XrdOssCsiFileAio()
Definition XrdOssCsiFileAio.hh:206
void SchedReadJob()
Definition XrdOssCsiFileAio.hh:200
XrdScheduler * Sched_
Definition XrdOssCsiFileAio.hh:214
XrdOssCsiFileAio(XrdOssCsiFileAioStore *store)
Definition XrdOssCsiFileAio.hh:205
virtual void doneRead()
Definition XrdOssCsiFileAio.hh:117
XrdOssCsiFileAioStore * store_
Definition XrdOssCsiFileAio.hh:209
void SchedWriteJob2()
Definition XrdOssCsiFileAio.hh:183
XrdOssCsiFileAioJob job_
Definition XrdOssCsiFileAio.hh:213
void SchedWriteJob()
Definition XrdOssCsiFileAio.hh:189
XrdOssCsiRangeGuard rg_
Definition XrdOssCsiFileAio.hh:114
virtual void Recycle()
Definition XrdOssCsiFileAio.hh:131
XrdOssCsiFile * file_
Definition XrdOssCsiFileAio.hh:211
uint64_t pgOpts_
Definition XrdOssCsiFileAio.hh:115
XrdSfsAio * parentaio_
Definition XrdOssCsiFileAio.hh:210
void SchedReadJob2()
Definition XrdOssCsiFileAio.hh:194
virtual void doneWrite()
Definition XrdOssCsiFileAio.hh:124
XrdOssCsiFileAio * next_
Definition XrdOssCsiFileAio.hh:215
static XrdOssCsiFileAio * Alloc(XrdOssCsiFileAioStore *store)
Definition XrdOssCsiFileAio.hh:171
void Init(XrdSfsAio *aiop, XrdOssCsiFile *file, bool isPgOp, uint64_t opts, bool isread)
Definition XrdOssCsiFileAio.hh:153
bool isPgOp_
Definition XrdOssCsiFileAio.hh:212
Definition XrdOssCsi.hh:74
XrdOssCsiPages * Pages()
Definition XrdOssCsi.hh:140
void aioInc()
Definition XrdOssCsi.hh:111
void aioDec()
Definition XrdOssCsi.hh:120
int FetchRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
void LockTrackinglen(XrdOssCsiRangeGuard &, off_t, off_t, bool)
int StoreRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
int UpdateRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
int VerifyRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
Definition XrdOssCsiRanges.hh:58
static XrdScheduler * Sched_
Definition XrdOssCsi.hh:216
XrdOssDF * successor_
Definition XrdOssHandler.hh:81
virtual ssize_t Read(off_t offset, size_t size)
Definition XrdOss.hh:281
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:345
Definition XrdScheduler.hh:46
void Schedule(XrdJob *jp)
Definition XrdSfsAio.hh:59
uint32_t * cksVec
Definition XrdSfsAio.hh:63
ssize_t Result
Definition XrdSfsAio.hh:65
const char * TIdent
Definition XrdSfsAio.hh:67
virtual void doneRead()=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
virtual void doneWrite()=0
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
int aio_reqprio
Definition XrdSfsAio.hh:50
int aio_fildes
Definition XrdSfsAio.hh:46
void * aio_buf
Definition XrdSfsAio.hh:47