XRootD
Loading...
Searching...
No Matches
XrdCephPosix.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN)
3// Author: Sebastien Ponce <sebastien.ponce@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25/*
26 * This interface provides wrapper methods for using ceph through a POSIX API.
27 */
28
29#include <sys/types.h>
30#include <sys/stat.h>
31#include <cerrno>
32#include <fcntl.h>
33#include <unistd.h>
34#include <stdlib.h>
35#include <stdarg.h>
36#include <memory>
37#include <radosstriper/libradosstriper.hpp>
38#include <map>
39#include <stdexcept>
40#include <string>
41#include <sstream>
42#include <sys/xattr.h>
43#include <time.h>
44#include <chrono>
45#include <limits>
46#include <pthread.h>
47
48#include "XrdSfs/XrdSfsAio.hh"
52#include <XrdOss/XrdOss.hh>
53#include "XrdOuc/XrdOucIOVec.hh"
56#include "XrdSfs/XrdSfsFlags.hh" // for the OFFLINE flag status
57
58
61 librados::NObjectIterator m_iterator;
62 librados::IoCtx *m_ioctx;
63};
64
66struct AioArgs {
67 AioArgs(XrdSfsAio* a, AioCB *b, size_t n, int _fd, ceph::bufferlist *_bl=0) :
68 aiop(a), callback(b), nbBytes(n), fd(_fd), bl(_bl) { ::gettimeofday(&startTime, nullptr); }
71 size_t nbBytes;
72 int fd;
73 ::timeval startTime;
74 ceph::bufferlist *bl;
75};
76
80typedef std::map<std::string, libradosstriper::RadosStriper*> StriperDict;
81std::vector<StriperDict> g_radosStripers;
82typedef std::map<std::string, librados::IoCtx*> IOCtxDict;
83std::vector<IOCtxDict> g_ioCtx;
84std::vector<librados::Rados*> g_cluster;
88unsigned int g_cephPoolIdx = 0;
91unsigned int g_cephAioWaitThresh = 15;
95unsigned int g_maxCephPoolIdx = 1;
99
101std::multiset<std::string> g_filesOpenForWrite;
103std::map<unsigned int, CephFileRef> g_fds;
105unsigned int g_nextCephFd = 0;
110
111//JW Counter for number of times a given cluster is resolved.
112std::map<unsigned int, unsigned long long> g_idxCntr;
113
118 if (g_radosStripers.size() == 0) {
119 // make sure we do not have a race condition here
121 // double check now that we have the lock
122 if (g_radosStripers.size() == 0) {
123 // initialization phase : allocate corresponding places in the vectors
124 for (unsigned int i = 0; i < g_maxCephPoolIdx; i++) {
125 g_radosStripers.push_back(StriperDict());
126 g_ioCtx.push_back(IOCtxDict());
127 g_cluster.push_back(0);
128 }
129 }
130 }
131 unsigned int res = g_cephPoolIdx;
132 unsigned nextValue = g_cephPoolIdx+1;
133 if (nextValue >= g_maxCephPoolIdx) {
134 nextValue = 0;
135 }
136 g_cephPoolIdx = nextValue;
137 // JW logging of accesses:
138 ++g_idxCntr[res];
139 return res;
140}
141
143bool isOpenForWrite(std::string& name) {
145 return g_filesOpenForWrite.find(name) != g_filesOpenForWrite.end();
146}
147
151 std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
152 if (it != g_fds.end()) {
153 // We will release the lock upon exiting this function.
154 // The structure here is not protected from deletion, but we trust xrootd to
155 // ensure close (which does the deletion) will not be called before all previous
156 // calls are complete (including the async ones).
157 return &(it->second);
158 } else {
159 return 0;
160 }
161}
162
164void deleteFileRef(int fd, const CephFileRef &fr) {
166 if (fr.flags & (O_WRONLY|O_RDWR)) {
168 }
169 std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
170 if (it != g_fds.end()) {
171 g_fds.erase(it);
172 }
173}
174
181 g_fds[g_nextCephFd] = fr;
182 g_nextCephFd++;
183 if (fr.flags & (O_WRONLY|O_RDWR)) {
184 g_filesOpenForWrite.insert(fr.name);
185 }
186 return g_nextCephFd-1;
187}
188
191 "default", // default pool
192 "admin", // default user
193 1, // default nbStripes
194 4 * 1024 * 1024, // default stripeUnit : 4 MB
195 4 * 1024 * 1024}; // default objectSize : 4 MB
196
197std::string g_defaultUserId = "admin";
198std::string g_defaultPool = "default";
199
201static void (*g_logfunc) (char *, va_list argp) = 0;
202
203static void logwrapper(char* format, ...) {
204 if (0 == g_logfunc) return;
205 va_list arg;
206 va_start(arg, format);
207 (*g_logfunc)(format, arg);
208 va_end(arg);
209}
210
212static unsigned long long int stoull(const std::string &s) {
213 char* end;
214 errno = 0;
215 unsigned long long int res = strtoull(s.c_str(), &end, 10);
216 if (0 != *end) {
217 throw std::invalid_argument(s);
218 }
219 if (ERANGE == errno) {
220 throw std::out_of_range(s);
221 }
222 return res;
223}
224
226static unsigned int stoui(const std::string &s) {
227 char* end;
228 errno = 0;
229 unsigned long int res = strtoul(s.c_str(), &end, 10);
230 if (0 != *end) {
231 throw std::invalid_argument(s);
232 }
233 if (ERANGE == errno || res > std::numeric_limits<unsigned int>::max()) {
234 throw std::out_of_range(s);
235 }
236 return (unsigned int)res;
237}
238
239
240
242 //JW
243 // log the current state of the cluster:
244 // don't want to lock here, so the numbers may not be 100% self-consistent
245 int n_cluster = g_cluster.size();
246 int n_ioCtx = g_ioCtx.size();
247 int n_filesOpenForWrite = g_filesOpenForWrite.size();
248 int n_fds = g_fds.size();
249 int n_stripers = g_radosStripers.size();
250 int n_stripers_pool = 0;
251 for (size_t i = 0; i < g_radosStripers.size(); ++i) {
252 n_stripers_pool += g_radosStripers.at(i).size();
253 }
254 std::stringstream ss;
255 ss << "Counts: " << n_cluster << " " << n_ioCtx << " " << n_filesOpenForWrite << " "
256 << n_fds << " " << n_stripers << " " << n_stripers_pool << " " << n_stripers_pool
257 << " CountsbyCluster: [";
258 for (const auto& el : g_idxCntr) {
259 ss << el.first << ":" << el.second << ", " ;
260 } // it
261 ss<< "], ";
262
263 logwrapper((char*)"dumpClusterInfo : %s", ss.str().c_str());
264}
265
268static int fillCephUserId(const std::string &params, XrdOucEnv *env, CephFile &file) {
269 // default
271 // parsing
272 size_t atPos = params.find('@');
273 if (std::string::npos != atPos) {
274 file.userId = params.substr(0, atPos);
275 return atPos+1;
276 } else {
277 if (0 != env) {
278 char* cuser = env->Get("cephUserId");
279 if (0 != cuser) {
280 file.userId = cuser;
281 }
282 }
283 return 0;
284 }
285}
286
289static int fillCephPool(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
290 // default
292 // parsing
293 size_t comPos = params.find(',', offset);
294 if (std::string::npos == comPos) {
295 if (params.size() == offset) {
296 if (NULL != env) {
297 char* cpool = env->Get("cephPool");
298 if (0 != cpool) {
299 file.pool = cpool;
300 }
301 }
302 } else {
303 file.pool = params.substr(offset);
304 }
305 return params.size();
306 } else {
307 file.pool = params.substr(offset, comPos-offset);
308 return comPos+1;
309 }
310}
311
314// this may raise std::invalid_argument and std::out_of_range
315static int fillCephNbStripes(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
316 // default
318 // parsing
319 size_t comPos = params.find(',', offset);
320 if (std::string::npos == comPos) {
321 if (params.size() == offset) {
322 if (NULL != env) {
323 char* cNbStripes = env->Get("cephNbStripes");
324 if (0 != cNbStripes) {
325 file.nbStripes = stoui(cNbStripes);
326 }
327 }
328 } else {
329 file.nbStripes = stoui(params.substr(offset));
330 }
331 return params.size();
332 } else {
333 file.nbStripes = stoui(params.substr(offset, comPos-offset));
334 return comPos+1;
335 }
336}
337
340// this may raise std::invalid_argument and std::out_of_range
341static int fillCephStripeUnit(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
342 // default
344 // parsing
345 size_t comPos = params.find(',', offset);
346 if (std::string::npos == comPos) {
347 if (params.size() == offset) {
348 if (NULL != env) {
349 char* cStripeUnit = env->Get("cephStripeUnit");
350 if (0 != cStripeUnit) {
351 file.stripeUnit = ::stoull(cStripeUnit);
352 }
353 }
354 } else {
355 file.stripeUnit = ::stoull(params.substr(offset));
356 }
357 return params.size();
358 } else {
359 file.stripeUnit = ::stoull(params.substr(offset, comPos-offset));
360 return comPos+1;
361 }
362}
363
366// this may raise std::invalid_argument and std::out_of_range
367static void fillCephObjectSize(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
368 // default
370 // parsing
371 if (params.size() == offset) {
372 if (NULL != env) {
373 char* cObjectSize = env->Get("cephObjectSize");
374 if (0 != cObjectSize) {
375 file.objectSize = ::stoull(cObjectSize);
376 }
377 }
378 } else {
379 file.objectSize = ::stoull(params.substr(offset));
380 }
381}
382
385void fillCephFileParams(const std::string &params, XrdOucEnv *env, CephFile &file) {
386 // parse the params one by one
387 unsigned int afterUser = fillCephUserId(params, env, file);
388 unsigned int afterPool = fillCephPool(params, afterUser, env, file);
389 unsigned int afterNbStripes = fillCephNbStripes(params, afterPool, env, file);
390 unsigned int afterStripeUnit = fillCephStripeUnit(params, afterNbStripes, env, file);
391 fillCephObjectSize(params, afterStripeUnit, env, file);
392}
393
397void ceph_posix_set_defaults(const char* value) {
398 if (value) {
399 CephFile newdefault;
400 fillCephFileParams(value, NULL, newdefault);
401 g_defaultParams = newdefault;
402 }
403}
404
406void translateFileName(std::string &physName, std::string logName){
407 if (0 != g_namelib) {
408 char physCName[MAXPATHLEN+1];
409 int retc = g_namelib->lfn2pfn(logName.c_str(), physCName, sizeof(physCName));
410 if (retc) {
411 logwrapper((char*)"ceph_namelib : failed to translate %s using namelib plugin, using it as is", logName.c_str());
412 physName = logName;
413 } else {
414 logwrapper((char*)"ceph_namelib : translated %s to %s", logName.c_str(), physCName);
415 physName = physCName;
416 }
417 } else {
418 //logwrapper((char*)"ceph_namelib : No mapping done");
419 physName = logName;
420 }
421}
422
424void fillCephFile(const char *path, XrdOucEnv *env, CephFile &file) {
425 // Syntax of the given path is :
426 // [[userId@]pool[,nbStripes[,stripeUnit[,objectSize]]]:]<actual path>
427 // for the missing parts, if env is not null the entries
428 // cephUserId, cephPool, cephNbStripes, cephStripeUnit, and cephObjectSize
429 // of env will be used.
430 // If env is null or no entry is found for what is missing, defaults are
431 // applied. These defaults are initially set to 'admin', 'default', 1, 4MB and 4MB
432 // but can be changed via a call to ceph_posix_set_defaults
433 std::string spath {path};
434 // If namelib is specified, apply translation to the whole path (which might include pool, etc)
435 translateFileName(spath,path);
436 size_t colonPos = spath.find(':');
437 if (std::string::npos == colonPos) {
438 // deal with name translation
439 file.name = spath;
440 fillCephFileParams("", env, file);
441 } else {
442 file.name = spath.substr(colonPos+1);
443 fillCephFileParams(spath.substr(0, colonPos), env, file);
444 }
445}
446
447static CephFile getCephFile(const char *path, XrdOucEnv *env) {
448 CephFile file;
449 fillCephFile(path, env, file);
450 return file;
451}
452
453static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags,
454 mode_t mode, unsigned long long offset) {
455 CephFileRef fr;
456 fillCephFile(path, env, fr);
457 fr.flags = flags;
458 fr.mode = mode;
459 fr.offset = 0;
460 fr.maxOffsetWritten = 0;
462 fr.bytesWritten = 0;
463 fr.rdcount = 0;
464 fr.wrcount = 0;
465 fr.asyncRdStartCount = 0;
467 fr.asyncWrStartCount = 0;
469 fr.lastAsyncSubmission.tv_sec = 0;
470 fr.lastAsyncSubmission.tv_usec = 0;
471 fr.longestAsyncWriteTime = 0.0l;
473 return fr;
474}
475
476inline librados::Rados* checkAndCreateCluster(unsigned int cephPoolIdx,
477 std::string userId = g_defaultParams.userId) {
478 if (0 == g_cluster[cephPoolIdx]) {
479 // create connection to cluster
480 librados::Rados *cluster = new librados::Rados;
481 if (0 == cluster) {
482 return 0;
483 }
484 int rc = cluster->init(userId.c_str());
485 if (rc) {
486 logwrapper((char*)"checkAndCreateCluster : cluster init failed");
487 delete cluster;
488 return 0;
489 }
490 rc = cluster->conf_read_file(NULL);
491 if (rc) {
492 logwrapper((char*)"checkAndCreateCluster : cluster read config failed, rc = %d", rc);
493 cluster->shutdown();
494 delete cluster;
495 return 0;
496 }
497 cluster->conf_parse_env(NULL);
498 rc = cluster->connect();
499 if (rc) {
500 logwrapper((char*)"checkAndCreateCluster : cluster connect failed, rc = %d", rc);
501 cluster->shutdown();
502 delete cluster;
503 return 0;
504 }
505 g_cluster[cephPoolIdx] = cluster;
506 }
507 return g_cluster[cephPoolIdx];
508}
509
510int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile& file) {
511 StriperDict &sDict = g_radosStripers[cephPoolIdx];
512 StriperDict::iterator it = sDict.find(userAtPool);
513 if (it == sDict.end()) {
514 // we need to create a new radosStriper
515 // Get a cluster
516 librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx, file.userId);
517 if (0 == cluster) {
518 logwrapper((char*)"checkAndCreateStriper : checkAndCreateCluster failed");
519 return 0;
520 }
521 // create IoCtx for our pool
522 librados::IoCtx *ioctx = new librados::IoCtx;
523 if (0 == ioctx) {
524 logwrapper((char*)"checkAndCreateStriper : IoCtx instantiation failed");
525 cluster->shutdown();
526 delete cluster;
527 g_cluster[cephPoolIdx] = 0;
528 return 0;
529 }
530 int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx);
531 if (rc != 0) {
532 logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, rc = %d", rc);
533 cluster->shutdown();
534 delete cluster;
535 g_cluster[cephPoolIdx] = 0;
536 delete ioctx;
537 return 0;
538 }
539 // create RadosStriper connection
540 libradosstriper::RadosStriper *striper = new libradosstriper::RadosStriper;
541 if (0 == striper) {
542 logwrapper((char*)"checkAndCreateStriper : RadosStriper instantiation failed");
543 delete ioctx;
544 cluster->shutdown();
545 delete cluster;
546 g_cluster[cephPoolIdx] = 0;
547 return 0;
548 }
549 rc = libradosstriper::RadosStriper::striper_create(*ioctx, striper);
550 if (rc != 0) {
551 logwrapper((char*)"checkAndCreateStriper : striper_create failed, rc = %d", rc);
552 delete striper;
553 delete ioctx;
554 cluster->shutdown();
555 delete cluster;
556 g_cluster[cephPoolIdx] = 0;
557 return 0;
558 }
559 // setup layout
560 rc = striper->set_object_layout_stripe_count(file.nbStripes);
561 if (rc != 0) {
562 logwrapper((char*)"checkAndCreateStriper : invalid nbStripes %d", file.nbStripes);
563 delete striper;
564 delete ioctx;
565 cluster->shutdown();
566 delete cluster;
567 g_cluster[cephPoolIdx] = 0;
568 return 0;
569 }
570 rc = striper->set_object_layout_stripe_unit(file.stripeUnit);
571 if (rc != 0) {
572 logwrapper((char*)"checkAndCreateStriper : invalid stripeUnit %d (must be non 0, multiple of 64K)", file.stripeUnit);
573 delete striper;
574 delete ioctx;
575 cluster->shutdown();
576 delete cluster;
577 g_cluster[cephPoolIdx] = 0;
578 return 0;
579 }
580 rc = striper->set_object_layout_object_size(file.objectSize);
581 if (rc != 0) {
582 logwrapper((char*)"checkAndCreateStriper : invalid objectSize %d (must be non 0, multiple of stripe_unit)", file.objectSize);
583 delete striper;
584 delete ioctx;
585 cluster->shutdown();
586 delete cluster;
587 g_cluster[cephPoolIdx] = 0;
588 return 0;
589 }
590 IOCtxDict & ioDict = g_ioCtx[cephPoolIdx];
591 ioDict.emplace(userAtPool, ioctx);
592 sDict.emplace(userAtPool, striper);
593 }
594 return 1;
595}
596
597static libradosstriper::RadosStriper* getRadosStriper(const CephFile& file) {
599 std::stringstream ss;
600 ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ','
601 << file.stripeUnit << ',' << file.objectSize;
602 std::string userAtPool = ss.str();
603 unsigned int cephPoolIdx = getCephPoolIdxAndIncrease();
604 if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) {
605 logwrapper((char*)"getRadosStriper : checkAndCreateStriper failed");
606 return 0;
607 }
608 return g_radosStripers[cephPoolIdx][userAtPool];
609}
610
611static librados::IoCtx* getIoCtx(const CephFile& file) {
613 std::stringstream ss;
614 ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ','
615 << file.stripeUnit << ',' << file.objectSize;
616 std::string userAtPool = ss.str();
617 unsigned int cephPoolIdx = getCephPoolIdxAndIncrease();
618 if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) {
619 return 0;
620 }
621 return g_ioCtx[cephPoolIdx][userAtPool];
622}
623
626 for (unsigned int i= 0; i < g_maxCephPoolIdx; i++) {
627 for (StriperDict::iterator it2 = g_radosStripers[i].begin();
628 it2 != g_radosStripers[i].end();
629 it2++) {
630 delete it2->second;
631 }
632 for (IOCtxDict::iterator it2 = g_ioCtx[i].begin();
633 it2 != g_ioCtx[i].end();
634 it2++) {
635 delete it2->second;
636 }
637 delete g_cluster[i];
638 }
639 g_radosStripers.clear();
640 g_ioCtx.clear();
641 g_cluster.clear();
642}
643
644void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp)) {
645 g_logfunc = logfunc;
646};
647
648static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size);
649
664int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode){
665
666 CephFileRef fr = getCephFileRef(pathname, env, flags, mode, 0);
667
668 struct stat buf;
669 libradosstriper::RadosStriper *striper = getRadosStriper(fr); //Get a handle to the RADOS striper API
670 if (NULL == striper) {
671 logwrapper((char*)"Cannot create striper");
672 return -EINVAL;
673 }
674 dumpClusterInfo(); // JW enhanced logging
675
676 int rc = striper->stat(fr.name, (uint64_t*)&(buf.st_size), &(buf.st_atime)); //Get details about a file
677
678
679 bool fileExists = (rc != -ENOENT); //Make clear what condition we are testing
680
681 logwrapper((char*)"Access Mode: %s flags&O_ACCMODE %d ", pathname, flags);
682
683 if ((flags&O_ACCMODE) == O_RDONLY) { // Access mode is READ
684
685 if (fileExists) {
686 librados::bufferlist d_stripeUnit;
687 librados::bufferlist d_objectSize;
688 std::string obj_name;
689 librados::IoCtx *context = getIoCtx(fr);
690
691 // read first stripe of the object for xattr stripe unit and object size
692 // this will fail if the object was not written in stripes e.g. s3
693 // TBD: fallback to direct object (no stripe id appends to filename,
694 // replace striper metadata with corresponding metadata)
695 //
696 try {
697 obj_name = fr.name + std::string(".0000000000000000");
698 } catch (std::bad_alloc&) {
699 logwrapper((char*)"Can not create object string for file %s)", fr.name.c_str());
700 return -ENOMEM;
701 }
702 int ret = 0;
703 ret = context->getxattr(obj_name, "striper.layout.stripe_unit", d_stripeUnit);
704 ret = std::min(ret,context->getxattr(obj_name, "striper.layout.object_size", d_objectSize));
705 //log_func((char*)"size xattr for %s , %llu ,%llu", file_ref->name.c_str(), file_ref->objectSize, file_ref->stripeUnit );
706 if (ret<=0){
707 logwrapper((char*)"Could not find size or stripe_unit xattr for %s", fr.name.c_str());
708 }
709 else{
710 //librados's c_str() method does not return a NULL-terminated string, hence why we need to cleanup here
711 char cleanStripeUnit[MAXDIGITSIZE];
712 char cleanObjectSize[MAXDIGITSIZE];
713 unsigned int stripeUnitLength = std::min((unsigned int)MAXDIGITSIZE-1, d_stripeUnit.length());
714 unsigned int objectSizeLength = std::min((unsigned int)MAXDIGITSIZE-1, d_objectSize.length());
715 (void)strncpy( cleanStripeUnit, d_stripeUnit.c_str(), stripeUnitLength );
716 (void)strncpy( cleanObjectSize, d_objectSize.c_str(), objectSizeLength );
717 cleanStripeUnit[stripeUnitLength] = '\0';
718 cleanObjectSize[objectSizeLength] = '\0';
719 //only change defaults if different
720 if(fr.stripeUnit != std::stoull(cleanStripeUnit)){
721 logwrapper((char*)"WARNING: stripe unit of %s does not match defaults. object size is %s", pathname, cleanStripeUnit);
722 fr.stripeUnit = std::stoull(cleanStripeUnit);
723 }
724 if(fr.objectSize != std::stoull(cleanObjectSize)){
725 logwrapper((char*)"WARNING: object size of %s does not match defaults. object size is %s",pathname, cleanObjectSize);
726 fr.objectSize = std::stoull(cleanObjectSize);
727 }
728 }
729 int fd = insertFileRef(fr);
730 logwrapper((char*)"File descriptor %d associated to file %s opened in read mode", fd, pathname);
731 return fd;
732 } else {
733 return -ENOENT;
734 }
735
736 } else { // Access mode is WRITE
737 if (fileExists) {
738 if (flags & O_TRUNC) {
739 int rc = ceph_posix_unlink(env, pathname);
740 if (rc < 0 && rc != -ENOENT) {
741 return rc;
742 }
743 } else {
744 if (flags & O_EXCL) {
745 return -EACCES; // permission denied
746 } else {
747 return -EEXIST; // otherwise return just file exists
748 }
749 }
750 }
751 // At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it
752 int fd = insertFileRef(fr);
753 logwrapper((char*)"File descriptor %d associated to file %s opened in write mode", fd, pathname);
754 return fd;
755
756 }
757
758}
759
760int ceph_posix_close(int fd) {
761 CephFileRef* fr = getFileRef(fd);
762 if (fr) {
763 ::timeval now;
764 ::gettimeofday(&now, nullptr);
766 double lastAsyncAge = 0.0;
767 // Only compute an age if the starting point was set.
768 if (fr->lastAsyncSubmission.tv_sec && fr->lastAsyncSubmission.tv_usec) {
769 lastAsyncAge = 1.0 * (now.tv_sec - fr->lastAsyncSubmission.tv_sec)
770 + 0.000001 * (now.tv_usec - fr->lastAsyncSubmission.tv_usec);
771 }
772 logwrapper((char*)"ceph_close: closed fd %d for file %s, read ops count %d, write ops count %d, "
773 "async write ops %d/%d, async pending write bytes %ld, "
774 "async read ops %d/%d, bytes written/max offset %ld/%ld, "
775 "longest async write %f, longest callback invocation %f, last async op age %f",
776 fd, fr->name.c_str(), fr->rdcount, fr->wrcount,
779 fr->longestAsyncWriteTime, fr->longestCallbackInvocation, (lastAsyncAge));
780 deleteFileRef(fd, *fr);
781 return 0;
782 } else {
783 return -EBADF;
784 }
785}
786
787static off64_t lseek_compute_offset(CephFileRef &fr, off64_t offset, int whence) {
788 switch (whence) {
789 case SEEK_SET:
790 fr.offset = offset;
791 break;
792 case SEEK_CUR:
793 fr.offset += offset;
794 break;
795 default:
796 return -EINVAL;
797 }
798 return fr.offset;
799}
800
801off_t ceph_posix_lseek(int fd, off_t offset, int whence) {
802 CephFileRef* fr = getFileRef(fd);
803 if (fr) {
804 logwrapper((char*)"ceph_lseek: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
805 return (off_t)lseek_compute_offset(*fr, offset, whence);
806 } else {
807 return -EBADF;
808 }
809}
810
811off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence) {
812 CephFileRef* fr = getFileRef(fd);
813 if (fr) {
814 logwrapper((char*)"ceph_lseek64: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
815 return lseek_compute_offset(*fr, offset, whence);
816 } else {
817 return -EBADF;
818 }
819}
820
821ssize_t ceph_posix_write(int fd, const void *buf, size_t count) {
822 CephFileRef* fr = getFileRef(fd);
823 if (fr) {
824 logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count);
825 if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
826 return -EBADF;
827 }
828 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
829 if (0 == striper) {
830 return -EINVAL;
831 }
832 ceph::bufferlist bl;
833 bl.append((const char*)buf, count);
834 int rc = striper->write(fr->name, bl, count, fr->offset);
835 if (rc) return rc;
836 fr->offset += count;
838 fr->wrcount++;
839 fr->bytesWritten+=count;
840 if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten);
841 return count;
842 } else {
843 return -EBADF;
844 }
845}
846
847ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) {
848 CephFileRef* fr = getFileRef(fd);
849 if (fr) {
850 // TODO implement proper logging level for this plugin - this should be only debug
851 //logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count);
852 if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
853 return -EBADF;
854 }
855 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
856 if (0 == striper) {
857 return -EINVAL;
858 }
859 ceph::bufferlist bl;
860 bl.append((const char*)buf, count);
861 int rc = striper->write(fr->name, bl, count, offset);
862 if (rc) return rc;
864 fr->wrcount++;
865 fr->bytesWritten+=count;
866 if (offset + count) fr->maxOffsetWritten = std::max(uint64_t(offset + count - 1), fr->maxOffsetWritten);
867 return count;
868 } else {
869 return -EBADF;
870 }
871}
872
873static void ceph_aio_write_complete(rados_completion_t c, void *arg) {
874 AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
875 size_t rc = rados_aio_get_return_value(c);
876 // Compute statistics before reportng to xrootd, so that a close cannot happen
877 // in the meantime.
878 CephFileRef* fr = getFileRef(awa->fd);
879 if (fr) {
883 fr->bytesWritten += awa->nbBytes;
884 if (awa->aiop->sfsAio.aio_nbytes)
885 fr->maxOffsetWritten = std::max(fr->maxOffsetWritten, uint64_t(awa->aiop->sfsAio.aio_offset + awa->aiop->sfsAio.aio_nbytes - 1));
886 ::timeval now;
887 ::gettimeofday(&now, nullptr);
888 double writeTime = 0.000001 * (now.tv_usec - awa->startTime.tv_usec) + 1.0 * (now.tv_sec - awa->startTime.tv_sec);
889 fr->longestAsyncWriteTime = std::max(fr->longestAsyncWriteTime, writeTime);
890 }
891 ::timeval before, after;
892 if (fr) ::gettimeofday(&before, nullptr);
893 awa->callback(awa->aiop, rc == 0 ? awa->nbBytes : rc);
894 if (fr) {
895 ::gettimeofday(&after, nullptr);
896 double callbackInvocationTime = 0.000001 * (after.tv_usec - before.tv_usec) + 1.0 * (after.tv_sec - before.tv_sec);
898 fr->longestCallbackInvocation = std::max(fr->longestCallbackInvocation, callbackInvocationTime);
899 }
900 delete(awa);
901}
902
903ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
904 CephFileRef* fr = getFileRef(fd);
905 if (fr) {
906 // get the parameters from the Xroot aio object
907 size_t count = aiop->sfsAio.aio_nbytes;
908 const char *buf = (const char*)aiop->sfsAio.aio_buf;
909 size_t offset = aiop->sfsAio.aio_offset;
910 // TODO implement proper logging level for this plugin - this should be only debug
911 //logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count);
912 if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
913 return -EBADF;
914 }
915 // get the striper object
916 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
917 if (0 == striper) {
918 return -EINVAL;
919 }
920 // prepare a bufferlist around the given buffer
921 ceph::bufferlist bl;
922 bl.append(buf, count);
923 // get the poolIdx to use
924 int cephPoolIdx = getCephPoolIdxAndIncrease();
925 // Get the cluster to use
926 librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
927 if (0 == cluster) {
928 return -EINVAL;
929 }
930 // prepare a ceph AioCompletion object and do async call
931 AioArgs *args = new AioArgs(aiop, cb, count, fd);
932 librados::AioCompletion *completion =
933 cluster->aio_create_completion(args, ceph_aio_write_complete, NULL);
934 // do the write
935 int rc = striper->aio_write(fr->name, completion, bl, count, offset);
936 completion->release();
938 fr->asyncWrStartCount++;
939 ::gettimeofday(&fr->lastAsyncSubmission, nullptr);
940 fr->bytesAsyncWritePending+=count;
941 return rc;
942 } else {
943 return -EBADF;
944 }
945}
946
947ssize_t ceph_nonstriper_readv(int fd, XrdOucIOVec *readV, int n) {
948 CephFileRef* fr = getFileRef(fd);
949 if (fr) {
950 // TODO implement proper logging level for this plugin - this should be only debug
951 //logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
952 if ((fr->flags & O_WRONLY) != 0) {
953 return -EBADF;
954 }
955 if (fr->nbStripes != 1) {
956 //Non-striper based read method works only with a single stripe
957 return -ENOTSUP;
958 }
959
960 ssize_t read_bytes;
961 int rc;
962
963 librados::IoCtx *ioctx = getIoCtx(*fr);
964 if (0 == ioctx) {
965 return -EINVAL;
966 }
967
968 try {
969 //Constructor can throw bad alloc
970 bulkAioRead readOp(ioctx, logwrapper, fr);
971
972 for (int i = 0; i < n; i++) {
973 rc = readOp.read(readV[i].data, readV[i].size, readV[i].offset);
974 if (rc < 0) {
975 logwrapper( (char*)"Can not declare read request\n");
976 return rc;
977 }
978 }
979
980 std::time_t wait_time = std::time(0);
981 rc = readOp.submit_and_wait_for_complete();
982 wait_time = std::time(0) - wait_time;
983 if (wait_time > g_cephAioWaitThresh) {
985 (char*)"Waiting for AIO results in readv for %s took %ld seconds, too long!\n",
986 fr->name.c_str(),
987 wait_time
988 );
989 }
990 if (rc < 0) {
991 logwrapper( (char*)"Can not submit read requests\n");
992 return rc;
993 }
994 read_bytes = readOp.get_results();
996 //We consider readv as a single operation
997 fr->rdcount += 1;
998 return read_bytes;
999 } catch(std::bad_alloc&) {
1000 return -ENOMEM;
1001 }
1002 } else {
1003 return -EBADF;
1004 }
1005}
1006
1007ssize_t ceph_striper_readv(int fd, XrdOucIOVec *readV, int n) {
1011 ssize_t nbytes = 0, curCount = 0;
1012 for (int i=0; i<n; i++) {
1013 curCount = ceph_posix_pread(fd, (void *)readV[i].data, (size_t)readV[i].size, (off_t)readV[i].offset);
1014 if (curCount != readV[i].size) {
1015 if (curCount < 0) return curCount;
1016 return -ESPIPE;
1017 }
1018 nbytes += curCount;
1019 }
1020 return nbytes;
1021}
1022
1023ssize_t ceph_posix_read(int fd, void *buf, size_t count) {
1024 CephFileRef* fr = getFileRef(fd);
1025 if (fr) {
1026 // TODO implement proper logging level for this plugin - this should be only debug
1027 //logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
1028 if ((fr->flags & O_WRONLY) != 0) {
1029 return -EBADF;
1030 }
1031 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
1032 if (0 == striper) {
1033 return -EINVAL;
1034 }
1035 ceph::bufferlist bl;
1036 int rc = striper->read(fr->name, &bl, count, fr->offset);
1037 if (rc < 0) return rc;
1038 bl.begin().copy(rc, (char*)buf);
1039 XrdSysMutexHelper lock(fr->statsMutex);
1040 fr->offset += rc;
1041 fr->rdcount++;
1042 return rc;
1043 } else {
1044 return -EBADF;
1045 }
1046}
1047
1048ssize_t ceph_posix_nonstriper_pread(int fd, void *buf, size_t count, off64_t offset) {
1049 //The same as pread, but do not relies on rados striper library. Uses direct atomic
1050 //reads from ceph object (see BulkAioRead class for details).
1051 CephFileRef* fr = getFileRef(fd);
1052 if (fr) {
1053 // TODO implement proper logging level for this plugin - this should be only debug
1054 //logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
1055 if ((fr->flags & O_WRONLY) != 0) {
1056 return -EBADF;
1057 }
1058 if (fr->nbStripes != 1) {
1059 //Non-striper based read method works only with a single stripe
1060 return -ENOTSUP;
1061 }
1062
1063 int rc;
1064 ssize_t bytes_read;
1065
1066 librados::IoCtx *ioctx = getIoCtx(*fr);
1067 if (0 == ioctx) {
1068 return -EINVAL;
1069 }
1070
1071 try {
1072 //Constructor can throw bad alloc
1073 bulkAioRead readOp(ioctx, logwrapper, fr);
1074 rc = readOp.read(buf, count, offset);
1075 if (rc < 0) {
1076 logwrapper( (char*)"Can not declare read request\n");
1077 return rc;
1078 }
1079 std::time_t wait_time = std::time(0);
1080 rc = readOp.submit_and_wait_for_complete();
1081 wait_time = std::time(0) - wait_time;
1082 if (wait_time > g_cephAioWaitThresh) {
1083 logwrapper(
1084 (char*)"Waiting for AIO results in pread for %s took %ld seconds, too long!\n",
1085 fr->name.c_str(),
1086 wait_time
1087 );
1088 }
1089 if (rc < 0) {
1090 logwrapper( (char*)"Can not submit read request\n");
1091 return rc;
1092 }
1093 bytes_read = readOp.get_results();
1094
1095 if (bytes_read > 0) {
1096 XrdSysMutexHelper lock(fr->statsMutex);
1097 fr->rdcount++;
1098 } else {
1099 logwrapper( (char*)"Error while read: %d\n", bytes_read);
1100 }
1101 return bytes_read;
1102 } catch (std::bad_alloc&) {
1103 return -ENOMEM;
1104 }
1105 } else {
1106 return -EBADF;
1107 }
1108}
1109
1110ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset) {
1111 CephFileRef* fr = getFileRef(fd);
1112 if (fr) {
1113 // TODO implement proper logging level for this plugin - this should be only debug
1114 //logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
1115 if ((fr->flags & O_WRONLY) != 0) {
1116 return -EBADF;
1117 }
1118 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
1119 if (0 == striper) {
1120 return -EINVAL;
1121 }
1122 ceph::bufferlist bl;
1123 int rc = striper->read(fr->name, &bl, count, offset);
1124 if (rc < 0) return rc;
1125 bl.begin().copy(rc, (char*)buf);
1126 XrdSysMutexHelper lock(fr->statsMutex);
1127 fr->rdcount++;
1128 return rc;
1129 } else {
1130 return -EBADF;
1131 }
1132}
1133
1134ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper) {
1135 ssize_t rc {0};
1136 if (!allowStriper) {
1137 rc = ceph_posix_pread(fd,buf,count,offset);
1138 return rc;
1139 }
1140 rc = ceph_posix_nonstriper_pread(fd, buf, count,offset);
1141 if (-ENOENT == rc || -ENOTSUP == rc) {
1142 //This might be a sparse file or nbstripes > 1, so let's try striper read
1143 rc = ceph_posix_pread(fd, buf, count,offset);
1144 if (rc >= 0) {
1145 char err_str[100]; //99 symbols should be enough for the short message
1146 snprintf(err_str, 100, "WARNING! The file (fd %d) seem to be sparse, this is not expected", fd);
1147 logwrapper(err_str);
1148 }
1149 }
1150 return rc;
1151}
1152
1153
1154static void ceph_aio_read_complete(rados_completion_t c, void *arg) {
1155 AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
1156 size_t rc = rados_aio_get_return_value(c);
1157 if (awa->bl) {
1158 if (rc > 0) {
1159 awa->bl->begin().copy(rc, (char*)awa->aiop->sfsAio.aio_buf);
1160 }
1161 delete awa->bl;
1162 awa->bl = 0;
1163 }
1164 // Compute statistics before reportng to xrootd, so that a close cannot happen
1165 // in the meantime.
1166 CephFileRef* fr = getFileRef(awa->fd);
1167 if (fr) {
1168 XrdSysMutexHelper lock(fr->statsMutex);
1170 }
1171 awa->callback(awa->aiop, rc );
1172 delete(awa);
1173}
1174
1175ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) {
1176 CephFileRef* fr = getFileRef(fd);
1177 if (fr) {
1178 // get the parameters from the Xroot aio object
1179 size_t count = aiop->sfsAio.aio_nbytes;
1180 size_t offset = aiop->sfsAio.aio_offset;
1181 // TODO implement proper logging level for this plugin - this should be only debug
1182 //logwrapper((char*)"ceph_aio_read: for fd %d, count=%d", fd, count);
1183 if ((fr->flags & O_WRONLY) != 0) {
1184 return -EBADF;
1185 }
1186 // get the striper object
1187 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
1188 if (0 == striper) {
1189 return -EINVAL;
1190 }
1191 // prepare a bufferlist to receive data
1192 ceph::bufferlist *bl = new ceph::bufferlist();
1193 // get the poolIdx to use
1194 int cephPoolIdx = getCephPoolIdxAndIncrease();
1195 // Get the cluster to use
1196 librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
1197 if (0 == cluster) {
1198 return -EINVAL;
1199 }
1200 // prepare a ceph AioCompletion object and do async call
1201 AioArgs *args = new AioArgs(aiop, cb, count, fd, bl);
1202 librados::AioCompletion *completion =
1203 cluster->aio_create_completion(args, ceph_aio_read_complete, NULL);
1204 // do the read
1205 int rc = striper->aio_read(fr->name, completion, bl, count, offset);
1206 completion->release();
1207 XrdSysMutexHelper lock(fr->statsMutex);
1208 fr->asyncRdStartCount++;
1209 return rc;
1210 } else {
1211 return -EBADF;
1212 }
1213}
1214
1215int ceph_posix_fstat(int fd, struct stat *buf) {
1216 CephFileRef* fr = getFileRef(fd);
1217 if (fr) {
1218 logwrapper((char*)__FUNCTION__,": fd %d", fd);
1219 // minimal stat : only size and times are filled
1220 // atime, mtime and ctime are set all to the same value
1221 // mode is set arbitrarily to 0666 | S_IFREG
1222 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
1223 if (0 == striper) {
1224 logwrapper((char*)"ceph_stat: getRadosStriper failed");
1225 return -EINVAL;
1226 }
1227 memset(buf, 0, sizeof(*buf));
1228 int rc = striper->stat(fr->name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1229 if (rc != 0) {
1230 return -rc;
1231 }
1232 buf->st_dev = 1;
1233 buf->st_ino = 1;
1234 buf->st_mtime = buf->st_atime;
1235 buf->st_ctime = buf->st_atime;
1236 buf->st_mode = 0666 | S_IFREG;
1237 return 0;
1238 } else {
1239 return -EBADF;
1240 }
1241}
1242
1243int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf) {
1244 logwrapper((char*)__FUNCTION__, pathname);
1245 // minimal stat : only size and times are filled
1246 // atime, mtime and ctime are set all to the same value
1247 // mode is set arbitrarily to 0666 | S_IFREG
1248 CephFile file = getCephFile(pathname, env);
1249 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1250 if (0 == striper) {
1251 return -EINVAL;
1252 }
1253 memset(buf, 0, sizeof(*buf));
1254 int rc = striper->stat(file.name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1255 if (rc != 0) {
1256 // for non existing file. Check that we did not open it for write recently
1257 // in that case, we return 0 size and current time
1258 if (-ENOENT == rc && isOpenForWrite(file.name)) {
1259 buf->st_size = 0;
1260 buf->st_atime = time(NULL);
1261 } else {
1262 return -rc;
1263 }
1264 }
1265 // XRootD assumes an 'offline' file if st_dev and st_ino
1266// are zero. Set to non-zero (meaningful) values to avoid this
1267 buf->st_dev = 1;
1268 buf->st_ino = 1;
1269 buf->st_mtime = buf->st_atime;
1270 buf->st_ctime = buf->st_atime;
1271 buf->st_mode = 0666 | S_IFREG;
1272 return 0;
1273}
1274
1275int ceph_posix_fsync(int fd) {
1276 CephFileRef* fr = getFileRef(fd);
1277 if (fr) {
1278 // no locking of fr as it is not used.
1279 logwrapper((char*)"ceph_sync: fd %d", fd);
1280 return 0;
1281 } else {
1282 return -EBADF;
1283 }
1284}
1285
1286int ceph_posix_fcntl(int fd, int cmd, ... /* arg */ ) {
1287 CephFileRef* fr = getFileRef(fd);
1288 if (fr) {
1289 logwrapper((char*)"ceph_fcntl: fd %d cmd=%d", fd, cmd);
1290 // minimal implementation
1291 switch (cmd) {
1292 case F_GETFL:
1293 return fr->mode;
1294 default:
1295 return -EINVAL;
1296 }
1297 } else {
1298 return -EBADF;
1299 }
1300}
1301
1302static ssize_t ceph_posix_internal_getxattr(const CephFile &file, const char* name,
1303 void* value, size_t size) {
1304 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1305 if (0 == striper) {
1306 return -EINVAL;
1307 }
1308 ceph::bufferlist bl;
1309 int rc = striper->getxattr(file.name, name, bl);
1310 if (rc < 0) return rc;
1311 size_t returned_size = (size_t)rc<size?rc:size;
1312 bl.begin().copy(returned_size, (char*)value);
1313 return returned_size;
1314}
1315
1316ssize_t ceph_posix_getxattr(XrdOucEnv* env, const char* path,
1317 const char* name, void* value,
1318 size_t size) {
1319 logwrapper((char*)"ceph_getxattr: path %s name=%s", path, name);
1320 return ceph_posix_internal_getxattr(getCephFile(path, env), name, value, size);
1321}
1322
1323ssize_t ceph_posix_fgetxattr(int fd, const char* name,
1324 void* value, size_t size) {
1325 CephFileRef* fr = getFileRef(fd);
1326 if (fr) {
1327 logwrapper((char*)"ceph_fgetxattr: fd %d name=%s", fd, name);
1328 return ceph_posix_internal_getxattr(*fr, name, value, size);
1329 } else {
1330 return -EBADF;
1331 }
1332}
1333
1334static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char* name,
1335 const void* value, size_t size, int flags) {
1336 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1337 if (0 == striper) {
1338 return -EINVAL;
1339 }
1340 ceph::bufferlist bl;
1341 bl.append((const char*)value, size);
1342 int rc = striper->setxattr(file.name, name, bl);
1343 if (rc) {
1344 return -rc;
1345 }
1346 return 0;
1347}
1348
1349ssize_t ceph_posix_setxattr(XrdOucEnv* env, const char* path,
1350 const char* name, const void* value,
1351 size_t size, int flags) {
1352 logwrapper((char*)"ceph_setxattr: path %s name=%s value=%s", path, name, value);
1353 return ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags);
1354}
1355
1357 const char* name, const void* value,
1358 size_t size, int flags) {
1359 CephFileRef* fr = getFileRef(fd);
1360 if (fr) {
1361 logwrapper((char*)"ceph_fsetxattr: fd %d name=%s value=%s", fd, name, value);
1362 return ceph_posix_internal_setxattr(*fr, name, value, size, flags);
1363 } else {
1364 return -EBADF;
1365 }
1366}
1367
1368static int ceph_posix_internal_removexattr(const CephFile &file, const char* name) {
1369 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1370 if (0 == striper) {
1371 return -EINVAL;
1372 }
1373 int rc = striper->rmxattr(file.name, name);
1374 if (rc) {
1375 return -rc;
1376 }
1377 return 0;
1378}
1379
1380int ceph_posix_removexattr(XrdOucEnv* env, const char* path,
1381 const char* name) {
1382 logwrapper((char*)"ceph_removexattr: path %s name=%s", path, name);
1383 return ceph_posix_internal_removexattr(getCephFile(path, env), name);
1384}
1385
1386int ceph_posix_fremovexattr(int fd, const char* name) {
1387 CephFileRef* fr = getFileRef(fd);
1388 if (fr) {
1389 logwrapper((char*)"ceph_fremovexattr: fd %d name=%s", fd, name);
1390 return ceph_posix_internal_removexattr(*fr, name);
1391 } else {
1392 return -EBADF;
1393 }
1394}
1395
1396static int ceph_posix_internal_listxattrs(const CephFile &file, XrdSysXAttr::AList **aPL, int getSz) {
1397 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1398 if (0 == striper) {
1399 return -EINVAL;
1400 }
1401 // call ceph
1402 std::map<std::string, ceph::bufferlist> attrset;
1403 int rc = striper->getxattrs(file.name, attrset);
1404 if (rc) {
1405 return -rc;
1406 }
1407 // build result
1408 *aPL = 0;
1409 int maxSize = 0;
1410 for (std::map<std::string, ceph::bufferlist>::const_iterator it = attrset.begin();
1411 it != attrset.end();
1412 it++) {
1413 XrdSysXAttr::AList* newItem = (XrdSysXAttr::AList*)malloc(sizeof(XrdSysXAttr::AList)+it->first.size());
1414 newItem->Next = *aPL;
1415 newItem->Vlen = it->second.length();
1416 if (newItem->Vlen > maxSize) {
1417 maxSize = newItem->Vlen;
1418 }
1419 newItem->Nlen = it->first.size();
1420 strncpy(newItem->Name, it->first.c_str(), newItem->Vlen+1);
1421 *aPL = newItem;
1422 }
1423 if (getSz) {
1424 return 0;
1425 } else {
1426 return maxSize;
1427 }
1428}
1429
1430int ceph_posix_listxattrs(XrdOucEnv* env, const char* path, XrdSysXAttr::AList **aPL, int getSz) {
1431 logwrapper((char*)"ceph_listxattrs: path %s", path);
1432 return ceph_posix_internal_listxattrs(getCephFile(path, env), aPL, getSz);
1433}
1434
1435int ceph_posix_flistxattrs(int fd, XrdSysXAttr::AList **aPL, int getSz) {
1436 CephFileRef* fr = getFileRef(fd);
1437 if (fr) {
1438 logwrapper((char*)"ceph_flistxattrs: fd %d", fd);
1439 return ceph_posix_internal_listxattrs(*fr, aPL, getSz);
1440 } else {
1441 return -EBADF;
1442 }
1443}
1444
1446 while (aPL) {
1447 free(aPL->Name);
1448 XrdSysXAttr::AList *cur = aPL;
1449 aPL = aPL->Next;
1450 free(cur);
1451 }
1452}
1453
1454int ceph_posix_statfs(long long *totalSpace, long long *freeSpace) {
1455 logwrapper((char*)"ceph_posix_statfs");
1456 // get the poolIdx to use
1457 int cephPoolIdx = getCephPoolIdxAndIncrease();
1458 // Get the cluster to use
1459 librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
1460 if (0 == cluster) {
1461 return -EINVAL;
1462 }
1463 // call ceph stat
1464 librados::cluster_stat_t result;
1465 int rc = cluster->cluster_stat(result);
1466 if (0 == rc) {
1467 *totalSpace = result.kb * 1024;
1468 *freeSpace = result.kb_avail * 1024;
1469 }
1470 return rc;
1471}
1472
1491int ceph_posix_stat_pool(char const *poolName, long long *usedSpace) {
1492
1493 logwrapper((char*)__FUNCTION__, poolName);
1494 // get the poolIdx to use
1495 int cephPoolIdx = getCephPoolIdxAndIncrease();
1496 librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
1497 if (0 == cluster) {
1498 return -EINVAL;
1499 }
1500
1501 std::list<std::string> poolNames({poolName});
1502 std::map<std::string, librados::pool_stat_t> stat;
1503
1504 if (cluster->get_pool_stats(poolNames, stat) < 0) {
1505
1506 logwrapper((char*)"Unable to get_pool_stats for pool ", poolName);
1507 return -EINVAL;
1508
1509 } else {
1510
1511 *usedSpace = stat[poolName].num_kb * 1024;
1512 return XrdOssOK;
1513
1514 }
1515}
1516
1517static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size) {
1518 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1519 if (0 == striper) {
1520 return -EINVAL;
1521 }
1522 return striper->trunc(file.name, size);
1523}
1524
1525int ceph_posix_ftruncate(int fd, unsigned long long size) {
1526 CephFileRef* fr = getFileRef(fd);
1527 if (fr) {
1528 logwrapper((char*)"ceph_posix_ftruncate: fd %d, size %d", fd, size);
1529 return ceph_posix_internal_truncate(*fr, size);
1530 } else {
1531 return -EBADF;
1532 }
1533}
1534
1535int ceph_posix_truncate(XrdOucEnv* env, const char *pathname, unsigned long long size) {
1536 logwrapper((char*)"ceph_posix_truncate : %s", pathname);
1537 // minimal stat : only size and times are filled
1538 CephFile file = getCephFile(pathname, env);
1539 return ceph_posix_internal_truncate(file, size);
1540}
1541
1542int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) {
1543 logwrapper((char*)"ceph_posix_unlink : %s", pathname);
1544 // start the timer
1545 auto timer_start = std::chrono::steady_clock::now();
1546
1547 // minimal stat : only size and times are filled
1548 CephFile file = getCephFile(pathname, env);
1549 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1550 if (0 == striper) {
1551 return -EINVAL;
1552 }
1553 int rc = striper->remove(file.name);
1554 auto end = std::chrono::steady_clock::now();
1555 auto deltime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - timer_start).count();
1556
1557 if (rc == 0) {
1558 logwrapper((char*)"ceph_posix_unlink : %s unlink successful: %d ms", pathname, deltime_ms);
1559 return 0;
1560 }
1561 if (rc != -EBUSY) {
1562 logwrapper((char*)"ceph_posix_unlink : %s unlink failed: %d ms; return code %d", pathname, deltime_ms, rc);
1563 return rc;
1564 }
1565 // if EBUSY returned, assume the file is locked; so try to remove the lock
1566 logwrapper((char*)"ceph_posix_unlink : unlink failed with -EBUSY %s, now trying to remove lock.", pathname);
1567
1568 // lock name is only exposed in the libradosstriper source file, so hardcode it here.
1569 rc = ceph_posix_internal_removexattr(file, "lock.striper.lock");
1570 if (rc !=0 ) {
1571 logwrapper((char*)"ceph_posix_unlink : unlink rmxattr failed %s, %d", pathname, rc);
1572 return rc;
1573 }
1574
1575 // now try to remove again
1576 rc = striper->remove(file.name);
1577 end = std::chrono::steady_clock::now();
1578 deltime_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - timer_start).count();
1579
1580 if (rc != 0) {
1581 logwrapper((char*)"ceph_posix_unlink : unlink failed after lock removal %s, %d ms", pathname, deltime_ms);
1582 } else {
1583 logwrapper((char*)"ceph_posix_unlink : unlink succeeded after lock removal %s, %d ms", pathname, deltime_ms);
1584 }
1585 return rc;
1586}
1587
1588DIR* ceph_posix_opendir(XrdOucEnv* env, const char *pathname) {
1589 logwrapper((char*)"ceph_posix_opendir : %s", pathname);
1590 // only accept root dir, as there is no concept of dirs in object stores
1591 CephFile file = getCephFile(pathname, env);
1592 if (file.name.size() != 1 || file.name[0] != '/') {
1593 errno = -ENOENT;
1594 return 0;
1595 }
1596 librados::IoCtx *ioctx = getIoCtx(file);
1597 if (0 == ioctx) {
1598 errno = EINVAL;
1599 return 0;
1600 }
1601 DirIterator* res = new DirIterator();
1602 res->m_iterator = ioctx->nobjects_begin();
1603 res->m_ioctx = ioctx;
1604 return (DIR*)res;
1605}
1606
1607int ceph_posix_readdir(DIR *dirp, char *buff, int blen) {
1608 librados::NObjectIterator &iterator = ((DirIterator*)dirp)->m_iterator;
1609 librados::IoCtx *ioctx = ((DirIterator*)dirp)->m_ioctx;
1610 while (iterator->get_oid().compare(iterator->get_oid().size()-17, 17, ".0000000000000000") &&
1611 iterator != ioctx->nobjects_end()) {
1612 iterator++;
1613 }
1614 if (iterator == ioctx->nobjects_end()) {
1615 buff[0] = 0;
1616 } else {
1617 int l = iterator->get_oid().size()-17;
1618 if (l < blen) blen = l;
1619 strncpy(buff, iterator->get_oid().c_str(), blen-1);
1620 buff[blen-1] = 0;
1621 iterator++;
1622 }
1623 return 0;
1624}
1625
1626int ceph_posix_closedir(DIR *dirp) {
1627 delete ((DirIterator*)dirp);
1628 return 0;
1629}
unsigned int g_cephPoolIdx
index of current Striper/IoCtx to be used
static libradosstriper::RadosStriper * getRadosStriper(const CephFile &file)
ssize_t ceph_posix_maybestriper_pread(int fd, void *buf, size_t count, off64_t offset, bool allowStriper)
ssize_t ceph_posix_write(int fd, const void *buf, size_t count)
CephFileRef * getFileRef(int fd)
look for a FileRef from its file descriptor
unsigned int getCephPoolIdxAndIncrease()
void ceph_posix_set_logfunc(void(*logfunc)(char *, va_list argp))
int ceph_posix_truncate(XrdOucEnv *env, const char *pathname, unsigned long long size)
CephFile g_defaultParams
global variable containing defaults for CephFiles
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_unlink(XrdOucEnv *env, const char *pathname)
XrdOucName2Name * g_namelib
void fillCephFileParams(const std::string &params, XrdOucEnv *env, CephFile &file)
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_readdir(DIR *dirp, char *buff, int blen)
std::multiset< std::string > g_filesOpenForWrite
global variable holding a list of files currently opened for write
int ceph_posix_fcntl(int fd, int cmd,...)
static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char *name, const void *value, size_t size, int flags)
ssize_t ceph_posix_getxattr(XrdOucEnv *env, const char *path, const char *name, void *value, size_t size)
std::vector< librados::Rados * > g_cluster
static int fillCephStripeUnit(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
static void ceph_aio_write_complete(rados_completion_t c, void *arg)
static CephFile getCephFile(const char *path, XrdOucEnv *env)
ssize_t ceph_nonstriper_readv(int fd, XrdOucIOVec *readV, int n)
XrdSysMutex g_striper_mutex
mutex protecting the striper and ioctx maps
std::map< std::string, libradosstriper::RadosStriper * > StriperDict
ssize_t ceph_posix_read(int fd, void *buf, size_t count)
static int fillCephPool(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset)
int ceph_posix_listxattrs(XrdOucEnv *env, const char *path, XrdSysXAttr::AList **aPL, int getSz)
int ceph_posix_fstat(int fd, struct stat *buf)
static unsigned int stoui(const std::string &s)
simple integer parsing, to be replaced by std::stoi when C++11 can be used
void ceph_posix_disconnect_all()
int ceph_posix_fsync(int fd)
XrdSysMutex g_init_mutex
mutex protecting initialization of ceph clusters
int ceph_posix_closedir(DIR *dirp)
static int fillCephNbStripes(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
std::map< unsigned int, CephFileRef > g_fds
global variable holding a map of file descriptor to file reference
static librados::IoCtx * getIoCtx(const CephFile &file)
unsigned int g_nextCephFd
global variable remembering the next available file descriptor
static ssize_t ceph_posix_internal_getxattr(const CephFile &file, const char *name, void *value, size_t size)
std::vector< IOCtxDict > g_ioCtx
int insertFileRef(CephFileRef &fr)
DIR * ceph_posix_opendir(XrdOucEnv *env, const char *pathname)
int ceph_posix_statfs(long long *totalSpace, long long *freeSpace)
int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile &file)
unsigned int g_maxCephPoolIdx
static void fillCephObjectSize(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags, mode_t mode, unsigned long long offset)
int ceph_posix_fremovexattr(int fd, const char *name)
librados::Rados * checkAndCreateCluster(unsigned int cephPoolIdx, std::string userId=g_defaultParams.userId)
void translateFileName(std::string &physName, std::string logName)
converts a logical filename to physical one if needed
int ceph_posix_close(int fd)
int ceph_posix_stat_pool(char const *poolName, long long *usedSpace)
Return the amount of space used in a pool.
librados::IoCtx * m_ioctx
std::map< unsigned int, unsigned long long > g_idxCntr
std::map< std::string, librados::IoCtx * > IOCtxDict
off_t ceph_posix_lseek(int fd, off_t offset, int whence)
void ceph_posix_set_defaults(const char *value)
void deleteFileRef(int fd, const CephFileRef &fr)
deletes a FileRef from the global table of file descriptors
int ceph_posix_fsetxattr(int fd, const char *name, const void *value, size_t size, int flags)
static void ceph_aio_read_complete(rados_completion_t c, void *arg)
int ceph_posix_ftruncate(int fd, unsigned long long size)
void dumpClusterInfo()
static int ceph_posix_internal_removexattr(const CephFile &file, const char *name)
std::vector< StriperDict > g_radosStripers
int ceph_posix_open(XrdOucEnv *env, const char *pathname, int flags, mode_t mode)
librados::NObjectIterator m_iterator
ssize_t ceph_posix_nonstriper_pread(int fd, void *buf, size_t count, off64_t offset)
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
static void logwrapper(char *format,...)
int ceph_posix_removexattr(XrdOucEnv *env, const char *path, const char *name)
off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence)
ssize_t ceph_striper_readv(int fd, XrdOucIOVec *readV, int n)
int ceph_posix_stat(XrdOucEnv *env, const char *pathname, struct stat *buf)
bool isOpenForWrite(std::string &name)
check whether a file is open for write
static int fillCephUserId(const std::string &params, XrdOucEnv *env, CephFile &file)
void fillCephFile(const char *path, XrdOucEnv *env, CephFile &file)
fill a ceph file struct from a path and an environment
ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
unsigned int g_cephAioWaitThresh
std::string g_defaultPool
XrdSysMutex g_fd_mutex
mutex protecting the map of file descriptors and the openForWrite multiset
ssize_t ceph_posix_setxattr(XrdOucEnv *env, const char *path, const char *name, const void *value, size_t size, int flags)
static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size)
static void(* g_logfunc)(char *, va_list argp)=0
global variable for the log function
std::string g_defaultUserId
int ceph_posix_flistxattrs(int fd, XrdSysXAttr::AList **aPL, int getSz)
ssize_t ceph_posix_fgetxattr(int fd, const char *name, void *value, size_t size)
static int ceph_posix_internal_listxattrs(const CephFile &file, XrdSysXAttr::AList **aPL, int getSz)
static off64_t lseek_compute_offset(CephFileRef &fr, off64_t offset, int whence)
void ceph_posix_freexattrlist(XrdSysXAttr::AList *aPL)
small struct for directory listing
void() AioCB(XrdSfsAio *, size_t)
#define MAXDIGITSIZE
#define XrdOssOK
Definition XrdOss.hh:50
#define stat(a, b)
Definition XrdPosix.hh:101
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
virtual int lfn2pfn(const char *lfn, char *buff, int blen)=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
char Name[1]
Start of the name (size of struct is dynamic)
int Vlen
The length of the attribute value;.
int Nlen
The length of the attribute name that follows.
AList * Next
-> next element.
int read(void *out_buf, size_t size, off64_t offset)
int submit_and_wait_for_complete()
small struct for aio API callbacks
size_t nbBytes
::timeval startTime
AioCB * callback
AioArgs(XrdSfsAio *a, AioCB *b, size_t n, int _fd, ceph::bufferlist *_bl=0)
XrdSfsAio * aiop
ceph::bufferlist * bl
uint64_t bytesAsyncWritePending
unsigned asyncRdStartCount
unsigned asyncWrStartCount
uint64_t maxOffsetWritten
::timeval lastAsyncSubmission
double longestCallbackInvocation
uint64_t bytesWritten
uint64_t offset
double longestAsyncWriteTime
unsigned wrcount
unsigned rdcount
XrdSysMutex statsMutex
unsigned asyncWrCompletionCount
unsigned asyncRdCompletionCount
small structs to store file metadata
std::string userId
unsigned int nbStripes
std::string pool
unsigned long long stripeUnit
std::string name
unsigned long long objectSize