2430 {
2432
2433 std::string checkSumMode;
2434 std::string checkSumType;
2435 std::string checkSumPreset;
2436 std::string zipSource;
2437 uint16_t parallelChunks;
2438 uint32_t chunkSize;
2439 uint64_t blockSize;
2440 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2441 rmOnBadCksum, continue_, zipappend, doserver;
2442 int32_t nbXcpSources;
2443 long long xRate;
2444 long long xRateThreshold;
2445 uint16_t cpTimeout;
2446 std::vector<std::string> addcksums;
2447
2470
2471 if( zip )
2473
2474 if( xcp )
2476
2477 if( force && continue_ )
2479 "Invalid argument combination: continue + force." );
2480
2481 if( zipappend && ( continue_ || force ) )
2483 "Invalid argument combination: ( continue | force ) + zip-append." );
2484
2485
2486
2487
2488 std::unique_ptr<timer_sec_t> cptimer;
2489 if( cpTimeout ) cptimer.reset( new timer_sec_t() );
2490
2491
2492
2493
2494 if( rmOnBadCksum ) posc = true;
2495
2496
2497
2498
2499 if( checkSumType == "auto" )
2500 {
2502 if( checkSumType.empty() )
2504 else
2505 log->Info(
UtilityMsg,
"Using inferred checksum type: %s.", checkSumType.c_str() );
2506 }
2507
2508 if( cptimer && cptimer->elapsed() > cpTimeout )
2510
2511
2512
2513
2514 std::unique_ptr<Source> src;
2515 if( xcp )
2516 src.reset(
new XRootDSourceXCp( &
GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2517 else if( zip )
2518 src.reset(
new XRootDSourceZip( zipSource, &
GetSource(), chunkSize, parallelChunks,
2519 checkSumType, addcksums , doserver) );
2520 else if(
GetSource().GetProtocol() ==
"stdio" )
2521 src.reset( new StdInSource( checkSumType, chunkSize, addcksums ) );
2522 else
2523 {
2524 if( dynamicSource )
2525 src.reset(
new XRootDSourceDynamic( &
GetSource(), chunkSize, checkSumType, addcksums ) );
2526 else
2527 src.reset(
new XRootDSource( &
GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2528 }
2529
2530 XRootDStatus st = src->Initialize();
2531 if( !st.IsOK() ) return SourceError( st );
2532 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2533
2534 if( cptimer && cptimer->elapsed() > cpTimeout )
2536
2537 std::unique_ptr<Destination> dest;
2539
2540 if(
GetTarget().GetProtocol() ==
"stdio" )
2541 dest.reset( new StdOutDestination( checkSumType ) );
2542 else if( zipappend )
2543 {
2545 size_t pos = fn.rfind( '/' );
2546 if( pos != std::string::npos )
2547 fn = fn.substr( pos + 1 );
2548 int64_t size = src->GetSize();
2549 dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *this ) );
2550 }
2551
2552
2553
2554 else
2555 {
2556 if( src->GetSize() >= 0 )
2557 {
2559 std::ostringstream o; o << src->GetSize();
2560 params["oss.asize"] = o.str();
2561 newDestUrl.SetParams( params );
2562
2563 }
2564 dest.reset( new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *this ) );
2565 }
2566
2567 dest->SetForce( force );
2568 dest->SetPOSC( posc );
2569 dest->SetCoerce( coerce );
2570 dest->SetMakeDir( makeDir );
2571 dest->SetContinue( continue_ );
2572 st = dest->Initialize();
2573 if( !st.IsOK() ) return DestinationError( st );
2574
2575 if( cptimer && cptimer->elapsed() > cpTimeout )
2577
2578
2579
2580
2581 if( continue_ )
2582 {
2583 size -= dest->GetSize();
2585 if( !st.
IsOK() )
return SetResult( st );
2586 }
2587
2588 PageInfo pageInfo;
2589 uint64_t total_processed = 0;
2590 uint64_t processed = 0;
2592 uint16_t threshold_interval = parallelChunks;
2593 bool threshold_draining = false;
2594 timer_nsec_t threshold_timer;
2595 while( 1 )
2596 {
2597 st = src->GetChunk( pageInfo );
2599 return SourceError( st);
2600
2602 break;
2603
2604 if( cptimer && cptimer->elapsed() > cpTimeout )
2606
2607 if( xRate )
2608 {
2609 auto elapsed = (
time_nsec() - start ).count();
2610 double transferred = total_processed + pageInfo.GetLength();
2611 double expected = double( xRate ) /
to_nsec( 1 ) * elapsed;
2612
2613
2614
2615
2616 if( elapsed &&
2617 transferred > expected )
2618 {
2619 auto nsec = ( transferred / xRate *
to_nsec( 1 ) ) - elapsed;
2621 }
2622 }
2623
2624 if( xRateThreshold )
2625 {
2626 auto elapsed = threshold_timer.elapsed();
2627 double transferred = processed + pageInfo.GetLength();
2628 double expected = double( xRateThreshold ) /
to_nsec( 1 ) * elapsed;
2629
2630
2631
2632
2633 if( elapsed &&
2634 transferred < expected &&
2635 threshold_interval == 0 )
2636 {
2637 if( !threshold_draining )
2638 {
2639 log->Warning(
UtilityMsg,
"Transfer rate dropped below requested ehreshold,"
2640 " trying different source!" );
2641 XRootDStatus st = src->TryOtherServer();
2643 "The transfer rate dropped below "
2644 "requested threshold!" );
2645 threshold_draining = true;
2646
2647 }
2648 else
2649 {
2650 processed = 0;
2651 threshold_timer.reset();
2652 threshold_interval = parallelChunks;
2653 threshold_draining = false;
2654 }
2655 }
2656
2657 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2658 }
2659
2660 total_processed += pageInfo.GetLength();
2661 processed += pageInfo.GetLength();
2662
2663 st = dest->PutChunk( std::move( pageInfo ) );
2664 if( !st.IsOK() )
2665 {
2667 {
2669 pResults->
Set(
"WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2670 return SetResult( st );
2671 }
2672 return DestinationError( st );
2673 }
2674
2675 if( progress )
2676 {
2677 progress->JobProgress(
pJobId, total_processed, size );
2678 if( progress->ShouldCancel(
pJobId ) )
2680 }
2681 }
2682
2683 st = dest->Flush();
2684 if( !st.IsOK() )
2685 return DestinationError( st );
2686
2687
2688
2689
2691 {
2692 std::vector<xattr_t> xattrs;
2693 st = src->GetXAttr( xattrs );
2694 if( !st.IsOK() ) return SourceError( st );
2695 st = dest->SetXAttr( xattrs );
2696 if( !st.IsOK() ) return DestinationError( st );
2697 }
2698
2699
2700
2701
2702
2703 if( src->GetSize() >= 0 && size != total_processed )
2704 {
2705 log->Error(
UtilityMsg,
"The declared source size is %llu bytes, but "
2706 "received %llu bytes.", (unsigned long long) size, (unsigned long long) total_processed );
2708 }
2710
2711
2712
2713
2714 st = dest->Finalize();
2715 if( !st.IsOK() )
2716 return DestinationError( st );
2717
2718
2719
2720
2721 if( checkSumMode != "none" )
2722 {
2723 log->Debug(
UtilityMsg,
"Attempting checksum calculation, mode: %s.",
2724 checkSumMode.c_str() );
2725 std::string sourceCheckSum;
2726 std::string targetCheckSum;
2727
2728 if( cptimer && cptimer->elapsed() > cpTimeout )
2730
2731
2732
2733
2734 timeval oStart, oEnd;
2735 XRootDStatus st;
2736
2737 if( checkSumMode == "end2end" || checkSumMode == "source" ||
2738 !checkSumPreset.empty() )
2739 {
2740 gettimeofday( &oStart, 0 );
2741 if( !checkSumPreset.empty() )
2742 {
2743 sourceCheckSum = checkSumType + ":";
2745 checkSumPreset );
2746 }
2747 else
2748 {
2749 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2750 }
2751 gettimeofday( &oEnd, 0 );
2752
2753 if( !st.IsOK() )
2754 return SourceError( st );
2755
2756 pResults->
Set(
"sourceCheckSum", sourceCheckSum );
2757 }
2758
2759 if( !addcksums.empty() )
2760 pResults->
Set(
"additionalCkeckSum", src->GetAddCks() );
2761
2762 if( cptimer && cptimer->elapsed() > cpTimeout )
2764
2765
2766
2767
2768 timeval tStart, tEnd;
2769
2770 if( checkSumMode == "end2end" || checkSumMode == "target" )
2771 {
2772 gettimeofday( &tStart, 0 );
2773 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2774 if( !st.IsOK() )
2775 return DestinationError( st );
2776 gettimeofday( &tEnd, 0 );
2777 pResults->
Set(
"targetCheckSum", targetCheckSum );
2778 }
2779
2780 if( cptimer && cptimer->elapsed() > cpTimeout )
2782
2783
2784
2785
2786 auto sanitize_cksum = []( char c )
2787 {
2788 std::locale loc;
2789 if( std::isalpha( c ) ) return std::tolower( c, loc );
2790 return c;
2791 };
2792
2793 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2794 sourceCheckSum.begin(), sanitize_cksum );
2795
2796 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2797 targetCheckSum.begin(), sanitize_cksum );
2798
2799
2800
2801
2802 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2803 {
2804 bool match = false;
2805 if( sourceCheckSum == targetCheckSum )
2806 match = true;
2807
2809 if( mon )
2810 {
2811 Monitor::CheckSumInfo i;
2814 i.cksum = sourceCheckSum;
2817 i.isOK = match;
2819 }
2820
2821 if( !match )
2822 {
2823 if( rmOnBadCksum )
2824 {
2825 FileSystem fs( newDestUrl );
2826 st = fs.Rm( newDestUrl.GetPath() );
2827 if( !st.IsOK() )
2828 log->Error(
UtilityMsg,
"Invalid checksum: failed to remove the target file: %s", st.ToString().c_str() );
2829 else
2830 log->Info(
UtilityMsg,
"Target file removed due to bad checksum!" );
2831 }
2832
2833 st = dest->Finalize();
2834 if( !st.IsOK() )
2835 log->Error(
UtilityMsg,
"Failed to finalize the destination: %s", st.ToString().c_str() );
2836
2838 }
2839
2840 log->Info(
UtilityMsg,
"Checksum verification: succeeded." );
2841 }
2842 }
2843
2844 return SetResult();
2845 }
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
PropertyList * pProperties
static Monitor * GetMonitor()
Get the monitor object.
@ EvCheckSum
CheckSumInfo: File checksummed.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
const std::string & GetPath() const
Get the path.
std::map< std::string, std::string > ParamsMap
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static bool HasXAttr(const XrdCl::URL &url)
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidArgs
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.