GridFTP transfer with Globus

In order to serve your special requests on pool extensions, we use the globus tool kit which allows us to use the GridFTP server. GridFTP enables secure, efficient and fault-tolerant large volume data transfer.

The transfer process is initiated with a request submitted by you. This request should contain the dataset_ids you would like to have replicated. Open an issue in the gitlab repository and make available a dictionary of the form:

dataset_id : { "data_node": ESGFNODE,
               "globus_url": URL
             }

After a review, a CDP manager will start the replication.

In this notebook, you learn - how to generate this request dictionary based on pyesgf - how this dictionary is converted for file based globus transfer - a scheme of how globus transfer is started

When using in appmode, you can create a replication request yourself. Therefore, all defined functions are executed at the end.

Replication request creation

  1. You set up a serach request with specifications for CMIP6 facets

  2. The esgf-index is searched with pyesgf for request matches with retracted=False, latest=True as default keyword arguments.

  3. Either globus_urls or download_urls are saved in a dictionary which will serve as first step towards globus transfer

  1. You can specify all valid search facets i.e. CMIP6 attributes for your request in a dictionary datasets_to_download. In appmode, we allow to set 5 different search facets. Please also provide a username which is used as a prefix for the output files.

import ipywidgets as widgets
user=widgets.Text(description="Username")
user
Text(value='', description='Username')
tab_contents = ['experiment_id', 'source_id', 'member_id', 'table_id', 'variable_id']
children = [widgets.Text(description=name) for name in tab_contents]
tab = widgets.Tab()
tab.children = children
[tab.set_title(i, title) for i, title in enumerate(tab_contents)]
tab
#            "source_id":"E3SM-1-0",
#            "member_id":"r1i1p1f1",
#            "table_id":"fx",
#            "variable_id":"sftlf",
Tab(children=(Text(value='', description='experiment_id'), Text(value='', description='source_id'), Text(value…
  1. We use the Lorence Livermoore ESGF node as the search node as it is the most stable node. We want to search through all ESGF nodes so we set distrib=True as a keyword argument.

from pyesgf.search import SearchConnection
#index_search_url = 'http://esgf-node.llnl.gov/esg-search'
index_search_url = 'http://esgf-data.dkrz.de/esg-search'
#conn = SearchConnection(index_search_url, distrib=True)

We define globus endpoints that are not useable in a list to sort them out.

noGoodEndpoints=["cmip.bcc.cma.cn",
                 "vesg.ipsl.upmc.fr",
                 "gfdl.noaa.gov",
                 "esgf-data.ucar.edu"]
  1. We have to do two things:

  • collect_instances which are identifiers for available datasets in the ESGF federation. These have to be valid (retracted=False, latest=True)

  • collect_globusURLs which is done by searching an ESGF node which provides a globus URL for each instance.

  • save_results_as_json which is pretty self-explanatory

def collect_instances(searchOptions):
    instances=[]
    print("Connect to esgf index...")
    conn = SearchConnection(index_search_url, distrib=True)
    ctx=conn.new_context(**searchOptions,
                         retracted=False, latest=True)
    recentctx=ctx.search()
    for dset in recentctx:
        instances.append(dset.json["instance_id"])
    instances=list(set(instances))
    return instances
from tqdm import tqdm

def collect_globusURLs(instances):
    globusDict={}
    print("Connect to esgf index...")
    conn = SearchConnection(index_search_url, distrib=True)
    for instance in tqdm(instances):
        globusDict.setdefault(instance, {})
        try:
            ctx = conn.new_context(instance_id=instance)
            recentctx=ctx.search()
            for dataset in recentctx :
                if "globus_url" in globusDict[instance] :
                    continue
                if dataset.json["data_node"] in noGoodEndpoints:
                    continue
                try:
                    files = dataset.file_context().search()
                    if len(files) == 0 :
                        globusDict[instance]=""
                    else:
                        if files[0].globus_url :
                            globusDict[instance]={
                            "data_node":files[0].json["data_node"],
                            "globus_url":'/'.join(files[0].globus_url.split('/')[0:-1])
                            }
                        else :
                            downloadUrls=[]
                            for file in files:
                                downloadUrls.append(file.download_url)
                            globusDict[instance] = {"download_url": downloadUrls}
                except:
                    print("File search failed")
                    continue
        except:
            print("Context failed")
            continue
    return globusDict
import json

def save_results_as_json(globusDict, result_file):
    with open(result_file, 'w', encoding='utf-8') as f:
        json.dump(globusDict, f, ensure_ascii=False, indent=4)
    print("You successfully created a request for replication.")

Dictionary conversion for globus input

We aim at file based recursive gloubus data transfer on dataset level. This requires the generation of inputFiles organized such that each one contains only urls for one globus endpoint i.e. GridFTP server at another ESGF node. One line in those files corresponds to one dataset.

Therefore, by using pandas and re, 1. the dictionary is sorted by endpoints 2. the source and destinations are defined in each line 3. special configurations of specific end points are considered

import pandas as pd
import re

def convert_dict_to_globus_input(result_file, result_trunk):
    requestDict = pd.read_json(result_file, orient="index")
    groups=requestDict.groupby("data_node")

    for groupname, group in groups:
        if "dkrz" in groupname:
            print(str(len(group))+" dataset(s) already published and available at DKRZ. These are skipped")
            continue
        resultDf=pd.DataFrame(columns=['line_in_globus_input'])
        #
        urls = [group["globus_url"]] if isinstance(group["globus_url"], str) else group["globus_url"]
        endpoint=urls[0].split('/')[0].split(':')[1]
        for url in urls:
            globusPath=url.split('/')
            #
            try:
                startDir=next(i for i,v in enumerate(globusPath) if v.lower() == 'cmip6')
            except:
                print("Globus URL cannot be used because 'cmip6' is not within: "+url)
            #EndDir is the version:
            endDirs=[i for i, item in enumerate(globusPath) if re.search(r'v\d', item)]
            if len(endDirs) != 1 :
                print("Globus URL cannot be used because a valid version (v%d) is not within: "+url)
                break
            endDir=endDirs[0]+1

            # ceda's esgf node is differently structured:
            if "ceda" in groupname.lower() :
                endDir-=1

            destTrunk='/'.join(globusPath[startDir:endDir])
            sourceForRecursive='/'.join(globusPath[1:endDir])
            line_in_globus_input="--recursive /"+sourceForRecursive+" /~/mnt/lustre02/work/ik1017/Ingest/requests/"+result_trunk+"/"+destTrunk
            resultDf=resultDf.append({'line_in_globus_input': line_in_globus_input}, ignore_index=True)
        resultDfName=result_file+"_"+groupname+"_"+endpoint+".csv"
        resultDf["line_in_globus_input"].to_csv(resultDfName, sep='\n', index=False, header=False)
import uuid

button = widgets.Button(description="Create replication request!")
buttondisplay = widgets.Output()
display(button, buttondisplay)

def collect_and_validate_facets_input():
    datasets_to_download={}
    for i in range(len(tab_contents)):
        if tab.children[i].value != "" and tab.children[i].value != None:
            datasets_to_download[tab_contents[i]]=tab.children[i].value
    if datasets_to_download == {} or len(datasets_to_download.keys()) < 3:
        raise ValueError("Please specify at least three search facets")
    return datasets_to_download

def set_and_validate_user():
    username = user.value
    if username == "" or username == None :
        raise ValueError("Please specify a Username")
    return username

def on_button_clicked(b):
    with buttondisplay:
        button.disabled=True
        datasets_to_download=collect_and_validate_facets_input()
        username=set_and_validate_user()
        request_id=str(uuid.uuid4())
        print("Collecting instances...")
        instances=collect_instances(datasets_to_download)
        print("Collecting globus URLs...")
        globusDict=collect_globusURLs(instances)
        result_trunk=username+"_"+request_id
        result_file="/home/dkrz/k204210/"+result_trunk
        print("Writing results to "+result_file+" ...")
        save_results_as_json(globusDict, result_file)
        print("Converting the request...")
        convert_dict_to_globus_input(result_file, result_trunk)
        print("Done!")
        button.disabled=False

button.on_click(on_button_clicked)
Button(description='Create replication request!', style=ButtonStyle())
Output()

Starting globus transfer

  1. The globus submission process starts with a manual login by a DKRZ Data Pool admin.

  2. Corresponding globus endpoints are activated manually with ESGF credentials.

  3. A part of the data pool which is affected by the request is mirrored so that globus can synchronize all missing files.

The globus command for starting a transfer looks like:

dkrz_endpoint="ac6870f0-5d5e-11ea-960a-0afc9e7dd773"
!globus transfer -s exists --notify off --batch {endpoint} {dkrz_endpoint} < {inputFile}