XRootD
Loading...
Searching...
No Matches
XrdHttpTpcMultistream.cc
Go to the documentation of this file.
1
5#include "XrdHttpTpcTPC.hh"
6#include "XrdHttpTpcState.hh"
7
9
10#include <curl/curl.h>
11
12#include <algorithm>
13#include <sstream>
14#include <stdexcept>
15
16
17using namespace TPC;
18
19class CurlHandlerSetupError : public std::runtime_error {
20public:
21 CurlHandlerSetupError(const std::string &msg) :
22 std::runtime_error(msg)
23 {}
24
25 virtual ~CurlHandlerSetupError() noexcept {}
26};
27
28namespace {
29class MultiCurlHandler {
30public:
31 MultiCurlHandler(std::vector<State*> &states, XrdSysError &log) :
32 m_handle(curl_multi_init()),
33 m_states(states),
34 m_log(log),
35 m_bytes_transferred(0),
36 m_error_code(0),
37 m_status_code(0)
38 {
39 if (m_handle == NULL) {
40 throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
41 }
42 m_avail_handles.reserve(states.size());
43 m_active_handles.reserve(states.size());
44 for (std::vector<State*>::const_iterator state_iter = states.begin();
45 state_iter != states.end();
46 state_iter++) {
47 m_avail_handles.push_back((*state_iter)->GetHandle());
48 }
49 }
50
51 ~MultiCurlHandler()
52 {
53 if (!m_handle) {return;}
54 for (std::vector<CURL *>::const_iterator it = m_active_handles.begin();
55 it != m_active_handles.end();
56 it++) {
57 curl_multi_remove_handle(m_handle, *it);
58 }
59 curl_multi_cleanup(m_handle);
60 }
61
62 MultiCurlHandler(const MultiCurlHandler &) = delete;
63
64 CURLM *Get() const {return m_handle;}
65
66 void FinishCurlXfer(CURL *curl) {
67 CURLMcode mres = curl_multi_remove_handle(m_handle, curl);
68 if (mres) {
69 std::stringstream ss;
70 ss << "Failed to remove transfer from set: "
71 << curl_multi_strerror(mres);
72 throw std::runtime_error(ss.str());
73 }
74 for (std::vector<State*>::iterator state_iter = m_states.begin();
75 state_iter != m_states.end();
76 state_iter++) {
77 if (curl == (*state_iter)->GetHandle()) {
78 m_bytes_transferred += (*state_iter)->BytesTransferred();
79 int error_code = (*state_iter)->GetErrorCode();
80 if (error_code && !m_error_code) {
81 m_error_code = error_code;
82 m_error_message = (*state_iter)->GetErrorMessage();
83 }
84 int status_code = (*state_iter)->GetStatusCode();
85 if (status_code >= 400 && !m_status_code) {
86 m_status_code = status_code;
87 m_error_message = (*state_iter)->GetErrorMessage();
88 }
89 (*state_iter)->ResetAfterRequest();
90 break;
91 }
92 }
93 for (std::vector<CURL *>::iterator iter = m_active_handles.begin();
94 iter != m_active_handles.end();
95 ++iter)
96 {
97 if (*iter == curl) {
98 m_active_handles.erase(iter);
99 break;
100 }
101 }
102 m_avail_handles.push_back(curl);
103 }
104
105 off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size,
106 int &running_handles) {
107 bool started_new_xfer = false;
108 do {
109 size_t xfer_size = std::min(content_length - current_offset, static_cast<off_t>(block_size));
110 if (xfer_size == 0) {return current_offset;}
111 if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) {
112 // In this case, we need to start new transfers but weren't able to.
113 if (running_handles == 0) {
114 if (!CanStartTransfer(true)) {
115 m_log.Emsg("StartTransfers", "Unable to start transfers.");
116 }
117 }
118 break;
119 } else {
120 running_handles += 1;
121 }
122 current_offset += xfer_size;
123 } while (true);
124 return current_offset;
125 }
126
127 int Flush() {
128 int last_error = 0;
129 for (std::vector<State*>::iterator state_it = m_states.begin();
130 state_it != m_states.end();
131 state_it++)
132 {
133 int error = (*state_it)->Flush();
134 if (error) {last_error = error;}
135 }
136 return last_error;
137 }
138
139 off_t BytesTransferred() const {
140 return m_bytes_transferred;
141 }
142
143 int GetStatusCode() const {
144 return m_status_code;
145 }
146
147 int GetErrorCode() const {
148 return m_error_code;
149 }
150
151 void SetErrorCode(int error_code) {
152 m_error_code = error_code;
153 }
154
155 std::string GetErrorMessage() const {
156 return m_error_message;
157 }
158
159 void SetErrorMessage(const std::string &error_msg) {
160 m_error_message = error_msg;
161 }
162
163private:
164
165 bool StartTransfer(off_t offset, size_t size) {
166 if (!CanStartTransfer(false)) {return false;}
167 for (std::vector<CURL*>::const_iterator handle_it = m_avail_handles.begin();
168 handle_it != m_avail_handles.end();
169 handle_it++) {
170 for (std::vector<State*>::iterator state_it = m_states.begin();
171 state_it != m_states.end();
172 state_it++) {
173 if ((*state_it)->GetHandle() == *handle_it) { // This state object represents an idle handle.
174 (*state_it)->SetTransferParameters(offset, size);
175 ActivateHandle(**state_it);
176 return true;
177 }
178 }
179 }
180 return false;
181 }
182
183 void ActivateHandle(State &state) {
184 CURL *curl = state.GetHandle();
185 m_active_handles.push_back(curl);
186 CURLMcode mres;
187 mres = curl_multi_add_handle(m_handle, curl);
188 if (mres) {
189 std::stringstream ss;
190 ss << "Failed to add transfer to libcurl multi-handle"
191 << curl_multi_strerror(mres);
192 throw std::runtime_error(ss.str());
193 }
194 for (auto iter = m_avail_handles.begin();
195 iter != m_avail_handles.end();
196 ++iter)
197 {
198 if (*iter == curl) {
199 m_avail_handles.erase(iter);
200 break;
201 }
202 }
203 }
204
205 bool CanStartTransfer(bool log_reason) const {
206 size_t idle_handles = m_avail_handles.size();
207 size_t transfer_in_progress = 0;
208 for (std::vector<State*>::const_iterator state_iter = m_states.begin();
209 state_iter != m_states.end();
210 state_iter++) {
211 for (std::vector<CURL*>::const_iterator handle_iter = m_active_handles.begin();
212 handle_iter != m_active_handles.end();
213 handle_iter++) {
214 if (*handle_iter == (*state_iter)->GetHandle()) {
215 transfer_in_progress += (*state_iter)->BodyTransferInProgress();
216 break;
217 }
218 }
219 }
220 if (!idle_handles) {
221 if (log_reason) {
222 m_log.Emsg("CanStartTransfer", "Unable to start transfers as no idle CURL handles are available.");
223 }
224 return false;
225 }
226 ssize_t available_buffers = m_states[0]->AvailableBuffers();
227 // To be conservative, set aside buffers for any transfers that have been activated
228 // but don't have their first responses back yet.
229 available_buffers -= (m_active_handles.size() - transfer_in_progress);
230 if (log_reason && (available_buffers == 0)) {
231 std::stringstream ss;
232 ss << "Unable to start transfers as no buffers are available. Available buffers: " <<
233 m_states[0]->AvailableBuffers() << ", Active curl handles: " << m_active_handles.size()
234 << ", Transfers in progress: " << transfer_in_progress;
235 m_log.Emsg("CanStartTransfer", ss.str().c_str());
236 if (m_states[0]->AvailableBuffers() == 0) {
237 m_states[0]->DumpBuffers();
238 }
239 }
240 return available_buffers > 0;
241 }
242
243 CURLM *m_handle;
244 std::vector<CURL *> m_avail_handles;
245 std::vector<CURL *> m_active_handles;
246 std::vector<State*> &m_states;
247 XrdSysError &m_log;
248 off_t m_bytes_transferred;
249 int m_error_code;
250 int m_status_code;
251 std::string m_error_message;
252};
253}
254
255
256int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
257 size_t streams, std::vector<State*> &handles,
258 std::vector<ManagedCurlHandle> &curl_handles, TPCLogRecord &rec)
259{
260 bool success;
261 // The content-length was set thanks to the call to GetContentLengthTPCPull() before calling this function
262 off_t content_size = state.GetContentLength();
263 off_t current_offset = 0;
264
265 size_t concurrency = streams * m_pipelining_multiplier;
266
267 handles.reserve(concurrency);
268 handles.push_back(new State());
269 handles[0]->Move(state);
270 for (size_t idx = 1; idx < concurrency; idx++) {
271 handles.push_back(handles[0]->Duplicate());
272 curl_handles.emplace_back(handles.back()->GetHandle());
273 }
274
275 // Notify the packet marking manager that the transfer will start after this point
276 rec.pmarkManager.startTransfer();
277
278 // Create the multi-handle and add in the current transfer to it.
279 MultiCurlHandler mch(handles, m_log);
280 CURLM *multi_handle = mch.Get();
281
282 curl_multi_setopt(multi_handle, CURLMOPT_PIPELINING, 1);
283 curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, streams);
284
285 // Start response to client prior to the first call to curl_multi_perform
286 int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
287 if (retval) {
288 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
289 "Failed to send the initial response to the TPC client");
290 return retval;
291 } else {
292 logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
293 "Initial transfer response sent to the TPC client");
294 }
295
296 // Start assigning transfers
297 int running_handles = 0;
298 current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles);
299
300 // Transfer loop: use curl to actually run the transfer, but periodically
301 // interrupt things to send back performance updates to the client.
302 time_t last_marker = 0;
303 // Track the time since the transfer last made progress
304 off_t last_advance_bytes = 0;
305 time_t last_advance_time = time(NULL);
306 time_t transfer_start = last_advance_time;
307 CURLcode res = static_cast<CURLcode>(-1);
308 CURLMcode mres = CURLM_OK;
309 do {
310 time_t now = time(NULL);
311 time_t next_marker = last_marker + m_marker_period;
312 if (now >= next_marker) {
313 if (current_offset > last_advance_bytes) {
314 last_advance_bytes = current_offset;
315 last_advance_time = now;
316 }
317 if (SendPerfMarker(req, rec, handles, current_offset)) {
318 logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
319 "Failed to send a perf marker to the TPC client");
320 return -1;
321 }
322 int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
323 if (now > last_advance_time + timeout) {
324 const char *log_prefix = rec.log_prefix.c_str();
325 bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
326
327 mch.SetErrorCode(10);
328 std::stringstream ss;
329 ss << "Transfer failed because no bytes have been "
330 << (tpc_pull ? "received from the source (pull mode) in "
331 : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
332 mch.SetErrorMessage(ss.str());
333 break;
334 }
335 last_marker = now;
336 }
337
338 mres = curl_multi_perform(multi_handle, &running_handles);
339 if (mres == CURLM_CALL_MULTI_PERFORM) {
340 // curl_multi_perform should be called again immediately. On newer
341 // versions of curl, this is no longer used.
342 continue;
343 } else if (mres != CURLM_OK) {
344 break;
345 }
346
347 rec.pmarkManager.beginPMarks();
348
349
350 // Harvest any messages, looking for CURLMSG_DONE.
351 CURLMsg *msg;
352 do {
353 int msgq = 0;
354 msg = curl_multi_info_read(multi_handle, &msgq);
355 if (msg && (msg->msg == CURLMSG_DONE)) {
356 CURL *easy_handle = msg->easy_handle;
357 res = msg->data.result;
358 mch.FinishCurlXfer(easy_handle);
359 // If any requests fail, cut off the entire transfer.
360 if (res != CURLE_OK) {
361 break;
362 }
363 }
364 } while (msg);
365 if (res != static_cast<CURLcode>(-1) && res != CURLE_OK) {
366 std::stringstream ss;
367 ss << "Breaking loop due to failed curl transfer: " << curl_easy_strerror(res);
368 logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_CURL_FAILURE",
369 ss.str());
370 break;
371 }
372
373 if (running_handles < static_cast<int>(concurrency)) {
374 // Issue new transfers if there is still pending work to do.
375 // Otherwise, continue running until there are no handles left.
376 if (current_offset != content_size) {
377 current_offset = mch.StartTransfers(current_offset, content_size,
378 m_block_size, running_handles);
379 if (!running_handles) {
380 std::stringstream ss;
381 ss << "No handles are able to run. Streams=" << streams << ", concurrency="
382 << concurrency;
383
384 logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", ss.str());
385 }
386 } else if (running_handles == 0) {
387 logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE",
388 "Unable to start new transfers; breaking loop.");
389 break;
390 }
391 }
392
393 int64_t max_sleep_time = next_marker - time(NULL);
394 if (max_sleep_time <= 0) {
395 continue;
396 }
397 int fd_count;
398 mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000,
399 &fd_count);
400 if (mres != CURLM_OK) {
401 break;
402 }
403 } while (running_handles);
404
405 if (mres != CURLM_OK) {
406 std::stringstream ss;
407 ss << "Internal libcurl multi-handle error: "
408 << curl_multi_strerror(mres);
409 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", ss.str());
410 throw std::runtime_error(ss.str());
411 }
412
413 // Harvest any messages, looking for CURLMSG_DONE.
414 CURLMsg *msg;
415 do {
416 int msgq = 0;
417 msg = curl_multi_info_read(multi_handle, &msgq);
418 if (msg && (msg->msg == CURLMSG_DONE)) {
419 CURL *easy_handle = msg->easy_handle;
420 mch.FinishCurlXfer(easy_handle);
421 if (res == CURLE_OK || res == static_cast<CURLcode>(-1))
422 res = msg->data.result; // Transfer result will be examined below.
423 }
424 } while (msg);
425
426 if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
427 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
428 "Internal state error in libcurl");
429 throw std::runtime_error("Internal state error in libcurl");
430 }
431
432 mch.Flush();
433
434 rec.bytes_transferred = mch.BytesTransferred();
435 rec.tpc_status = mch.GetStatusCode();
436
437 // Generate the final response back to the client.
438 std::stringstream ss;
439 success = false;
440 if (mch.GetStatusCode() >= 400) {
441 std::string err = mch.GetErrorMessage();
442 std::stringstream ss2;
443 ss2 << "Remote side failed with status code " << mch.GetStatusCode();
444 if (!err.empty()) {
445 std::replace(err.begin(), err.end(), '\n', ' ');
446 ss2 << "; error message: \"" << err << "\"";
447 }
448 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
449 ss << generateClientErr(ss2, rec);
450 } else if (mch.GetErrorCode()) {
451 std::string err = mch.GetErrorMessage();
452 if (err.empty()) {err = "(no error message provided)";}
453 else {std::replace(err.begin(), err.end(), '\n', ' ');}
454 std::stringstream ss2;
455 ss2 << "Error when interacting with local filesystem: " << err;
456 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
457 ss << generateClientErr(ss2, rec);
458 } else if (res != CURLE_OK) {
459 std::stringstream ss2;
460 ss2 << "Request failed when processing";
461 std::stringstream ss3;
462 ss3 << ss2.str() << ":" << curl_easy_strerror(res);
463 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss3.str());
464 ss << generateClientErr(ss2, rec, res);
465 } else if (current_offset != content_size) {
466 std::stringstream ss2;
467 ss2 << "Internal logic error led to early abort; current offset is " <<
468 current_offset << " while full size is " << content_size;
469 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
470 ss << generateClientErr(ss2, rec);
471 } else {
472 if (!handles[0]->Finalize()) {
473 std::stringstream ss2;
474 ss2 << "Failed to finalize and close file handle.";
475 ss << generateClientErr(ss2, rec);
476 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
477 ss2.str());
478 } else {
479 ss << "success: Created";
480 success = true;
481 }
482 }
483
484 if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
485 logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
486 "Failed to send last update to remote client");
487 return retval;
488 } else if (success) {
489 logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
490 rec.status = 0;
491 }
492 return req.ChunkResp(NULL, 0);
493}
494
495
496int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
497 size_t streams, TPCLogRecord &rec)
498{
499 std::vector<ManagedCurlHandle> curl_handles;
500 std::vector<State*> handles;
501 std::stringstream err_ss;
502 try {
503 int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec);
504 for (std::vector<State*>::iterator state_iter = handles.begin();
505 state_iter != handles.end();
506 state_iter++) {
507 delete *state_iter;
508 }
509 return retval;
510 } catch (CurlHandlerSetupError &e) {
511 for (std::vector<State*>::iterator state_iter = handles.begin();
512 state_iter != handles.end();
513 state_iter++) {
514 delete *state_iter;
515 }
516
517 rec.status = 500;
518 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
519 std::stringstream ss;
520 ss << e.what();
521 err_ss << generateClientErr(ss, rec);
522 return req.SendSimpleResp(rec.status, NULL, NULL, e.what(), 0);
523 } catch (std::runtime_error &e) {
524 for (std::vector<State*>::iterator state_iter = handles.begin();
525 state_iter != handles.end();
526 state_iter++) {
527 delete *state_iter;
528 }
529
530 logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
531 std::stringstream ss;
532 ss << e.what();
533 err_ss << generateClientErr(ss, rec);
534 int retval;
535 if ((retval = req.ChunkResp(err_ss.str().c_str(), 0))) {
536 return retval;
537 }
538 return req.ChunkResp(NULL, 0);
539 }
540}
void CURL
#define Duplicate(x, y)
CurlHandlerSetupError(const std::string &msg)
virtual ~CurlHandlerSetupError() noexcept
CURL * GetHandle() const
int GetErrorCode() const
off_t GetContentLength() const
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.