xrootd
Loading...
Searching...
No Matches
XrdClAsyncVectorReader.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_
20#define SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_
21
23#include "XrdCl/XrdClSocket.hh"
25
26namespace XrdCl
27{
28
29 //----------------------------------------------------------------------------
31 //----------------------------------------------------------------------------
33 {
34 public:
35 //------------------------------------------------------------------------
39 //------------------------------------------------------------------------
42 rdlstoff( 0 ),
43 rdlstlen( 0 )
44 {
45 memset( &rdlst, 0, sizeof( readahead_list ) );
46 }
47
48 //------------------------------------------------------------------------
54 //------------------------------------------------------------------------
55 XRootDStatus Read( Socket &socket, uint32_t &btsret )
56 {
57 Log *log = DefaultEnv::GetLog();
58
59 while( true )
60 {
61 switch( readstage )
62 {
63 //------------------------------------------------------------------
64 // Prepare to readout a new response
65 //------------------------------------------------------------------
66 case ReadStart:
67 {
68 msgbtsrd = 0;
69 rdlstoff = 0;
70 rdlstlen = sizeof( readahead_list );
72 continue;
73 }
74
75 //------------------------------------------------------------------
76 // Readout the read_list
77 //------------------------------------------------------------------
78 case ReadRdLst:
79 {
80 //----------------------------------------------------------------
81 // We cannot afford to read the next header from the stream
82 // because we will cross the message boundary
83 //----------------------------------------------------------------
84 if( msgbtsrd + rdlstlen > dlen )
85 {
86 uint32_t btsleft = dlen - msgbtsrd;
87 log->Error( XRootDMsg, "[%s] VectorReader: No enough data to read "
88 "another chunk header. Discarding %d bytes.",
89 url.GetHostId().c_str(), btsleft );
91 continue;
92 }
93
94 //----------------------------------------------------------------
95 // Let's readout the read list record from the socket
96 //----------------------------------------------------------------
97 uint32_t btsrd = 0;
98 char *buff = reinterpret_cast<char*>( &rdlst );
99 Status st = ReadBytesAsync( socket, buff + rdlstoff, rdlstlen, btsrd );
100 rdlstoff += btsrd;
101 rdlstlen -= btsrd;
102 msgbtsrd += btsrd;
103 btsret += btsrd;
104
105 if( !st.IsOK() || st.code == suRetry )
106 return st;
107
108 //----------------------------------------------------------------
109 // We have a complete read list record, now we need to marshal it
110 //----------------------------------------------------------------
111 rdlst.rlen = ntohl( rdlst.rlen );
112 rdlst.offset = ntohll( rdlst.offset );
113 choff = 0;
114 chlen = rdlst.rlen;
115
116 //----------------------------------------------------------------
117 // Find the buffer corresponding to the chunk
118 //----------------------------------------------------------------
119 bool chfound = false;
120 for( size_t i = 0; i < chunks->size(); ++i )
121 {
122 if( ( *chunks )[i].offset == uint64_t( rdlst.offset ) &&
123 ( *chunks )[i].length == uint32_t( rdlst.rlen ) )
124 {
125 chfound = true;
126 chidx = i;
127 break;
128 }
129 }
130
131 //----------------------------------------------------------------
132 // If the chunk was not found this is a bogus response, switch
133 // to discard mode
134 //----------------------------------------------------------------
135 if( !chfound )
136 {
137 log->Error( XRootDMsg, "[%s] VectorReader: Impossible to find chunk "
138 "buffer corresponding to %d bytes at %ld",
139 url.GetHostId().c_str(), rdlst.rlen, rdlst.offset );
141 continue;
142 }
143
145 continue;
146 }
147
148 //------------------------------------------------------------------
149 // Readout the raw data
150 //------------------------------------------------------------------
151 case ReadRaw:
152 {
153 //----------------------------------------------------------------
154 // The chunk was found, but reading all the data will cross the
155 // message boundary
156 //----------------------------------------------------------------
157 if( msgbtsrd + chlen > dlen )
158 {
159 uint32_t btsleft = dlen - msgbtsrd;
160 log->Error( XRootDMsg, "[%s] VectorReader: Malformed chunk header: "
161 "reading %d bytes from message would cross the message "
162 "boundary, discarding %d bytes.", url.GetHostId().c_str(),
163 rdlst.rlen, btsleft );
164 chstatus[chidx].sizeerr = true;
166 continue;
167 }
168
169 //----------------------------------------------------------------
170 // Readout the raw data from the socket
171 //----------------------------------------------------------------
172 uint32_t btsrd = 0;
173 char *buff = static_cast<char*>( ( *chunks )[chidx].buffer );
174 Status st = ReadBytesAsync( socket, buff + choff, chlen, btsrd );
175 choff += btsrd;
176 chlen -= btsrd;
177 msgbtsrd += btsrd;
178 rawbtsrd += btsrd;
179 btsret += btsrd;
180
181 if( !st.IsOK() || st.code == suRetry )
182 return st;
183
184 log->Dump( XRootDMsg, "[%s] VectorReader: read buffer for chunk %d@%ld",
185 url.GetHostId().c_str(), rdlst.rlen, rdlst.offset );
186
187 //----------------------------------------------------------------
188 // Mark chunk as done
189 //----------------------------------------------------------------
190 chstatus[chidx].done = true;
191
192 //----------------------------------------------------------------
193 // There is still data to be read, we need to readout the next
194 // read list record.
195 //----------------------------------------------------------------
196 if( msgbtsrd < dlen )
197 {
198 rdlstoff = 0;
199 rdlstlen = sizeof( readahead_list );
201 continue;
202 }
203
205 continue;
206 }
207
208 //------------------------------------------------------------------
209 // We've had an error and we are in the discarding mode
210 //------------------------------------------------------------------
211 case ReadDiscard:
212 {
213 // Just drop the connection, we don't know if the stream is sane
214 // anymore. Recover with a reconnect.
216 }
217
218 //------------------------------------------------------------------
219 // Finalize the read
220 //------------------------------------------------------------------
221 case ReadDone:
222 {
223 chidx = 0;
224 choff = 0;
225 chlen = 0;
226 rdlstoff = 0;
227 rdlstlen = 0;
228 break;
229 }
230
231 //------------------------------------------------------------------
232 // Others should not happen
233 //------------------------------------------------------------------
234 default : return XRootDStatus( stError, errInternal );
235 }
236
237 // just in case
238 break;
239 }
240 //----------------------------------------------------------------------
241 // We are done
242 //----------------------------------------------------------------------
243 return XRootDStatus();
244 }
245
246 //------------------------------------------------------------------------
248 //------------------------------------------------------------------------
250 {
251 //--------------------------------------------------------------------------
252 // See if all the chunks are OK and put them in the response
253 //--------------------------------------------------------------------------
254 std::unique_ptr<VectorReadInfo> ptr( new VectorReadInfo() );
255 for( uint32_t i = 0; i < chunks->size(); ++i )
256 {
257 if( !chstatus[i].done )
259 ptr->GetChunks().emplace_back( ( *chunks )[i].offset,
260 ( *chunks )[i].length, ( *chunks )[i].buffer );
261 }
262 ptr->SetSize( rawbtsrd );
263 response = new AnyObject();
264 response->Set( ptr.release() );
265 return XRootDStatus();
266 }
267
268 private:
269
270 size_t rdlstoff; //< offset within the current read_list
271 readahead_list rdlst; //< the readahead list for the current chunk
272 size_t rdlstlen; //< bytes left to be readout into read list
273 };
274
275} /* namespace XrdCl */
276
277#endif /* SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_ */
Definition XrdClAnyObject.hh:33
void Set(Type object, bool own=true)
Definition XrdClAnyObject.hh:59
Base class for any message's body reader.
Definition XrdClAsyncRawReaderIntfc.hh:35
size_t chidx
Definition XrdClAsyncRawReaderIntfc.hh:163
size_t chlen
Definition XrdClAsyncRawReaderIntfc.hh:165
Stage readstage
Definition XrdClAsyncRawReaderIntfc.hh:149
std::vector< ChunkStatus > chstatus
Definition XrdClAsyncRawReaderIntfc.hh:158
XRootDStatus ReadBytesAsync(Socket &socket, char *buffer, uint32_t toBeRead, uint32_t &bytesRead)
Definition XrdClAsyncRawReaderIntfc.hh:98
const Message & request
Definition XrdClAsyncRawReaderIntfc.hh:155
size_t choff
Definition XrdClAsyncRawReaderIntfc.hh:164
const URL & url
Definition XrdClAsyncRawReaderIntfc.hh:154
uint32_t dlen
Definition XrdClAsyncRawReaderIntfc.hh:159
uint32_t rawbtsrd
Definition XrdClAsyncRawReaderIntfc.hh:161
uint32_t msgbtsrd
Definition XrdClAsyncRawReaderIntfc.hh:160
ChunkList * chunks
Definition XrdClAsyncRawReaderIntfc.hh:157
@ ReadDiscard
Definition XrdClAsyncRawReaderIntfc.hh:142
@ ReadStart
Definition XrdClAsyncRawReaderIntfc.hh:139
@ ReadDone
Definition XrdClAsyncRawReaderIntfc.hh:143
@ ReadRdLst
Definition XrdClAsyncRawReaderIntfc.hh:140
@ ReadRaw
Definition XrdClAsyncRawReaderIntfc.hh:141
Object for reading out data from the VectorRead response.
Definition XrdClAsyncVectorReader.hh:33
AsyncVectorReader(const URL &url, const Message &request)
Definition XrdClAsyncVectorReader.hh:40
readahead_list rdlst
Definition XrdClAsyncVectorReader.hh:271
XRootDStatus Read(Socket &socket, uint32_t &btsret)
Definition XrdClAsyncVectorReader.hh:55
size_t rdlstlen
Definition XrdClAsyncVectorReader.hh:272
XRootDStatus GetResponse(AnyObject *&response)
Get the response.
Definition XrdClAsyncVectorReader.hh:249
size_t rdlstoff
Definition XrdClAsyncVectorReader.hh:270
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
The message representation used throughout the system.
Definition XrdClMessage.hh:30
A network socket.
Definition XrdClSocket.hh:43
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:94
Vector read info.
Definition XrdClXRootDResponses.hh:1061
Request status.
Definition XrdClXRootDResponses.hh:219
Definition XrdClAction.hh:34
const uint16_t suRetry
Definition XrdClStatus.hh:40
const uint16_t stFatal
Fatal error, it's still an error.
Definition XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition XrdClStatus.hh:32
const uint64_t XRootDMsg
Definition XrdClConstants.hh:39
const uint16_t errInternal
Internal error.
Definition XrdClStatus.hh:56
const uint16_t errInvalidResponse
Definition XrdClStatus.hh:99
const uint16_t errCorruptedHeader
Definition XrdClStatus.hh:103
Procedure execution status.
Definition XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition XrdClStatus.hh:124
Definition XProtocol.hh:658
kXR_int32 rlen
Definition XProtocol.hh:660
kXR_int64 offset
Definition XProtocol.hh:661