OmniOpt2-Logo ScaDS.ai-Logo

♻️ Using external generators

Basic idea

It is possible to use external generators in OmniOpt2, which means that you use external programs to generate new points that should be examined. That means you can use any algorithm you like in any programming language you want, as long as you follow the standards required by OmniOpt2.
The external generator works by putting a JSON file that contains all previously generated data, the seed, the constraints, all parameters and their types in a JSON file.
You can specify your program with the --external_generator parameter, though it must be in base64. To take effect, the --model must be set to EXTERNAL_GENERATOR . See the last parameters here:
./omniopt \
    --partition=alpha \
    --experiment_name=EXTERNAL_GENERATOR_test \
    --mem_gb=1 \
    --time=60 \
    --worker_timeout=60 \
    --max_eval=2 \
    --num_parallel_jobs=5 \
    --gpus=1 \
    --num_random_steps=1 \
    --follow \
    --live_share \
    --send_anonymized_usage_stats \
    --result_names RESULT=max \
    --run_program=ZWNobyAiUkVTVUxUOiAlKHgpJSh5KSIgJiYgZWNobyAiUkVTVUxUMjogJXoi \
    --cpus_per_task=1 \
    --nodes_per_job=1 \
    --generate_all_jobs_at_once \
    --revert_to_random_when_seemingly_exhausted \
    --run_mode=local \
    --decimalrounding=4 \
    --occ_type=euclid \
    --main_process_gb=8 \
    --max_nr_of_zero_results=1 \
    --slurm_signal_delay_s=0 \
    --n_estimators_randomforest=100 \
    --parameter x range 123 100000000 int false \
    --parameter y range 1234 4321 \
    --parameter z range 111 222 int \
    --experiment_constraint "x >= y" \
    --seed 1234 \
    --model=EXTERNAL_GENERATOR \
    --external_generator $(echo "python3 $(pwd)/.tests/example_external.py" | base64 -w0)

This then gets called with a temporary directory as first parameter, in which a JSON file called input.json like this resides:
{
    "parameters": {
        "x": {
            "parameter_type": "RANGE",
            "type": "INT",
            "range": [
                123,
                100000000
            ]
        },
        "y": {
            "parameter_type": "RANGE",
            "type": "FLOAT",
            "range": [
                1234.0,
                4321.0
            ]
        },
        "z": {
            "parameter_type": "RANGE",
            "type": "INT",
            "range": [
                111,
                222
            ]
        }
    },
    "constraints": [
        "y <= x"
    ],
    "seed": 1234,
    "trials": [
        [
            {
                "x": 46164761,
                "y": 2179.7038996219635,
                "z": 221
            }
        ],
        [
            {
                "RESULT": 461647612179.7039
            }
        ]
    ],
    "objectives": {
        "RESULT": "max"
    }
}

Your program must take this JSON file and create new hyperparameters, and put them in the same folder as results.json . The parameters, constraints and so on are, of course, dependent on the way you run OmniOpt2 and it's parameters.
The results.json file your program must write in the folder given as parameter may look like this:
{
    "parameters": {
        "x": 1234,
        "y": "5431",
        "z": "111"
    }
}

This file is then read, parsed and used to run a new hyperparameter set. x , y and z are the hyperparameter names; of course, those are also dependent on your OmniOpt2 run.
For each new hyperparameter (after the SOBOL-phase), the program will be invoked newly.

Another example run code and input.json -file

./omniopt \
	--partition=alpha \
	--experiment_name=EXTERNAL_GENERATOR_test \
	--mem_gb=1 \
	--time=60 \
	--worker_timeout=60 \
	--max_eval=2 \
	--num_parallel_jobs=5 \
	--gpus=1 \
	--num_random_steps=1 \
	--follow \
	--live_share \
	--send_anonymized_usage_stats \
	--result_names RESULT=max \
	--run_program=ZWNobyAiUkVTVUxUOiAlKHgpJSh5KSIgJiYgZWNobyAiUkVTVUxUMjogJXoi \
	--cpus_per_task=1 \
	--nodes_per_job=1 \
	--generate_all_jobs_at_once \
	--revert_to_random_when_seemingly_exhausted \
	--run_mode=local \
	--decimalrounding=4 \
	--occ_type=euclid \
	--main_process_gb=8 \
	--max_nr_of_zero_results=1 \
	--slurm_signal_delay_s=0 \
	--n_estimators_randomforest=100 \
	--parameter x range 123 100000000 int false \
	--parameter y choice 5431,1234 \
	--parameter z fixed 111 \
	--model=EXTERNAL_GENERATOR \
    --external_generator $(echo "python3 $(pwd)/.tests/example_external.py" | base64 -w0)

{
    "parameters": {
        "x": {
            "parameter_type": "RANGE",
            "type": "INT",
            "range": [
                123,
                100000000
            ]
        },
        "y": {
            "parameter_type": "CHOICE",
            "type": "STRING",
            "values": [
                "5431",
                "1234"
            ]
        },
        "z": {
            "parameter_type": "FIXED",
            "type": "STRING",
            "value": "111"
        }
    },
    "constraints": [],
    "seed": null,
    "trials": [
        [
            {
                "x": 55988092,
                "y": 1234,
                "z": 111
            }
        ],
        [
            {
                "RESULT": 559880921234.0
            }
        ]
    ]
}

Example programs

PSEUDORANDOM

This is an example python-program that generated random points that lie within the ranges and parameter boundaries of your experiment:
import sys
import os
import json
import random
from typing import Union, Optional
def check_constraint(constraint: str, params: dict) -> bool:
    return eval(constraint, {}, params)
def constraints_not_ok(constraints: list, point: dict) -> bool:
    if not constraints or constraints is None or len(constraints) == 0:
        return True
    for constraint in constraints:
        if not check_constraint(constraint, point):
            return True
    return False
def generate_random_value(parameter: dict) -> Optional[Union[int, float, str]]:
    try:
        if parameter['parameter_type'] == 'RANGE':
            range_min, range_max = parameter['range']
            if parameter['type'] == 'INT':
                return random.randint(range_min, range_max)
            if parameter['type'] == 'FLOAT':
                return random.uniform(range_min, range_max)
        elif parameter['parameter_type'] == 'CHOICE':
            values = parameter['values']
            if parameter['type'] == 'INT':
                return random.choice(values)
            if parameter['type'] == 'STRING':
                return random.choice(values)
            return random.choice(values)
        elif parameter['parameter_type'] == 'FIXED':
            return parameter['value']
    except KeyError as e:
        print(f"KeyError: Missing {e} in parameter")
        sys.exit(4)
    return None
def generate_random_point(data: dict) -> dict:
    constraints = data["constraints"]
    point: dict = {}
    param_data = data["parameters"]
    i = 0
    if len(constraints):
        while not point or constraints_not_ok(constraints, point):
            for param_name in list(param_data.keys()):
                point[param_name] = generate_random_value(param_data[param_name])
            if i > 100: # if after 100 trials nothing was found, stop trying
                break
            i = i + 1
    else:
        for param_name in list(param_data.keys()):
            point[param_name] = generate_random_value(param_data[param_name])
    return point
def main() -> None:
    if len(sys.argv) != 2:
        print("Usage: python script.py <path>")
        sys.exit(1)
    path = sys.argv[1]
    if not os.path.isdir(path):
        print(f"Error: The path '{path}' is not a valid folder.")
        sys.exit(2)
    json_file_path = os.path.join(path, 'input.json')
    results_file_path = os.path.join(path, 'results.json')
    try:
        with open(json_file_path, mode='r', encoding="utf-8") as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"Error: {json_file_path} not found.")
        sys.exit(3)
    except json.JSONDecodeError:
        print(f"Error: Failed to decode JSON in {json_file_path}.")
        sys.exit(4)
    random_point = generate_random_point(data)
    with open(results_file_path, mode='w', encoding="utf-8") as f:
        json.dump({"parameters": random_point}, f, indent=4)
if __name__ == "__main__":
    main()

TPE (Tree Parzen Estimator)

This is another program. It uses Optuna and a Tree-Parzen-Estimator to generate new points:
import sys
import os
import json
import logging
from typing import Optional
try:
    import optuna
    from optuna.trial import create_trial
except ModuleNotFoundError:
    print("Optuna not found. Cannot continue.")
    sys.exit(1)
try:
    from beartype import beartype
except ModuleNotFoundError:
    print("beartype not found. Cannot continue.")
    sys.exit(1)
logging.getLogger("optuna").setLevel(logging.WARNING)
@beartype
def check_constraint(constraint: str, params: dict) -> bool:
    return eval(constraint, {}, params)
@beartype
def constraints_not_ok(constraints: list, point: dict) -> bool:
    if not constraints or constraints is None or len(constraints) == 0:
        return True
    for constraint in constraints:
        if not check_constraint(constraint, point):
            return True
    return False
@beartype
def tpe_suggest_point(trial: optuna.Trial, parameters: dict) -> dict:
    point = {}
    for param_name, param in parameters.items():
        ptype = param['parameter_type']
        pvaltype = param['type']
        try:
            if ptype == 'RANGE':
                rmin, rmax = param['range']
                if pvaltype == 'INT':
                    point[param_name] = trial.suggest_int(param_name, rmin, rmax)
                elif pvaltype == 'FLOAT':
                    point[param_name] = trial.suggest_float(param_name, rmin, rmax)
                else:
                    raise ValueError(f"Unsupported type {pvaltype} for RANGE")
            elif ptype == 'CHOICE':
                values = param['values']
                point[param_name] = trial.suggest_categorical(param_name, values)
            elif ptype == 'FIXED':
                point[param_name] = param['value']
            else:
                raise ValueError(f"Unknown parameter_type {ptype}")
        except KeyboardInterrupt:
            print("You pressed CTRL-c.")
            sys.exit(1)
    return point
@beartype
def generate_tpe_point(data: dict, max_trials: int = 100) -> dict:
    parameters = data["parameters"]
    constraints = data.get("constraints", [])
    seed = data.get("seed", None)
    trials_data = data.get("trials", [])
    objectives = data.get("objectives", {})
    direction, result_key = parse_objectives(objectives)
    study = create_study_with_seed(seed, direction)
    for trial_entry in trials_data:
        add_existing_trial_to_study(study, trial_entry, parameters, result_key)
    study.optimize(lambda trial: wrapped_objective(trial, parameters, constraints, direction), n_trials=max_trials)
    return get_best_or_new_point(study, parameters, direction)
@beartype
def parse_objectives(objectives: dict) -> tuple[str, str]:
    if len(objectives) != 1:
        raise ValueError("Only single-objective optimization is supported.")
    result_key, result_goal = next(iter(objectives.items()))
    if result_goal.lower() not in ("min", "max"):
        raise ValueError(f"Unsupported objective direction: {result_goal}")
    direction = "maximize" if result_goal.lower() == "max" else "minimize"
    return direction, result_key
@beartype
def create_study_with_seed(seed: Optional[int], direction: str) -> optuna.study.study.Study:
    return optuna.create_study(
        sampler=optuna.samplers.TPESampler(seed=seed),
        direction=direction
    )
@beartype
def wrapped_objective(trial: optuna.Trial, parameters: dict, constraints: list, direction: str) -> float:
    point = tpe_suggest_point(trial, parameters)
    if not constraints_not_ok(constraints, point):
        return 1e6 if direction == "minimize" else -1e6
    return 0.0
@beartype
def add_existing_trial_to_study(study: optuna.study.study.Study, trial_entry: list, parameters: dict, result_key: str) -> None:
    if len(trial_entry) != 2:
        return
    param_dict, result_dict = trial_entry
    if not result_dict or result_key not in result_dict:
        return
    if not all(k in param_dict for k in parameters):
        return
    final_value = result_dict[result_key]
    trial_params = {}
    trial_distributions = {}
    for name, p in parameters.items():
        value = param_dict[name]
        if p["parameter_type"] == "FIXED":
            trial_params[name] = value
            continue
        if p["parameter_type"] == "RANGE":
            if p["type"] == "INT":
                dist = optuna.distributions.IntUniformDistribution(p["range"][0], p["range"][1])
            elif p["type"] == "FLOAT":
                dist = optuna.distributions.UniformDistribution(p["range"][0], p["range"][1])
            else:
                continue
        elif p["parameter_type"] == "CHOICE":
            dist = optuna.distributions.CategoricalDistribution(p["values"])
        else:
            continue
        trial_params[name] = value
        trial_distributions[name] = dist
    study.add_trial(
        create_trial(
            params=trial_params,
            distributions=trial_distributions,
            value=final_value
        )
    )
@beartype
def get_best_or_new_point(study: optuna.study.study.Study, parameters: dict, direction: str) -> dict:
    best_trial_value = study.best_trial.value
    if best_trial_value is not None:
        if (direction == "minimize" and best_trial_value < 1e6) or \
           (direction == "maximize" and best_trial_value > -1e6):
            return study.best_params
    return tpe_suggest_point(study.best_trial, parameters)
@beartype
def main() -> None:
    if len(sys.argv) != 2:
        print("Usage: python script.py <path>")
        sys.exit(1)
    path = sys.argv[1]
    if not os.path.isdir(path):
        print(f"Error: The path '{path}' is not a valid folder.")
        sys.exit(2)
    json_file_path = os.path.join(path, 'input.json')
    results_file_path = os.path.join(path, 'results.json')
    try:
        with open(json_file_path, mode='r', encoding="utf-8") as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"Error: {json_file_path} not found.")
        sys.exit(3)
    except json.JSONDecodeError:
        print(f"Error: Failed to decode JSON in {json_file_path}.")
        sys.exit(4)
    random_point = generate_tpe_point(data)
    with open(results_file_path, mode='w', encoding="utf-8") as f:
        json.dump({"parameters": random_point}, f, indent=4)
if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("You pressed CTRL-c.")
        sys.exit(1)

Caveats

External Generator does not work with custom generation strategies.