fmeval.util

  1import os
  2import re
  3import ray
  4import multiprocessing as mp
  5import importlib.metadata
  6
  7from ray.actor import ActorHandle
  8from fmeval.constants import EVAL_RESULTS_PATH, DEFAULT_EVAL_RESULTS_PATH, PARALLELIZATION_FACTOR
  9from fmeval.exceptions import EvalAlgorithmInternalError, EvalAlgorithmClientError
 10
 11
 12def require(expression, msg: str):
 13    """
 14    Raise EvalAlgorithmClientError if expression is not True
 15    """
 16    if not expression:
 17        raise EvalAlgorithmClientError(msg)
 18
 19
 20def assert_condition(expression, msg: str):
 21    """
 22    Raise EvalAlgorithmInternalError if expression is not True
 23    """
 24    if not expression:
 25        raise EvalAlgorithmInternalError(msg)
 26
 27
 28def project_root(current_file: str) -> str:
 29    """
 30    :return: project root
 31    """
 32    curpath = os.path.abspath(os.path.dirname(current_file))
 33
 34    def is_project_root(path: str) -> bool:
 35        return os.path.exists(os.path.join(path, ".root"))
 36
 37    while not is_project_root(curpath):  # pragma: no cover
 38        parent = os.path.abspath(os.path.join(curpath, os.pardir))
 39        if parent == curpath:
 40            raise EvalAlgorithmInternalError("Got to the root and couldn't find a parent folder with .root")
 41        curpath = parent
 42    return curpath
 43
 44
 45def camel_to_snake(name):
 46    name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
 47    return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower()
 48
 49
 50def get_eval_results_path():
 51    """
 52    Util method to return results path for eval_algos. This method looks for EVAL_RESULTS_PATH environment variable,
 53    if present returns that else default path
 54    :returns: Local directory path of eval algo results
 55    """
 56    if os.environ.get(EVAL_RESULTS_PATH) is not None:
 57        os.makedirs(os.environ[EVAL_RESULTS_PATH], exist_ok=True)
 58        return os.environ[EVAL_RESULTS_PATH]
 59    else:
 60        os.makedirs(DEFAULT_EVAL_RESULTS_PATH, exist_ok=True)
 61        return DEFAULT_EVAL_RESULTS_PATH
 62
 63
 64def singleton(cls):
 65    """
 66    Decorator to make a class Singleton
 67    """
 68    instances = {}
 69
 70    def get_instance(*args, **kwargs):
 71        if cls not in instances:
 72            instances[cls] = cls(*args, **kwargs)
 73        return instances[cls]
 74
 75    return get_instance
 76
 77
 78def get_num_actors():
 79    try:
 80        num_actors = (
 81            int(os.environ[PARALLELIZATION_FACTOR]) if PARALLELIZATION_FACTOR in os.environ else (mp.cpu_count() - 1)
 82        )
 83    except ValueError:
 84        num_actors = mp.cpu_count() - 1
 85    return num_actors
 86
 87
 88def create_shared_resource(resource: object, num_cpus: int = 1) -> ActorHandle:
 89    """Create a Ray actor out of `resource`.
 90
 91    Typically, `resource` will be an object that consumes a significant amount of
 92    memory (ex: a BertscoreHelperModel instance) that you do not want to create
 93    on a per-transform (i.e. per-process) basis, but rather wish to have as a "global resource".
 94
 95    Conceptually, the object that is returned from this function can be thought
 96    of as the input object, except it now exists in shared memory, as opposed
 97    to the address space of the process it was created in. Note that this
 98    function returns a Ray actor handle, which must be interacted with using the
 99    Ray remote API.
100
101    :param resource: The object which we create a Ray actor from.
102        This object's class must implement the `__reduce__` method
103        with a return value of the form (ClassName, serialized_data),
104        where serialized_data is a tuple containing arguments to __init__,
105        in order to be compatible with this function.
106    :param num_cpus: The num_cpus parameter to pass to ray.remote().
107        This parameter represents the number of Ray logical CPUs
108        (see https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#physical-resources-and-logical-resources)
109        that the created actor will require.
110    :returns: The Ray actor handle corresponding to the created actor.
111    """
112    resource_cls, serialized_data = resource.__reduce__()  # type: ignore[misc]
113    wrapped_resource_cls = ray.remote(num_cpus=num_cpus)(resource_cls)
114    return wrapped_resource_cls.remote(*serialized_data)  # type: ignore
115
116
117def cleanup_shared_resource(resource: ActorHandle) -> None:
118    """Remove the resource from shared memory.
119
120    Concretely, this function kills the Ray actor corresponding
121    to `resource`, which in most cases will be an actor created
122    via create_shared_resource.
123
124    :param resource: A Ray actor handle to a shared resource
125        (ex: a BertscoreHelperModel).
126    :returns: None
127    """
128    ray.kill(resource)
129
130
131def get_fmeval_package_version() -> str:
132    """
133    :returns: The current fmeval package version.
134    """
135    return importlib.metadata.version("fmeval")
def require(expression, msg: str):
13def require(expression, msg: str):
14    """
15    Raise EvalAlgorithmClientError if expression is not True
16    """
17    if not expression:
18        raise EvalAlgorithmClientError(msg)

Raise EvalAlgorithmClientError if expression is not True

def assert_condition(expression, msg: str):
21def assert_condition(expression, msg: str):
22    """
23    Raise EvalAlgorithmInternalError if expression is not True
24    """
25    if not expression:
26        raise EvalAlgorithmInternalError(msg)

Raise EvalAlgorithmInternalError if expression is not True

def project_root(current_file: str) -> str:
29def project_root(current_file: str) -> str:
30    """
31    :return: project root
32    """
33    curpath = os.path.abspath(os.path.dirname(current_file))
34
35    def is_project_root(path: str) -> bool:
36        return os.path.exists(os.path.join(path, ".root"))
37
38    while not is_project_root(curpath):  # pragma: no cover
39        parent = os.path.abspath(os.path.join(curpath, os.pardir))
40        if parent == curpath:
41            raise EvalAlgorithmInternalError("Got to the root and couldn't find a parent folder with .root")
42        curpath = parent
43    return curpath
Returns

project root

def camel_to_snake(name):
46def camel_to_snake(name):
47    name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
48    return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower()
def get_eval_results_path():
51def get_eval_results_path():
52    """
53    Util method to return results path for eval_algos. This method looks for EVAL_RESULTS_PATH environment variable,
54    if present returns that else default path
55    :returns: Local directory path of eval algo results
56    """
57    if os.environ.get(EVAL_RESULTS_PATH) is not None:
58        os.makedirs(os.environ[EVAL_RESULTS_PATH], exist_ok=True)
59        return os.environ[EVAL_RESULTS_PATH]
60    else:
61        os.makedirs(DEFAULT_EVAL_RESULTS_PATH, exist_ok=True)
62        return DEFAULT_EVAL_RESULTS_PATH

Util method to return results path for eval_algos. This method looks for EVAL_RESULTS_PATH environment variable, if present returns that else default path :returns: Local directory path of eval algo results

def singleton(cls):
65def singleton(cls):
66    """
67    Decorator to make a class Singleton
68    """
69    instances = {}
70
71    def get_instance(*args, **kwargs):
72        if cls not in instances:
73            instances[cls] = cls(*args, **kwargs)
74        return instances[cls]
75
76    return get_instance

Decorator to make a class Singleton

def get_num_actors():
79def get_num_actors():
80    try:
81        num_actors = (
82            int(os.environ[PARALLELIZATION_FACTOR]) if PARALLELIZATION_FACTOR in os.environ else (mp.cpu_count() - 1)
83        )
84    except ValueError:
85        num_actors = mp.cpu_count() - 1
86    return num_actors
def create_shared_resource(resource: object, num_cpus: int = 1) -> ray.actor.ActorHandle:
 89def create_shared_resource(resource: object, num_cpus: int = 1) -> ActorHandle:
 90    """Create a Ray actor out of `resource`.
 91
 92    Typically, `resource` will be an object that consumes a significant amount of
 93    memory (ex: a BertscoreHelperModel instance) that you do not want to create
 94    on a per-transform (i.e. per-process) basis, but rather wish to have as a "global resource".
 95
 96    Conceptually, the object that is returned from this function can be thought
 97    of as the input object, except it now exists in shared memory, as opposed
 98    to the address space of the process it was created in. Note that this
 99    function returns a Ray actor handle, which must be interacted with using the
100    Ray remote API.
101
102    :param resource: The object which we create a Ray actor from.
103        This object's class must implement the `__reduce__` method
104        with a return value of the form (ClassName, serialized_data),
105        where serialized_data is a tuple containing arguments to __init__,
106        in order to be compatible with this function.
107    :param num_cpus: The num_cpus parameter to pass to ray.remote().
108        This parameter represents the number of Ray logical CPUs
109        (see https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#physical-resources-and-logical-resources)
110        that the created actor will require.
111    :returns: The Ray actor handle corresponding to the created actor.
112    """
113    resource_cls, serialized_data = resource.__reduce__()  # type: ignore[misc]
114    wrapped_resource_cls = ray.remote(num_cpus=num_cpus)(resource_cls)
115    return wrapped_resource_cls.remote(*serialized_data)  # type: ignore

Create a Ray actor out of resource.

Typically, resource will be an object that consumes a significant amount of memory (ex: a BertscoreHelperModel instance) that you do not want to create on a per-transform (i.e. per-process) basis, but rather wish to have as a "global resource".

Conceptually, the object that is returned from this function can be thought of as the input object, except it now exists in shared memory, as opposed to the address space of the process it was created in. Note that this function returns a Ray actor handle, which must be interacted with using the Ray remote API.

Parameters
  • resource: The object which we create a Ray actor from. This object's class must implement the __reduce__ method with a return value of the form (ClassName, serialized_data), where serialized_data is a tuple containing arguments to __init__, in order to be compatible with this function.
  • num_cpus: The num_cpus parameter to pass to ray.remote(). This parameter represents the number of Ray logical CPUs (see https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#physical-resources-and-logical-resources) that the created actor will require. :returns: The Ray actor handle corresponding to the created actor.
def cleanup_shared_resource(resource: ray.actor.ActorHandle) -> None:
118def cleanup_shared_resource(resource: ActorHandle) -> None:
119    """Remove the resource from shared memory.
120
121    Concretely, this function kills the Ray actor corresponding
122    to `resource`, which in most cases will be an actor created
123    via create_shared_resource.
124
125    :param resource: A Ray actor handle to a shared resource
126        (ex: a BertscoreHelperModel).
127    :returns: None
128    """
129    ray.kill(resource)

Remove the resource from shared memory.

Concretely, this function kills the Ray actor corresponding to resource, which in most cases will be an actor created via create_shared_resource.

Parameters
  • resource: A Ray actor handle to a shared resource (ex: a BertscoreHelperModel). :returns: None
def get_fmeval_package_version() -> str:
132def get_fmeval_package_version() -> str:
133    """
134    :returns: The current fmeval package version.
135    """
136    return importlib.metadata.version("fmeval")

:returns: The current fmeval package version.