OmniOpt2-Logo ScaDS.ai-Logo

♻️ Using external generators

Basic idea

It is possible to use external generators in OmniOpt2, which means that you use external programs to generate new points that should be examined. That means you can use any algorithm you like in any programming language you want, as long as you follow the standards required by OmniOpt2.
The external generator works by putting a JSON file that contains all previously generated data, the seed, the constraints, all parameters and their types in a JSON file.
You can specify your program with the --external_generator parameter, though it must be in base64. To take effect, the --model must be set to EXTERNAL_GENERATOR . See the last parameters here:
./omniopt \
    --partition=alpha \
    --experiment_name=EXTERNAL_GENERATOR_test \
    --mem_gb=1 \
    --time=60 \
    --worker_timeout=60 \
    --max_eval=2 \
    --num_parallel_jobs=5 \
    --gpus=1 \
    --num_random_steps=1 \
    --follow \
    --live_share \
    --send_anonymized_usage_stats \
    --result_names RESULT=max \
    --run_program=ZWNobyAiUkVTVUxUOiAlKHgpJSh5KSIgJiYgZWNobyAiUkVTVUxUMjogJXoi \
    --cpus_per_task=1 \
    --nodes_per_job=1 \
    --generate_all_jobs_at_once \
    --revert_to_random_when_seemingly_exhausted \
    --run_mode=local \
    --decimalrounding=4 \
    --occ_type=euclid \
    --main_process_gb=8 \
    --max_nr_of_zero_results=1 \
    --slurm_signal_delay_s=0 \
    --n_estimators_randomforest=100 \
    --parameter x range 123 100000000 int false \
    --parameter y range 1234 4321 \
    --parameter z range 111 222 int \
    --experiment_constraint "x >= y" \
    --seed 1234 \
    --model=EXTERNAL_GENERATOR \
    --external_generator $(echo "python3 $(pwd)/.tests/example_external.py" | base64 -w0)

This then gets called with a temporary directory as first parameter, in which a JSON file called input.json like this resides:
{
    "parameters": {
        "x": {
            "parameter_type": "RANGE",
            "type": "INT",
            "range": [
                123,
                100000000
            ]
        },
        "y": {
            "parameter_type": "RANGE",
            "type": "FLOAT",
            "range": [
                1234.0,
                4321.0
            ]
        },
        "z": {
            "parameter_type": "RANGE",
            "type": "INT",
            "range": [
                111,
                222
            ]
        }
    },
    "constraints": [
        "y <= x"
    ],
    "seed": 1234,
    "trials": [
        [
            {
                "x": 46164761,
                "y": 2179.7038996219635,
                "z": 221
            }
        ],
        [
            {
                "RESULT": 461647612179.7039
            }
        ]
    ],
    "objectives": {
        "RESULT": "max"
    }
}

Your program must take this JSON file and create new hyperparameters, and put them in the same folder as results.json . The parameters, constraints and so on are, of course, dependent on the way you run OmniOpt2 and it's parameters.
The results.json file your program must write in the folder given as parameter may look like this:
{
    "parameters": {
        "x": 1234,
        "y": "5431",
        "z": "111"
    }
}

This file is then read, parsed and used to run a new hyperparameter set. x , y and z are the hyperparameter names; of course, those are also dependent on your OmniOpt2 run.
For each new hyperparameter (after the SOBOL-phase), the program will be invoked newly.

Another example run code and input.json -file

./omniopt \
	--partition=alpha \
	--experiment_name=EXTERNAL_GENERATOR_test \
	--mem_gb=1 \
	--time=60 \
	--worker_timeout=60 \
	--max_eval=2 \
	--num_parallel_jobs=5 \
	--gpus=1 \
	--num_random_steps=1 \
	--follow \
	--live_share \
	--send_anonymized_usage_stats \
	--result_names RESULT=max \
	--run_program=ZWNobyAiUkVTVUxUOiAlKHgpJSh5KSIgJiYgZWNobyAiUkVTVUxUMjogJXoi \
	--cpus_per_task=1 \
	--nodes_per_job=1 \
	--generate_all_jobs_at_once \
	--revert_to_random_when_seemingly_exhausted \
	--run_mode=local \
	--decimalrounding=4 \
	--occ_type=euclid \
	--main_process_gb=8 \
	--max_nr_of_zero_results=1 \
	--slurm_signal_delay_s=0 \
	--n_estimators_randomforest=100 \
	--parameter x range 123 100000000 int false \
	--parameter y choice 5431,1234 \
	--parameter z fixed 111 \
	--model=EXTERNAL_GENERATOR \
    --external_generator $(echo "python3 $(pwd)/.tests/example_external.py" | base64 -w0)

{
    "parameters": {
        "x": {
            "parameter_type": "RANGE",
            "type": "INT",
            "range": [
                123,
                100000000
            ]
        },
        "y": {
            "parameter_type": "CHOICE",
            "type": "STRING",
            "values": [
                "5431",
                "1234"
            ]
        },
        "z": {
            "parameter_type": "FIXED",
            "type": "STRING",
            "value": "111"
        }
    },
    "constraints": [],
    "seed": null,
    "trials": [
        [
            {
                "x": 55988092,
                "y": 1234,
                "z": 111
            }
        ],
        [
            {
                "RESULT": 559880921234.0
            }
        ]
    ]
}

Example programs

PSEUDORANDOM

This is an example python-program that generated random points that lie within the ranges and parameter boundaries of your experiment:
import sys
import os
import json
import random
from typing import Union, Optional

def check_constraint(constraint: str, params: dict) -> bool: return eval(constraint, {}, params)
def constraints_not_ok(constraints: list, point: dict) -> bool: if not constraints or constraints is None or len(constraints) == 0: return True
for constraint in constraints: if not check_constraint(constraint, point): return True
return False
def generate_random_value(parameter: dict) -> Optional[Union[int, float, str]]: try: if parameter['parameter_type'] == 'RANGE': range_min, range_max = parameter['range'] if parameter['type'] == 'INT': return random.randint(range_min, range_max)
if parameter['type'] == 'FLOAT': return random.uniform(range_min, range_max) elif parameter['parameter_type'] == 'CHOICE': values = parameter['values'] if parameter['type'] == 'INT': return random.choice(values)
if parameter['type'] == 'STRING': return random.choice(values)
return random.choice(values) elif parameter['parameter_type'] == 'FIXED': return parameter['value'] except KeyError as e: print(f"KeyError: Missing {e} in parameter") sys.exit(4)
return None
def generate_random_point(data: dict) -> dict: constraints = data["constraints"] point: dict = {}
param_data = data["parameters"]
i = 0
if len(constraints): while not point or constraints_not_ok(constraints, point): for param_name in list(param_data.keys()): point[param_name] = generate_random_value(param_data[param_name])
if i > 100: # if after 100 trials nothing was found, stop trying break
i = i + 1 else: for param_name in list(param_data.keys()): point[param_name] = generate_random_value(param_data[param_name])
return point
def main() -> None: if len(sys.argv) != 2: print("Usage: python script.py <path>") sys.exit(1)
path = sys.argv[1]
if not os.path.isdir(path): print(f"Error: The path '{path}' is not a valid folder.") sys.exit(2)
json_file_path = os.path.join(path, 'input.json') results_file_path = os.path.join(path, 'results.json')
try: with open(json_file_path, mode='r', encoding="utf-8") as f: data = json.load(f) except FileNotFoundError: print(f"Error: {json_file_path} not found.") sys.exit(3) except json.JSONDecodeError: print(f"Error: Failed to decode JSON in {json_file_path}.") sys.exit(4)
random_point = generate_random_point(data)
with open(results_file_path, mode='w', encoding="utf-8") as f: json.dump({"parameters": random_point}, f, indent=4)
if __name__ == "__main__": main()

TPE (Tree Parzen Estimator)

This is another program. It uses Optuna and a Tree-Parzen-Estimator to generate new points:
import sys
import os
import json
import logging
from typing import Optional
try:
    import optuna
    from optuna.trial import create_trial

from optuna.distributions import ( BaseDistribution, IntUniformDistribution, FloatDistribution, # Optuna ≥3.6 ) except ModuleNotFoundError: print("Optuna not found. Cannot continue.") sys.exit(1)
try: from beartype import beartype except ModuleNotFoundError: print("beartype not found. Cannot continue.") sys.exit(1)
logging.getLogger("optuna").setLevel(logging.WARNING)
@beartype def check_constraint(constraint: str, params: dict) -> bool: return eval(constraint, {}, params)
@beartype def constraints_not_ok(constraints: list, point: dict) -> bool: if not constraints or constraints is None or len(constraints) == 0: return True
for constraint in constraints: if not check_constraint(constraint, point): return True
return False
@beartype def tpe_suggest_point(trial: optuna.Trial, parameters: dict) -> dict: point = {} for param_name, param in parameters.items(): ptype = param['parameter_type'] pvaltype = param['type']
try: if ptype == 'RANGE': rmin, rmax = param['range'] if pvaltype == 'INT': point[param_name] = trial.suggest_int(param_name, rmin, rmax) elif pvaltype == 'FLOAT': point[param_name] = trial.suggest_float(param_name, rmin, rmax) else: raise ValueError(f"Unsupported type {pvaltype} for RANGE")
elif ptype == 'CHOICE': values = param['values'] point[param_name] = trial.suggest_categorical(param_name, values)
elif ptype == 'FIXED': point[param_name] = param['value']
else: raise ValueError(f"Unknown parameter_type {ptype}") except KeyboardInterrupt: print("You pressed CTRL-c.") sys.exit(1)
return point
@beartype def generate_tpe_point(data: dict, max_trials: int = 100) -> dict: parameters = data["parameters"] constraints = data.get("constraints", []) seed = data.get("seed", None) trials_data = data.get("trials", []) objectives = data.get("objectives", {})
direction, result_key = parse_objectives(objectives) study = create_study_with_seed(seed, direction)
for trial_entry in trials_data: add_existing_trial_to_study(study, trial_entry, parameters, result_key)
study.optimize(lambda trial: wrapped_objective(trial, parameters, constraints, direction), n_trials=max_trials)
return get_best_or_new_point(study, parameters, direction)
@beartype def parse_objectives(objectives: dict) -> tuple[str, str]: if len(objectives) != 1: raise ValueError("Only single-objective optimization is supported.") result_key, result_goal = next(iter(objectives.items())) if result_goal.lower() not in ("min", "max"): raise ValueError(f"Unsupported objective direction: {result_goal}") direction = "maximize" if result_goal.lower() == "max" else "minimize" return direction, result_key
@beartype def create_study_with_seed(seed: Optional[int], direction: str) -> optuna.study.study.Study: return optuna.create_study( sampler=optuna.samplers.TPESampler(seed=seed), direction=direction )
@beartype def wrapped_objective(trial: optuna.Trial, parameters: dict, constraints: list, direction: str) -> float: point = tpe_suggest_point(trial, parameters) if not constraints_not_ok(constraints, point): return 1e6 if direction == "minimize" else -1e6 return 0.0
@beartype def add_existing_trial_to_study(study: optuna.study.study.Study, trial_entry: list, parameters: dict, result_key: str) -> None: if len(trial_entry) != 2: return param_dict, result_dict = trial_entry
if not result_dict or result_key not in result_dict: return
if not all(k in param_dict for k in parameters): return
final_value = result_dict[result_key]
trial_params: dict[str, object] = {} trial_distributions: dict[str, BaseDistribution] = {} # 👈 explicit & correct
for name, p in parameters.items(): value = param_dict[name]
if p["parameter_type"] == "FIXED": trial_params[name] = value continue
dist: BaseDistribution if p["parameter_type"] == "RANGE": if p["type"] == "INT": dist = IntUniformDistribution(p["range"][0], p["range"][1]) elif p["type"] == "FLOAT": # pick the right class for your Optuna version dist = FloatDistribution(p["range"][0], p["range"][1]) else: continue elif p["parameter_type"] == "CHOICE": dist = optuna.distributions.CategoricalDistribution(p["values"]) else: continue
trial_params[name] = value trial_distributions[name] = dist # keys are str, values are BaseDistribution
study.add_trial( create_trial( params=trial_params, distributions=trial_distributions, # ✅ mypy is happy now value=final_value ) )
@beartype def get_best_or_new_point(study: optuna.study.study.Study, parameters: dict, direction: str) -> dict: best_trial_value = study.best_trial.value if best_trial_value is not None: if (direction == "minimize" and best_trial_value < 1e6) or \ (direction == "maximize" and best_trial_value > -1e6): return study.best_params return tpe_suggest_point(study.best_trial, parameters)
@beartype def main() -> None: if len(sys.argv) != 2: print("Usage: python script.py <path>") sys.exit(1)
path = sys.argv[1]
if not os.path.isdir(path): print(f"Error: The path '{path}' is not a valid folder.") sys.exit(2)
json_file_path = os.path.join(path, 'input.json') results_file_path = os.path.join(path, 'results.json')
try: with open(json_file_path, mode='r', encoding="utf-8") as f: data = json.load(f) except FileNotFoundError: print(f"Error: {json_file_path} not found.") sys.exit(3) except json.JSONDecodeError: print(f"Error: Failed to decode JSON in {json_file_path}.") sys.exit(4)
random_point = generate_tpe_point(data)
with open(results_file_path, mode='w', encoding="utf-8") as f: json.dump({"parameters": random_point}, f, indent=4)
if __name__ == "__main__": try: main() except KeyboardInterrupt: print("You pressed CTRL-c.") sys.exit(1)

Caveats

External Generator does not work with custom generation strategies.