xrootd
Loading...
Searching...
No Matches
XrdSysKernelBuffer.hh
Go to the documentation of this file.
1//-----------------------------------------------------------------------------
2// Copyright (c) 2013 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//-----------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//-----------------------------------------------------------------------------
24
25#ifndef SRC_XRDCL_XRDCLKERNELBUFFER_HH_
26#define SRC_XRDCL_XRDCLKERNELBUFFER_HH_
27
28#include <fcntl.h>
29#include <sys/uio.h>
30#include <cstdint>
31#include <unistd.h>
32#include <cstring>
33
34#include <vector>
35#include <array>
36#include <tuple>
37
38namespace XrdSys
39{
40 //---------------------------------------------------------------------------
44 //---------------------------------------------------------------------------
46 {
47 friend ssize_t Read( int, KernelBuffer&, uint32_t, int64_t );
48
49 friend ssize_t Read( int, KernelBuffer&, uint32_t );
50
51 friend ssize_t Write( int, KernelBuffer&, int64_t );
52
53 friend ssize_t Send( int, KernelBuffer& );
54
55 friend ssize_t Move( KernelBuffer&, char*& );
56
57 friend ssize_t Move( char*&, KernelBuffer&, size_t );
58
59 public:
60
61 //-----------------------------------------------------------------------
63 //-----------------------------------------------------------------------
64 KernelBuffer() : capacity( 0 ), size( 0 )
65 {
66 }
67
68 //-----------------------------------------------------------------------
70 //-----------------------------------------------------------------------
71 KernelBuffer( const KernelBuffer& ) = delete;
72
73 //-----------------------------------------------------------------------
74 // Move constructor
75 //-----------------------------------------------------------------------
77 size( kbuff.size ),
78 pipes( std::move( kbuff.pipes ) )
79 {
80 capacity = 0;
81 size = 0;
82 }
83
84 //-----------------------------------------------------------------------
86 //-----------------------------------------------------------------------
87 KernelBuffer& operator=( const KernelBuffer& ) = delete;
88
89 //-----------------------------------------------------------------------
91 //-----------------------------------------------------------------------
93 {
94 capacity = kbuff.capacity;
95 size = kbuff.size;
96 pipes = std::move( kbuff.pipes );
97 return *this;
98 }
99
100 //-----------------------------------------------------------------------
102 //-----------------------------------------------------------------------
104 {
105 if( capacity > 0 ) Free();
106 }
107
108 //------------------------------------------------------------------------
110 //------------------------------------------------------------------------
111 inline bool Empty() const
112 {
113 return size == 0;
114 }
115
116 //-----------------------------------------------------------------------
122 //-----------------------------------------------------------------------
123 inline static bool IsPageAligned( const void *ptr )
124 {
125 return ( ( uintptr_t ( ptr ) ) % PAGE_SZ ) == 0 ;
126 }
127
128 private:
129
130 //-----------------------------------------------------------------------
132 //-----------------------------------------------------------------------
133 inline void Free()
134 {
135 auto itr = pipes.begin();
136 for( ; itr != pipes.end() ; ++itr )
137 {
138 std::array<int, 2> &p = std::get<0>( *itr );
139 close( p[1] );
140 close( p[0] );
141 }
142
143 pipes.clear();
144 capacity = 0;
145 size = 0;
146 }
147
148 //-----------------------------------------------------------------------
150 //-----------------------------------------------------------------------
151 inline ssize_t Alloc( size_t size )
152 {
153#ifndef F_SETPIPE_SZ
154 return -ENOTSUP;
155#else
156 ssize_t ret = 0;
157
158 std::array<int, 2> pipe_fd;
159 ret = pipe( pipe_fd.data() );
160 if( ret < 0 ) return ret;
161
163 ret = fcntl( pipe_fd[0], F_SETPIPE_SZ, size );
164 if( ret < 0 ) return ret;
165
166 capacity += ret;
167 pipes.emplace_back( pipe_fd, 0 );
168
169 return ret;
170#endif
171 }
172
173 //-----------------------------------------------------------------------
182 //-----------------------------------------------------------------------
183#ifndef SPLICE_F_MOVE
184 inline ssize_t ReadFromFD( int fd, uint32_t length, int64_t *offset )
185 {
186 return -ENOTSUP;
187 }
188#else
189 inline ssize_t ReadFromFD( int fd, uint32_t length, loff_t *offset )
190 {
191 if( capacity > 0 ) Free();
192
193 while( length > 0 )
194 {
195 ssize_t ret = Alloc( length );
196 if( ret < 0 ) return ret;
197 if( size_t( ret ) > length ) ret = length;
198 std::array<int, 2> &pipe_fd = std::get<0>( pipes.back() );
199 size_t &pipedata = std::get<1>( pipes.back() );
200 ret = splice( fd, offset, pipe_fd[1], NULL, ret, SPLICE_F_MOVE | SPLICE_F_MORE );
201 if( ret == 0 ) break; // we reached the end of the file
202 if( ret < 0 ) return -1;
203 pipedata += ret;
204
205 length -= ret;
206 size += ret;
207 }
208 pipes_cursor = pipes.begin();
209
210 return size;
211 }
212#endif
213
214 //-----------------------------------------------------------------------
222 //-----------------------------------------------------------------------
223#ifndef SPLICE_F_MOVE
224 inline ssize_t WriteToFD( int fd, int64_t *offset )
225 {
226 return -ENOTSUP;
227 }
228#else
229 inline ssize_t WriteToFD( int fd, loff_t *offset )
230 {
231 if( size == 0 ) return 0;
232
233 ssize_t result = 0;
234
235 auto itr = pipes_cursor;
236 while( itr != pipes.end() )
237 {
238 std::array<int, 2> &pipe_fd = std::get<0>( *itr );
239 size_t &pipedata = std::get<1>( *itr );
240
241 int ret = splice( pipe_fd[0], NULL, fd, offset, size, SPLICE_F_MOVE | SPLICE_F_MORE );
242 if( ret == 0 ) break; // we reached the end of the file
243 if( ret < 0 ) return -1;
244
245 size -= ret;
246 result += ret;
247 pipedata -= ret;
248
249
250 if( pipedata > 0 ) continue;
251
252 ++pipes_cursor;
253 ++itr;
254 }
255
256 Free();
257
258 return result;
259 }
260#endif
261
262 //-----------------------------------------------------------------------
277 //-----------------------------------------------------------------------
278 inline ssize_t ToUser( char *&buffer )
279 {
280#ifndef SPLICE_F_MOVE
281 return -ENOTSUP;
282#else
283 if( size == 0 ) return 0;
284
285 ssize_t result = 0;
286
287 void *void_ptr = 0;
288 int ret = posix_memalign( &void_ptr, PAGE_SZ, size );
289 if( ret )
290 {
291 errno = ret;
292 return -1;
293 }
294 char *ptr = reinterpret_cast<char*>( void_ptr );
295
296 auto itr = pipes_cursor;
297 while( itr != pipes.end() )
298 {
299 iovec iov[1];
300 size_t len = size > MAX_PIPE_SIZE ? MAX_PIPE_SIZE : size;
301 iov->iov_len = len;
302 iov->iov_base = ptr;
303
304 std::array<int, 2> &pipe_fd = std::get<0>( *itr );
305 size_t &pipedata = std::get<1>( *itr );
306 int ret = vmsplice( pipe_fd[0], iov, 1, 0 ); // vmsplice man NOTE:
307 // vmsplice() really supports true splicing only from user memory to a
308 // pipe. In the opposite direction, it actually just copies the data to
309 // userspace. But this makes the interface nice and symmetric and
310 // enables people to build on vmsplice() with room for future
311 // improvement in performance.
312 if( ret < 0 ) // an error
313 {
314 delete[] buffer;
315 buffer = 0;
316 return ret;
317 }
318
319 size -= ret;
320 ptr += ret;
321 result += ret;
322 pipedata -= ret;
323
324 if( pipedata > 0 ) continue;
325
326 ++itr;
327 ++pipes_cursor;
328 }
329
330 Free();
331
332 buffer = reinterpret_cast<char*>( void_ptr );
333 return result;
334#endif
335 }
336
337 //-----------------------------------------------------------------------
354 //-----------------------------------------------------------------------
355 inline ssize_t FromUser( char *&buffer, size_t length )
356 {
357#ifndef SPLICE_F_MOVE
358 return -ENOTSUP;
359#else
360 if( !IsPageAligned( buffer ) )
361 {
362 errno = EINVAL;
363 return -1;
364 }
365
366 if( capacity > 0 ) Free();
367
368 char *buff = buffer;
369 while( length > 0 )
370 {
371 ssize_t ret = Alloc( length );
372 if( ret < 0 ) return ret;
373 std::array<int, 2> &pipe_fd = std::get<0>( pipes.back() );
374 size_t &pipedata = std::get<1>( pipes.back() );
375
376 iovec iov[1];
377 iov->iov_len = size_t( ret ) < length ? ret : length;
378 iov->iov_base = buff;
379 ret = vmsplice( pipe_fd[1], iov, 1, SPLICE_F_GIFT );
380
381 if( ret < 0 ) return -1;
382 length -= ret;
383 size += ret;
384 buff += ret;
385 pipedata += ret;
386 }
387
388 pipes_cursor = pipes.begin();
389 free( buffer );
390 buffer = 0;
391 return size;
392#endif
393 }
394
395 static const size_t PAGE_SZ = 4 * 1024; //< page size
396 static const size_t MAX_PIPE_SIZE = 1024 * 1024; //< maximum pipe size
397
398 size_t capacity; //< the total capacity of all underlying pipes
399 size_t size; //< size of the data stored in this kernel buffer
400 std::vector<std::tuple<std::array<int,2>, size_t>> pipes; //< the unerlying pipes
401 std::vector<std::tuple<std::array<int,2>, size_t>>::iterator pipes_cursor;
402 };
403
404 //---------------------------------------------------------------------------
409 //---------------------------------------------------------------------------
410 inline ssize_t Read( int fd, KernelBuffer &buffer, uint32_t length, int64_t offset )
411 {
412 return buffer.ReadFromFD( fd, length, &offset );
413 }
414
415 //---------------------------------------------------------------------------
420 //---------------------------------------------------------------------------
421 inline ssize_t Read( int fd, KernelBuffer &buffer, uint32_t length )
422 {
423 return buffer.ReadFromFD( fd, length, NULL );
424 }
425
426 //---------------------------------------------------------------------------
431 //---------------------------------------------------------------------------
432 inline ssize_t Write( int fd, KernelBuffer &buffer, int64_t offset )
433 {
434 return buffer.WriteToFD( fd, &offset );
435 }
436
437 //---------------------------------------------------------------------------
441 //---------------------------------------------------------------------------
442 inline ssize_t Send( int fd, KernelBuffer &buffer )
443 {
444 return buffer.WriteToFD( fd, NULL );
445 }
446
447 //---------------------------------------------------------------------------
451 //---------------------------------------------------------------------------
452 inline ssize_t Move( KernelBuffer &kbuff, char *&ubuff )
453 {
454 return kbuff.ToUser( ubuff );
455 }
456
457 //---------------------------------------------------------------------------
461 //---------------------------------------------------------------------------
462 inline ssize_t Move( char *&ubuff, KernelBuffer &kbuff, size_t length )
463 {
464 return kbuff.FromUser( ubuff, length );
465 }
466
467}
468
469
470#endif /* SRC_XRDCL_XRDCLKERNELBUFFER_HH_ */
#define close(a)
Definition XrdPosix.hh:43
Definition XrdSysKernelBuffer.hh:46
KernelBuffer()
Default constructor.
Definition XrdSysKernelBuffer.hh:64
static const size_t MAX_PIPE_SIZE
Definition XrdSysKernelBuffer.hh:396
friend ssize_t Move(KernelBuffer &, char *&)
Definition XrdSysKernelBuffer.hh:452
bool Empty() const
Definition XrdSysKernelBuffer.hh:111
ssize_t ReadFromFD(int fd, uint32_t length, int64_t *offset)
Definition XrdSysKernelBuffer.hh:184
size_t capacity
Definition XrdSysKernelBuffer.hh:398
static bool IsPageAligned(const void *ptr)
Definition XrdSysKernelBuffer.hh:123
KernelBuffer & operator=(const KernelBuffer &)=delete
Copy assignment operator - deleted.
ssize_t FromUser(char *&buffer, size_t length)
Definition XrdSysKernelBuffer.hh:355
friend ssize_t Read(int, KernelBuffer &, uint32_t, int64_t)
Definition XrdSysKernelBuffer.hh:410
friend ssize_t Send(int, KernelBuffer &)
Definition XrdSysKernelBuffer.hh:442
KernelBuffer(KernelBuffer &&kbuff)
Definition XrdSysKernelBuffer.hh:76
KernelBuffer & operator=(KernelBuffer &&kbuff)
Move assignment operator.
Definition XrdSysKernelBuffer.hh:92
ssize_t Alloc(size_t size)
Allocates another pipe (kernel buffer) of size up to 1MB.
Definition XrdSysKernelBuffer.hh:151
std::vector< std::tuple< std::array< int, 2 >, size_t > >::iterator pipes_cursor
Definition XrdSysKernelBuffer.hh:401
KernelBuffer(const KernelBuffer &)=delete
Copy constructor - deleted.
void Free()
Closes the underlying pipes (kernel buffers)
Definition XrdSysKernelBuffer.hh:133
ssize_t WriteToFD(int fd, int64_t *offset)
Definition XrdSysKernelBuffer.hh:224
static const size_t PAGE_SZ
Definition XrdSysKernelBuffer.hh:395
ssize_t ToUser(char *&buffer)
Definition XrdSysKernelBuffer.hh:278
~KernelBuffer()
Destructor.
Definition XrdSysKernelBuffer.hh:103
friend ssize_t Write(int, KernelBuffer &, int64_t)
Definition XrdSysKernelBuffer.hh:432
size_t size
Definition XrdSysKernelBuffer.hh:399
std::vector< std::tuple< std::array< int, 2 >, size_t > > pipes
Definition XrdSysKernelBuffer.hh:400
Definition XrdClPollerBuiltIn.hh:28
ssize_t Write(int fd, KernelBuffer &buffer, int64_t offset)
Definition XrdSysKernelBuffer.hh:432
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
Definition XrdSysKernelBuffer.hh:410
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Definition XrdSysKernelBuffer.hh:452
ssize_t Send(int fd, KernelBuffer &buffer)
Definition XrdSysKernelBuffer.hh:442
Definition XrdOucJson.hh:4517
Definition XrdOucIOVec.hh:65