XRootD
Loading...
Searching...
No Matches
XrdOfsTPCProg.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O f s T P C P r o g . c c */
4/* */
5/* (c) 2011 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <strings.h>
33#include <unistd.h>
34#include <sys/stat.h>
35
37#include "XrdOfs/XrdOfsTPC.hh"
41#include "XrdOfs/XrdOfsTrace.hh"
42#include "XrdOss/XrdOss.hh"
44#include "XrdOuc/XrdOucProg.hh"
45#include "XrdSys/XrdSysError.hh"
46#include "XrdSys/XrdSysFD.hh"
48
50
51/******************************************************************************/
52/* G l o b a l O b j e c t s */
53/******************************************************************************/
54
57extern XrdOss *XrdOfsOss;
58
59namespace XrdOfsTPCParms
60{
61extern XrdOfsTPCConfig Cfg;
62}
63
64using namespace XrdOfsTPCParms;
65
66/******************************************************************************/
67/* S t a t i c V a r i a b l e s */
68/******************************************************************************/
69
70XrdSysMutex XrdOfsTPCProg::pgmMutex;
71XrdOfsTPCProg *XrdOfsTPCProg::pgmIdle = 0;
72
73/******************************************************************************/
74/* E x t e r n a l L i n k a g e s */
75/******************************************************************************/
76
77void *XrdOfsTPCProgRun(void *pp)
78{
79 XrdOfsTPCProg *theProg = (XrdOfsTPCProg *)pp;
80 theProg->Run();
81 return (void *)0;
82}
83
84/******************************************************************************/
85/* L o c a l C l a s s e s */
86/******************************************************************************/
87
88namespace
89{
90class credFile
91{
92public:
93
94char *Path;
95char pEnv[MAXPATHLEN+65];
96
97 credFile(XrdOfsTPCJob *jP)
98 {if (jP->Info.Csz > 0 && jP->Info.Crd && jP->Info.Env)
99 {int n;
100 csMutex.Lock(); n = cSeq++; csMutex.UnLock();
101 snprintf(pEnv, sizeof(pEnv), "%s=%s%s#%d.creds",
102 jP->Info.Env, jP->credPath(), jP->Info.Org, n);
103 Path = index(pEnv,'=')+1;
104 } else Path = 0;
105 }
106
107 ~credFile() {if (Path) unlink(Path);}
108
109private:
110static XrdSysMutex csMutex;
111static int cSeq;
112};
113
114XrdSysMutex credFile::csMutex;
115int credFile::cSeq = 0;
116}
117
118/******************************************************************************/
119/* C o n s t r u c t o r */
120/******************************************************************************/
121
123 : Prog(&OfsEroute, errMon),
124 JobStream(&OfsEroute),
125 Next(Prev), Job(0)
126 {snprintf(Pname, sizeof(Pname), "TPC job %d: ", num);
127 Pname[sizeof(Pname)-1] = 0;
128 }
129
130/******************************************************************************/
131/* E x p o r t C r e d s */
132/******************************************************************************/
133
134int XrdOfsTPCProg::ExportCreds(const char *path)
135{
136static const int oOpts = (O_CREAT | O_TRUNC | O_WRONLY);
137static const mode_t oMode = (S_IRUSR | S_IWUSR);
138
139int fd, rc;
140
141// Open the file as if it were new
142//
143 fd = XrdSysFD_Open(path, oOpts, oMode);
144 if (fd < 0)
145 {rc = errno;
146 OfsEroute.Emsg("TPC", rc, "create credentials file", path);
147 return -rc;
148 }
149
150// Write out the credentials
151//
152 if (write(fd, Job->Info.Crd, Job->Info.Csz) < 0)
153 {rc = errno;
154 OfsEroute.Emsg("TPC", rc, "write credentials file", path);
155 } else rc = 0;
156
157// Close the file and return (we ignore close errors)
158//
159 close(fd);
160 return rc;
161}
162
163/******************************************************************************/
164/* I n i t */
165/******************************************************************************/
166
168{
169 int n;
170
171// Allocate copy program objects
172//
173 for (n = 0; n < Cfg.xfrMax; n++)
174 {pgmIdle = new XrdOfsTPCProg(pgmIdle, n, Cfg.errMon);
175 if (pgmIdle->Prog.Setup(Cfg.XfrProg, &OfsEroute)) return 0;
176 }
177
178// All done
179//
180 Cfg.doEcho = Cfg.doEcho || GTRACE(debug);
181 return 1;
182}
183
184/******************************************************************************/
185/* R u n */
186/******************************************************************************/
187
189{
191 struct stat Stat;
192 const char *clID, *at;
193 char *questSrc, *questLfn, *questDst;
194 int rc;
195 bool isIPv4, doMon = Cfg.tpcMon != 0;
196 char clBuff[592];
197
198// Run the current job and indicate it's ending status and possibly getting a
199// another job to run. Note "Job" will always be valid.
200//
201do{if (doMon)
202 {monInfo.Init();
203 gettimeofday(&monInfo.begT, 0);
204 }
205
206 rc = Xeq(isIPv4);
207
208 if (doMon)
209 {gettimeofday(&monInfo.endT, 0);
210 if ((questSrc = index(Job->Info.Key, '?'))) *questSrc = 0;
211 monInfo.srcURL = Job->Info.Key;
212 if ((questLfn = index(Job->Info.Lfn, '?'))) *questLfn = 0;
213 monInfo.dstURL = Job->Info.Lfn;
214 monInfo.endRC = rc;
215 if (Job->Info.Str) monInfo.strm = Job->Info.Str;
216 if (isIPv4) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
217
218 clID = Job->Info.Org;
219 if (clID && (at = index(clID, '@')) && !index(at+1, '.'))
220 {const char *dName = XrdNetIdentity::Domain();
221 if (dName)
222 {snprintf(clBuff, sizeof(clBuff), "%s%s", clID, dName);
223 clID = clBuff;
224 }
225 }
226 monInfo.clID = clID;
227
228 if ((questDst = index(Job->Info.Dst, '?'))) *questDst = 0;
229 if (!XrdOfsOss->Stat(Job->Info.Dst, &Stat)) monInfo.fSize = Stat.st_size;
230 if (questDst) *questDst = '?';
231 Cfg.tpcMon->Report(monInfo);
232 if (questLfn) *questLfn = '?';
233 if (questSrc) *questSrc = '?';
234 }
235
236 Job = Job->Done(this, eRec, rc);
237
238 } while(Job);
239
240// No more jobs to run. Place us on the idle queue. Upon return this thread
241// will end.
242//
243 pgmMutex.Lock();
244 Next = pgmIdle;
245 pgmIdle = this;
246 pgmMutex.UnLock();
247}
248
249/******************************************************************************/
250/* S t a r t */
251/******************************************************************************/
252
254{
255 XrdSysMutexHelper pgmMon(&pgmMutex);
256 XrdOfsTPCProg *pgmP;
257 pthread_t tid;
258
259// Get a new program object, if none left, tell the caller to try later
260//
261 if (!(pgmP = pgmIdle)) {rc = 0; return 0;}
262 pgmP->Job = jP;
263
264// Start a thread to run the job
265//
266 if ((rc = XrdSysThread::Run(&tid, XrdOfsTPCProgRun, (void *)pgmP, 0,
267 "TPC job")))
268 return 0;
269
270// We are all set, return the program being used
271//
272 pgmIdle = pgmP->Next;
273 return pgmP;
274}
275
276/******************************************************************************/
277/* X e q */
278/******************************************************************************/
279
280int XrdOfsTPCProg::Xeq(bool &isIPv4)
281{
282 EPNAME("Xeq");
283 credFile cFile(Job);
284 const char *Args[6], *eVec[6], **envArg;
285 char *lP, *Colon, *cksVal, sBuff[8], *tident = Job->Info.Org;
286 char *Quest = index(Job->Info.Key, '?');
287 int i, rc, aNum = 0;
288
289// If we have credentials, write them out to a file
290//
291 if (cFile.Path && (rc = ExportCreds(cFile.Path)))
292 {strcpy(eRec, "Copy failed; unable to pass credentials.");
293 return rc;
294 }
295
296// Echo out what we are doing if so desired
297//
298 if (Cfg.doEcho)
299 {if (Quest) *Quest = 0;
300 OfsEroute.Say(Pname,tident," copying ",Job->Info.Key," to ",Job->Info.Dst);
301 if (Quest) *Quest = '?';
302 }
303
304// Determine checksum option
305//
306 cksVal = (Job->Info.Cks ? Job->Info.Cks : Cfg.cksType);
307 if (cksVal)
308 {Args[aNum++] = "-C";
309 Args[aNum++] = cksVal;
310 }
311
312// Set streams option if need be
313//
314 if (Job->Info.Str)
315 {sprintf(sBuff, "%d", static_cast<int>(Job->Info.Str));
316 Args[aNum++] = "-S";
317 Args[aNum++] = sBuff;
318 }
319
320// Set remaining arguments
321//
322 Args[aNum++] = Job->Info.Key;
323 Args[aNum++] = Job->Info.Dst;
324
325// Always export the trace identifier of the original issuer
326//
327 char tidBuff[512];
328 snprintf(tidBuff, sizeof(tidBuff), "XRD_TIDENT=%s", tident);
329 eVec[0] = tidBuff;
330 envArg = eVec;
331 i = 1;
332
333// Export source protocol if present
334//
335 char sprBuff[128];
336 if (Job->Info.Spr)
337 {snprintf(sprBuff, sizeof(sprBuff), "XRDTPC_SPROT=%s", Job->Info.Spr);
338 eVec[i++] = sprBuff;
339 }
340
341// Export target protocol if present
342//
343 char tprBuff[128];
344 if (Job->Info.Tpr)
345 {snprintf(tprBuff, sizeof(tprBuff), "XRDTPC_TPROT=%s", Job->Info.Tpr);
346 eVec[i++] = tprBuff;
347 }
348
349// If we need to reproxy, export the path
350//
351 char rpxBuff[1024];
352 if (Job->Info.Rpx)
353 {snprintf(rpxBuff, sizeof(rpxBuff), "XRD_CPTARGET=%s", Job->Info.Rpx);
354 eVec[i++] = rpxBuff;
355 }
356
357// Determine if credentials are being passed, If so, pass where it is.
358//
359 if (cFile.Path) eVec[i++] = cFile.pEnv;
360 eVec[i] = 0;
361
362// Start the job.
363//
364 if ((rc = Prog.Run(&JobStream, Args, aNum, envArg)))
365 {strcpy(eRec, "Copy failed; unable to start job.");
366 OfsEroute.Emsg("TPC", Job->Info.Org, Job->Info.Lfn, eRec);
367 return rc;
368 }
369
370// Now we drain the output looking for an end of run line. This line should
371// be printed as an error message should the copy fail.
372//
373 *eRec = 0;
374 isIPv4 = false;
375 while((lP = JobStream.GetLine()))
376 {if (!strcmp(lP, "!-!IPv4")) isIPv4 = true;
377 if ((Colon = index(lP, ':')) && *(Colon+1) == ' ')
378 {strncpy(eRec, Colon+2, sizeof(eRec)-1);
379 eRec[sizeof(eRec)-1] = 0;
380 }
381 if (Cfg.doEcho && *lP) OfsEroute.Say(Pname, lP);
382 }
383
384// The job has completed. So, we must get the ending status.
385//
386 if ((rc = Prog.RunDone(JobStream)) < 0) rc = -rc;
387 DEBUG(Pname <<"ended with rc=" <<rc);
388
389// Check if we should generate a message
390//
391 if (rc && !(*eRec)) sprintf(eRec, "Copy failed with return code %d", rc);
392
393// Log failures and optionally remove the file (Info would do that as well
394// but much later on, so we do it now).
395//
396 if (rc)
397 {OfsEroute.Emsg("TPC", Job->Info.Org, Job->Info.Lfn, eRec);
398 if (Cfg.autoRM) XrdOfsOss->Unlink(Job->Info.Lfn);
399 } else Job->Info.Success();
400
401// All done
402//
403 return rc;
404}
#define tident
#define DEBUG(x)
#define EPNAME(x)
#define GTRACE(act)
struct stat Stat
Definition XrdCks.cc:49
XrdOss * XrdOfsOss
Definition XrdOfs.cc:163
XrdSysTrace OfsTrace
void * XrdOfsTPCProgRun(void *pp)
XrdSysError OfsEroute
XrdSysError OfsEroute(0)
XrdOss * XrdOfsOss
Definition XrdOfs.cc:163
#define close(a)
Definition XrdPosix.hh:48
#define write(a, b, c)
Definition XrdPosix.hh:115
#define unlink(a)
Definition XrdPosix.hh:113
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString Path
static const char * Domain(const char **eText=0)
const char * Env
XrdOfsTPCJob * Done(XrdOfsTPCProg *pgmP, const char *eTxt, int rc)
static XrdOfsTPCProg * Start(XrdOfsTPCJob *jP, int &rc)
int Xeq(bool &isIPv4)
XrdOfsTPCProg(XrdOfsTPCProg *Prev, int num, int errMon)
static int Init()
static const char * credPath()
Definition XrdOfsTPC.hh:77
XrdOfsTPCInfo Info
Definition XrdOfsTPC.hh:109
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
int RunDone(XrdOucStream &cmd) const
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
char * GetLine()
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
void Report(TpcInfo &info)
XrdOfsTPCConfig Cfg
Definition XrdOfsTPC.cc:85
XrdXrootdTpcMon * tpcMon