xrootd
Loading...
Searching...
No Matches
XrdEcStrmWriter.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef SRC_XRDEC_XRDECSTRMWRITER_HH_
26#define SRC_XRDEC_XRDECSTRMWRITER_HH_
27
28#include "XrdEc/XrdEcWrtBuff.hh"
30
34
35#include <random>
36#include <chrono>
37#include <future>
38#include <atomic>
39#include <memory>
40#include <vector>
41#include <thread>
42#include <iterator>
43
44#include <sys/stat.h>
45
46namespace XrdEc
47{
48 //---------------------------------------------------------------------------
51 //---------------------------------------------------------------------------
53 {
54 //-------------------------------------------------------------------------
55 // Type for queue of buffers to be written
56 //-------------------------------------------------------------------------
58
59 public:
60
61 //-----------------------------------------------------------------------
63 //-----------------------------------------------------------------------
65 writer_thread_stop( false ),
67 next_blknb( 0 ),
68 global_status( this )
69 {
70 }
71
72 //-----------------------------------------------------------------------
74 //-----------------------------------------------------------------------
75 virtual ~StrmWriter()
76 {
77 writer_thread_stop = true;
79 writer_thread.join();
80 }
81
82 //-----------------------------------------------------------------------
86 //-----------------------------------------------------------------------
87 void Open( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
88
89 //-----------------------------------------------------------------------
95 //-----------------------------------------------------------------------
96 void Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler );
97
98 //-----------------------------------------------------------------------
102 //-----------------------------------------------------------------------
103 void Close( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
104
105 //-----------------------------------------------------------------------
107 //-----------------------------------------------------------------------
108 uint64_t GetSize()
109 {
111 }
112
113 private:
114
115 //-----------------------------------------------------------------------
116 // Global status of the StrmWriter
117 //-----------------------------------------------------------------------
119 {
120 //---------------------------------------------------------------------
121 // Constructor
122 //---------------------------------------------------------------------
124 btsleft( 0 ),
125 btswritten( 0 ),
126 stopped_writing( false ),
127 closeHandler( 0 )
128 {
129 }
130
131 //---------------------------------------------------------------------
132 // Report status of write operation
133 //---------------------------------------------------------------------
134 void report_wrt( const XrdCl::XRootDStatus &st, uint64_t wrtsize )
135 {
136 std::unique_lock<std::recursive_mutex> lck( mtx );
137 //-------------------------------------------------------------------
138 // Update the global status
139 //-------------------------------------------------------------------
140 btsleft -= wrtsize;
141 if( !st.IsOK() ) status = st;
142 else btswritten += wrtsize;
143
144 //-------------------------------------------------------------------
145 // check if we are done, and if yes call the close implementation
146 //-------------------------------------------------------------------
147 if( btsleft == 0 && stopped_writing )
148 {
149 lck.unlock();
151 }
152 }
153
154 //---------------------------------------------------------------------
155 // Report status of open operation
156 //---------------------------------------------------------------------
157 inline void report_open( const XrdCl::XRootDStatus &st )
158 {
159 report_wrt( st, 0 );
160 }
161
162 //---------------------------------------------------------------------
163 // Indicate that the user issued close
164 //---------------------------------------------------------------------
165 void issue_close( XrdCl::ResponseHandler *handler, uint16_t timeout )
166 {
167 std::unique_lock<std::recursive_mutex> lck( mtx );
168 //-------------------------------------------------------------------
169 // There will be no more new write requests
170 //-------------------------------------------------------------------
171 stopped_writing = true;
172 //-------------------------------------------------------------------
173 // If there are no outstanding writes, we can simply call the close
174 // routine
175 //-------------------------------------------------------------------
176 if( btsleft == 0 ) return writer->CloseImpl( handler, timeout );
177 //-------------------------------------------------------------------
178 // Otherwise we save the handler for later
179 //-------------------------------------------------------------------
180 closeHandler = handler;
181 }
182
183 //---------------------------------------------------------------------
184 // get the global status value
185 //---------------------------------------------------------------------
186 inline const XrdCl::XRootDStatus& get() const
187 {
188 std::unique_lock<std::recursive_mutex> lck( mtx );
189 return status;
190 }
191
192 inline void issue_write( uint64_t wrtsize )
193 {
194 std::unique_lock<std::recursive_mutex> lck( mtx );
195 btsleft += wrtsize;
196 }
197
198 inline uint64_t get_btswritten()
199 {
200 return btswritten;
201 }
202
203 private:
204 mutable std::recursive_mutex mtx;
205 StrmWriter *writer; //> pointer to the StrmWriter
206 uint64_t btsleft; //> bytes left to be written
207 uint64_t btswritten; //> total number of bytes written
208 bool stopped_writing; //> true, if user called close
209 XrdCl::XRootDStatus status; //> the global status
210 XrdCl::ResponseHandler *closeHandler; //> user close handler
211 };
212
213 //-----------------------------------------------------------------------
217 //-----------------------------------------------------------------------
218 inline void EnqueueBuff( std::unique_ptr<WrtBuff> wrtbuff )
219 {
220 // the routine to be called in the thread-pool
221 // - does erasure coding
222 // - calculates crc32cs
223 static auto prepare_buff = []( WrtBuff *wrtbuff )
224 {
225 std::unique_ptr<WrtBuff> ptr( wrtbuff );
226 ptr->Encode();
227 return ptr.release();
228 };
229 buffers.enqueue( ThreadPool::Instance().Execute( prepare_buff, wrtbuff.release() ) );
230 }
231
232 //-----------------------------------------------------------------------
236 //-----------------------------------------------------------------------
237 inline std::unique_ptr<WrtBuff> DequeueBuff()
238 {
239 std::future<WrtBuff*> ftr = buffers.dequeue();
240 std::unique_ptr<WrtBuff> result( ftr.get() );
241 return result;
242 }
243
244 //-----------------------------------------------------------------------
248 //-----------------------------------------------------------------------
249 static void writer_routine( StrmWriter *me )
250 {
251 try
252 {
253 while( !me->writer_thread_stop )
254 {
255 std::unique_ptr<WrtBuff> wrtbuff( me->DequeueBuff() );
256 if( !wrtbuff ) continue;
257 me->WriteBuff( std::move( wrtbuff ) );
258 }
259 }
260 catch( const buff_queue::wait_interrupted& ){ }
261 }
262
263 //-----------------------------------------------------------------------
267 //-----------------------------------------------------------------------
268 void WriteBuff( std::unique_ptr<WrtBuff> buff );
269
270 //-----------------------------------------------------------------------
274 //-----------------------------------------------------------------------
275 std::vector<char> GetMetadataBuffer();
276
277 //-----------------------------------------------------------------------
281 //-----------------------------------------------------------------------
282 void CloseImpl( XrdCl::ResponseHandler *handler, uint16_t timeout = 0 );
283
285 std::unique_ptr<WrtBuff> wrtbuff; //< current write buffer
286 std::vector<std::shared_ptr<XrdCl::ZipArchive>> dataarchs; //< ZIP archives with data
287 std::vector<std::shared_ptr<XrdCl::File>> metadataarchs; //< ZIP archives with metadata
288 std::vector<std::vector<char>> cdbuffs; //< buffers with CDs
289 buff_queue buffers; //< queue of buffer for writing
290 //< (waiting to be erasure coded)
291 std::atomic<bool> writer_thread_stop; //< true if the writer thread should be stopped,
292 //< flase otherwise
293 std::thread writer_thread; //< handle to the writer thread
294 size_t next_blknb; //< number of the next block to be created
295 global_status_t global_status; //< global status of the writer
296 };
297
298}
299
300#endif /* SRC_XRDEC_XRDECSTRMWRITER_HH_ */
Handle an async response.
Definition XrdClXRootDResponses.hh:1126
Request status.
Definition XrdClXRootDResponses.hh:219
Definition XrdEcStrmWriter.hh:53
std::vector< std::vector< char > > cdbuffs
Definition XrdEcStrmWriter.hh:288
static void writer_routine(StrmWriter *me)
Definition XrdEcStrmWriter.hh:249
StrmWriter(const ObjCfg &objcfg)
Constructor.
Definition XrdEcStrmWriter.hh:64
void Open(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
buff_queue buffers
Definition XrdEcStrmWriter.hh:289
void WriteBuff(std::unique_ptr< WrtBuff > buff)
std::atomic< bool > writer_thread_stop
Definition XrdEcStrmWriter.hh:291
virtual ~StrmWriter()
Destructor.
Definition XrdEcStrmWriter.hh:75
void EnqueueBuff(std::unique_ptr< WrtBuff > wrtbuff)
Definition XrdEcStrmWriter.hh:218
sync_queue< std::future< WrtBuff * > > buff_queue
Definition XrdEcStrmWriter.hh:57
void Write(uint32_t size, const void *buff, XrdCl::ResponseHandler *handler)
uint64_t GetSize()
Definition XrdEcStrmWriter.hh:108
size_t next_blknb
Definition XrdEcStrmWriter.hh:294
std::unique_ptr< WrtBuff > DequeueBuff()
Definition XrdEcStrmWriter.hh:237
const ObjCfg & objcfg
Definition XrdEcStrmWriter.hh:284
std::vector< std::shared_ptr< XrdCl::ZipArchive > > dataarchs
Definition XrdEcStrmWriter.hh:286
std::vector< std::shared_ptr< XrdCl::File > > metadataarchs
Definition XrdEcStrmWriter.hh:287
void Close(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
global_status_t global_status
Definition XrdEcStrmWriter.hh:295
std::vector< char > GetMetadataBuffer()
void CloseImpl(XrdCl::ResponseHandler *handler, uint16_t timeout=0)
std::thread writer_thread
Definition XrdEcStrmWriter.hh:293
std::unique_ptr< WrtBuff > wrtbuff
Definition XrdEcStrmWriter.hh:285
static ThreadPool & Instance()
Singleton access.
Definition XrdEcThreadPool.hh:150
Definition XrdEcWrtBuff.hh:133
Definition XrdClZipArchive.hh:45
bool IsOK() const
We're fine.
Definition XrdClStatus.hh:124
Definition XrdEcObjCfg.hh:34
Definition XrdEcStrmWriter.hh:119
uint64_t btsleft
Definition XrdEcStrmWriter.hh:206
void issue_write(uint64_t wrtsize)
Definition XrdEcStrmWriter.hh:192
XrdCl::XRootDStatus status
Definition XrdEcStrmWriter.hh:209
std::recursive_mutex mtx
Definition XrdEcStrmWriter.hh:204
global_status_t(StrmWriter *writer)
Definition XrdEcStrmWriter.hh:123
void report_wrt(const XrdCl::XRootDStatus &st, uint64_t wrtsize)
Definition XrdEcStrmWriter.hh:134
void issue_close(XrdCl::ResponseHandler *handler, uint16_t timeout)
Definition XrdEcStrmWriter.hh:165
uint64_t btswritten
Definition XrdEcStrmWriter.hh:207
void report_open(const XrdCl::XRootDStatus &st)
Definition XrdEcStrmWriter.hh:157
uint64_t get_btswritten()
Definition XrdEcStrmWriter.hh:198
XrdCl::ResponseHandler * closeHandler
Definition XrdEcStrmWriter.hh:210
StrmWriter * writer
Definition XrdEcStrmWriter.hh:205
const XrdCl::XRootDStatus & get() const
Definition XrdEcStrmWriter.hh:186
bool stopped_writing
Definition XrdEcStrmWriter.hh:208
Definition XrdEcUtilities.hh:169
void enqueue(Element &&element)
Definition XrdEcUtilities.hh:185
Element dequeue()
Definition XrdEcUtilities.hh:196
void interrupt()
Definition XrdEcUtilities.hh:234