xrootd
Loading...
Searching...
No Matches
XrdClAsyncRawReader.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCRAWREADER_HH_
20#define SRC_XRDCL_XRDCLASYNCRAWREADER_HH_
21
22
24#include "XrdCl/XrdClSocket.hh"
25#include "XrdCl/XrdClStream.hh"
27
28namespace XrdCl
29{
30
31 //----------------------------------------------------------------------------
33 //----------------------------------------------------------------------------
35 {
36 public:
37 //------------------------------------------------------------------------
42 //------------------------------------------------------------------------
43 AsyncRawReader( const URL &url, const Message &request ) :
45 {
46 }
47
48 //------------------------------------------------------------------------
54 //------------------------------------------------------------------------
55 XRootDStatus Read( Socket &socket, uint32_t &btsret )
56 {
57 while( true )
58 {
59 switch( readstage )
60 {
61 //------------------------------------------------------------------
62 // Prepare to readout a new response
63 //------------------------------------------------------------------
64 case ReadStart:
65 {
66 msgbtsrd = 0;
67 chlen = ( *chunks )[0].length;
69 continue;
70 }
71
72 //------------------------------------------------------------------
73 // Readout the raw data
74 //------------------------------------------------------------------
75 case ReadRaw:
76 {
77 //----------------------------------------------------------------
78 // Make sure we are not reading past the end of the read response
79 //----------------------------------------------------------------
80 if( msgbtsrd + chlen > dlen )
82
83 //----------------------------------------------------------------
84 // Readout the raw data from the socket
85 //----------------------------------------------------------------
86 uint32_t btsrd = 0;
87 char *buff = static_cast<char*>( ( *chunks )[chidx].buffer );
88 Status st = ReadBytesAsync( socket, buff + choff, chlen, btsrd );
89 choff += btsrd;
90 chlen -= btsrd;
91 msgbtsrd += btsrd;
92 rawbtsrd += btsrd;
93 btsret += btsrd;
94
95 if( !st.IsOK() || st.code == suRetry )
96 return st;
97
98 //----------------------------------------------------------------
99 // If the chunk is full, move to the next buffer
100 //----------------------------------------------------------------
101 if( choff == ( *chunks )[chidx].length )
102 {
103 ++chidx;
104 choff = 0;
105 chlen = ( chidx < chunks->size() ? ( *chunks )[chidx].length : 0 );
106 }
107 //----------------------------------------------------------------
108 // Check if there are some data left in the response to be readout
109 // from the socket.
110 //----------------------------------------------------------------
111 if( msgbtsrd < dlen )
112 {
113 //--------------------------------------------------------------
114 // We run out of space, the server has send too much data
115 //--------------------------------------------------------------
116 if( chidx >= chunks->size() )
117 {
119 continue;
120 }
122 continue;
123 }
124 //----------------------------------------------------------------
125 // We are done
126 //----------------------------------------------------------------
128 continue;
129 }
130
131 //------------------------------------------------------------------
132 // We've had an error and we are in the discarding mode
133 //------------------------------------------------------------------
134 case ReadDiscard:
135 {
136 DefaultEnv::GetLog()->Error( XRootDMsg, "[%s] RawReader: Handling "
137 "response to %s: user supplied buffer is "
138 "too small for the received data.",
139 url.GetHostId().c_str(),
140 request.GetDescription().c_str() );
141 // Just drop the connection, we don't know if the stream is sane
142 // anymore. Recover with a reconnect.
144 }
145
146 //------------------------------------------------------------------
147 // Finalize the read
148 //------------------------------------------------------------------
149 case ReadDone:
150 {
151 break;
152 }
153
154 //------------------------------------------------------------------
155 // Others should not happen
156 //------------------------------------------------------------------
157 default : return XRootDStatus( stError, errInternal );
158 }
159
160 // just in case
161 break;
162 }
163 //----------------------------------------------------------------------
164 // We are done
165 //----------------------------------------------------------------------
166 return XRootDStatus();
167 }
168
169 //------------------------------------------------------------------------
171 //------------------------------------------------------------------------
173 {
174 if( dataerr )
176 std::unique_ptr<AnyObject> rsp( new AnyObject() );
178 rsp->Set( GetVectorReadInfo() );
179 else
180 rsp->Set( GetChunkInfo() );
181 response = rsp.release();
182 return XRootDStatus();
183 }
184
185 private:
186
188 {
189 ChunkInfo *info = new ChunkInfo( chunks->front() );
190 info->length = rawbtsrd;
191 return info;
192 }
193
195 {
196 VectorReadInfo *info = new VectorReadInfo();
197 info->SetSize( rawbtsrd );
198 int btsleft = rawbtsrd;
199 for( auto &chunk : *chunks )
200 {
201 int length = uint32_t( btsleft ) >= chunk.length ? chunk.length : btsleft;
202 info->GetChunks().emplace_back( chunk.offset, length, chunk.buffer );
203 btsleft -= length;
204 }
205 return info;
206 }
207 };
208
209} /* namespace XrdCl */
210
211#endif /* SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_ */
@ kXR_virtReadv
Definition XProtocol.hh:150
Definition XrdClAnyObject.hh:33
Base class for any message's body reader.
Definition XrdClAsyncRawReaderIntfc.hh:35
size_t chidx
Definition XrdClAsyncRawReaderIntfc.hh:163
bool dataerr
Definition XrdClAsyncRawReaderIntfc.hh:168
size_t chlen
Definition XrdClAsyncRawReaderIntfc.hh:165
Stage readstage
Definition XrdClAsyncRawReaderIntfc.hh:149
XRootDStatus ReadBytesAsync(Socket &socket, char *buffer, uint32_t toBeRead, uint32_t &bytesRead)
Definition XrdClAsyncRawReaderIntfc.hh:98
const Message & request
Definition XrdClAsyncRawReaderIntfc.hh:155
size_t choff
Definition XrdClAsyncRawReaderIntfc.hh:164
const URL & url
Definition XrdClAsyncRawReaderIntfc.hh:154
uint32_t dlen
Definition XrdClAsyncRawReaderIntfc.hh:159
uint32_t rawbtsrd
Definition XrdClAsyncRawReaderIntfc.hh:161
uint32_t msgbtsrd
Definition XrdClAsyncRawReaderIntfc.hh:160
ChunkList * chunks
Definition XrdClAsyncRawReaderIntfc.hh:157
@ ReadDiscard
Definition XrdClAsyncRawReaderIntfc.hh:142
@ ReadStart
Definition XrdClAsyncRawReaderIntfc.hh:139
@ ReadDone
Definition XrdClAsyncRawReaderIntfc.hh:143
@ ReadRaw
Definition XrdClAsyncRawReaderIntfc.hh:141
Object for reading out data from the kXR_read response.
Definition XrdClAsyncRawReader.hh:35
VectorReadInfo * GetVectorReadInfo()
Definition XrdClAsyncRawReader.hh:194
XRootDStatus Read(Socket &socket, uint32_t &btsret)
Definition XrdClAsyncRawReader.hh:55
XRootDStatus GetResponse(AnyObject *&response)
Get the response.
Definition XrdClAsyncRawReader.hh:172
ChunkInfo * GetChunkInfo()
Definition XrdClAsyncRawReader.hh:187
AsyncRawReader(const URL &url, const Message &request)
Definition XrdClAsyncRawReader.hh:43
static Log * GetLog()
Get default log.
void Error(uint64_t topic, const char *format,...)
Report an error.
The message representation used throughout the system.
Definition XrdClMessage.hh:30
const std::string & GetDescription() const
Get the description of the message.
Definition XrdClMessage.hh:95
uint16_t GetVirtReqID() const
Get virtual request ID for the message.
Definition XrdClMessage.hh:127
A network socket.
Definition XrdClSocket.hh:43
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:94
Vector read info.
Definition XrdClXRootDResponses.hh:1061
ChunkList & GetChunks()
Get chunks.
Definition XrdClXRootDResponses.hh:1087
void SetSize(uint32_t size)
Set size.
Definition XrdClXRootDResponses.hh:1079
Request status.
Definition XrdClXRootDResponses.hh:219
Definition XrdClAction.hh:34
const uint16_t suRetry
Definition XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition XrdClStatus.hh:32
const uint64_t XRootDMsg
Definition XrdClConstants.hh:39
const uint16_t errInternal
Internal error.
Definition XrdClStatus.hh:56
const uint16_t errInvalidResponse
Definition XrdClStatus.hh:99
const uint16_t errCorruptedHeader
Definition XrdClStatus.hh:103
Describe a data chunk for vector read.
Definition XrdClXRootDResponses.hh:917
uint32_t length
offset in the file
Definition XrdClXRootDResponses.hh:949
Procedure execution status.
Definition XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition XrdClStatus.hh:124