xrootd
Loading...
Searching...
No Matches
XrdClZipCache.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef SRC_XRDZIP_XRDZIPINFLCACHE_HH_
26#define SRC_XRDZIP_XRDZIPINFLCACHE_HH_
27
29#include <zlib.h>
30#include <exception>
31#include <string>
32#include <vector>
33#include <mutex>
34#include <queue>
35#include <tuple>
36
37namespace XrdCl
38{
39 //---------------------------------------------------------------------------
41 //---------------------------------------------------------------------------
42 struct ZipError : public std::exception
43 {
47
49 };
50
51 //---------------------------------------------------------------------------
53 //---------------------------------------------------------------------------
55 {
56 public:
57
58 typedef std::vector<char> buffer_t;
59
60 private:
61
62 typedef std::tuple<uint64_t, uint32_t, void*, ResponseHandler*> read_args_t;
63 typedef std::tuple<XRootDStatus, uint64_t, buffer_t> read_resp_t;
64
66 {
67 inline bool operator() ( const read_resp_t &lhs, const read_resp_t &rhs ) const
68 {
69 return std::get<1>( lhs ) > std::get<1>( rhs );
70 }
71 };
72
73 typedef std::priority_queue<read_resp_t, std::vector<read_resp_t>, greater_read_resp_t> resp_queue_t;
74
75 public:
76
78 {
79 strm.zalloc = Z_NULL;
80 strm.zfree = Z_NULL;
81 strm.opaque = Z_NULL;
82 strm.avail_in = 0;
83 strm.next_in = Z_NULL;
84 strm.avail_out = 0;
85 strm.next_out = Z_NULL;
86
87 // make sure zlib doesn't look for gzip headers, in order to do so
88 // pass negative window bits !!!
89 int rc = inflateInit2( &strm, -MAX_WBITS );
90 XrdCl::XRootDStatus st = ToXRootDStatus( rc, "inflateInit2" );
91 if( !st.IsOK() ) throw ZipError( st );
92 }
93
95 {
96 inflateEnd( &strm );
97 }
98
99 inline void QueueReq( uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler )
100 {
101 std::unique_lock<std::mutex> lck( mtx );
102 rdreqs.emplace( offset, length, buffer, handler );
103 Decompress();
104 }
105
106 inline void QueueRsp( const XRootDStatus &st, uint64_t offset, buffer_t &&buffer )
107 {
108 std::unique_lock<std::mutex> lck( mtx );
109 rdrsps.emplace( st, offset, std::move( buffer ) );
110 Decompress();
111 }
112
113 private:
114
115 inline bool HasInput() const
116 {
117 return strm.avail_in != 0;
118 }
119
120 inline bool HasOutput() const
121 {
122 return strm.avail_out != 0;
123 }
124
125 inline void Input( const read_resp_t &rdrsp )
126 {
127 const buffer_t &buffer = std::get<2>( rdrsp );
128 strm.avail_in = buffer.size();
129 strm.next_in = (Bytef*)buffer.data();
130 }
131
132 inline void Output( const read_args_t &rdreq )
133 {
134 strm.avail_out = std::get<1>( rdreq );
135 strm.next_out = (Bytef*)std::get<2>( rdreq );
136 }
137
138 inline bool Consecutive( const read_resp_t &resp ) const
139 {
140 return ( std::get<1>( resp ) == inabsoff );
141 }
142
144 {
145 while( HasInput() || HasOutput() || !rdreqs.empty() || !rdrsps.empty() )
146 {
147 if( !HasOutput() && !rdreqs.empty() )
148 Output( rdreqs.front() );
149
150 if( !HasInput() && !rdrsps.empty() && Consecutive( rdrsps.top() ) ) // the response might come out of order so we need to check the offset
151 Input( rdrsps.top() );
152
153 if( !HasInput() || !HasOutput() ) return;
154
155 // check the response status
156 XRootDStatus st = std::get<0>( rdrsps.top() );
157 if( !st.IsOK() ) return CallHandler( st );
158
159 // the available space in output buffer before inflating
160 uInt avail_before = strm.avail_in;
161 // decompress the data
162 int rc = inflate( &strm, Z_SYNC_FLUSH );
163 st = ToXRootDStatus( rc, "inflate" );
164 if( !st.IsOK() ) return CallHandler( st ); // report error to user handler
165 // update the absolute input offset by the number of bytes we consumed
166 inabsoff += avail_before - strm.avail_in;
167
168 if( !strm.avail_out ) // the output buffer is empty meaning a request has been fulfilled
170
171 // the input buffer is empty meaning a response has been consumed
172 // (we need to check if there are any elements in the responses
173 // queue as the input buffer might have been set directly by the user)
174 if( !strm.avail_in && !rdrsps.empty() )
175 rdrsps.pop();
176 }
177 }
178
179 static inline AnyObject* PkgRsp( ChunkInfo *chunk )
180 {
181 if( !chunk ) return nullptr;
182 AnyObject *rsp = new AnyObject();
183 rsp->Set( chunk );
184 return rsp;
185 }
186
187 inline void CallHandler( const XRootDStatus &st )
188 {
189 if( rdreqs.empty() ) return;
190 read_args_t args = std::move( rdreqs.front() );
191 rdreqs.pop();
192
193 ChunkInfo *chunk = nullptr;
194 if( st.IsOK() ) chunk = new ChunkInfo( std::get<0>( args ),
195 std::get<1>( args ),
196 std::get<2>( args ) );
197
198 ResponseHandler *handler = std::get<3>( args );
199 handler->HandleResponse( new XRootDStatus( st ), PkgRsp( chunk ) );
200 }
201
202 XrdCl::XRootDStatus ToXRootDStatus( int rc, const std::string &func )
203 {
204 std::string msg = "[zlib] " + func + " : ";
205
206 switch( rc )
207 {
208 case Z_STREAM_END :
209 case Z_OK : return XrdCl::XRootDStatus();
210 case Z_BUF_ERROR : return XrdCl::XRootDStatus( XrdCl::stOK, XrdCl::suContinue );
211 case Z_MEM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_MEM_ERROR, msg + "not enough memory." );
212 case Z_VERSION_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_VERSION_ERROR, msg + "version mismatch." );
213 case Z_STREAM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidArgs, Z_STREAM_ERROR, msg + "invalid argument." );
214 case Z_NEED_DICT : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_NEED_DICT, msg + "need dict.");
215 case Z_DATA_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_DATA_ERROR, msg + "corrupted data." );
217 }
218 }
219
220 z_stream strm; // the zlib stream we will use for reading
221
222 std::mutex mtx;
223 uint64_t inabsoff; //< the absolute offset in the input file (compressed), ensures the user is actually streaming the data
224 std::queue<read_args_t> rdreqs; //< pending read requests (we only allow read requests to be submitted in order)
225 resp_queue_t rdrsps; //< pending read responses (due to multiple-streams the read response may come out of order)
226 };
227
228}
229
230#endif /* SRC_XRDZIP_XRDZIPINFLCACHE_HH_ */
Definition XrdClAnyObject.hh:33
void Set(Type object, bool own=true)
Definition XrdClAnyObject.hh:59
Handle an async response.
Definition XrdClXRootDResponses.hh:1126
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition XrdClXRootDResponses.hh:1155
Request status.
Definition XrdClXRootDResponses.hh:219
Utility class for inflating a compressed buffer.
Definition XrdClZipCache.hh:55
std::tuple< uint64_t, uint32_t, void *, ResponseHandler * > read_args_t
Definition XrdClZipCache.hh:62
bool HasInput() const
Definition XrdClZipCache.hh:115
XrdCl::XRootDStatus ToXRootDStatus(int rc, const std::string &func)
Definition XrdClZipCache.hh:202
uint64_t inabsoff
Definition XrdClZipCache.hh:223
void QueueRsp(const XRootDStatus &st, uint64_t offset, buffer_t &&buffer)
Definition XrdClZipCache.hh:106
void Output(const read_args_t &rdreq)
Definition XrdClZipCache.hh:132
bool HasOutput() const
Definition XrdClZipCache.hh:120
~ZipCache()
Definition XrdClZipCache.hh:94
bool Consecutive(const read_resp_t &resp) const
Definition XrdClZipCache.hh:138
std::mutex mtx
Definition XrdClZipCache.hh:222
std::tuple< XRootDStatus, uint64_t, buffer_t > read_resp_t
Definition XrdClZipCache.hh:63
ZipCache()
Definition XrdClZipCache.hh:77
void Decompress()
Definition XrdClZipCache.hh:143
std::priority_queue< read_resp_t, std::vector< read_resp_t >, greater_read_resp_t > resp_queue_t
Definition XrdClZipCache.hh:73
void CallHandler(const XRootDStatus &st)
Definition XrdClZipCache.hh:187
void QueueReq(uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler)
Definition XrdClZipCache.hh:99
void Input(const read_resp_t &rdrsp)
Definition XrdClZipCache.hh:125
z_stream strm
Definition XrdClZipCache.hh:220
std::vector< char > buffer_t
Definition XrdClZipCache.hh:58
resp_queue_t rdrsps
Definition XrdClZipCache.hh:225
static AnyObject * PkgRsp(ChunkInfo *chunk)
Definition XrdClZipCache.hh:179
std::queue< read_args_t > rdreqs
Definition XrdClZipCache.hh:224
Definition XrdClAction.hh:34
const uint16_t errUnknown
Unknown error.
Definition XrdClStatus.hh:50
const uint16_t stError
An error occurred that could potentially be retried.
Definition XrdClStatus.hh:32
const uint16_t errDataError
data is corrupted
Definition XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition XrdClStatus.hh:31
const uint16_t errInvalidArgs
Definition XrdClStatus.hh:58
const uint16_t suContinue
Definition XrdClStatus.hh:39
Describe a data chunk for vector read.
Definition XrdClXRootDResponses.hh:917
bool IsOK() const
We're fine.
Definition XrdClStatus.hh:124
Definition XrdClZipCache.hh:66
bool operator()(const read_resp_t &lhs, const read_resp_t &rhs) const
Definition XrdClZipCache.hh:67
An exception for carrying the XRootDStatus of InflCache.
Definition XrdClZipCache.hh:43
XrdCl::XRootDStatus status
Definition XrdClZipCache.hh:48
ZipError(const XrdCl::XRootDStatus &status)
Definition XrdClZipCache.hh:44