Source code for wrapyfi.utils.core_utils

import os
from pathlib import Path
from glob import glob
import threading
import importlib.util
import sys
from typing import Union, Callable, Any, Optional, List


WRAPYFI_PLUGIN_PATHS = "WRAPYFI_PLUGIN_PATHS"
WRAPYFI_MWARE_PATHS = "WRAPYFI_MWARE_PATHS"

lock = threading.Lock()


[docs] def deepcopy( obj: Any, exclude_keys: Optional[Union[list, tuple]] = None, shallow_keys: Optional[Union[list, tuple]] = None, ): """ Deep copy an object, excluding specified keys. :param obj: Any: The object to deep copy :param exclude_keys: Union[list, tuple]: A list of keys to exclude from the deep copy :param shallow_keys: Union[list, tuple]: A list of keys to shallow copy """ import copy if exclude_keys is None: return copy.deepcopy(obj) else: if isinstance(obj, list): return [deepcopy(item, exclude_keys) for item in obj] elif isinstance(obj, tuple): return tuple(deepcopy(item, exclude_keys) for item in obj) elif isinstance(obj, set): return {deepcopy(item, exclude_keys) for item in obj} elif isinstance(obj, dict): _shallows = shallow_keys or [] ret = { key: deepcopy(val, exclude_keys) for key, val in obj.items() if key not in exclude_keys + _shallows } ret.update({key: val for key, val in obj.items() if key in _shallows}) return ret else: return copy.deepcopy(obj)
[docs] def get_default_args(fnc: Callable[..., Any]): """ Get the default arguments for a function. :param fnc: Callable[..., Any]: The function to get the default arguments for """ import inspect signature = inspect.signature(fnc) return { k: v.default for k, v in signature.parameters.items() if v.default is not inspect.Parameter.empty }
[docs] def match_args( args: Union[list, tuple], kwargs: dict, src_args: Union[list, tuple], src_kwargs: dict, ): """ Match and Substitute Arguments and Keyword Arguments using Specified Source Values. Navigate through the provided `args` and `kwargs`, identifying entries prefixed with "$" and substituting them with values from `src_args` and `src_kwargs` respectively, to dynamically modify the function call parameters using the source values. :param args: Union[list, tuple]: A list of arguments, potentially containing strings that indicate substitutable entries. Substitutable entries are prefixed with "$" and followed by either: - A digit (indicating an index to reference a value from `src_args`), or - Non-digit characters (indicating a key to reference a value from `src_kwargs`). :param kwargs: dict: A dictionary of keyword arguments, where values might be strings indicating substitutable entries, similar to the entries in `args`. :param src_args: Union[list, tuple]: A list of source arguments, intended to be referenced by substitutable entries within `args`. :param src_kwargs: dict: A dictionary of source keyword arguments, intended to be referenced by substitutable entries within `args` and `kwargs`. :return: Tuple[list, dict]: A tuple containing: - list: The new arguments, formed by substituting specified entries from `args` using `src_args` and `src_kwargs`. - dict: The new keyword arguments, formed by substituting specified entries from `kwargs` using `src_args` and `src_kwargs`. """ new_args = [] new_kwargs = {} for arg in args: if arg[0] == "$" and arg[1:].isdigit(): new_args.append(src_args[int(arg[1:])]) elif arg[0] == "$" and not arg[1:].isdigit(): new_args.append(src_kwargs[arg[1:]]) else: new_args.append(arg) for kwarg_key, kwarg_val in kwargs.items(): if isinstance(kwarg_val, str) and "$" in kwarg_val and kwarg_val[1:].isdigit(): new_kwargs[kwarg_key] = src_args[int(kwarg_val[1:])] elif ( isinstance(kwarg_val, str) and "$" in kwarg_val and not kwarg_val[1:].isdigit() ): new_kwargs[kwarg_key] = src_kwargs[kwarg_val[1:]] else: new_kwargs[kwarg_key] = kwarg_val return tuple(new_args), new_kwargs
[docs] def dynamic_module_import(modules: List[str], globals: dict): """ Dynamically import modules. :param modules: List[str]: A list of module names to import :param globals: dict: The globals dictionary to update """ for module_name in modules: if not module_name.endswith(".py") or module_name.endswith("__.py"): continue module_name = module_name[:-3] module_name = module_name.replace("/", ".") try: module = __import__(module_name, fromlist=["*"]) except ImportError as e: # print(module_name + " could not be imported.", e) continue if hasattr(module, "__all__"): all_names = module.__all__ else: all_names = [name for name in dir(module) if not name.startswith("_")] globals.update({name: getattr(module, name) for name in all_names})
[docs] def scan_external(module_paths: str, component: str): """ Scan external directories specified in module_paths for a specified component directory and dynamically import the modules. Args: :param module_paths: str: Colon-separated paths for external directories to scan. :param component: str: Type of module to scan for i.e., "listeners", "publishers", "servers", "clients", "plugins" """ # Split the provided paths and iterate through them extern_modules_paths = module_paths.split(":") for mod_group_idx, extern_module_path in enumerate(extern_modules_paths): # Define the directory to search based on the component type extern_base_dir = Path(extern_module_path) / component extern_modules = list(extern_base_dir.glob("*.py")) for mod_idx, extern_module in enumerate(extern_modules): # Generate a unique name for each module spec_name = ( f"wrapyfi.extern{mod_group_idx}.{component}.{extern_module.stem}" ) # Dynamically load and execute the module spec = importlib.util.spec_from_file_location(spec_name, extern_module) module = importlib.util.module_from_spec(spec) sys.modules[spec.name] = module spec.loader.exec_module(module) # Register the loaded module in the global scope dynamic_module_import([spec.name], globals())
[docs] class SingletonOptimized(type): """ A singleton metaclass that is thread-safe and optimized for speed. Source: https://stackoverflow.com/a/6798042 """ _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: with lock: if cls not in cls._instances: cls._instances[cls] = super(SingletonOptimized, cls).__call__( *args, **kwargs ) return cls._instances[cls]
[docs] class Plugin(object): """ Base class for encoding and decoding plugins. """
[docs] def encode(self, *args, **kwargs): """ Encode data into a base64 string. :param args: tuple: Additional arguments :param kwargs: dict: Additional keyword arguments :return: Tuple[bool, dict]: A tuple containing: - bool: True if the encoding was successful, False otherwise - dict: A dictionary containing: - '__wrapyfi__': A tuple containing the class name and encoded data string """ raise NotImplementedError
[docs] def decode(self, *args, **kwargs): """ Decode a base64 string back into data. :param args: tuple: Additional arguments :param kwargs: dict: Additional keyword arguments :return: Tuple[bool, object]: A tuple containing: - bool: True if the decoding was successful, False otherwise - object: The decoded data """ raise NotImplementedError
[docs] class PluginRegistrar(object): """ Class for registering encoding and decoding plugins. """ encoder_registry = {} decoder_registry = {}
[docs] @staticmethod def register(types=None): """ Register a plugin for encoding and decoding a specific type. :param types: tuple: The type(s) to register the plugin for """ def wrapper(cls): if types is not None: for cls_type in types: PluginRegistrar.encoder_registry[cls_type] = cls PluginRegistrar.decoder_registry[str(cls.__name__)] = cls return cls return wrapper
[docs] @staticmethod def scan(): """ Scan the plugins directory (Wrapyfi builtin and external) for plugins to register. This method is called automatically when the module is imported. """ base_dir = Path(__file__).parent / "plugins" modules = glob(str(base_dir / "*.py"), recursive=True) modules = [ "wrapyfi.plugins." + Path(module).relative_to(base_dir).as_posix() for module in modules ] dynamic_module_import(modules, globals()) scan_external(os.environ.get(WRAPYFI_PLUGIN_PATHS, ""), "plugins")