Skip to content

Commit deeae0d

Browse files
authored
Rucio replica sorting service using geolocation (#896)
1 parent 054ba7f commit deeae0d

File tree

11 files changed

+1606
-11
lines changed

11 files changed

+1606
-11
lines changed

did_finder_rucio/poetry.lock

Lines changed: 661 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

did_finder_rucio/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ python = "^3.9"
1111
rucio-clients = "^34.2.0"
1212
xmltodict = "^0.13.0"
1313
servicex-did-finder-lib = "^3.0.1"
14+
geoip2 = "^4.7.0"
1415
requests = ">=2.25.0,<3.0.0"
1516

1617
[tool.poetry.group.test]

did_finder_rucio/src/rucio_did_finder/celery.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from rucio_did_finder.lookup_request import LookupRequest
3535
from rucio_did_finder.rucio_adapter import RucioAdapter
3636
from servicex_did_finder_lib import DIDFinderApp
37+
from .replica_distance import ReplicaSorter
3738

3839
__log = logging.getLogger(__name__)
3940

@@ -43,14 +44,26 @@
4344
replica_client = ReplicaClient()
4445
rucio_adapter = RucioAdapter(did_client, replica_client, False)
4546

47+
if 'RUCIO_LATITUDE' in os.environ and 'RUCIO_LONGITUDE' in os.environ\
48+
and 'USE_REPLICA_SORTER' in os.environ:
49+
location = {'latitude': float(os.environ['RUCIO_LATITUDE']),
50+
'longitude': float(os.environ['RUCIO_LONGITUDE'])
51+
}
52+
replica_sorter = ReplicaSorter()
53+
else:
54+
location = None
55+
replica_sorter = None
56+
4657
app = DIDFinderApp('rucio', did_finder_args={"rucio_adapter": rucio_adapter})
4758

4859

4960
def find_files(did_name, info, did_finder_args):
5061
lookup_request = LookupRequest(
5162
did=did_name,
5263
rucio_adapter=did_finder_args['rucio_adapter'],
53-
dataset_id=info['dataset-id']
64+
dataset_id=info['dataset-id'],
65+
replica_sorter=replica_sorter,
66+
location=location,
5467
)
5568
for file in lookup_request.lookup_files():
5669
yield file

did_finder_rucio/src/rucio_did_finder/lookup_request.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,16 @@
2828
import logging
2929
from datetime import datetime
3030
from rucio_did_finder.rucio_adapter import RucioAdapter
31+
from .replica_distance import ReplicaSorter
32+
from typing import Optional, Mapping
3133

3234

3335
class LookupRequest:
3436
def __init__(self, did: str,
3537
rucio_adapter: RucioAdapter,
36-
dataset_id: str = 'bogus-id'):
38+
dataset_id: str = 'bogus-id',
39+
replica_sorter: Optional[ReplicaSorter] = None,
40+
location: Optional[Mapping[str, float]] = None):
3741
'''Create the `LookupRequest` object that is responsible for returning
3842
lists of files. Processes things in chunks.
3943
@@ -51,6 +55,9 @@ def __init__(self, did: str,
5155
self.logger = logging.getLogger(__name__)
5256
self.logger.addHandler(logging.NullHandler())
5357

58+
self.location = location
59+
self.replica_sorter = replica_sorter
60+
5461
def lookup_files(self):
5562
"""
5663
lookup files.
@@ -68,6 +75,11 @@ def lookup_files(self):
6875
n_files += 1
6976
ds_size += af['file_size']
7077
total_paths += len(af['paths'])
78+
ipaths = af['paths'].copy()
79+
self.logger.debug(f'path before {ipaths}')
80+
if self.replica_sorter is not None and self.location is not None:
81+
af['paths'] = self.replica_sorter.sort_replicas(ipaths, self.location)
82+
self.logger.debug(f'path after {af["paths"]}')
7183
full_file_list.append(af)
7284
yield ds_files
7385

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# Copyright (c) 2024, IRIS-HEP
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28+
import logging
29+
import os
30+
31+
from typing import List, Mapping, Optional, Tuple
32+
from socket import gethostbyname
33+
import math
34+
from functools import lru_cache
35+
import tempfile
36+
from urllib.parse import urlparse
37+
import geoip2.database
38+
import geoip2.errors
39+
from collections import namedtuple
40+
41+
42+
Replica_distance = namedtuple('Replica_distance', 'replica distance')
43+
logger = logging.getLogger('ReplicaDistanceService')
44+
45+
46+
def _haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float):
47+
''' Assume inputs are in degrees; will convert to radians. Returns distance in radians '''
48+
dellat = math.radians(lat2-lat1)
49+
dellon = math.radians(lon2-lon1)
50+
hav_theta = ((1-math.cos(dellat))/2 +
51+
math.cos(math.radians(lat1))*math.cos(math.radians(lat2))*(1-math.cos(dellon))/2)
52+
53+
return 2*math.asin(math.sqrt(hav_theta))
54+
55+
56+
@lru_cache
57+
def _get_distance(database: Optional[geoip2.database.Reader],
58+
fqdn: str, my_lat: float, my_lon: float):
59+
"""
60+
Determine angular distance between server at fqdn and (my_lat, my_lon).
61+
If there is a failure of fdqn location lookup, will return pi
62+
(the largest possible physical result)
63+
"""
64+
if database is None:
65+
return math.pi
66+
try:
67+
loc_data = database.city(gethostbyname(fqdn)).location
68+
except geoip2.errors.AddressNotFoundError as e:
69+
logger.warning(f'Cannot geolocate {fqdn}, returning maximum distance.\nError: {e}')
70+
return math.pi
71+
site_lat, site_lon = loc_data.latitude, loc_data.longitude
72+
if site_lat is None or site_lon is None:
73+
return math.pi
74+
return _haversine_distance(site_lat, site_lon, my_lat, my_lon)
75+
76+
77+
class ReplicaSorter(object):
78+
_database: Optional[geoip2.database.Reader] = None
79+
# we keep the temporary directory around so it won't get randomly deleted by the GC
80+
_tmpdir: Optional[tempfile.TemporaryDirectory] = None
81+
82+
def __init__(self, db_url_tuple: Optional[Tuple[str, bool]] = None):
83+
"""
84+
Argument is an optional tuple of (URL, bool).
85+
The URL is assumed to be a file to download; the bool indicates whether
86+
it is ready to be used (True) or needs unpacking (False)
87+
"""
88+
if db_url_tuple is None:
89+
db_url_tuple = self.get_download_url_from_environment()
90+
self._download_data(db_url_tuple)
91+
92+
def sort_replicas(self, replicas: List[str], location: Mapping[str, float]) -> List[str]:
93+
"""
94+
Main method of this class.
95+
replicas: list of strings which are the URLs for the replicas for a file
96+
location: dict of the form {'latitude': xxx, 'longitude': yyy} where xxx and yyy are floats
97+
giving the latitude and longitude in signed degrees
98+
"""
99+
if not self._database:
100+
return replicas
101+
if len(replicas) == 1:
102+
return replicas
103+
fqdns = [(urlparse(replica).hostname, replica) for replica in replicas]
104+
distances = [Replica_distance(replica=replica,
105+
distance=_get_distance(self._database, fqdn,
106+
location['latitude'],
107+
location['longitude']
108+
)
109+
)
110+
for fqdn, replica in fqdns]
111+
distances.sort(key=lambda x: x.distance)
112+
return [_.replica for _ in distances]
113+
114+
@classmethod
115+
def get_download_url_from_key_and_edition(cls, license_key: str, edition: str):
116+
"""
117+
Construct the (url, unpacked) tuple to feed to the constructor from a license key
118+
and an edition of the MaxMind database.
119+
"""
120+
return (('https://download.maxmind.com/app/geoip_download?'
121+
f'edition_id={edition}&license_key={license_key}&suffix=tar.gz'),
122+
False)
123+
124+
@classmethod
125+
def get_download_url_from_environment(cls) -> Optional[Tuple[str, bool]]:
126+
"""
127+
Based on environment variables, this will give a tuple of the URL and a bool which is
128+
True if the file from the URL is ready to use as is, False if needs to be unpacked
129+
"""
130+
if url := os.environ.get('GEOIP_DB_URL', ''):
131+
return (url, True)
132+
key = os.environ.get('GEOIP_DB_LICENSE_KEY', '')
133+
edition = os.environ.get('GEOIP_DB_EDITION', '')
134+
if (key and edition):
135+
return cls.get_download_url_from_key_and_edition(key, edition)
136+
else:
137+
return None
138+
139+
def _download_data(self, db_url_tuple: Optional[Tuple[str, bool]]) -> None:
140+
"""
141+
Retrieves and unpacks the MaxMind databases and initializes the GeoIP reader
142+
"""
143+
from urllib.request import urlretrieve
144+
import tarfile
145+
import glob
146+
if db_url_tuple is None:
147+
return
148+
url, unpacked = db_url_tuple
149+
try:
150+
fname, _ = urlretrieve(url)
151+
except Exception as e:
152+
logger.error(f'Failure retrieving GeoIP database {url}.\nError: {e}')
153+
return
154+
try:
155+
if unpacked:
156+
self._database = geoip2.database.Reader(fname)
157+
else:
158+
tarball = tarfile.open(fname)
159+
self._tmpdir = tempfile.TemporaryDirectory()
160+
tarball.extractall(self._tmpdir.name)
161+
self._database = geoip2.database.Reader(glob.glob(os.path.join(self._tmpdir.name,
162+
'*/*mmdb')
163+
)[0])
164+
except Exception as e:
165+
logger.error(f'Failure initializing the GeoIP database reader.\nError: {e}')
166+
self._database = None
167+
return

did_finder_rucio/src/rucio_did_finder/rucio_adapter.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ def __init__(self, did_client, replica_client, report_logical_files=False):
4646

4747
def client_location(self):
4848
client_location = {}
49-
# setting the site actually seems to work
5049
if 'SITE_NAME' in os.environ:
5150
client_location['site'] = os.environ['SITE_NAME']
5251
latitude = os.environ.get('RUCIO_LATITUDE')
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Copyright (c) 2024, IRIS-HEP
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28+
29+
GEOIP_URL = 'https://ponyisi.web.cern.ch/public/GeoLite2-City.mmdb'
30+
GEOIP_TGZ_URL = 'https://ponyisi.web.cern.ch/public/GeoLite2-City_20241015.tar.gz'
31+
32+
# some URLs (real FQDNs, not real file paths)
33+
REPLICAS = ['https://ccxrootdatlas.in2p3.fr:1094//pnfs/DAOD_PHYSLITE.37020764._000004.pool.root.1',
34+
'root://fax.mwt2.org:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1',
35+
'root://atlasdcache-kit.gridka.de:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1']
36+
37+
SORTED_REPLICAS = ['root://fax.mwt2.org:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1',
38+
'https://ccxrootdatlas.in2p3.fr:1094//pnfs/DAOD_PHYSLITE.37020764._000004.pool.root.1', # noqa: E501
39+
'root://atlasdcache-kit.gridka.de:1094//DAOD_PHYSLITE.37020764._000004.pool.root.1'] # noqa: E501
40+
41+
JUNK_REPLICAS = ['https://junk.does.not.exist.org/',
42+
'root://fax.mwt2.org:1094//pnfs/uchicago.edu/']
43+
SORTED_JUNK_REPLICAS = ['root://fax.mwt2.org:1094//pnfs/uchicago.edu/',
44+
'https://junk.does.not.exist.org/']
45+
46+
LOCATION = {'latitude': 41.78, 'longitude': -87.7}
47+
48+
49+
def test_sorting():
50+
"""Also test unpacking tgz database"""
51+
from rucio_did_finder.replica_distance import ReplicaSorter
52+
rs = ReplicaSorter((GEOIP_TGZ_URL, False))
53+
# Given location (Chicago) replicas should sort US, FR, DE
54+
sorted = rs.sort_replicas(REPLICAS, LOCATION)
55+
assert sorted == SORTED_REPLICAS
56+
# the nonexistent FQDN should sort at end
57+
sorted = rs.sort_replicas(JUNK_REPLICAS, LOCATION)
58+
assert sorted == SORTED_JUNK_REPLICAS
59+
60+
61+
def test_envvars():
62+
"""Only tests unpacked DB download"""
63+
from rucio_did_finder.replica_distance import ReplicaSorter
64+
import os
65+
os.environ['GEOIP_DB_URL'] = GEOIP_URL
66+
rs = ReplicaSorter()
67+
sorted = rs.sort_replicas(REPLICAS, LOCATION)
68+
assert sorted == SORTED_REPLICAS
69+
del os.environ['GEOIP_DB_URL']
70+
71+
72+
def test_bad_geodb():
73+
"""Tests what happens when we have a bad DB URL"""
74+
from rucio_did_finder.replica_distance import ReplicaSorter
75+
rs = ReplicaSorter(('https://junk.does.not.exist.org', False))
76+
assert rs._database is None
77+
sorted = rs.sort_replicas(REPLICAS, LOCATION)
78+
assert sorted == REPLICAS

helm/example_secrets.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ data:
1818
rabbitmq-password: << rabbitMQ password >>
1919
rabbitmq-erlang-cookie: << rabbitMQ erlang cookie >>
2020
postgresql-password: << postgresql password for postgres user >>
21+
geoip-license-key: << MaxMind license key for GeoIP database download >>

helm/servicex/templates/did-finder-rucio/deployment.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,26 @@ spec:
4646
- name: SITE_NAME
4747
value: "{{ .Values.didFinder.rucio.site }}"
4848
{{- end }}
49+
{{- if .Values.didFinder.rucio.replicaSorterEnabled }}
50+
- name: USE_REPLICA_SORTER
51+
value: "1"
52+
{{- if .Values.didFinder.rucio.geoIPURL }}
53+
- name: GEOIP_DB_URL
54+
value: {{ .Values.didFinder.rucio.geoIPURL }}
55+
{{- else }}
56+
{{- if .Values.didFinder.rucio.geoIPEdition -}}
57+
- name: GEOIP_DB_EDITION
58+
value: {{ .Values.didFinder.rucio.geoIPEdition }}
59+
{{- end -}}
60+
{{- if .Values.secrets -}}
61+
- name: GEOIP_DB_LICENSE_KEY
62+
valueFrom:
63+
secretKeyRef:
64+
name: {{ .Values.secrets }}
65+
key: geoip-license-key
66+
{{- end }}
67+
{{- end }}
68+
{{- end }}
4969
- name: INSTANCE_NAME
5070
value: {{ .Release.Name }}
5171
volumeMounts:

helm/servicex/values.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ codeGen:
7777
defaultScienceContainerImage: sslhep/servicex_func_adl_cms_aod_transformer
7878
defaultScienceContainerTag: cmssw-5-3-32
7979

80-
8180
didFinder:
8281
CERNOpenData:
8382
enabled: true
@@ -99,6 +98,10 @@ didFinder:
9998
servicex_latitude: 41.78
10099
servicex_longitude: -87.7
101100
tag: develop
101+
replicaSorterEnabled: false
102+
geoIPURL: null
103+
geoIPEdition: null
104+
102105
gridAccount: <your account>
103106
logging:
104107
logstash:

0 commit comments

Comments
 (0)