xrootd
Loading...
Searching...
No Matches
XrdTpcStream.hh
Go to the documentation of this file.
1
10#include <memory>
11#include <vector>
12#include <string>
13
14#include <cstring>
15
16struct stat;
17
18class XrdSfsFile;
19class XrdSysError;
20
21namespace TPC {
22class Stream {
23public:
24 Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
25 : m_open_for_write(false),
26 m_avail_count(max_blocks),
27 m_fh(std::move(fh)),
28 m_offset(0),
29 m_log(log)
30 {
31 m_buffers.reserve(max_blocks);
32 for (size_t idx=0; idx < max_blocks; idx++) {
33 m_buffers.push_back(new Entry(buffer_size));
34 }
35 m_open_for_write = true;
36 }
37
39
40 int Stat(struct stat *);
41
42 int Read(off_t offset, char *buffer, size_t size);
43
44 // Writes a buffer of a given size to an offset.
45 // This will often keep the buffer in memory in to present the underlying
46 // filesystem with a single stream of data (required for HDFS); further,
47 // it will also buffer to align the writes on a 1MB boundary (required
48 // for some RADOS configurations). When force is set to true, it will
49 // skip the buffering and always write (this should only be done at the
50 // end of a stream!).
51 //
52 // Returns the number of bytes written; on error, returns -1 and sets
53 // the error code and error message for the stream
54 ssize_t Write(off_t offset, const char *buffer, size_t size, bool force);
55
56 size_t AvailableBuffers() const {return m_avail_count;}
57
58 void DumpBuffers() const;
59
60 // Flush and finalize the stream. If all data has been sent to the underlying
61 // file handle, close() will be invoked on the file handle.
62 //
63 // Further write operations on this stream will result in an error.
64 // If any memory buffers remain, an error occurs.
65 //
66 // Returns true on success; false otherwise.
67 bool Finalize();
68
69 std::string GetErrorMessage() const {return m_error_buf;}
70
71private:
72
73 class Entry {
74 public:
75 Entry(size_t capacity) :
76 m_offset(-1),
77 m_capacity(capacity),
78 m_size(0)
79 {}
80
81 bool Available() const {return m_offset == -1;}
82
83 int Write(Stream &stream, bool force) {
84 if (Available() || !CanWrite(stream)) {return 0;}
85 // Only full buffer writes are accepted unless the stream forces a flush
86 // (i.e., we are at EOF) because the multistream code uses buffer occupancy
87 // to determine how many streams are currently in-flight. If we do an early
88 // write, then the buffer will be empty and the multistream code may decide
89 // to start another request (which we don't have the capacity to serve!).
90 if (!force && (m_size != m_capacity)) {
91 return 0;
92 }
93 ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size);
94 // Currently the only valid negative value is SFS_ERROR (-1); checking for
95 // all negative values to future-proof the code.
96 if ((retval < 0) || (static_cast<size_t>(retval) != m_size)) {
97 return -1;
98 }
99 m_offset = -1;
100 m_size = 0;
101 m_buffer.clear();
102 return retval;
103 }
104
105 size_t Accept(off_t offset, const char *buf, size_t size) {
106 // Validate acceptance criteria.
107 if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
108 return 0;
109 }
110 size_t to_accept = m_capacity - m_size;
111 if (to_accept == 0) {return 0;}
112 if (size > to_accept) {
113 size = to_accept;
114 }
115
116 // Inflate the underlying buffer if needed.
117 ssize_t new_bytes_needed = (m_size + size) - m_buffer.size();
118 if (new_bytes_needed > 0) {
119 m_buffer.resize(m_capacity);
120 }
121
122 // Finally, do the copy.
123 memcpy(&m_buffer[0] + m_size, buf, size);
124 m_size += size;
125 if (m_offset == -1) {
126 m_offset = offset;
127 }
128 return size;
129 }
130
132 if (!Available()) {return;}
133#if __cplusplus > 199711L
134 m_buffer.shrink_to_fit();
135#endif
136 }
137
138 void Move(Entry &other) {
139 m_buffer.swap(other.m_buffer);
140 m_offset = other.m_offset;
141 m_size = other.m_size;
142 }
143
144 off_t GetOffset() const {return m_offset;}
145 size_t GetCapacity() const {return m_capacity;}
146 size_t GetSize() const {return m_size;}
147
148 private:
149
150 Entry(const Entry&) = delete;
151
152 bool CanWrite(Stream &stream) const {
153 return (m_size > 0) && (m_offset == stream.m_offset);
154 }
155
156 off_t m_offset; // Offset within file that m_buffer[0] represents.
158 size_t m_size; // Number of bytes held in buffer.
159 std::vector<char> m_buffer;
160 };
161
162 ssize_t WriteImpl(off_t offset, const char *buffer, size_t size);
163
166 std::unique_ptr<XrdSfsFile> m_fh;
167 off_t m_offset;
168 std::vector<Entry*> m_buffers;
170 std::string m_error_buf;
171};
172}
#define stat(a, b)
Definition XrdPosix.hh:96
Definition XrdTpcStream.hh:73
void Move(Entry &other)
Definition XrdTpcStream.hh:138
size_t m_size
Definition XrdTpcStream.hh:158
int Write(Stream &stream, bool force)
Definition XrdTpcStream.hh:83
off_t m_offset
Definition XrdTpcStream.hh:156
Entry(size_t capacity)
Definition XrdTpcStream.hh:75
off_t GetOffset() const
Definition XrdTpcStream.hh:144
std::vector< char > m_buffer
Definition XrdTpcStream.hh:159
bool CanWrite(Stream &stream) const
Definition XrdTpcStream.hh:152
Entry(const Entry &)=delete
bool Available() const
Definition XrdTpcStream.hh:81
size_t GetSize() const
Definition XrdTpcStream.hh:146
size_t GetCapacity() const
Definition XrdTpcStream.hh:145
void ShrinkIfUnused()
Definition XrdTpcStream.hh:131
size_t Accept(off_t offset, const char *buf, size_t size)
Definition XrdTpcStream.hh:105
size_t m_capacity
Definition XrdTpcStream.hh:157
Definition XrdTpcStream.hh:22
std::unique_ptr< XrdSfsFile > m_fh
Definition XrdTpcStream.hh:166
ssize_t WriteImpl(off_t offset, const char *buffer, size_t size)
bool m_open_for_write
Definition XrdTpcStream.hh:164
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
off_t m_offset
Definition XrdTpcStream.hh:167
void DumpBuffers() const
size_t m_avail_count
Definition XrdTpcStream.hh:165
XrdSysError & m_log
Definition XrdTpcStream.hh:169
int Stat(struct stat *)
std::vector< Entry * > m_buffers
Definition XrdTpcStream.hh:168
int Read(off_t offset, char *buffer, size_t size)
bool Finalize()
Stream(std::unique_ptr< XrdSfsFile > fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
Definition XrdTpcStream.hh:24
std::string GetErrorMessage() const
Definition XrdTpcStream.hh:69
size_t AvailableBuffers() const
Definition XrdTpcStream.hh:56
std::string m_error_buf
Definition XrdTpcStream.hh:170
Definition XrdSfsInterface.hh:369
Definition XrdSysError.hh:90
Definition XrdTpcState.hh:17
Definition XrdOucJson.hh:4517