XRootD
Loading...
Searching...
No Matches
XrdCephPosix.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2014-2015 by European Organization for Nuclear Research (CERN)
3// Author: Sebastien Ponce <sebastien.ponce@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25/*
26 * This interface provides wrapper methods for using ceph through a POSIX API.
27 */
28
29#include <sys/types.h>
30#include <sys/stat.h>
31#include <cerrno>
32#include <fcntl.h>
33#include <unistd.h>
34#include <stdlib.h>
35#include <stdarg.h>
36#include <memory>
37#include <radosstriper/libradosstriper.hpp>
38#include <map>
39#include <stdexcept>
40#include <string>
41#include <sstream>
42#include <sys/xattr.h>
43#include <time.h>
44#include <limits>
45#include <pthread.h>
46#include "XrdSfs/XrdSfsAio.hh"
50
52
54struct CephFile {
55 std::string name;
56 std::string pool;
57 std::string userId;
58 unsigned int nbStripes;
59 unsigned long long stripeUnit;
60 unsigned long long objectSize;
61};
62
64 int flags;
65 mode_t mode;
66 uint64_t offset;
67 // This mutex protects against parallel updates of the stats.
71 uint64_t bytesWritten;
72 unsigned rdcount;
73 unsigned wrcount;
81};
82
85 librados::NObjectIterator m_iterator;
86 librados::IoCtx *m_ioctx;
87};
88
90struct AioArgs {
91 AioArgs(XrdSfsAio* a, AioCB *b, size_t n, int _fd, ceph::bufferlist *_bl=0) :
92 aiop(a), callback(b), nbBytes(n), fd(_fd), bl(_bl) { ::gettimeofday(&startTime, nullptr); }
95 size_t nbBytes;
96 int fd;
97 ::timeval startTime;
98 ceph::bufferlist *bl;
99};
100
104typedef std::map<std::string, libradosstriper::RadosStriper*> StriperDict;
105std::vector<StriperDict> g_radosStripers;
106typedef std::map<std::string, librados::IoCtx*> IOCtxDict;
107std::vector<IOCtxDict> g_ioCtx;
108std::vector<librados::Rados*> g_cluster;
112unsigned int g_cephPoolIdx = 0;
116unsigned int g_maxCephPoolIdx = 1;
120
122std::multiset<std::string> g_filesOpenForWrite;
124std::map<unsigned int, CephFileRef> g_fds;
126unsigned int g_nextCephFd = 0;
131
136 if (g_radosStripers.size() == 0) {
137 // make sure we do not have a race condition here
139 // double check now that we have the lock
140 if (g_radosStripers.size() == 0) {
141 // initialization phase : allocate corresponding places in the vectors
142 for (unsigned int i = 0; i < g_maxCephPoolIdx; i++) {
143 g_radosStripers.push_back(StriperDict());
144 g_ioCtx.push_back(IOCtxDict());
145 g_cluster.push_back(0);
146 }
147 }
148 }
149 unsigned int res = g_cephPoolIdx;
150 unsigned nextValue = g_cephPoolIdx+1;
151 if (nextValue >= g_maxCephPoolIdx) {
152 nextValue = 0;
153 }
154 g_cephPoolIdx = nextValue;
155 return res;
156}
157
159bool isOpenForWrite(std::string& name) {
161 return g_filesOpenForWrite.find(name) != g_filesOpenForWrite.end();
162}
163
167 std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
168 if (it != g_fds.end()) {
169 // We will release the lock upon exiting this function.
170 // The structure here is not protected from deletion, but we trust xrootd to
171 // ensure close (which does the deletion) will not be called before all previous
172 // calls are complete (including the async ones).
173 return &(it->second);
174 } else {
175 return 0;
176 }
177}
178
180void deleteFileRef(int fd, const CephFileRef &fr) {
182 if (fr.flags & (O_WRONLY|O_RDWR)) {
184 }
185 std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
186 if (it != g_fds.end()) {
187 g_fds.erase(it);
188 }
189}
190
197 g_fds[g_nextCephFd] = fr;
198 g_nextCephFd++;
199 if (fr.flags & (O_WRONLY|O_RDWR)) {
200 g_filesOpenForWrite.insert(fr.name);
201 }
202 return g_nextCephFd-1;
203}
204
207 "default", // default pool
208 "admin", // default user
209 1, // default nbStripes
210 4 * 1024 * 1024, // default stripeUnit : 4 MB
211 4 * 1024 * 1024}; // default objectSize : 4 MB
212
213std::string g_defaultUserId = "admin";
214std::string g_defaultPool = "default";
215
217static void (*g_logfunc) (char *, va_list argp) = 0;
218
219static void logwrapper(char* format, ...) {
220 if (0 == g_logfunc) return;
221 va_list arg;
222 va_start(arg, format);
223 (*g_logfunc)(format, arg);
224 va_end(arg);
225}
226
228static unsigned long long int stoull(const std::string &s) {
229 char* end;
230 errno = 0;
231 unsigned long long int res = strtoull(s.c_str(), &end, 10);
232 if (0 != *end) {
233 throw std::invalid_argument(s);
234 }
235 if (ERANGE == errno) {
236 throw std::out_of_range(s);
237 }
238 return res;
239}
240
242static unsigned int stoui(const std::string &s) {
243 char* end;
244 errno = 0;
245 unsigned long int res = strtoul(s.c_str(), &end, 10);
246 if (0 != *end) {
247 throw std::invalid_argument(s);
248 }
249 if (ERANGE == errno || res > std::numeric_limits<unsigned int>::max()) {
250 throw std::out_of_range(s);
251 }
252 return (unsigned int)res;
253}
254
257static int fillCephUserId(const std::string &params, XrdOucEnv *env, CephFile &file) {
258 // default
260 // parsing
261 size_t atPos = params.find('@');
262 if (std::string::npos != atPos) {
263 file.userId = params.substr(0, atPos);
264 return atPos+1;
265 } else {
266 if (0 != env) {
267 char* cuser = env->Get("cephUserId");
268 if (0 != cuser) {
269 file.userId = cuser;
270 }
271 }
272 return 0;
273 }
274}
275
278static int fillCephPool(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
279 // default
281 // parsing
282 size_t comPos = params.find(',', offset);
283 if (std::string::npos == comPos) {
284 if (params.size() == offset) {
285 if (NULL != env) {
286 char* cpool = env->Get("cephPool");
287 if (0 != cpool) {
288 file.pool = cpool;
289 }
290 }
291 } else {
292 file.pool = params.substr(offset);
293 }
294 return params.size();
295 } else {
296 file.pool = params.substr(offset, comPos-offset);
297 return comPos+1;
298 }
299}
300
303// this may raise std::invalid_argument and std::out_of_range
304static int fillCephNbStripes(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
305 // default
307 // parsing
308 size_t comPos = params.find(',', offset);
309 if (std::string::npos == comPos) {
310 if (params.size() == offset) {
311 if (NULL != env) {
312 char* cNbStripes = env->Get("cephNbStripes");
313 if (0 != cNbStripes) {
314 file.nbStripes = stoui(cNbStripes);
315 }
316 }
317 } else {
318 file.nbStripes = stoui(params.substr(offset));
319 }
320 return params.size();
321 } else {
322 file.nbStripes = stoui(params.substr(offset, comPos-offset));
323 return comPos+1;
324 }
325}
326
329// this may raise std::invalid_argument and std::out_of_range
330static int fillCephStripeUnit(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
331 // default
333 // parsing
334 size_t comPos = params.find(',', offset);
335 if (std::string::npos == comPos) {
336 if (params.size() == offset) {
337 if (NULL != env) {
338 char* cStripeUnit = env->Get("cephStripeUnit");
339 if (0 != cStripeUnit) {
340 file.stripeUnit = ::stoull(cStripeUnit);
341 }
342 }
343 } else {
344 file.stripeUnit = ::stoull(params.substr(offset));
345 }
346 return params.size();
347 } else {
348 file.stripeUnit = ::stoull(params.substr(offset, comPos-offset));
349 return comPos+1;
350 }
351}
352
355// this may raise std::invalid_argument and std::out_of_range
356static void fillCephObjectSize(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file) {
357 // default
359 // parsing
360 if (params.size() == offset) {
361 if (NULL != env) {
362 char* cObjectSize = env->Get("cephObjectSize");
363 if (0 != cObjectSize) {
364 file.objectSize = ::stoull(cObjectSize);
365 }
366 }
367 } else {
368 file.objectSize = ::stoull(params.substr(offset));
369 }
370}
371
374void fillCephFileParams(const std::string &params, XrdOucEnv *env, CephFile &file) {
375 // parse the params one by one
376 unsigned int afterUser = fillCephUserId(params, env, file);
377 unsigned int afterPool = fillCephPool(params, afterUser, env, file);
378 unsigned int afterNbStripes = fillCephNbStripes(params, afterPool, env, file);
379 unsigned int afterStripeUnit = fillCephStripeUnit(params, afterNbStripes, env, file);
380 fillCephObjectSize(params, afterStripeUnit, env, file);
381}
382
386void ceph_posix_set_defaults(const char* value) {
387 if (value) {
388 CephFile newdefault;
389 fillCephFileParams(value, NULL, newdefault);
390 g_defaultParams = newdefault;
391 }
392}
393
395void translateFileName(std::string &physName, std::string logName){
396 if (0 != g_namelib) {
397 char physCName[MAXPATHLEN+1];
398 int retc = g_namelib->lfn2pfn(logName.c_str(), physCName, sizeof(physCName));
399 if (retc) {
400 logwrapper((char*)"ceph_namelib : failed to translate %s using namelib plugin, using it as is", logName.c_str());
401 physName = logName;
402 } else {
403 physName = physCName;
404 }
405 } else {
406 physName = logName;
407 }
408}
409
411void fillCephFile(const char *path, XrdOucEnv *env, CephFile &file) {
412 // Syntax of the given path is :
413 // [[userId@]pool[,nbStripes[,stripeUnit[,objectSize]]]:]<actual path>
414 // for the missing parts, if env is not null the entries
415 // cephUserId, cephPool, cephNbStripes, cephStripeUnit, and cephObjectSize
416 // of env will be used.
417 // If env is null or no entry is found for what is missing, defaults are
418 // applied. These defaults are initially set to 'admin', 'default', 1, 4MB and 4MB
419 // but can be changed via a call to ceph_posix_set_defaults
420 std::string spath = path;
421 size_t colonPos = spath.find(':');
422 if (std::string::npos == colonPos) {
423 // deal with name translation
424 translateFileName(file.name, spath);
425 fillCephFileParams("", env, file);
426 } else {
427 translateFileName(file.name, spath.substr(colonPos+1));
428 fillCephFileParams(spath.substr(0, colonPos), env, file);
429 }
430}
431
432static CephFile getCephFile(const char *path, XrdOucEnv *env) {
433 CephFile file;
434 fillCephFile(path, env, file);
435 return file;
436}
437
438static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags,
439 mode_t mode, unsigned long long offset) {
440 CephFileRef fr;
441 fillCephFile(path, env, fr);
442 fr.flags = flags;
443 fr.mode = mode;
444 fr.offset = 0;
445 fr.maxOffsetWritten = 0;
447 fr.bytesWritten = 0;
448 fr.rdcount = 0;
449 fr.wrcount = 0;
450 fr.asyncRdStartCount = 0;
452 fr.asyncWrStartCount = 0;
454 fr.lastAsyncSubmission.tv_sec = 0;
455 fr.lastAsyncSubmission.tv_usec = 0;
456 fr.longestAsyncWriteTime = 0.0l;
458 return fr;
459}
460
461inline librados::Rados* checkAndCreateCluster(unsigned int cephPoolIdx,
462 std::string userId = g_defaultParams.userId) {
463 if (0 == g_cluster[cephPoolIdx]) {
464 // create connection to cluster
465 librados::Rados *cluster = new librados::Rados;
466 if (0 == cluster) {
467 return 0;
468 }
469 int rc = cluster->init(userId.c_str());
470 if (rc) {
471 logwrapper((char*)"checkAndCreateCluster : cluster init failed");
472 delete cluster;
473 return 0;
474 }
475 rc = cluster->conf_read_file(NULL);
476 if (rc) {
477 logwrapper((char*)"checkAndCreateCluster : cluster read config failed, rc = %d", rc);
478 cluster->shutdown();
479 delete cluster;
480 return 0;
481 }
482 cluster->conf_parse_env(NULL);
483 rc = cluster->connect();
484 if (rc) {
485 logwrapper((char*)"checkAndCreateCluster : cluster connect failed, rc = %d", rc);
486 cluster->shutdown();
487 delete cluster;
488 return 0;
489 }
490 g_cluster[cephPoolIdx] = cluster;
491 }
492 return g_cluster[cephPoolIdx];
493}
494
495int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile& file) {
496 StriperDict &sDict = g_radosStripers[cephPoolIdx];
497 StriperDict::iterator it = sDict.find(userAtPool);
498 if (it == sDict.end()) {
499 // we need to create a new radosStriper
500 // Get a cluster
501 librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx, file.userId);
502 if (0 == cluster) {
503 logwrapper((char*)"checkAndCreateStriper : checkAndCreateCluster failed");
504 return 0;
505 }
506 // create IoCtx for our pool
507 librados::IoCtx *ioctx = new librados::IoCtx;
508 if (0 == ioctx) {
509 logwrapper((char*)"checkAndCreateStriper : IoCtx instantiation failed");
510 cluster->shutdown();
511 delete cluster;
512 g_cluster[cephPoolIdx] = 0;
513 return 0;
514 }
515 int rc = g_cluster[cephPoolIdx]->ioctx_create(file.pool.c_str(), *ioctx);
516 if (rc != 0) {
517 logwrapper((char*)"checkAndCreateStriper : ioctx_create failed, rc = %d", rc);
518 cluster->shutdown();
519 delete cluster;
520 g_cluster[cephPoolIdx] = 0;
521 delete ioctx;
522 return 0;
523 }
524 // create RadosStriper connection
525 libradosstriper::RadosStriper *striper = new libradosstriper::RadosStriper;
526 if (0 == striper) {
527 logwrapper((char*)"checkAndCreateStriper : RadosStriper instantiation failed");
528 delete ioctx;
529 cluster->shutdown();
530 delete cluster;
531 g_cluster[cephPoolIdx] = 0;
532 return 0;
533 }
534 rc = libradosstriper::RadosStriper::striper_create(*ioctx, striper);
535 if (rc != 0) {
536 logwrapper((char*)"checkAndCreateStriper : striper_create failed, rc = %d", rc);
537 delete striper;
538 delete ioctx;
539 cluster->shutdown();
540 delete cluster;
541 g_cluster[cephPoolIdx] = 0;
542 return 0;
543 }
544 // setup layout
545 rc = striper->set_object_layout_stripe_count(file.nbStripes);
546 if (rc != 0) {
547 logwrapper((char*)"checkAndCreateStriper : invalid nbStripes %d", file.nbStripes);
548 delete striper;
549 delete ioctx;
550 cluster->shutdown();
551 delete cluster;
552 g_cluster[cephPoolIdx] = 0;
553 return 0;
554 }
555 rc = striper->set_object_layout_stripe_unit(file.stripeUnit);
556 if (rc != 0) {
557 logwrapper((char*)"checkAndCreateStriper : invalid stripeUnit %d (must be non 0, multiple of 64K)", file.stripeUnit);
558 delete striper;
559 delete ioctx;
560 cluster->shutdown();
561 delete cluster;
562 g_cluster[cephPoolIdx] = 0;
563 return 0;
564 }
565 rc = striper->set_object_layout_object_size(file.objectSize);
566 if (rc != 0) {
567 logwrapper((char*)"checkAndCreateStriper : invalid objectSize %d (must be non 0, multiple of stripe_unit)", file.objectSize);
568 delete striper;
569 delete ioctx;
570 cluster->shutdown();
571 delete cluster;
572 g_cluster[cephPoolIdx] = 0;
573 return 0;
574 }
575 IOCtxDict & ioDict = g_ioCtx[cephPoolIdx];
576 ioDict.emplace(userAtPool, ioctx);
577 sDict.emplace(userAtPool, striper);
578 }
579 return 1;
580}
581
582static libradosstriper::RadosStriper* getRadosStriper(const CephFile& file) {
584 std::stringstream ss;
585 ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ','
586 << file.stripeUnit << ',' << file.objectSize;
587 std::string userAtPool = ss.str();
588 unsigned int cephPoolIdx = getCephPoolIdxAndIncrease();
589 if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) {
590 logwrapper((char*)"getRadosStriper : checkAndCreateStriper failed");
591 return 0;
592 }
593 return g_radosStripers[cephPoolIdx][userAtPool];
594}
595
596static librados::IoCtx* getIoCtx(const CephFile& file) {
598 std::stringstream ss;
599 ss << file.userId << '@' << file.pool << ',' << file.nbStripes << ','
600 << file.stripeUnit << ',' << file.objectSize;
601 std::string userAtPool = ss.str();
602 unsigned int cephPoolIdx = getCephPoolIdxAndIncrease();
603 if (checkAndCreateStriper(cephPoolIdx, userAtPool, file) == 0) {
604 return 0;
605 }
606 return g_ioCtx[cephPoolIdx][userAtPool];
607}
608
611 for (unsigned int i= 0; i < g_maxCephPoolIdx; i++) {
612 for (StriperDict::iterator it2 = g_radosStripers[i].begin();
613 it2 != g_radosStripers[i].end();
614 it2++) {
615 delete it2->second;
616 }
617 for (IOCtxDict::iterator it2 = g_ioCtx[i].begin();
618 it2 != g_ioCtx[i].end();
619 it2++) {
620 delete it2->second;
621 }
622 delete g_cluster[i];
623 }
624 g_radosStripers.clear();
625 g_ioCtx.clear();
626 g_cluster.clear();
627}
628
629void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp)) {
630 g_logfunc = logfunc;
631};
632
633static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size);
634
649int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode){
650
651 CephFileRef fr = getCephFileRef(pathname, env, flags, mode, 0);
652
653 struct stat buf;
654 libradosstriper::RadosStriper *striper = getRadosStriper(fr); //Get a handle to the RADOS striper API
655
656 if (NULL == striper) {
657 logwrapper((char*)"Cannot create striper");
658 return -EINVAL;
659 }
660
661 int rc = striper->stat(fr.name, (uint64_t*)&(buf.st_size), &(buf.st_atime)); //Get details about a file
662
663
664 bool fileExists = (rc != -ENOENT); //Make clear what condition we are testing
665
666 if ((flags&O_ACCMODE) == O_RDONLY) { // Access mode is READ
667
668 if (fileExists) {
669 int fd = insertFileRef(fr);
670 logwrapper((char*)"File descriptor %d associated to file %s opened in read mode", fd, pathname);
671 return fd;
672 } else {
673 return -ENOENT;
674 }
675
676 } else { // Access mode is WRITE
677 if (fileExists) {
678 if (flags & O_TRUNC) {
679 int rc = ceph_posix_unlink(env, pathname);
680 if (rc < 0 && rc != -ENOENT) {
681 return rc;
682 }
683 } else {
684 return -EEXIST;
685 }
686 }
687 // At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it
688 int fd = insertFileRef(fr);
689 logwrapper((char*)"File descriptor %d associated to file %s opened in write mode", fd, pathname);
690 return fd;
691
692 }
693
694}
695
696int ceph_posix_close(int fd) {
697 CephFileRef* fr = getFileRef(fd);
698 if (fr) {
699 ::timeval now;
700 ::gettimeofday(&now, nullptr);
702 double lastAsyncAge = 0.0;
703 // Only compute an age if the starting point was set.
704 if (fr->lastAsyncSubmission.tv_sec && fr->lastAsyncSubmission.tv_usec) {
705 lastAsyncAge = 1.0 * (now.tv_sec - fr->lastAsyncSubmission.tv_sec)
706 + 0.000001 * (now.tv_usec - fr->lastAsyncSubmission.tv_usec);
707 }
708 logwrapper((char*)"ceph_close: closed fd %d for file %s, read ops count %d, write ops count %d, "
709 "async write ops %d/%d, async pending write bytes %ld, "
710 "async read ops %d/%d, bytes written/max offset %ld/%ld, "
711 "longest async write %f, longest callback invocation %f, last async op age %f",
712 fd, fr->name.c_str(), fr->rdcount, fr->wrcount,
715 fr->longestAsyncWriteTime, fr->longestCallbackInvocation, (lastAsyncAge));
716 deleteFileRef(fd, *fr);
717 return 0;
718 } else {
719 return -EBADF;
720 }
721}
722
723static off64_t lseek_compute_offset(CephFileRef &fr, off64_t offset, int whence) {
724 switch (whence) {
725 case SEEK_SET:
726 fr.offset = offset;
727 break;
728 case SEEK_CUR:
729 fr.offset += offset;
730 break;
731 default:
732 return -EINVAL;
733 }
734 return fr.offset;
735}
736
737off_t ceph_posix_lseek(int fd, off_t offset, int whence) {
738 CephFileRef* fr = getFileRef(fd);
739 if (fr) {
740 logwrapper((char*)"ceph_lseek: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
741 return (off_t)lseek_compute_offset(*fr, offset, whence);
742 } else {
743 return -EBADF;
744 }
745}
746
747off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence) {
748 CephFileRef* fr = getFileRef(fd);
749 if (fr) {
750 logwrapper((char*)"ceph_lseek64: for fd %d, offset=%lld, whence=%d", fd, offset, whence);
751 return lseek_compute_offset(*fr, offset, whence);
752 } else {
753 return -EBADF;
754 }
755}
756
757ssize_t ceph_posix_write(int fd, const void *buf, size_t count) {
758 CephFileRef* fr = getFileRef(fd);
759 if (fr) {
760 logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count);
761 if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
762 return -EBADF;
763 }
764 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
765 if (0 == striper) {
766 return -EINVAL;
767 }
768 ceph::bufferlist bl;
769 bl.append((const char*)buf, count);
770 int rc = striper->write(fr->name, bl, count, fr->offset);
771 if (rc) return rc;
772 fr->offset += count;
774 fr->wrcount++;
775 fr->bytesWritten+=count;
776 if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten);
777 return count;
778 } else {
779 return -EBADF;
780 }
781}
782
783ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) {
784 CephFileRef* fr = getFileRef(fd);
785 if (fr) {
786 // TODO implement proper logging level for this plugin - this should be only debug
787 //logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count);
788 if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
789 return -EBADF;
790 }
791 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
792 if (0 == striper) {
793 return -EINVAL;
794 }
795 ceph::bufferlist bl;
796 bl.append((const char*)buf, count);
797 int rc = striper->write(fr->name, bl, count, offset);
798 if (rc) return rc;
800 fr->wrcount++;
801 fr->bytesWritten+=count;
802 if (offset + count) fr->maxOffsetWritten = std::max(uint64_t(offset + count - 1), fr->maxOffsetWritten);
803 return count;
804 } else {
805 return -EBADF;
806 }
807}
808
809static void ceph_aio_write_complete(rados_completion_t c, void *arg) {
810 AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
811 size_t rc = rados_aio_get_return_value(c);
812 // Compute statistics before reportng to xrootd, so that a close cannot happen
813 // in the meantime.
814 CephFileRef* fr = getFileRef(awa->fd);
815 if (fr) {
819 fr->bytesWritten += awa->nbBytes;
820 if (awa->aiop->sfsAio.aio_nbytes)
821 fr->maxOffsetWritten = std::max(fr->maxOffsetWritten, uint64_t(awa->aiop->sfsAio.aio_offset + awa->aiop->sfsAio.aio_nbytes - 1));
822 ::timeval now;
823 ::gettimeofday(&now, nullptr);
824 double writeTime = 0.000001 * (now.tv_usec - awa->startTime.tv_usec) + 1.0 * (now.tv_sec - awa->startTime.tv_sec);
825 fr->longestAsyncWriteTime = std::max(fr->longestAsyncWriteTime, writeTime);
826 }
827 ::timeval before, after;
828 if (fr) ::gettimeofday(&before, nullptr);
829 awa->callback(awa->aiop, rc == 0 ? awa->nbBytes : rc);
830 if (fr) {
831 ::gettimeofday(&after, nullptr);
832 double callbackInvocationTime = 0.000001 * (after.tv_usec - before.tv_usec) + 1.0 * (after.tv_sec - before.tv_sec);
834 fr->longestCallbackInvocation = std::max(fr->longestCallbackInvocation, callbackInvocationTime);
835 }
836 delete(awa);
837}
838
839ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
840 CephFileRef* fr = getFileRef(fd);
841 if (fr) {
842 // get the parameters from the Xroot aio object
843 size_t count = aiop->sfsAio.aio_nbytes;
844 const char *buf = (const char*)aiop->sfsAio.aio_buf;
845 size_t offset = aiop->sfsAio.aio_offset;
846 // TODO implement proper logging level for this plugin - this should be only debug
847 //logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count);
848 if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
849 return -EBADF;
850 }
851 // get the striper object
852 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
853 if (0 == striper) {
854 return -EINVAL;
855 }
856 // prepare a bufferlist around the given buffer
857 ceph::bufferlist bl;
858 bl.append(buf, count);
859 // get the poolIdx to use
860 int cephPoolIdx = getCephPoolIdxAndIncrease();
861 // Get the cluster to use
862 librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
863 if (0 == cluster) {
864 return -EINVAL;
865 }
866 // prepare a ceph AioCompletion object and do async call
867 AioArgs *args = new AioArgs(aiop, cb, count, fd);
868 librados::AioCompletion *completion =
869 cluster->aio_create_completion(args, ceph_aio_write_complete, NULL);
870 // do the write
871 int rc = striper->aio_write(fr->name, completion, bl, count, offset);
872 completion->release();
874 fr->asyncWrStartCount++;
875 ::gettimeofday(&fr->lastAsyncSubmission, nullptr);
876 fr->bytesAsyncWritePending+=count;
877 return rc;
878 } else {
879 return -EBADF;
880 }
881}
882
883ssize_t ceph_posix_read(int fd, void *buf, size_t count) {
884 CephFileRef* fr = getFileRef(fd);
885 if (fr) {
886 // TODO implement proper logging level for this plugin - this should be only debug
887 //logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
888 if ((fr->flags & O_WRONLY) != 0) {
889 return -EBADF;
890 }
891 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
892 if (0 == striper) {
893 return -EINVAL;
894 }
895 ceph::bufferlist bl;
896 int rc = striper->read(fr->name, &bl, count, fr->offset);
897 if (rc < 0) return rc;
898 bl.begin().copy(rc, (char*)buf);
900 fr->offset += rc;
901 fr->rdcount++;
902 return rc;
903 } else {
904 return -EBADF;
905 }
906}
907
908ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset) {
909 CephFileRef* fr = getFileRef(fd);
910 if (fr) {
911 // TODO implement proper logging level for this plugin - this should be only debug
912 //logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
913 if ((fr->flags & O_WRONLY) != 0) {
914 return -EBADF;
915 }
916 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
917 if (0 == striper) {
918 return -EINVAL;
919 }
920 ceph::bufferlist bl;
921 int rc = striper->read(fr->name, &bl, count, offset);
922 if (rc < 0) return rc;
923 bl.begin().copy(rc, (char*)buf);
925 fr->rdcount++;
926 return rc;
927 } else {
928 return -EBADF;
929 }
930}
931
932static void ceph_aio_read_complete(rados_completion_t c, void *arg) {
933 AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
934 size_t rc = rados_aio_get_return_value(c);
935 if (awa->bl) {
936 if (rc > 0) {
937 awa->bl->begin().copy(rc, (char*)awa->aiop->sfsAio.aio_buf);
938 }
939 delete awa->bl;
940 awa->bl = 0;
941 }
942 // Compute statistics before reportng to xrootd, so that a close cannot happen
943 // in the meantime.
944 CephFileRef* fr = getFileRef(awa->fd);
945 if (fr) {
948 }
949 awa->callback(awa->aiop, rc );
950 delete(awa);
951}
952
953ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) {
954 CephFileRef* fr = getFileRef(fd);
955 if (fr) {
956 // get the parameters from the Xroot aio object
957 size_t count = aiop->sfsAio.aio_nbytes;
958 size_t offset = aiop->sfsAio.aio_offset;
959 // TODO implement proper logging level for this plugin - this should be only debug
960 //logwrapper((char*)"ceph_aio_read: for fd %d, count=%d", fd, count);
961 if ((fr->flags & O_WRONLY) != 0) {
962 return -EBADF;
963 }
964 // get the striper object
965 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
966 if (0 == striper) {
967 return -EINVAL;
968 }
969 // prepare a bufferlist to receive data
970 ceph::bufferlist *bl = new ceph::bufferlist();
971 // get the poolIdx to use
972 int cephPoolIdx = getCephPoolIdxAndIncrease();
973 // Get the cluster to use
974 librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
975 if (0 == cluster) {
976 return -EINVAL;
977 }
978 // prepare a ceph AioCompletion object and do async call
979 AioArgs *args = new AioArgs(aiop, cb, count, fd, bl);
980 librados::AioCompletion *completion =
981 cluster->aio_create_completion(args, ceph_aio_read_complete, NULL);
982 // do the read
983 int rc = striper->aio_read(fr->name, completion, bl, count, offset);
984 completion->release();
986 fr->asyncRdStartCount++;
987 return rc;
988 } else {
989 return -EBADF;
990 }
991}
992
993int ceph_posix_fstat(int fd, struct stat *buf) {
994 CephFileRef* fr = getFileRef(fd);
995 if (fr) {
996 logwrapper((char*)"ceph_stat: fd %d", fd);
997 // minimal stat : only size and times are filled
998 // atime, mtime and ctime are set all to the same value
999 // mode is set arbitrarily to 0666 | S_IFREG
1000 libradosstriper::RadosStriper *striper = getRadosStriper(*fr);
1001 if (0 == striper) {
1002 logwrapper((char*)"ceph_stat: getRadosStriper failed");
1003 return -EINVAL;
1004 }
1005 memset(buf, 0, sizeof(*buf));
1006 int rc = striper->stat(fr->name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1007 if (rc != 0) {
1008 return -rc;
1009 }
1010 buf->st_mtime = buf->st_atime;
1011 buf->st_ctime = buf->st_atime;
1012 buf->st_mode = 0666 | S_IFREG;
1013 return 0;
1014 } else {
1015 return -EBADF;
1016 }
1017}
1018
1019int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf) {
1020 logwrapper((char*)"ceph_stat: %s", pathname);
1021 // minimal stat : only size and times are filled
1022 // atime, mtime and ctime are set all to the same value
1023 // mode is set arbitrarily to 0666 | S_IFREG
1024 CephFile file = getCephFile(pathname, env);
1025 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1026 if (0 == striper) {
1027 return -EINVAL;
1028 }
1029 memset(buf, 0, sizeof(*buf));
1030 int rc = striper->stat(file.name, (uint64_t*)&(buf->st_size), &(buf->st_atime));
1031 if (rc != 0) {
1032 // for non existing file. Check that we did not open it for write recently
1033 // in that case, we return 0 size and current time
1034 if (-ENOENT == rc && isOpenForWrite(file.name)) {
1035 buf->st_size = 0;
1036 buf->st_atime = time(NULL);
1037 } else {
1038 return -rc;
1039 }
1040 }
1041 buf->st_mtime = buf->st_atime;
1042 buf->st_ctime = buf->st_atime;
1043 buf->st_mode = 0666 | S_IFREG;
1044 return 0;
1045}
1046
1047int ceph_posix_fsync(int fd) {
1048 CephFileRef* fr = getFileRef(fd);
1049 if (fr) {
1050 // no locking of fr as it is not used.
1051 logwrapper((char*)"ceph_sync: fd %d", fd);
1052 return 0;
1053 } else {
1054 return -EBADF;
1055 }
1056}
1057
1058int ceph_posix_fcntl(int fd, int cmd, ... /* arg */ ) {
1059 CephFileRef* fr = getFileRef(fd);
1060 if (fr) {
1061 logwrapper((char*)"ceph_fcntl: fd %d cmd=%d", fd, cmd);
1062 // minimal implementation
1063 switch (cmd) {
1064 case F_GETFL:
1065 return fr->mode;
1066 default:
1067 return -EINVAL;
1068 }
1069 } else {
1070 return -EBADF;
1071 }
1072}
1073
1074static ssize_t ceph_posix_internal_getxattr(const CephFile &file, const char* name,
1075 void* value, size_t size) {
1076 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1077 if (0 == striper) {
1078 return -EINVAL;
1079 }
1080 ceph::bufferlist bl;
1081 int rc = striper->getxattr(file.name, name, bl);
1082 if (rc < 0) return rc;
1083 size_t returned_size = (size_t)rc<size?rc:size;
1084 bl.begin().copy(returned_size, (char*)value);
1085 return returned_size;
1086}
1087
1088ssize_t ceph_posix_getxattr(XrdOucEnv* env, const char* path,
1089 const char* name, void* value,
1090 size_t size) {
1091 logwrapper((char*)"ceph_getxattr: path %s name=%s", path, name);
1092 return ceph_posix_internal_getxattr(getCephFile(path, env), name, value, size);
1093}
1094
1095ssize_t ceph_posix_fgetxattr(int fd, const char* name,
1096 void* value, size_t size) {
1097 CephFileRef* fr = getFileRef(fd);
1098 if (fr) {
1099 logwrapper((char*)"ceph_fgetxattr: fd %d name=%s", fd, name);
1100 return ceph_posix_internal_getxattr(*fr, name, value, size);
1101 } else {
1102 return -EBADF;
1103 }
1104}
1105
1106static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char* name,
1107 const void* value, size_t size, int flags) {
1108 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1109 if (0 == striper) {
1110 return -EINVAL;
1111 }
1112 ceph::bufferlist bl;
1113 bl.append((const char*)value, size);
1114 int rc = striper->setxattr(file.name, name, bl);
1115 if (rc) {
1116 return -rc;
1117 }
1118 return 0;
1119}
1120
1121ssize_t ceph_posix_setxattr(XrdOucEnv* env, const char* path,
1122 const char* name, const void* value,
1123 size_t size, int flags) {
1124 logwrapper((char*)"ceph_setxattr: path %s name=%s value=%s", path, name, value);
1125 return ceph_posix_internal_setxattr(getCephFile(path, env), name, value, size, flags);
1126}
1127
1129 const char* name, const void* value,
1130 size_t size, int flags) {
1131 CephFileRef* fr = getFileRef(fd);
1132 if (fr) {
1133 logwrapper((char*)"ceph_fsetxattr: fd %d name=%s value=%s", fd, name, value);
1134 return ceph_posix_internal_setxattr(*fr, name, value, size, flags);
1135 } else {
1136 return -EBADF;
1137 }
1138}
1139
1140static int ceph_posix_internal_removexattr(const CephFile &file, const char* name) {
1141 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1142 if (0 == striper) {
1143 return -EINVAL;
1144 }
1145 int rc = striper->rmxattr(file.name, name);
1146 if (rc) {
1147 return -rc;
1148 }
1149 return 0;
1150}
1151
1152int ceph_posix_removexattr(XrdOucEnv* env, const char* path,
1153 const char* name) {
1154 logwrapper((char*)"ceph_removexattr: path %s name=%s", path, name);
1155 return ceph_posix_internal_removexattr(getCephFile(path, env), name);
1156}
1157
1158int ceph_posix_fremovexattr(int fd, const char* name) {
1159 CephFileRef* fr = getFileRef(fd);
1160 if (fr) {
1161 logwrapper((char*)"ceph_fremovexattr: fd %d name=%s", fd, name);
1162 return ceph_posix_internal_removexattr(*fr, name);
1163 } else {
1164 return -EBADF;
1165 }
1166}
1167
1168static int ceph_posix_internal_listxattrs(const CephFile &file, XrdSysXAttr::AList **aPL, int getSz) {
1169 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1170 if (0 == striper) {
1171 return -EINVAL;
1172 }
1173 // call ceph
1174 std::map<std::string, ceph::bufferlist> attrset;
1175 int rc = striper->getxattrs(file.name, attrset);
1176 if (rc) {
1177 return -rc;
1178 }
1179 // build result
1180 *aPL = 0;
1181 int maxSize = 0;
1182 for (std::map<std::string, ceph::bufferlist>::const_iterator it = attrset.begin();
1183 it != attrset.end();
1184 it++) {
1185 XrdSysXAttr::AList* newItem = (XrdSysXAttr::AList*)malloc(sizeof(XrdSysXAttr::AList)+it->first.size());
1186 newItem->Next = *aPL;
1187 newItem->Vlen = it->second.length();
1188 if (newItem->Vlen > maxSize) {
1189 maxSize = newItem->Vlen;
1190 }
1191 newItem->Nlen = it->first.size();
1192 strncpy(newItem->Name, it->first.c_str(), newItem->Vlen+1);
1193 *aPL = newItem;
1194 }
1195 if (getSz) {
1196 return 0;
1197 } else {
1198 return maxSize;
1199 }
1200}
1201
1202int ceph_posix_listxattrs(XrdOucEnv* env, const char* path, XrdSysXAttr::AList **aPL, int getSz) {
1203 logwrapper((char*)"ceph_listxattrs: path %s", path);
1204 return ceph_posix_internal_listxattrs(getCephFile(path, env), aPL, getSz);
1205}
1206
1207int ceph_posix_flistxattrs(int fd, XrdSysXAttr::AList **aPL, int getSz) {
1208 CephFileRef* fr = getFileRef(fd);
1209 if (fr) {
1210 logwrapper((char*)"ceph_flistxattrs: fd %d", fd);
1211 return ceph_posix_internal_listxattrs(*fr, aPL, getSz);
1212 } else {
1213 return -EBADF;
1214 }
1215}
1216
1218 while (aPL) {
1219 free(aPL->Name);
1220 XrdSysXAttr::AList *cur = aPL;
1221 aPL = aPL->Next;
1222 free(cur);
1223 }
1224}
1225
1226int ceph_posix_statfs(long long *totalSpace, long long *freeSpace) {
1227 logwrapper((char*)"ceph_posix_statfs");
1228 // get the poolIdx to use
1229 int cephPoolIdx = getCephPoolIdxAndIncrease();
1230 // Get the cluster to use
1231 librados::Rados* cluster = checkAndCreateCluster(cephPoolIdx);
1232 if (0 == cluster) {
1233 return -EINVAL;
1234 }
1235 // call ceph stat
1236 librados::cluster_stat_t result;
1237 int rc = cluster->cluster_stat(result);
1238 if (0 == rc) {
1239 *totalSpace = result.kb * 1024;
1240 *freeSpace = result.kb_avail * 1024;
1241 }
1242 return rc;
1243}
1244
1245static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size) {
1246 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1247 if (0 == striper) {
1248 return -EINVAL;
1249 }
1250 return striper->trunc(file.name, size);
1251}
1252
1253int ceph_posix_ftruncate(int fd, unsigned long long size) {
1254 CephFileRef* fr = getFileRef(fd);
1255 if (fr) {
1256 logwrapper((char*)"ceph_posix_ftruncate: fd %d, size %d", fd, size);
1257 return ceph_posix_internal_truncate(*fr, size);
1258 } else {
1259 return -EBADF;
1260 }
1261}
1262
1263int ceph_posix_truncate(XrdOucEnv* env, const char *pathname, unsigned long long size) {
1264 logwrapper((char*)"ceph_posix_truncate : %s", pathname);
1265 // minimal stat : only size and times are filled
1266 CephFile file = getCephFile(pathname, env);
1267 return ceph_posix_internal_truncate(file, size);
1268}
1269
1270int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) {
1271 logwrapper((char*)"ceph_posix_unlink : %s", pathname);
1272 // minimal stat : only size and times are filled
1273 CephFile file = getCephFile(pathname, env);
1274 libradosstriper::RadosStriper *striper = getRadosStriper(file);
1275 if (0 == striper) {
1276 return -EINVAL;
1277 }
1278 return striper->remove(file.name);
1279}
1280
1281DIR* ceph_posix_opendir(XrdOucEnv* env, const char *pathname) {
1282 logwrapper((char*)"ceph_posix_opendir : %s", pathname);
1283 // only accept root dir, as there is no concept of dirs in object stores
1284 CephFile file = getCephFile(pathname, env);
1285 if (file.name.size() != 1 || file.name[0] != '/') {
1286 errno = -ENOENT;
1287 return 0;
1288 }
1289 librados::IoCtx *ioctx = getIoCtx(file);
1290 if (0 == ioctx) {
1291 errno = EINVAL;
1292 return 0;
1293 }
1294 DirIterator* res = new DirIterator();
1295 res->m_iterator = ioctx->nobjects_begin();
1296 res->m_ioctx = ioctx;
1297 return (DIR*)res;
1298}
1299
1300int ceph_posix_readdir(DIR *dirp, char *buff, int blen) {
1301 librados::NObjectIterator &iterator = ((DirIterator*)dirp)->m_iterator;
1302 librados::IoCtx *ioctx = ((DirIterator*)dirp)->m_ioctx;
1303 while (iterator->get_oid().compare(iterator->get_oid().size()-17, 17, ".0000000000000000") &&
1304 iterator != ioctx->nobjects_end()) {
1305 iterator++;
1306 }
1307 if (iterator == ioctx->nobjects_end()) {
1308 buff[0] = 0;
1309 } else {
1310 int l = iterator->get_oid().size()-17;
1311 if (l < blen) blen = l;
1312 strncpy(buff, iterator->get_oid().c_str(), blen-1);
1313 buff[blen-1] = 0;
1314 iterator++;
1315 }
1316 return 0;
1317}
1318
1319int ceph_posix_closedir(DIR *dirp) {
1320 delete ((DirIterator*)dirp);
1321 return 0;
1322}
unsigned int g_cephPoolIdx
index of current Striper/IoCtx to be used
static libradosstriper::RadosStriper * getRadosStriper(const CephFile &file)
ssize_t ceph_posix_write(int fd, const void *buf, size_t count)
CephFileRef * getFileRef(int fd)
look for a FileRef from its file descriptor
unsigned int getCephPoolIdxAndIncrease()
void ceph_posix_set_logfunc(void(*logfunc)(char *, va_list argp))
int ceph_posix_truncate(XrdOucEnv *env, const char *pathname, unsigned long long size)
CephFile g_defaultParams
global variable containing defaults for CephFiles
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_unlink(XrdOucEnv *env, const char *pathname)
XrdOucName2Name * g_namelib
void fillCephFileParams(const std::string &params, XrdOucEnv *env, CephFile &file)
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb)
int ceph_posix_readdir(DIR *dirp, char *buff, int blen)
std::multiset< std::string > g_filesOpenForWrite
global variable holding a list of files currently opened for write
int ceph_posix_fcntl(int fd, int cmd,...)
static ssize_t ceph_posix_internal_setxattr(const CephFile &file, const char *name, const void *value, size_t size, int flags)
ssize_t ceph_posix_getxattr(XrdOucEnv *env, const char *path, const char *name, void *value, size_t size)
std::vector< librados::Rados * > g_cluster
static int fillCephStripeUnit(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
static void ceph_aio_write_complete(rados_completion_t c, void *arg)
static CephFile getCephFile(const char *path, XrdOucEnv *env)
XrdSysMutex g_striper_mutex
mutex protecting the striper and ioctx maps
std::map< std::string, libradosstriper::RadosStriper * > StriperDict
ssize_t ceph_posix_read(int fd, void *buf, size_t count)
static int fillCephPool(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset)
int ceph_posix_listxattrs(XrdOucEnv *env, const char *path, XrdSysXAttr::AList **aPL, int getSz)
int ceph_posix_fstat(int fd, struct stat *buf)
static unsigned int stoui(const std::string &s)
simple integer parsing, to be replaced by std::stoi when C++11 can be used
void ceph_posix_disconnect_all()
int ceph_posix_fsync(int fd)
XrdSysMutex g_init_mutex
mutex protecting initialization of ceph clusters
int ceph_posix_closedir(DIR *dirp)
static int fillCephNbStripes(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
std::map< unsigned int, CephFileRef > g_fds
global variable holding a map of file descriptor to file reference
static librados::IoCtx * getIoCtx(const CephFile &file)
unsigned int g_nextCephFd
global variable remembering the next available file descriptor
static ssize_t ceph_posix_internal_getxattr(const CephFile &file, const char *name, void *value, size_t size)
std::vector< IOCtxDict > g_ioCtx
int insertFileRef(CephFileRef &fr)
DIR * ceph_posix_opendir(XrdOucEnv *env, const char *pathname)
int ceph_posix_statfs(long long *totalSpace, long long *freeSpace)
int checkAndCreateStriper(unsigned int cephPoolIdx, std::string &userAtPool, const CephFile &file)
unsigned int g_maxCephPoolIdx
static void fillCephObjectSize(const std::string &params, unsigned int offset, XrdOucEnv *env, CephFile &file)
static CephFileRef getCephFileRef(const char *path, XrdOucEnv *env, int flags, mode_t mode, unsigned long long offset)
int ceph_posix_fremovexattr(int fd, const char *name)
librados::Rados * checkAndCreateCluster(unsigned int cephPoolIdx, std::string userId=g_defaultParams.userId)
void translateFileName(std::string &physName, std::string logName)
converts a logical filename to physical one if needed
int ceph_posix_close(int fd)
librados::IoCtx * m_ioctx
std::map< std::string, librados::IoCtx * > IOCtxDict
off_t ceph_posix_lseek(int fd, off_t offset, int whence)
void ceph_posix_set_defaults(const char *value)
void deleteFileRef(int fd, const CephFileRef &fr)
deletes a FileRef from the global table of file descriptors
int ceph_posix_fsetxattr(int fd, const char *name, const void *value, size_t size, int flags)
static void ceph_aio_read_complete(rados_completion_t c, void *arg)
int ceph_posix_ftruncate(int fd, unsigned long long size)
static int ceph_posix_internal_removexattr(const CephFile &file, const char *name)
std::vector< StriperDict > g_radosStripers
int ceph_posix_open(XrdOucEnv *env, const char *pathname, int flags, mode_t mode)
librados::NObjectIterator m_iterator
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
static void logwrapper(char *format,...)
int ceph_posix_removexattr(XrdOucEnv *env, const char *path, const char *name)
off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence)
int ceph_posix_stat(XrdOucEnv *env, const char *pathname, struct stat *buf)
bool isOpenForWrite(std::string &name)
check whether a file is open for write
static int fillCephUserId(const std::string &params, XrdOucEnv *env, CephFile &file)
void fillCephFile(const char *path, XrdOucEnv *env, CephFile &file)
fill a ceph file struct from a path and an environment
ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
std::string g_defaultPool
XrdSysMutex g_fd_mutex
mutex protecting the map of file descriptors and the openForWrite multiset
ssize_t ceph_posix_setxattr(XrdOucEnv *env, const char *path, const char *name, const void *value, size_t size, int flags)
static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size)
static void(* g_logfunc)(char *, va_list argp)=0
global variable for the log function
std::string g_defaultUserId
int ceph_posix_flistxattrs(int fd, XrdSysXAttr::AList **aPL, int getSz)
ssize_t ceph_posix_fgetxattr(int fd, const char *name, void *value, size_t size)
static int ceph_posix_internal_listxattrs(const CephFile &file, XrdSysXAttr::AList **aPL, int getSz)
static off64_t lseek_compute_offset(CephFileRef &fr, off64_t offset, int whence)
void ceph_posix_freexattrlist(XrdSysXAttr::AList *aPL)
small struct for directory listing
void() AioCB(XrdSfsAio *, size_t)
#define stat(a, b)
Definition XrdPosix.hh:96
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
virtual int lfn2pfn(const char *lfn, char *buff, int blen)=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
char Name[1]
Start of the name (size of struct is dynamic)
int Vlen
The length of the attribute value;.
int Nlen
The length of the attribute name that follows.
AList * Next
-> next element.
small struct for aio API callbacks
size_t nbBytes
::timeval startTime
AioCB * callback
AioArgs(XrdSfsAio *a, AioCB *b, size_t n, int _fd, ceph::bufferlist *_bl=0)
XrdSfsAio * aiop
ceph::bufferlist * bl
uint64_t bytesAsyncWritePending
unsigned asyncRdStartCount
unsigned asyncWrStartCount
uint64_t maxOffsetWritten
::timeval lastAsyncSubmission
double longestCallbackInvocation
uint64_t bytesWritten
uint64_t offset
double longestAsyncWriteTime
unsigned wrcount
unsigned rdcount
XrdSysMutex statsMutex
unsigned asyncWrCompletionCount
unsigned asyncRdCompletionCount
small structs to store file metadata
std::string userId
unsigned int nbStripes
std::string pool
unsigned long long stripeUnit
std::string name
unsigned long long objectSize