xrootd
Loading...
Searching...
No Matches
XrdEcWrtBuff.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef SRC_XRDEC_XRDECWRTBUFF_HH_
26#define SRC_XRDEC_XRDECWRTBUFF_HH_
27
29#include "XrdEc/XrdEcObjCfg.hh"
30#include "XrdEc/XrdEcConfig.hh"
32
33#include "XrdCl/XrdClBuffer.hh"
35
37
38#include <vector>
39#include <condition_variable>
40#include <mutex>
41#include <future>
42
43namespace XrdEc
44{
45 //---------------------------------------------------------------------------
47 //---------------------------------------------------------------------------
49 {
50 public:
51
52 //-----------------------------------------------------------------------
54 //-----------------------------------------------------------------------
56 {
57 static BufferPool instance;
58 return instance;
59 }
60
61 //-----------------------------------------------------------------------
63 //-----------------------------------------------------------------------
64 XrdCl::Buffer Create( const ObjCfg &objcfg )
65 {
66 std::unique_lock<std::mutex> lck( mtx );
67 //---------------------------------------------------------------------
68 // If pool is not empty, recycle existing buffer
69 //---------------------------------------------------------------------
70 if( !pool.empty() )
71 {
72 XrdCl::Buffer buffer( std::move( pool.front() ) );
73 pool.pop();
74 return buffer;
75 }
76 //---------------------------------------------------------------------
77 // Check if we can create a new buffer object without exceeding the
78 // the maximum size of the pool
79 //---------------------------------------------------------------------
81 {
82 XrdCl::Buffer buffer( objcfg.blksize );
84 return buffer;
85 }
86 //---------------------------------------------------------------------
87 // If not, we have to wait until there is a buffer we can recycle
88 //---------------------------------------------------------------------
89 while( pool.empty() ) cv.wait( lck );
90 XrdCl::Buffer buffer( std::move( pool.front() ) );
91 pool.pop();
92 return buffer;
93 }
94
95 //-----------------------------------------------------------------------
97 //-----------------------------------------------------------------------
98 void Recycle( XrdCl::Buffer && buffer )
99 {
100 if( !buffer.GetBuffer() ) return;
101 std::unique_lock<std::mutex> lck( mtx );
102 buffer.SetCursor( 0 );
103 pool.emplace( std::move( buffer ) );
104 cv.notify_all();
105 }
106
107 private:
108
109 //-----------------------------------------------------------------------
110 // Default constructor
111 //-----------------------------------------------------------------------
113 {
114 }
115
116 BufferPool( const BufferPool& ) = delete; //< Copy constructor
117 BufferPool( BufferPool&& ) = delete; //< Move constructor
118 BufferPool& operator=( const BufferPool& ) = delete; //< Copy assigment operator
119 BufferPool& operator=( BufferPool&& ) = delete; //< Move assigment operator
120
121 const size_t totalsize; //< maximum size of the pool
122 size_t currentsize; //< current size of the pool
123 std::condition_variable cv;
124 std::mutex mtx;
125 std::queue<XrdCl::Buffer> pool; //< the pool itself
126 };
127
128 //---------------------------------------------------------------------------
131 //---------------------------------------------------------------------------
133 {
134 public:
135 //-----------------------------------------------------------------------
139 //-----------------------------------------------------------------------
141 wrtbuff( BufferPool::Instance().Create( objcfg ) )
142 {
143 stripes.reserve( objcfg.nbchunks );
144 memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
145 }
146 //-----------------------------------------------------------------------
148 //-----------------------------------------------------------------------
150 wrtbuff( std::move( wrtbuff.wrtbuff ) ),
151 stripes( std::move( wrtbuff.stripes ) ),
152 cksums( std::move( wrtbuff.cksums ) )
153 {
154 }
155 //-----------------------------------------------------------------------
156 // Destructor
157 //-----------------------------------------------------------------------
159 {
160 BufferPool::Instance().Recycle( std::move( wrtbuff ) );
161 }
162 //-----------------------------------------------------------------------
168 //-----------------------------------------------------------------------
169 uint32_t Write( uint32_t size, const char *buffer )
170 {
171 uint64_t bytesAccepted = size; // bytes accepted by the buffer
172 if( wrtbuff.GetCursor() + bytesAccepted > objcfg.datasize )
173 bytesAccepted = objcfg.datasize - wrtbuff.GetCursor();
174 memcpy( wrtbuff.GetBufferAtCursor(), buffer, bytesAccepted );
175 wrtbuff.AdvanceCursor( bytesAccepted );
176 return bytesAccepted;
177 }
178 //-----------------------------------------------------------------------
182 //-----------------------------------------------------------------------
183 void Pad( uint32_t size )
184 {
185 // if the buffer exist we only need to move the cursor
186 if( wrtbuff.GetSize() != 0 )
187 {
188 wrtbuff.AdvanceCursor( size );
189 return;
190 }
191 // otherwise we allocate the buffer and set the cursor
193 memset( wrtbuff.GetBuffer(), 0, wrtbuff.GetSize() );
194 wrtbuff.SetCursor( size );
195 return;
196 }
197 //-----------------------------------------------------------------------
201 //-----------------------------------------------------------------------
202 inline char* GetStrpBuff( uint8_t strpnb )
203 {
204 return stripes[strpnb].buffer;
205 }
206 //-----------------------------------------------------------------------
210 //-----------------------------------------------------------------------
211 uint32_t GetStrpSize( uint8_t strp )
212 {
213 // Check if it is a data chunk?
214 if( strp < objcfg.nbdata )
215 {
216 // If the cursor is at least at the expected size
217 // it means we have the full chunk.
218 uint64_t expsize = ( strp + 1) * objcfg.chunksize;
219 if( expsize <= wrtbuff.GetCursor() )
220 return objcfg.chunksize;
221 // If the cursor is of by less than the chunk size
222 // it means we have a partial chunk
223 uint64_t delta = expsize - wrtbuff.GetCursor();
224 if( delta < objcfg.chunksize )
225 return objcfg.chunksize - delta;
226 // otherwise we are handling an empty chunk
227 return 0;
228 }
229 // It is a parity chunk so its size has to be equal
230 // to the size of the first chunk
231 return GetStrpSize( 0 );
232 }
233 //-----------------------------------------------------------------------
235 //-----------------------------------------------------------------------
236 inline uint32_t GetBlkSize()
237 {
238 return wrtbuff.GetCursor();
239 }
240 //-----------------------------------------------------------------------
242 //-----------------------------------------------------------------------
243 inline bool Complete()
244 {
245 return wrtbuff.GetCursor() == objcfg.datasize;
246 }
247 //-----------------------------------------------------------------------
249 //-----------------------------------------------------------------------
250 inline bool Empty()
251 {
252 return ( wrtbuff.GetSize() == 0 || wrtbuff.GetCursor() == 0 );
253 }
254 //-----------------------------------------------------------------------
256 //-----------------------------------------------------------------------
257 inline void Encode()
258 {
259 // first calculate the parity
260 uint8_t i ;
261 for( i = 0; i < objcfg.nbchunks; ++i )
262 stripes.emplace_back( wrtbuff.GetBuffer( i * objcfg.chunksize ), i < objcfg.nbdata );
263 Config &cfg = Config::Instance();
265 // then calculate the checksums
266 cksums.reserve( objcfg.nbchunks );
267 for( uint8_t strpnb = 0; strpnb < objcfg.nbchunks; ++strpnb )
268 {
269 size_t chunksize = GetStrpSize( strpnb );
270 std::future<uint32_t> ftr = ThreadPool::Instance().Execute( objcfg.digest, 0, stripes[strpnb].buffer, chunksize );
271 cksums.emplace_back( std::move( ftr ) );
272 }
273 }
274 //-----------------------------------------------------------------------
279 //-----------------------------------------------------------------------
280 inline uint32_t GetCrc32c( size_t strpnb )
281 {
282 return cksums[strpnb].get();
283 }
284
285 private:
286
287 ObjCfg objcfg; //< configuration for the data object
288 XrdCl::Buffer wrtbuff; //< the buffer for the data
289 stripes_t stripes; //< data stripes
290 std::vector<std::future<uint32_t>> cksums; //< crc32cs for the data stripes
291 };
292
293
294} /* namespace XrdEc */
295
296#endif /* SRC_XRDEC_XRDECWRTBUFF_HH_ */
Binary blob representation.
Definition XrdClBuffer.hh:34
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition XrdClBuffer.hh:156
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition XrdClBuffer.hh:189
void Allocate(uint32_t size)
Allocate the buffer.
Definition XrdClBuffer.hh:110
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition XrdClBuffer.hh:72
void SetCursor(uint32_t cursor)
Set the cursor.
Definition XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition XrdClBuffer.hh:140
uint32_t GetSize() const
Get the size of the message.
Definition XrdClBuffer.hh:132
Pool of buffer for caching writes.
Definition XrdEcWrtBuff.hh:49
std::mutex mtx
Definition XrdEcWrtBuff.hh:124
BufferPool(const BufferPool &)=delete
void Recycle(XrdCl::Buffer &&buffer)
Give back a buffer to the poool.
Definition XrdEcWrtBuff.hh:98
std::queue< XrdCl::Buffer > pool
Definition XrdEcWrtBuff.hh:125
const size_t totalsize
Definition XrdEcWrtBuff.hh:121
BufferPool()
Definition XrdEcWrtBuff.hh:112
static BufferPool & Instance()
Singleton access to the object.
Definition XrdEcWrtBuff.hh:55
BufferPool & operator=(BufferPool &&)=delete
BufferPool(BufferPool &&)=delete
std::condition_variable cv
Definition XrdEcWrtBuff.hh:123
BufferPool & operator=(const BufferPool &)=delete
XrdCl::Buffer Create(const ObjCfg &objcfg)
Create now buffer (or recycle existing one)
Definition XrdEcWrtBuff.hh:64
size_t currentsize
Definition XrdEcWrtBuff.hh:122
Global configuration for the EC module.
Definition XrdEcConfig.hh:40
static Config & Instance()
Singleton access.
Definition XrdEcConfig.hh:46
RedundancyProvider & GetRedundancy(const ObjCfg &objcfg)
Get redundancy provider for given data object configuration.
Definition XrdEcConfig.hh:55
void compute(stripes_t &stripes)
std::future< typename std::result_of< FUNC(ARGs...)>::type > Execute(FUNC func, ARGs... args)
Schedule a functional (together with its arguments) for execution.
Definition XrdEcThreadPool.hh:161
static ThreadPool & Instance()
Singleton access.
Definition XrdEcThreadPool.hh:150
Definition XrdEcWrtBuff.hh:133
ObjCfg objcfg
Definition XrdEcWrtBuff.hh:287
XrdCl::Buffer wrtbuff
Definition XrdEcWrtBuff.hh:288
uint32_t Write(uint32_t size, const char *buffer)
Definition XrdEcWrtBuff.hh:169
WrtBuff(const ObjCfg &objcfg)
Definition XrdEcWrtBuff.hh:140
uint32_t GetBlkSize()
Get size of the data in the buffer.
Definition XrdEcWrtBuff.hh:236
uint32_t GetStrpSize(uint8_t strp)
Definition XrdEcWrtBuff.hh:211
void Pad(uint32_t size)
Definition XrdEcWrtBuff.hh:183
uint32_t GetCrc32c(size_t strpnb)
Definition XrdEcWrtBuff.hh:280
void Encode()
Calculate the parity for the data stripes and the crc32cs.
Definition XrdEcWrtBuff.hh:257
bool Empty()
True if there are no data in the buffer, false otherwise.
Definition XrdEcWrtBuff.hh:250
bool Complete()
True if the buffer if full, false otherwise.
Definition XrdEcWrtBuff.hh:243
stripes_t stripes
Definition XrdEcWrtBuff.hh:289
char * GetStrpBuff(uint8_t strpnb)
Definition XrdEcWrtBuff.hh:202
WrtBuff(WrtBuff &&wrtbuff)
Move constructor.
Definition XrdEcWrtBuff.hh:149
std::vector< std::future< uint32_t > > cksums
Definition XrdEcWrtBuff.hh:290
~WrtBuff()
Definition XrdEcWrtBuff.hh:158
Definition XrdClZipArchive.hh:45
std::vector< stripe_t > stripes_t
All stripes in a block.
Definition XrdEcUtilities.hh:64
Definition XrdOucJson.hh:4517
Definition XrdEcObjCfg.hh:34
const uint64_t blksize
Definition XrdEcObjCfg.hh:91
const uint8_t nbdata
Definition XrdEcObjCfg.hh:87
const uint8_t nbchunks
Definition XrdEcObjCfg.hh:85
uint32_t(* digest)(uint32_t, void const *, size_t)
Definition XrdEcObjCfg.hh:96
const uint64_t chunksize
Definition XrdEcObjCfg.hh:89
const uint64_t datasize
Definition XrdEcObjCfg.hh:88