xrootd
Loading...
Searching...
No Matches
XrdClAsyncMsgReader.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
20#define SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
21
22#include "XrdCl/XrdClMessage.hh"
25#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClStream.hh"
28
29#include <memory>
30
31namespace XrdCl
32{
33 //----------------------------------------------------------------------------
35 //----------------------------------------------------------------------------
37 {
38 public:
39 //------------------------------------------------------------------------
47 //------------------------------------------------------------------------
50 const std::string &strmname,
51 Stream &strm,
52 uint16_t substrmnb) : readstage( ReadStart ),
54 socket( socket ),
56 strm( strm ),
58 inmsgsize( 0 ),
59 inhandler( nullptr )
60 {
61 }
62
63 //------------------------------------------------------------------------
65 //------------------------------------------------------------------------
66 virtual ~AsyncMsgReader(){ }
67
68 //------------------------------------------------------------------------
70 //------------------------------------------------------------------------
71 inline void Reset()
72 {
74 inmsg.reset();
75 inmsgsize = 0;
76 inhandler = nullptr;
77 }
78
79 //------------------------------------------------------------------------
81 //------------------------------------------------------------------------
83 {
84 Log *log = DefaultEnv::GetLog();
85
86 while( true )
87 {
88 switch( readstage )
89 {
90 //------------------------------------------------------------------
91 // There is no incoming message currently being processed so we
92 // create a new one
93 //------------------------------------------------------------------
94 case ReadStart:
95 {
96 inmsg = std::make_shared<Message>();
97 //----------------------------------------------------------------
98 // The next step is to read the header
99 //----------------------------------------------------------------
101 continue;
102 }
103 //------------------------------------------------------------------
104 // We need to read the header
105 //------------------------------------------------------------------
106 case ReadHeader:
107 {
109 if( !st.IsOK() || st.code == suRetry )
110 return st;
111
112 log->Dump( AsyncSockMsg, "[%s] Received message header for 0x%x size: %d",
113 strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114
115 ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116 if( rsp->hdr.status == kXR_attn )
117 {
118 log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119 "of message 0x%x", strmname.c_str(), inmsg.get() );
120 inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
122 continue;
123 }
124
125 inmsgsize = inmsg->GetCursor();
127
128 if( inhandler )
129 {
130 log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131 "of message 0x%x", strmname.c_str(), inmsg.get() );
132 //--------------------------------------------------------------
133 // The next step is to read raw data
134 //--------------------------------------------------------------
136 continue;
137 }
138
139 //----------------------------------------------------------------
140 // The next step is to read the message body
141 //----------------------------------------------------------------
143 continue;
144 }
145 //------------------------------------------------------------------
146 // Before proceeding we need to figure out the attn action code
147 //------------------------------------------------------------------
148 case ReadAttn:
149 {
151 if( !st.IsOK() || st.code == suRetry )
152 return st;
153
154 //----------------------------------------------------------------
155 // There is an embedded response, overwrite the message with that
156 //----------------------------------------------------------------
157 if( HasEmbeddedRsp() )
158 {
159 inmsg->Free();
161 continue;
162 }
163
164 //----------------------------------------------------------------
165 // Readout the rest of the body
166 //----------------------------------------------------------------
167 inmsgsize = inmsg->GetCursor();
169 continue;
170 }
171 //------------------------------------------------------------------
172 // kXR_status is special as it can have both body and raw data,
173 // handle it separately
174 //------------------------------------------------------------------
175 case ReadMore:
176 {
178 if( !st.IsOK() || st.code == suRetry )
179 return st;
180 inmsgsize = inmsg->GetCursor();
181
182 //----------------------------------------------------------------
183 // The next step is to finalize the read
184 //----------------------------------------------------------------
186 continue;
187 }
188 //------------------------------------------------------------------
189 // We need to call a raw message handler to get the data from the
190 // socket
191 //------------------------------------------------------------------
192 case ReadRawData:
193 {
194 uint32_t bytesRead = 0;
195 XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
196 if( !st.IsOK() )
197 return st;
198 inmsgsize += bytesRead;
199 if( st.code == suRetry )
200 return st;
201 //----------------------------------------------------------------
202 // The next step is to finalize the read
203 //----------------------------------------------------------------
205 continue;
206 }
207 //------------------------------------------------------------------
208 // No raw handler, so we read the message to the buffer
209 //------------------------------------------------------------------
210 case ReadMsgBody:
211 {
213 if( !st.IsOK() || st.code == suRetry )
214 return st;
215 inmsgsize = inmsg->GetCursor();
216
217
218 //----------------------------------------------------------------
219 // kXR_status response needs special handling as it can have
220 // either (body + raw data) or (body + additional body data)
221 //----------------------------------------------------------------
222 if( IsStatusRsp() )
223 {
224 uint16_t action = strm.InspectStatusRsp( substrmnb,
225 inhandler );
226
227 if( action & MsgHandler::Corrupted )
229
230 if( action & MsgHandler::Raw )
231 {
232 //--------------------------------------------------------------
233 // The next step is to read the raw data
234 //--------------------------------------------------------------
236 continue;
237 }
238
239 if( action & MsgHandler::More )
240 {
241
242 //--------------------------------------------------------------
243 // The next step is to read the additional data in the message
244 // body
245 //--------------------------------------------------------------
247 continue;
248 }
249 }
250
251 //----------------------------------------------------------------
252 // The next step is to finalize the read
253 //----------------------------------------------------------------
255 continue;
256 }
257
258 case ReadDone:
259 {
260 //----------------------------------------------------------------
261 // Report the incoming message
262 //----------------------------------------------------------------
263 log->Dump( AsyncSockMsg, "[%s] Received message 0x%x of %d bytes",
264 strmname.c_str(), inmsg.get(), inmsgsize );
265
266 strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
267 }
268 }
269 // just in case
270 break;
271 }
272
273 //----------------------------------------------------------------------
274 // We are done
275 //----------------------------------------------------------------------
276 return XRootDStatus();
277 }
278
279 private:
280
282 {
283 //----------------------------------------------------------------------
284 // Readout the action code from the socket. We are reading out 8 bytes
285 // into the message, the 8 byte header is already there.
286 //----------------------------------------------------------------------
287 size_t btsleft = 8 - ( inmsg->GetCursor() - 8 );
288 while( btsleft > 0 )
289 {
290 int btsrd = 0;
291 XRootDStatus st = socket.Read( inmsg->GetBufferAtCursor(), btsleft, btsrd );
292 if( !st.IsOK() || st.code == suRetry )
293 return st;
294 btsleft -= btsrd;
295 inmsg->AdvanceCursor( btsrd );
296 }
297
298 //----------------------------------------------------------------------
299 // Marshal the action code
300 //----------------------------------------------------------------------
301 ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
302 attn->actnum = ntohl( attn->actnum );
303
304 return XRootDStatus();
305 }
306
307 inline bool IsStatusRsp()
308 {
309 ServerResponseHeader *hdr = (ServerResponseHeader*)inmsg->GetBuffer();
310 return ( hdr->status == kXR_status );
311 }
312
313 inline bool HasEmbeddedRsp()
314 {
315 ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
316 return ( attn->actnum == kXR_asynresp );
317 }
318
319 //------------------------------------------------------------------------
321 //------------------------------------------------------------------------
322 enum Stage
323 {
324 ReadStart, //< the next step is to initialize the read
325 ReadHeader, //< the next step is to read the header
326 ReadAttn, //< the next step is to read attn action code
327 ReadMore, //< the next step is to read more status body
328 ReadMsgBody, //< the next step is to read the body
329 ReadRawData, //< the next step is to read the raw data
330 ReadDone //< the next step is to finalize the read
331 };
332
333 //------------------------------------------------------------------------
334 // Current read stage
335 //------------------------------------------------------------------------
337
338 //------------------------------------------------------------------------
339 // The context of the read operation
340 //------------------------------------------------------------------------
343 const std::string &strmname;
345 uint16_t substrmnb;
346
347
348 //------------------------------------------------------------------------
349 // The internal state of the the reader
350 //------------------------------------------------------------------------
351 std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
352 uint32_t inmsgsize;
354
355 };
356
357} /* namespace XrdCl */
358
359#endif /* SRC_XRDCL_XRDCLASYNCMSGREADER_HH_ */
@ kXR_asynresp
Definition XProtocol.hh:936
@ kXR_status
Definition XProtocol.hh:905
@ kXR_attn
Definition XProtocol.hh:899
Utility class encapsulating reading response message logic.
Definition XrdClAsyncMsgReader.hh:37
void Reset()
Reset the state of the object (makes it ready to read out next msg)
Definition XrdClAsyncMsgReader.hh:71
Socket & socket
Definition XrdClAsyncMsgReader.hh:342
TransportHandler & xrdTransport
Definition XrdClAsyncMsgReader.hh:341
const std::string & strmname
Definition XrdClAsyncMsgReader.hh:343
std::shared_ptr< Message > inmsg
Definition XrdClAsyncMsgReader.hh:351
uint32_t inmsgsize
Definition XrdClAsyncMsgReader.hh:352
Stage readstage
Definition XrdClAsyncMsgReader.hh:336
Stage
Stages of reading out a response from the socket.
Definition XrdClAsyncMsgReader.hh:323
@ ReadMore
Definition XrdClAsyncMsgReader.hh:327
@ ReadAttn
Definition XrdClAsyncMsgReader.hh:326
@ ReadRawData
Definition XrdClAsyncMsgReader.hh:329
@ ReadMsgBody
Definition XrdClAsyncMsgReader.hh:328
@ ReadHeader
Definition XrdClAsyncMsgReader.hh:325
@ ReadStart
Definition XrdClAsyncMsgReader.hh:324
@ ReadDone
Definition XrdClAsyncMsgReader.hh:330
bool HasEmbeddedRsp()
Definition XrdClAsyncMsgReader.hh:313
uint16_t substrmnb
Definition XrdClAsyncMsgReader.hh:345
XRootDStatus Read()
Read out the response from the socket.
Definition XrdClAsyncMsgReader.hh:82
bool IsStatusRsp()
Definition XrdClAsyncMsgReader.hh:307
virtual ~AsyncMsgReader()
Destructor.
Definition XrdClAsyncMsgReader.hh:66
MsgHandler * inhandler
Definition XrdClAsyncMsgReader.hh:353
XRootDStatus ReadAttnActnum()
Definition XrdClAsyncMsgReader.hh:281
AsyncMsgReader(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
Definition XrdClAsyncMsgReader.hh:48
Stream & strm
Definition XrdClAsyncMsgReader.hh:344
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition XrdClLog.hh:101
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Message handler.
Definition XrdClPostMasterInterfaces.hh:51
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
Definition XrdClPostMasterInterfaces.hh:138
@ Raw
Definition XrdClPostMasterInterfaces.hh:63
@ More
there are more (non-raw) data to be read
Definition XrdClPostMasterInterfaces.hh:72
@ Corrupted
Definition XrdClPostMasterInterfaces.hh:69
A network socket.
Definition XrdClSocket.hh:43
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
Stream.
Definition XrdClStream.hh:52
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
Perform the handshake and the authentication for each physical stream.
Definition XrdClPostMasterInterfaces.hh:310
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
virtual XRootDStatus GetMore(Message &message, Socket *socket)=0
Request status.
Definition XrdClXRootDResponses.hh:219
Definition XrdClAction.hh:34
const uint16_t suRetry
Definition XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition XrdClStatus.hh:32
const uint64_t AsyncSockMsg
Definition XrdClConstants.hh:41
const uint16_t errCorruptedHeader
Definition XrdClStatus.hh:103
Definition XProtocol.hh:939
kXR_int32 actnum
Definition XProtocol.hh:940
Definition XProtocol.hh:911
kXR_unt16 status
Definition XProtocol.hh:913
Definition XProtocol.hh:1282
ServerResponseHeader hdr
Definition XProtocol.hh:1283
uint16_t code
Error type, or additional hints on what to do.
Definition XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition XrdClStatus.hh:124