Skip to content

Migrate DID Finders to use Celery #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 87 additions & 72 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,105 +4,112 @@

## Introduction

ServiceX DID finders take a dataset name and turn them into files to be transformed. They interact with ServiceX via the Rabbit MQ message broker. As such, the integration into ServiceX and the API must be the same for all DID finders. This library abstracts away some of that interaction so that the code that interacts with ServiceX can be separated from the code that translates a dataset identifier into a list of files.
ServiceX DID finders take a dataset name and turn them into files to be transformed. They are
implemented as a Celery application with a task called `do_lookup`. Developers of a specific
DID Finder implementation need to write a generator function which yields a dictionary for each
file in the dataset.

The DID Finder author need write only a line of initialization code and then a small routine that translates a DID into a list of files.
The Task interacts with ServiceX through the App's REST endpoint to add files to the dataset and
a separate REST endpoint to signal that the dataset is complete.

## Usage
The app caches DID lookups. The `dataset_id` is the primary key for the cache table.

A very simple [demo](https://github.com/ssl-hep/ServiceX_DID_Finder_Demo) has been created to show how to make a basic DID finder for ServiceX. You can use that as a starting point if authoring one.
Invocations of the `do_lookup` task accepts the following arguments:
* `did`: The dataset identifier to look up
* `dataset_id`: The ID of the dataset in the database
* `endpoint`: The ServiceX endpoint to send the results to
* `user_did_finder`: The user callback that is a generator function that yields file information dictionaries.

Create an async callback method that `yield`s file info dictionaries. For example:
## Creating a DID Finder
You start with a new Python project. You will need to add this library as a dependency to the project
by adding the following to your `pyproject.tom` file:

```
servicex-did-finder-lib = "^3.0"
```

Create a celery app that will run your DID finder. This app will be responsible for starting the
Celery worker and registering your DID finder function as a task. Here is an example of how to do
this. Celery prefers that the app is in a file called `celery.py` in a module in your project. Here
is an example of how to do this:

## celery.py:
```python
async def my_callback(did_name: str, info: Dict[str, Any]):
for i in range(0, 10):
yield {
'paths': [f"root://atlas-experiment.cern.ch/dataset1/file{i}.root"],
'adler32': b183712731,
'file_size': 0,
'file_events': 0,
}

from servicex_did_finder_lib import DIDFinderApp
rucio_adaptor = RucioAdaptor()
app = DIDFinderApp('rucio', did_finder_args={"rucio_adapter": rucio_adaptor})
```

Attach the DID finder to the app by using the `did_lookup_task` decorator. This decorator will
register the function as a Celery task. Here is an example of how to do this:

```python
@app.did_lookup_task(name="did_finder_rucio.lookup_dataset")
def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
self.do_lookup(did=did, dataset_id=dataset_id,
endpoint=endpoint, user_did_finder=find_files)
```

You will need to implement the `find_files` function. This function is a generator that yields
file information dictionaries.

The arguments to the method are straight forward:

* `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`)
* `info` contains a dict of various info about the request that asked for this DID.
* `info` contains a dict of various info about the database ID for this dataset.
* `did_finder_args` contains the arguments that were passed to the DID finder at startup. This is a way to pass command line arguments to your file finder

Yield the results as you find them - ServiceX will actually start processing the files before your DID lookup is finished if you do this. The fields you need to pass back to the library are as follows:
Yield the results as you find them. The fields you need to pass back to the library are as follows:

* `paths`: An ordered list of URIs that a transformer in ServiceX can access to get at the file. Often these are either `root://` or `http://` schema URI's. When accessing the file, URIs will be tried in ordered listed.
* `adler32`: A CRC number for the file. This CRC is calculated in a special way by rucio and is not used. Leave as 0 if you do not know it.
* `file_size`: Number of bytes of the file. Used to calculate statistics. Leave as zero if you do not know it (or it is expensive to look up).
* `file_events`: Number of events in the file. Used to calculate statistics. Leave as zero if you do not know it (or it is expensive to look up).

Once the callback is working privately, it is time to build a container. ServiceX will start the container and pass, one way or the other, a few arguments to it. Several arguments are required for the DID finder to work (like `rabbitUrl`). The library will automatically parse these during initialization.

To initialize the library and start listening for and processing messages from RabbitMQ, you must initialize the library. In all cases, the call to `start_did_finder` will not return.

If you do not have to process any command line arguments to configure the service, then the following is good enough:
Here's a simple example of a did handler generator:

```python
start_did_finder('my_finder', my_callback)
def find_files(did_name: str,
info: Dict[str, Any],
did_finder_args: Dict[str, Any]
) -> Generator[Dict[str, Any], None]:
__log.info('DID Lookup request received.', extra={
'requestId': info['request-id'], 'dataset': did_name})

urls = xrd.glob(cache_prefix + did_name)
if len(urls) == 0:
raise RuntimeError(f"No files found matching {did_name} for request "
f"{info['request-id']} - are you sure it is correct?")

for url in urls:
yield {
'paths': [url],
'adler32': 0, # No clue
'file_size': 0, # We could look up the size but that would be slow
'file_events': 0, # And this we do not know
}
```

The first argument is the name of the schema. The user will use `my_finder://dataset_name` to access this DID lookup (and `my_callback` is called with `dataset_name` as `did_name`). Please make sure everything is lower case: schema in URI's are not case sensitive.

The second argument is the call back.
## Extra Command Line Arguments
Sometimes you need to pass additional information to your DID Finder from the command line. You do
this by creating your own `ArgParser`
```python
import argparse
# Parse command-line arguments
parser = argparse.ArgumentParser(description='DIDFinderApp')
parser.add_argument('--custom-arg', help='Custom argument for DIDFinderApp')
args, unknown = parser.parse_known_args()

If you do need to configure your DID finder from command line arguments, then use the python `argparse` library, being sure to hook in the did finder library as follows (this code is used to start the rucio finder):
# Create the app instance
app = DIDFinderApp('myApp', did_finder_args={"custom-arg": args.custom_arg})

```python
# Parse the command line arguments
parser = argparse.ArgumentParser()
parser.add_argument('--site', dest='site', action='store',
default=None,
help='XCache Site)')
parser.add_argument('--prefix', dest='prefix', action='store',
default='',
help='Prefix to add to Xrootd URLs')
parser.add_argument('--threads', dest='threads', action='store',
default=10, type=int, help="Number of threads to spawn")
add_did_finder_cnd_arguments(parser)

args = parser.parse_args()

site = args.site
prefix = args.prefix
threads = args.threads
logger.info("ServiceX DID Finder starting up: "
f"Threads: {threads} Site: {site} Prefix: {prefix}")

# Initialize the finder
did_client = DIDClient()
replica_client = ReplicaClient()
rucio_adapter = RucioAdapter(did_client, replica_client)

# Run the DID Finder
try:
logger.info('Starting rucio DID finder')

async def callback(did_name, info):
async for f in find_files(rucio_adapter, site, prefix, threads,
did_name, info):
yield f

start_did_finder('rucio',
callback,
parsed_args=args)

finally:
logger.info('Done running rucio DID finder')
```

In particular note:
These parsed args will be passed to your `find_files` function as a dictionary in
the `did_finder_args` parameter.

1. The call to `add_did_finder_cnd_arguments` to setup the arguments required by the finder library.
2. Parsing of the arguments using the usual `parse_args` method
3. Passing the parsed arguments to `start_did_finder`.

Another pattern in the above code that one might find useful - a thread-safe way of passing global arguments into the callback. Given Python's Global Interpreter Lock, this is probably not necessary.

### Proper Logging

Expand All @@ -124,7 +131,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com
}
```

The `start_did_finder` will configure the python root logger properly.
The `DIDFinderApp` will configure the python root logger properly.

## URI Format

Expand All @@ -135,3 +142,11 @@ All the incoming DID's are expected to be URI's without the schema. As such, the
* `get` - If the value is `all` (the default) then all files in the dataset must be returned. If the value is `available`, then only files that are accessible need be returned.

As am example, if the following URI is given to ServiceX, "rucio://dataset_name?files=20&get=available", then the first 20 available files of the dataset will be processed by the rest of servicex.

## Stressful DID Finder
As an example, there is in this repo a simple DID finder that can be used to test the system. It is called `stressful_did_finder.py`. It will return a large number of files, and will take a long time to run. It is useful for testing the system under load.
I'm not quite sure how to use it yet, but I'm sure it will be useful.

It accepts the following arguments:
* `--num-files` - The number of files to return as part of each request. Default is 10.
* `--file-path` - The DID Finder returns the same file over and over. This is the file to return in the response
Loading
Loading