xrootd
Loading...
Searching...
No Matches
XrdClAsyncMsgWriter.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
20#define SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
21
22#include "XrdCl/XrdClMessage.hh"
25#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClStream.hh"
28#include "XrdSys/XrdSysE2T.hh"
29
30#include <memory>
31
32namespace XrdCl
33{
34 //----------------------------------------------------------------------------
36 //----------------------------------------------------------------------------
38 {
39 public:
40 //------------------------------------------------------------------------
48 //------------------------------------------------------------------------
51 const std::string &strmname,
52 Stream &strm,
53 uint16_t substrmnb,
56 socket( socket ),
58 strm( strm ),
60 chdata( chdata ),
61 outmsg( nullptr ),
62 outmsgsize( 0 ),
63 outhandler( nullptr )
64 {
65 }
66
67 //------------------------------------------------------------------------
69 //------------------------------------------------------------------------
70 inline void Reset()
71 {
73 outmsg = nullptr;
74 outmsgsize = 0;;
75 outhandler = nullptr;
76 outsign.reset();
77 }
78
79 //------------------------------------------------------------------------
81 //------------------------------------------------------------------------
83 {
84 Log *log = DefaultEnv::GetLog();
85 while( true )
86 {
87 switch( writestage )
88 {
89 //------------------------------------------------------------------
90 // Pick up a message if we're not in process of writing something
91 //------------------------------------------------------------------
92 case WriteStart:
93 {
94 std::pair<Message *, MsgHandler *> toBeSent;
95 toBeSent = strm.OnReadyToWrite( substrmnb );
96 outmsg = toBeSent.first;
97 outhandler = toBeSent.second;
98 if( !outmsg ) return XRootDStatus( stOK, suAlreadyDone );
99
100 outmsg->SetCursor( 0 );
102
103 //----------------------------------------------------------------
104 // Secure the message if necessary
105 //----------------------------------------------------------------
106 Message *signature = nullptr;
108 if( !st.IsOK() ) return st;
109 outsign.reset( signature );
110
111 if( outsign )
112 outmsgsize += outsign->GetSize();
113
114 //----------------------------------------------------------------
115 // The next step is to write the signature
116 //----------------------------------------------------------------
118 continue;
119 }
120 //------------------------------------------------------------------
121 // First write the signature (if there is one)
122 //------------------------------------------------------------------
123 case WriteSign:
124 {
125 //----------------------------------------------------------------
126 // If there is a signature for the request send it over the socket
127 //----------------------------------------------------------------
128 if( outsign )
129 {
131 if( !st.IsOK() || st.code == suRetry ) return st;
132 }
133 //----------------------------------------------------------------
134 // The next step is to write the signature
135 //----------------------------------------------------------------
137 continue;
138 }
139 //------------------------------------------------------------------
140 // Then write the request itself
141 //------------------------------------------------------------------
142 case WriteRequest:
143 {
145 if( !st.IsOK() || st.code == suRetry ) return st;
146 //----------------------------------------------------------------
147 // The next step is to write the signature
148 //----------------------------------------------------------------
150 continue;
151 }
152 //------------------------------------------------------------------
153 // And then write the raw data (if any)
154 //------------------------------------------------------------------
155 case WriteRawData:
156 {
157 if( outhandler->IsRaw() )
158 {
159 uint32_t wrtcnt = 0;
161 if( !st.IsOK() || st.code == suRetry ) return st;
162 outmsgsize += wrtcnt;
163 log->Dump( AsyncSockMsg, "[%s] Wrote %d bytes of raw data of message"
164 "(0x%x) body.", strmname.c_str(), wrtcnt, outmsg );
165 }
166 //----------------------------------------------------------------
167 // The next step is to finalize the write operation
168 //----------------------------------------------------------------
170 continue;
171 }
172 //------------------------------------------------------------------
173 // Finally, finalize the write operation
174 //------------------------------------------------------------------
175 case WriteDone:
176 {
178 if( !st.IsOK() )
179 {
180 log->Error( AsyncSockMsg, "[%s] Unable to flash the socket: %s",
181 strmname.c_str(), XrdSysE2T( st.errNo ) );
182 return st;
183 }
184
185 log->Dump( AsyncSockMsg, "[%s] Successfully sent message: %s (0x%x).",
186 strmname.c_str(), outmsg->GetDescription().c_str(), outmsg );
187
189 return XRootDStatus();
190 }
191 }
192 // just in case ...
193 break;
194 }
195 //----------------------------------------------------------------------
196 // We are done
197 //----------------------------------------------------------------------
198 return XRootDStatus();
199 }
200
201 private:
202
203 //------------------------------------------------------------------------
205 //------------------------------------------------------------------------
206 enum Stage
207 {
208 WriteStart, //< the next step is to initialize the read
209 WriteSign, //< the next step is to write the signature
210 WriteRequest, //< the next step is to write the request
211 WriteRawData, //< the next step is to write the raw data
212 WriteDone //< the next step is to finalize the write
213 };
214
215 //------------------------------------------------------------------------
216 // Current read stage
217 //------------------------------------------------------------------------
219
220 //------------------------------------------------------------------------
221 // The context of the read operation
222 //------------------------------------------------------------------------
225 const std::string &strmname;
227 uint16_t substrmnb;
229
230 //------------------------------------------------------------------------
231 // The internal state of the the reader
232 //------------------------------------------------------------------------
233 Message *outmsg; //< we don't own the message
234 uint32_t outmsgsize;
236 std::unique_ptr<Message> outsign;
237 };
238
239}
240
241#endif /* SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_ */
const char * XrdSysE2T(int errcode)
Definition XrdClAnyObject.hh:33
Utility class encapsulating writing request logic.
Definition XrdClAsyncMsgWriter.hh:38
uint16_t substrmnb
Definition XrdClAsyncMsgWriter.hh:227
AnyObject & chdata
Definition XrdClAsyncMsgWriter.hh:228
std::unique_ptr< Message > outsign
Definition XrdClAsyncMsgWriter.hh:236
Stream & strm
Definition XrdClAsyncMsgWriter.hh:226
MsgHandler * outhandler
Definition XrdClAsyncMsgWriter.hh:235
Message * outmsg
Definition XrdClAsyncMsgWriter.hh:233
Stage writestage
Definition XrdClAsyncMsgWriter.hh:218
XRootDStatus Write()
Write the request into the socket.
Definition XrdClAsyncMsgWriter.hh:82
TransportHandler & xrdTransport
Definition XrdClAsyncMsgWriter.hh:223
Socket & socket
Definition XrdClAsyncMsgWriter.hh:224
const std::string & strmname
Definition XrdClAsyncMsgWriter.hh:225
Stage
Stages of reading out a response from the socket.
Definition XrdClAsyncMsgWriter.hh:207
@ WriteStart
Definition XrdClAsyncMsgWriter.hh:208
@ WriteRawData
Definition XrdClAsyncMsgWriter.hh:211
@ WriteSign
Definition XrdClAsyncMsgWriter.hh:209
@ WriteDone
Definition XrdClAsyncMsgWriter.hh:212
@ WriteRequest
Definition XrdClAsyncMsgWriter.hh:210
AsyncMsgWriter(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb, AnyObject &chdata)
Definition XrdClAsyncMsgWriter.hh:49
uint32_t outmsgsize
Definition XrdClAsyncMsgWriter.hh:234
void Reset()
Reset the state of the object (makes it ready to read out next msg)
Definition XrdClAsyncMsgWriter.hh:70
void SetCursor(uint32_t cursor)
Set the cursor.
Definition XrdClBuffer.hh:148
uint32_t GetSize() const
Get the size of the message.
Definition XrdClBuffer.hh:132
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
The message representation used throughout the system.
Definition XrdClMessage.hh:30
const std::string & GetDescription() const
Get the description of the message.
Definition XrdClMessage.hh:95
Message handler.
Definition XrdClPostMasterInterfaces.hh:51
virtual XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
Definition XrdClPostMasterInterfaces.hh:196
virtual bool IsRaw() const
Definition XrdClPostMasterInterfaces.hh:184
A network socket.
Definition XrdClSocket.hh:43
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
XRootDStatus Flash()
Stream.
Definition XrdClStream.hh:52
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
Perform the handshake and the authentication for each physical stream.
Definition XrdClPostMasterInterfaces.hh:310
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)=0
Get signature for given message.
Request status.
Definition XrdClXRootDResponses.hh:219
Definition XrdClAction.hh:34
const uint16_t suRetry
Definition XrdClStatus.hh:40
const uint16_t stOK
Everything went OK.
Definition XrdClStatus.hh:31
const uint64_t AsyncSockMsg
Definition XrdClConstants.hh:41
const uint16_t suAlreadyDone
Definition XrdClStatus.hh:42
uint16_t code
Error type, or additional hints on what to do.
Definition XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition XrdClStatus.hh:124
uint32_t errNo
Errno, if any.
Definition XrdClStatus.hh:148