Skip to content

API Reference

nearai

parse_location

parse_location(entry_location: str) -> EntryLocation

Create a EntryLocation from a string in the format namespace/name/version.

Source code in nearai/lib.py
def parse_location(entry_location: str) -> EntryLocation:
    """Create a EntryLocation from a string in the format namespace/name/version."""
    match = entry_location_pattern.match(entry_location)

    if match is None:
        raise ValueError(f"Invalid entry format: {entry_location}. Should have the format <namespace>/<name>/<version>")

    return EntryLocation(
        namespace=match.group("namespace"),
        name=match.group("name"),
        version=match.group("version"),
    )

agent

Agent

Bases: object

Source code in nearai/agent.py
class Agent(object):
    def __init__(self, path: str):  # noqa: D107
        self.name: str = ""
        self.version: str = ""
        self.env_vars: Dict[str, Any] = {}

        self.model = ""
        self.model_provider = ""
        self.model_temperature: Optional[float] = None
        self.model_max_tokens: Optional[int] = None
        self.welcome_title: Optional[str] = None
        self.welcome_description: Optional[str] = None

        self.path = path
        self.load_agent_metadata()
        self.namespace = get_namespace(Path(self.path))

        temp_dir = os.path.join(tempfile.gettempdir(), str(int(time.time())))

        # Copy all agent files including subfolders
        shutil.copytree(path, temp_dir, dirs_exist_ok=True)

        self.temp_dir = temp_dir

    def load_agent_metadata(self) -> None:
        """Load agent details from metadata.json."""
        metadata_path = os.path.join(self.path, "metadata.json")
        check_metadata(Path(metadata_path))
        with open(metadata_path) as f:
            metadata: Dict[str, Any] = json.load(f)
            self.metadata = metadata

            try:
                self.name = metadata["name"]
                self.version = metadata["version"]
            except KeyError as e:
                raise ValueError(f"Missing key in metadata: {e}") from None

            details = metadata.get("details", {})
            agent = details.get("agent", {})
            welcome = agent.get("welcome", {})

            self.env_vars = details.get("env_vars", {})
            self.welcome_title = welcome.get("title")
            self.welcome_description = welcome.get("description")

            if agent_metadata := details.get("agent", None):
                if defaults := agent_metadata.get("defaults", None):
                    self.model = defaults.get("model", self.model)
                    self.model_provider = defaults.get("model_provider", self.model_provider)
                    self.model_temperature = defaults.get("model_temperature", self.model_temperature)
                    self.model_max_tokens = defaults.get("model_max_tokens", self.model_max_tokens)

        if not self.version or not self.name:
            raise ValueError("Both 'version' and 'name' must be non-empty in metadata.")

    def run(self, env: Any, task: Optional[str] = None) -> None:  # noqa: D102
        if not os.path.exists(os.path.join(self.path, AGENT_FILENAME)):
            raise ValueError("Agent run error: {AGENT_FILENAME} does not exist")

        # combine agent.env_vars and env.env_vars
        total_env_vars = {**self.env_vars, **env.env_vars}

        # save os env vars
        os.environ.update(total_env_vars)
        # save env.env_vars
        env.env_vars = total_env_vars

        context = {"env": env, "agent": self, "task": task}

        original_cwd = os.getcwd()
        try:
            os.chdir(self.temp_dir)
            sys.path.insert(0, self.temp_dir)
            runpy.run_path(AGENT_FILENAME, init_globals=context, run_name="__main__")
        finally:
            os.chdir(original_cwd)
            sys.path.pop(0)
load_agent_metadata
load_agent_metadata() -> None

Load agent details from metadata.json.

Source code in nearai/agent.py
def load_agent_metadata(self) -> None:
    """Load agent details from metadata.json."""
    metadata_path = os.path.join(self.path, "metadata.json")
    check_metadata(Path(metadata_path))
    with open(metadata_path) as f:
        metadata: Dict[str, Any] = json.load(f)
        self.metadata = metadata

        try:
            self.name = metadata["name"]
            self.version = metadata["version"]
        except KeyError as e:
            raise ValueError(f"Missing key in metadata: {e}") from None

        details = metadata.get("details", {})
        agent = details.get("agent", {})
        welcome = agent.get("welcome", {})

        self.env_vars = details.get("env_vars", {})
        self.welcome_title = welcome.get("title")
        self.welcome_description = welcome.get("description")

        if agent_metadata := details.get("agent", None):
            if defaults := agent_metadata.get("defaults", None):
                self.model = defaults.get("model", self.model)
                self.model_provider = defaults.get("model_provider", self.model_provider)
                self.model_temperature = defaults.get("model_temperature", self.model_temperature)
                self.model_max_tokens = defaults.get("model_max_tokens", self.model_max_tokens)

    if not self.version or not self.name:
        raise ValueError("Both 'version' and 'name' must be non-empty in metadata.")

cli

AgentCli

Source code in nearai/cli.py
class AgentCli:
    def inspect(self, path: str) -> None:
        """Inspect environment from given path."""
        from nearai.environment import Environment

        env = Environment(path, [], CONFIG, create_files=False)
        env.inspect()

    def save_folder(self, path: str, name: Optional[str] = None) -> None:
        """Saves all subfolders with agent task runs (must contain non-empty chat.txt)."""
        from nearai.environment import Environment

        env = Environment(path, [], CONFIG, create_files=False)
        env.save_folder(name)

    def save_from_history(self, name: Optional[str] = None) -> None:
        """Reads piped history, finds agent task runs, writes start_command.log files, and saves to registry. For detailed usage, run: nearai agent save_from_history --help.

        This command:
        1. Finds agent task runs (must contain non-empty chat.txt)
        2. Writes start_command.log files
        3. Saves to registry

        Only 'interactive' is supported.
        Assumes format:
        ' <line_number>  <program_name> agent interactive <comma_separated_agents> <path> <other_args>'
        Run:
        $ history | grep "agent interactive" | sed "s:~:$HOME:g" | nearai agent save_from_history environment_interactive_runs_from_lambda_00
        """  # noqa: E501
        from nearai.environment import Environment

        env = Environment("/", [], CONFIG, create_files=False)
        # Read from stdin (piped input)
        lines = sys.stdin.readlines()
        env.save_from_history(lines, name)

    def interactive(
        self,
        agents: str,
        path: Optional[str] = "",
        record_run: str = "true",
        env_vars: Optional[Dict[str, Any]] = None,
        load_env: str = "",
        local: bool = False,
        tool_resources: Optional[Dict[str, Any]] = None,
        print_system_log: bool = True,
    ) -> None:
        """Runs agent interactively with environment from given path."""
        from nearai.environment import Environment

        _agents = [load_agent(agent, local) for agent in agents.split(",")]
        if not path:
            if len(_agents) == 1:
                path = _agents[0].path
            else:
                raise ValueError("Local path is required when running multiple agents")
        env = Environment(
            path,
            _agents,
            CONFIG,
            env_vars=env_vars,
            tool_resources=tool_resources,
            print_system_log=print_system_log,
        )

        env.run_interactive(record_run, load_env)

    def task(
        self,
        agents: str,
        task: str,
        path: Optional[str] = "",
        max_iterations: int = 10,
        record_run: str = "true",
        env_vars: Optional[Dict[str, Any]] = None,
        load_env: str = "",
        local: bool = False,
    ) -> None:
        """Runs agent non interactively with environment from given path."""
        from nearai.environment import Environment

        _agents = [load_agent(agent, local) for agent in agents.split(",")]
        if not path:
            if len(_agents) == 1:
                path = _agents[0].path
            else:
                raise ValueError("Local path is required when running multiple agents")
        env = Environment(path, _agents, CONFIG, env_vars=env_vars)
        env.run_task(task, record_run, load_env, max_iterations)

    def run_remote(
        self,
        agents: str,
        new_message: str = "",
        environment_id: str = "",
        provider: str = "aws_lambda",
        params: object = None,
    ) -> None:
        """Invoke a Container based AWS lambda function to run agents on a given environment."""
        if not CONFIG.auth:
            print("Please login with `nearai login`")
            return
        if provider != "aws_lambda":
            print(f"Provider {provider} is not supported.")
            return
        if not params:
            params = {"max_iterations": 2}
        wrapper = LambdaWrapper(boto3.client("lambda", region_name="us-east-2"))
        try:
            new_environment = wrapper.invoke_function(
                "agent-runner-docker",
                {
                    "agents": agents,
                    "environment_id": environment_id,
                    "auth": CONFIG.auth.model_dump(),
                    "new_message": new_message,
                    "params": params,
                },
            )
            print(f"Agent run finished. New environment is {new_environment}")
        except Exception as e:
            print(f"Error running agent remotely: {e}")
inspect
inspect(path: str) -> None

Inspect environment from given path.

Source code in nearai/cli.py
def inspect(self, path: str) -> None:
    """Inspect environment from given path."""
    from nearai.environment import Environment

    env = Environment(path, [], CONFIG, create_files=False)
    env.inspect()
interactive
interactive(agents: str, path: Optional[str] = '', record_run: str = 'true', env_vars: Optional[Dict[str, Any]] = None, load_env: str = '', local: bool = False, tool_resources: Optional[Dict[str, Any]] = None, print_system_log: bool = True) -> None

Runs agent interactively with environment from given path.

Source code in nearai/cli.py
def interactive(
    self,
    agents: str,
    path: Optional[str] = "",
    record_run: str = "true",
    env_vars: Optional[Dict[str, Any]] = None,
    load_env: str = "",
    local: bool = False,
    tool_resources: Optional[Dict[str, Any]] = None,
    print_system_log: bool = True,
) -> None:
    """Runs agent interactively with environment from given path."""
    from nearai.environment import Environment

    _agents = [load_agent(agent, local) for agent in agents.split(",")]
    if not path:
        if len(_agents) == 1:
            path = _agents[0].path
        else:
            raise ValueError("Local path is required when running multiple agents")
    env = Environment(
        path,
        _agents,
        CONFIG,
        env_vars=env_vars,
        tool_resources=tool_resources,
        print_system_log=print_system_log,
    )

    env.run_interactive(record_run, load_env)
run_remote
run_remote(agents: str, new_message: str = '', environment_id: str = '', provider: str = 'aws_lambda', params: object = None) -> None

Invoke a Container based AWS lambda function to run agents on a given environment.

Source code in nearai/cli.py
def run_remote(
    self,
    agents: str,
    new_message: str = "",
    environment_id: str = "",
    provider: str = "aws_lambda",
    params: object = None,
) -> None:
    """Invoke a Container based AWS lambda function to run agents on a given environment."""
    if not CONFIG.auth:
        print("Please login with `nearai login`")
        return
    if provider != "aws_lambda":
        print(f"Provider {provider} is not supported.")
        return
    if not params:
        params = {"max_iterations": 2}
    wrapper = LambdaWrapper(boto3.client("lambda", region_name="us-east-2"))
    try:
        new_environment = wrapper.invoke_function(
            "agent-runner-docker",
            {
                "agents": agents,
                "environment_id": environment_id,
                "auth": CONFIG.auth.model_dump(),
                "new_message": new_message,
                "params": params,
            },
        )
        print(f"Agent run finished. New environment is {new_environment}")
    except Exception as e:
        print(f"Error running agent remotely: {e}")
save_folder
save_folder(path: str, name: Optional[str] = None) -> None

Saves all subfolders with agent task runs (must contain non-empty chat.txt).

Source code in nearai/cli.py
def save_folder(self, path: str, name: Optional[str] = None) -> None:
    """Saves all subfolders with agent task runs (must contain non-empty chat.txt)."""
    from nearai.environment import Environment

    env = Environment(path, [], CONFIG, create_files=False)
    env.save_folder(name)
save_from_history
save_from_history(name: Optional[str] = None) -> None

Reads piped history, finds agent task runs, writes start_command.log files, and saves to registry. For detailed usage, run: nearai agent save_from_history --help.

This command: 1. Finds agent task runs (must contain non-empty chat.txt) 2. Writes start_command.log files 3. Saves to registry

Only 'interactive' is supported. Assumes format: ' agent interactive ' Run: $ history | grep "agent interactive" | sed "s:~:$HOME:g" | nearai agent save_from_history environment_interactive_runs_from_lambda_00

Source code in nearai/cli.py
def save_from_history(self, name: Optional[str] = None) -> None:
    """Reads piped history, finds agent task runs, writes start_command.log files, and saves to registry. For detailed usage, run: nearai agent save_from_history --help.

    This command:
    1. Finds agent task runs (must contain non-empty chat.txt)
    2. Writes start_command.log files
    3. Saves to registry

    Only 'interactive' is supported.
    Assumes format:
    ' <line_number>  <program_name> agent interactive <comma_separated_agents> <path> <other_args>'
    Run:
    $ history | grep "agent interactive" | sed "s:~:$HOME:g" | nearai agent save_from_history environment_interactive_runs_from_lambda_00
    """  # noqa: E501
    from nearai.environment import Environment

    env = Environment("/", [], CONFIG, create_files=False)
    # Read from stdin (piped input)
    lines = sys.stdin.readlines()
    env.save_from_history(lines, name)
task
task(agents: str, task: str, path: Optional[str] = '', max_iterations: int = 10, record_run: str = 'true', env_vars: Optional[Dict[str, Any]] = None, load_env: str = '', local: bool = False) -> None

Runs agent non interactively with environment from given path.

Source code in nearai/cli.py
def task(
    self,
    agents: str,
    task: str,
    path: Optional[str] = "",
    max_iterations: int = 10,
    record_run: str = "true",
    env_vars: Optional[Dict[str, Any]] = None,
    load_env: str = "",
    local: bool = False,
) -> None:
    """Runs agent non interactively with environment from given path."""
    from nearai.environment import Environment

    _agents = [load_agent(agent, local) for agent in agents.split(",")]
    if not path:
        if len(_agents) == 1:
            path = _agents[0].path
        else:
            raise ValueError("Local path is required when running multiple agents")
    env = Environment(path, _agents, CONFIG, env_vars=env_vars)
    env.run_task(task, record_run, load_env, max_iterations)

BenchmarkCli

Source code in nearai/cli.py
class BenchmarkCli:
    def __init__(self):
        """Initialize Benchmark API."""
        self.client = BenchmarkApi()

    def _get_or_create_benchmark(self, benchmark_name: str, solver_name: str, args: Dict[str, Any], force: bool) -> int:
        if CONFIG.auth is None:
            print("Please login with `nearai login`")
            exit(1)
        namespace = CONFIG.auth.account_id

        # Sort the args to have a consistent representation.
        solver_args = json.dumps(OrderedDict(sorted(args.items())))

        benchmark_id = self.client.get_benchmark_v1_benchmark_get_get(
            namespace=namespace,
            benchmark_name=benchmark_name,
            solver_name=solver_name,
            solver_args=solver_args,
        )

        if benchmark_id == -1 or force:
            benchmark_id = self.client.create_benchmark_v1_benchmark_create_get(
                benchmark_name=benchmark_name,
                solver_name=solver_name,
                solver_args=solver_args,
            )

        assert benchmark_id != -1
        return benchmark_id

    def run(
        self,
        dataset: str,
        solver_strategy: str,
        max_concurrent: int = -1,
        force: bool = False,
        subset: Optional[str] = None,
        check_compatibility: bool = True,
        record: bool = False,
        **solver_args: Any,
    ) -> None:
        """Run benchmark on a dataset with a solver strategy.

        It will cache the results in the database and subsequent runs will pull the results from the cache.
        If force is set to True, it will run the benchmark again and update the cache.
        """
        from nearai.benchmark import BenchmarkExecutor, DatasetInfo
        from nearai.dataset import load_dataset
        from nearai.solvers import SolverStrategy, SolverStrategyRegistry

        args = dict(solver_args)
        if subset is not None:
            args["subset"] = subset

        benchmark_id = self._get_or_create_benchmark(
            benchmark_name=dataset,
            solver_name=solver_strategy,
            args=args,
            force=force,
        )

        solver_strategy_class: SolverStrategy | None = SolverStrategyRegistry.get(solver_strategy, None)
        assert (
            solver_strategy_class
        ), f"Solver strategy {solver_strategy} not found. Available strategies: {list(SolverStrategyRegistry.keys())}"

        name = dataset
        if solver_strategy_class.scoring_method == SolverScoringMethod.Custom:
            dataset = str(get_dataset(dataset))
        else:
            dataset = load_dataset(dataset)

        solver_strategy_obj: SolverStrategy = solver_strategy_class(dataset_ref=dataset, **solver_args)  # type: ignore
        if check_compatibility:
            assert name in solver_strategy_obj.compatible_datasets() or any(
                map(lambda n: n in name, solver_strategy_obj.compatible_datasets())
            ), f"Solver strategy {solver_strategy} is not compatible with dataset {name}"

        be = BenchmarkExecutor(DatasetInfo(name, subset, dataset), solver_strategy_obj, benchmark_id=benchmark_id)

        cpu_count = os.cpu_count()
        max_concurrent = (cpu_count if cpu_count is not None else 1) if max_concurrent < 0 else max_concurrent
        be.run(max_concurrent=max_concurrent, record=record)

    def list(
        self,
        namespace: Optional[str] = None,
        benchmark: Optional[str] = None,
        solver: Optional[str] = None,
        args: Optional[str] = None,
        total: int = 32,
        offset: int = 0,
    ):
        """List all executed benchmarks."""
        result = self.client.list_benchmarks_v1_benchmark_list_get(
            namespace=namespace,
            benchmark_name=benchmark,
            solver_name=solver,
            solver_args=args,
            total=total,
            offset=offset,
        )

        header = ["id", "namespace", "benchmark", "solver", "args", "score", "solved", "total"]
        table = []
        for benchmark_output in result:
            score = 100 * benchmark_output.solved / benchmark_output.total
            table.append(
                [
                    fill(str(benchmark_output.id)),
                    fill(benchmark_output.namespace),
                    fill(benchmark_output.benchmark),
                    fill(benchmark_output.solver),
                    fill(benchmark_output.args),
                    fill(f"{score:.2f}%"),
                    fill(str(benchmark_output.solved)),
                    fill(str(benchmark_output.total)),
                ]
            )

        print(tabulate(table, headers=header, tablefmt="simple_grid"))
__init__
__init__()

Initialize Benchmark API.

Source code in nearai/cli.py
def __init__(self):
    """Initialize Benchmark API."""
    self.client = BenchmarkApi()
list
list(namespace: Optional[str] = None, benchmark: Optional[str] = None, solver: Optional[str] = None, args: Optional[str] = None, total: int = 32, offset: int = 0)

List all executed benchmarks.

Source code in nearai/cli.py
def list(
    self,
    namespace: Optional[str] = None,
    benchmark: Optional[str] = None,
    solver: Optional[str] = None,
    args: Optional[str] = None,
    total: int = 32,
    offset: int = 0,
):
    """List all executed benchmarks."""
    result = self.client.list_benchmarks_v1_benchmark_list_get(
        namespace=namespace,
        benchmark_name=benchmark,
        solver_name=solver,
        solver_args=args,
        total=total,
        offset=offset,
    )

    header = ["id", "namespace", "benchmark", "solver", "args", "score", "solved", "total"]
    table = []
    for benchmark_output in result:
        score = 100 * benchmark_output.solved / benchmark_output.total
        table.append(
            [
                fill(str(benchmark_output.id)),
                fill(benchmark_output.namespace),
                fill(benchmark_output.benchmark),
                fill(benchmark_output.solver),
                fill(benchmark_output.args),
                fill(f"{score:.2f}%"),
                fill(str(benchmark_output.solved)),
                fill(str(benchmark_output.total)),
            ]
        )

    print(tabulate(table, headers=header, tablefmt="simple_grid"))
run
run(dataset: str, solver_strategy: str, max_concurrent: int = -1, force: bool = False, subset: Optional[str] = None, check_compatibility: bool = True, record: bool = False, **solver_args: Any) -> None

Run benchmark on a dataset with a solver strategy.

It will cache the results in the database and subsequent runs will pull the results from the cache. If force is set to True, it will run the benchmark again and update the cache.

Source code in nearai/cli.py
def run(
    self,
    dataset: str,
    solver_strategy: str,
    max_concurrent: int = -1,
    force: bool = False,
    subset: Optional[str] = None,
    check_compatibility: bool = True,
    record: bool = False,
    **solver_args: Any,
) -> None:
    """Run benchmark on a dataset with a solver strategy.

    It will cache the results in the database and subsequent runs will pull the results from the cache.
    If force is set to True, it will run the benchmark again and update the cache.
    """
    from nearai.benchmark import BenchmarkExecutor, DatasetInfo
    from nearai.dataset import load_dataset
    from nearai.solvers import SolverStrategy, SolverStrategyRegistry

    args = dict(solver_args)
    if subset is not None:
        args["subset"] = subset

    benchmark_id = self._get_or_create_benchmark(
        benchmark_name=dataset,
        solver_name=solver_strategy,
        args=args,
        force=force,
    )

    solver_strategy_class: SolverStrategy | None = SolverStrategyRegistry.get(solver_strategy, None)
    assert (
        solver_strategy_class
    ), f"Solver strategy {solver_strategy} not found. Available strategies: {list(SolverStrategyRegistry.keys())}"

    name = dataset
    if solver_strategy_class.scoring_method == SolverScoringMethod.Custom:
        dataset = str(get_dataset(dataset))
    else:
        dataset = load_dataset(dataset)

    solver_strategy_obj: SolverStrategy = solver_strategy_class(dataset_ref=dataset, **solver_args)  # type: ignore
    if check_compatibility:
        assert name in solver_strategy_obj.compatible_datasets() or any(
            map(lambda n: n in name, solver_strategy_obj.compatible_datasets())
        ), f"Solver strategy {solver_strategy} is not compatible with dataset {name}"

    be = BenchmarkExecutor(DatasetInfo(name, subset, dataset), solver_strategy_obj, benchmark_id=benchmark_id)

    cpu_count = os.cpu_count()
    max_concurrent = (cpu_count if cpu_count is not None else 1) if max_concurrent < 0 else max_concurrent
    be.run(max_concurrent=max_concurrent, record=record)

CLI

Source code in nearai/cli.py
class CLI:
    def __init__(self) -> None:  # noqa: D107
        self.registry = RegistryCli()
        self.login = LoginCLI()
        self.logout = LogoutCLI()
        self.hub = HubCLI()

        self.config = ConfigCli()
        self.benchmark = BenchmarkCli()
        self.evaluation = EvaluationCli()
        self.agent = AgentCli()
        self.finetune = FinetuneCli()
        self.tensorboard = TensorboardCli()
        self.vllm = VllmCli()

    def location(self) -> None:  # noqa: D102
        """Show location where nearai is installed."""
        from nearai import cli_path

        print(cli_path())

    def version(self):
        """Show nearai version."""
        print(importlib.metadata.version("nearai"))
location
location() -> None

Show location where nearai is installed.

Source code in nearai/cli.py
def location(self) -> None:  # noqa: D102
    """Show location where nearai is installed."""
    from nearai import cli_path

    print(cli_path())
version
version()

Show nearai version.

Source code in nearai/cli.py
def version(self):
    """Show nearai version."""
    print(importlib.metadata.version("nearai"))

ConfigCli

Source code in nearai/cli.py
class ConfigCli:
    def set(self, key: str, value: str, local: bool = False) -> None:
        """Add key-value pair to the config file."""
        update_config(key, value, local)

    def get(self, key: str) -> None:
        """Get value of a key in the config file."""
        print(CONFIG.get(key))

    def show(self) -> None:  # noqa: D102
        for key, value in asdict(CONFIG).items():
            print(f"{key}: {value}")
get
get(key: str) -> None

Get value of a key in the config file.

Source code in nearai/cli.py
def get(self, key: str) -> None:
    """Get value of a key in the config file."""
    print(CONFIG.get(key))
set
set(key: str, value: str, local: bool = False) -> None

Add key-value pair to the config file.

Source code in nearai/cli.py
def set(self, key: str, value: str, local: bool = False) -> None:
    """Add key-value pair to the config file."""
    update_config(key, value, local)

EvaluationCli

Source code in nearai/cli.py
class EvaluationCli:
    def table(
        self,
        namespace: str = "",
        tags: str = "",
        all_key_columns: bool = False,
        all_metrics: bool = False,
        num_columns: int = 6,
        metric_name_max_length: int = 30,
    ) -> None:
        """Prints table of evaluations."""
        rows, columns, important_columns = evaluation_table(namespace, tags)
        print_evaluation_table(
            rows, columns, important_columns, all_key_columns, all_metrics, num_columns, metric_name_max_length
        )
table
table(namespace: str = '', tags: str = '', all_key_columns: bool = False, all_metrics: bool = False, num_columns: int = 6, metric_name_max_length: int = 30) -> None

Prints table of evaluations.

Source code in nearai/cli.py
def table(
    self,
    namespace: str = "",
    tags: str = "",
    all_key_columns: bool = False,
    all_metrics: bool = False,
    num_columns: int = 6,
    metric_name_max_length: int = 30,
) -> None:
    """Prints table of evaluations."""
    rows, columns, important_columns = evaluation_table(namespace, tags)
    print_evaluation_table(
        rows, columns, important_columns, all_key_columns, all_metrics, num_columns, metric_name_max_length
    )

HubCLI

Source code in nearai/cli.py
class HubCLI:
    def chat(self, **kwargs):
        """Chat with model from NearAI hub.

        Args:
        ----
            query (str): User's query to model
            endpoint (str): NearAI HUB's url
            model (str): Name of a model
            provider (str): Name of a provider
            info (bool): Display system info
            kwargs (Dict[str, Any]): All cli keyword arguments

        """
        hub = Hub(CONFIG)
        hub.chat(kwargs)
chat
chat(**kwargs)

Chat with model from NearAI hub.


query (str): User's query to model
endpoint (str): NearAI HUB's url
model (str): Name of a model
provider (str): Name of a provider
info (bool): Display system info
kwargs (Dict[str, Any]): All cli keyword arguments
Source code in nearai/cli.py
def chat(self, **kwargs):
    """Chat with model from NearAI hub.

    Args:
    ----
        query (str): User's query to model
        endpoint (str): NearAI HUB's url
        model (str): Name of a model
        provider (str): Name of a provider
        info (bool): Display system info
        kwargs (Dict[str, Any]): All cli keyword arguments

    """
    hub = Hub(CONFIG)
    hub.chat(kwargs)

LoginCLI

Source code in nearai/cli.py
class LoginCLI:
    def __call__(self, **kwargs):
        """Login with NEAR Mainnet account.

        Args:
        ----
            remote (bool): Remote login allows signing message with NEAR Account on a remote machine
            auth_url (str): Url to the auth portal
            accountId (str): AccountId in .near-credentials folder to signMessage
            privateKey (str): Private Key to sign a message
            kwargs (Dict[str, Any]): All cli keyword arguments

        """
        from nearai.login import generate_and_save_signature, login_with_file_credentials, login_with_near_auth

        remote = kwargs.get("remote", False)
        account_id = kwargs.get("accountId", None)
        private_key = kwargs.get("privateKey", None)

        if not remote and account_id and private_key:
            generate_and_save_signature(account_id, private_key)
        elif not remote and account_id:
            login_with_file_credentials(account_id)
        else:
            auth_url = kwargs.get("auth_url", "https://auth.near.ai")
            login_with_near_auth(remote, auth_url)

    def status(self):
        """Load NEAR account authorization data."""
        from nearai.login import print_login_status

        print_login_status()

    def save(self, **kwargs):
        """Save NEAR account authorization data.

        Args:
        ----
            accountId (str): Near Account
            signature (str): Signature
            publicKey (str): Public Key used to sign
            callbackUrl (str): Callback Url
            nonce (str): nonce
            kwargs (Dict[str, Any]): All cli keyword arguments

        """
        from nearai.login import update_auth_config

        account_id = kwargs.get("accountId")
        signature = kwargs.get("signature")
        public_key = kwargs.get("publicKey")
        callback_url = kwargs.get("callbackUrl")
        nonce = kwargs.get("nonce")

        if account_id and signature and public_key and callback_url and nonce:
            update_auth_config(account_id, signature, public_key, callback_url, nonce)
        else:
            print("Missing data")
__call__
__call__(**kwargs)

Login with NEAR Mainnet account.


remote (bool): Remote login allows signing message with NEAR Account on a remote machine
auth_url (str): Url to the auth portal
accountId (str): AccountId in .near-credentials folder to signMessage
privateKey (str): Private Key to sign a message
kwargs (Dict[str, Any]): All cli keyword arguments
Source code in nearai/cli.py
def __call__(self, **kwargs):
    """Login with NEAR Mainnet account.

    Args:
    ----
        remote (bool): Remote login allows signing message with NEAR Account on a remote machine
        auth_url (str): Url to the auth portal
        accountId (str): AccountId in .near-credentials folder to signMessage
        privateKey (str): Private Key to sign a message
        kwargs (Dict[str, Any]): All cli keyword arguments

    """
    from nearai.login import generate_and_save_signature, login_with_file_credentials, login_with_near_auth

    remote = kwargs.get("remote", False)
    account_id = kwargs.get("accountId", None)
    private_key = kwargs.get("privateKey", None)

    if not remote and account_id and private_key:
        generate_and_save_signature(account_id, private_key)
    elif not remote and account_id:
        login_with_file_credentials(account_id)
    else:
        auth_url = kwargs.get("auth_url", "https://auth.near.ai")
        login_with_near_auth(remote, auth_url)
save
save(**kwargs)

Save NEAR account authorization data.


accountId (str): Near Account
signature (str): Signature
publicKey (str): Public Key used to sign
callbackUrl (str): Callback Url
nonce (str): nonce
kwargs (Dict[str, Any]): All cli keyword arguments
Source code in nearai/cli.py
def save(self, **kwargs):
    """Save NEAR account authorization data.

    Args:
    ----
        accountId (str): Near Account
        signature (str): Signature
        publicKey (str): Public Key used to sign
        callbackUrl (str): Callback Url
        nonce (str): nonce
        kwargs (Dict[str, Any]): All cli keyword arguments

    """
    from nearai.login import update_auth_config

    account_id = kwargs.get("accountId")
    signature = kwargs.get("signature")
    public_key = kwargs.get("publicKey")
    callback_url = kwargs.get("callbackUrl")
    nonce = kwargs.get("nonce")

    if account_id and signature and public_key and callback_url and nonce:
        update_auth_config(account_id, signature, public_key, callback_url, nonce)
    else:
        print("Missing data")
status
status()

Load NEAR account authorization data.

Source code in nearai/cli.py
def status(self):
    """Load NEAR account authorization data."""
    from nearai.login import print_login_status

    print_login_status()

LogoutCLI

Source code in nearai/cli.py
class LogoutCLI:
    def __call__(self, **kwargs):
        """Clear NEAR account auth data."""
        from nearai.config import load_config_file, save_config_file

        config = load_config_file()
        if not config.get("auth") or not config["auth"].get("account_id"):
            print("Auth data does not exist.")
        else:
            config.pop("auth", None)
            save_config_file(config)
            print("Auth data removed")
__call__
__call__(**kwargs)

Clear NEAR account auth data.

Source code in nearai/cli.py
def __call__(self, **kwargs):
    """Clear NEAR account auth data."""
    from nearai.config import load_config_file, save_config_file

    config = load_config_file()
    if not config.get("auth") or not config["auth"].get("account_id"):
        print("Auth data does not exist.")
    else:
        config.pop("auth", None)
        save_config_file(config)
        print("Auth data removed")

RegistryCli

Source code in nearai/cli.py
class RegistryCli:
    def info(self, entry: str) -> None:
        """Show information about an item."""
        entry_location = parse_location(entry)
        metadata = registry.info(entry_location)

        if metadata is None:
            print(f"Entry {entry} not found.")
            return

        print(metadata.model_dump_json(indent=2))

    def metadata_template(self, local_path: str = ".", category: str = "", description: str = ""):
        """Create a metadata template."""
        path = Path(local_path)

        metadata_path = path / "metadata.json"

        # Get the name of the folder
        folder_name = path.name

        with open(metadata_path, "w") as f:
            metadata: Dict[str, Any] = {
                "name": folder_name,
                "version": "0.0.1",
                "description": description,
                "category": category,
                "tags": [],
                "details": {},
                "show_entry": True,
            }

            if category == "agent":
                metadata["details"]["agent"] = {}
                metadata["details"]["agent"]["defaults"] = {
                    "model": DEFAULT_MODEL,
                    "model_provider": DEFAULT_PROVIDER,
                    "model_temperature": DEFAULT_MODEL_TEMPERATURE,
                    "model_max_tokens": DEFAULT_MODEL_MAX_TOKENS,
                }

            json.dump(metadata, f, indent=2)

    def list(
        self,
        namespace: str = "",
        category: str = "",
        tags: str = "",
        total: int = 32,
        offset: int = 0,
        show_all: bool = False,
        show_latest_version: bool = True,
        star: str = "",
    ) -> None:
        """List available items."""
        # Make sure tags is a comma-separated list of tags
        tags_l = parse_tags(tags)
        tags = ",".join(tags_l)

        entries = registry.list(
            namespace=namespace,
            category=category,
            tags=tags,
            total=total + 1,
            offset=offset,
            show_all=show_all,
            show_latest_version=show_latest_version,
            starred_by=star,
        )

        more_rows = len(entries) > total
        entries = entries[:total]

        header = ["entry", "category", "description", "tags"]

        table = []
        for entry in entries:
            table.append(
                [
                    fill(f"{entry.namespace}/{entry.name}/{entry.version}"),
                    fill(entry.category, 20),
                    fill(entry.description, 50),
                    fill(", ".join(entry.tags), 20),
                ]
            )

        if more_rows:
            table.append(["...", "...", "...", "..."])

        print(tabulate(table, headers=header, tablefmt="simple_grid"))

    def update(self, local_path: str = ".") -> None:
        """Update metadata of a registry item."""
        path = Path(local_path)

        if CONFIG.auth is None:
            print("Please login with `nearai login`")
            exit(1)

        metadata_path = path / "metadata.json"
        check_metadata(metadata_path)

        with open(metadata_path) as f:
            metadata: Dict[str, Any] = json.load(f)

        namespace = CONFIG.auth.account_id

        entry_location = EntryLocation.model_validate(
            dict(
                namespace=namespace,
                name=metadata.pop("name"),
                version=metadata.pop("version"),
            )
        )

        entry_metadata = EntryMetadataInput.model_validate(metadata)
        result = registry.update(entry_location, entry_metadata)
        print(json.dumps(result, indent=2))

    def upload(self, local_path: str = ".") -> None:
        """Upload item to the registry."""
        registry.upload(Path(local_path), show_progress=True)

    def download(self, entry_location: str, force: bool = False) -> None:
        """Download item."""
        registry.download(entry_location, force=force, show_progress=True)
download
download(entry_location: str, force: bool = False) -> None

Download item.

Source code in nearai/cli.py
def download(self, entry_location: str, force: bool = False) -> None:
    """Download item."""
    registry.download(entry_location, force=force, show_progress=True)
info
info(entry: str) -> None

Show information about an item.

Source code in nearai/cli.py
def info(self, entry: str) -> None:
    """Show information about an item."""
    entry_location = parse_location(entry)
    metadata = registry.info(entry_location)

    if metadata is None:
        print(f"Entry {entry} not found.")
        return

    print(metadata.model_dump_json(indent=2))
list
list(namespace: str = '', category: str = '', tags: str = '', total: int = 32, offset: int = 0, show_all: bool = False, show_latest_version: bool = True, star: str = '') -> None

List available items.

Source code in nearai/cli.py
def list(
    self,
    namespace: str = "",
    category: str = "",
    tags: str = "",
    total: int = 32,
    offset: int = 0,
    show_all: bool = False,
    show_latest_version: bool = True,
    star: str = "",
) -> None:
    """List available items."""
    # Make sure tags is a comma-separated list of tags
    tags_l = parse_tags(tags)
    tags = ",".join(tags_l)

    entries = registry.list(
        namespace=namespace,
        category=category,
        tags=tags,
        total=total + 1,
        offset=offset,
        show_all=show_all,
        show_latest_version=show_latest_version,
        starred_by=star,
    )

    more_rows = len(entries) > total
    entries = entries[:total]

    header = ["entry", "category", "description", "tags"]

    table = []
    for entry in entries:
        table.append(
            [
                fill(f"{entry.namespace}/{entry.name}/{entry.version}"),
                fill(entry.category, 20),
                fill(entry.description, 50),
                fill(", ".join(entry.tags), 20),
            ]
        )

    if more_rows:
        table.append(["...", "...", "...", "..."])

    print(tabulate(table, headers=header, tablefmt="simple_grid"))
metadata_template
metadata_template(local_path: str = '.', category: str = '', description: str = '')

Create a metadata template.

Source code in nearai/cli.py
def metadata_template(self, local_path: str = ".", category: str = "", description: str = ""):
    """Create a metadata template."""
    path = Path(local_path)

    metadata_path = path / "metadata.json"

    # Get the name of the folder
    folder_name = path.name

    with open(metadata_path, "w") as f:
        metadata: Dict[str, Any] = {
            "name": folder_name,
            "version": "0.0.1",
            "description": description,
            "category": category,
            "tags": [],
            "details": {},
            "show_entry": True,
        }

        if category == "agent":
            metadata["details"]["agent"] = {}
            metadata["details"]["agent"]["defaults"] = {
                "model": DEFAULT_MODEL,
                "model_provider": DEFAULT_PROVIDER,
                "model_temperature": DEFAULT_MODEL_TEMPERATURE,
                "model_max_tokens": DEFAULT_MODEL_MAX_TOKENS,
            }

        json.dump(metadata, f, indent=2)
update
update(local_path: str = '.') -> None

Update metadata of a registry item.

Source code in nearai/cli.py
def update(self, local_path: str = ".") -> None:
    """Update metadata of a registry item."""
    path = Path(local_path)

    if CONFIG.auth is None:
        print("Please login with `nearai login`")
        exit(1)

    metadata_path = path / "metadata.json"
    check_metadata(metadata_path)

    with open(metadata_path) as f:
        metadata: Dict[str, Any] = json.load(f)

    namespace = CONFIG.auth.account_id

    entry_location = EntryLocation.model_validate(
        dict(
            namespace=namespace,
            name=metadata.pop("name"),
            version=metadata.pop("version"),
        )
    )

    entry_metadata = EntryMetadataInput.model_validate(metadata)
    result = registry.update(entry_location, entry_metadata)
    print(json.dumps(result, indent=2))
upload
upload(local_path: str = '.') -> None

Upload item to the registry.

Source code in nearai/cli.py
def upload(self, local_path: str = ".") -> None:
    """Upload item to the registry."""
    registry.upload(Path(local_path), show_progress=True)

check_update

check_update()

Check if there is a new version of nearai CLI available.

Source code in nearai/cli.py
def check_update():
    """Check if there is a new version of nearai CLI available."""
    try:
        api = DefaultApi()
        latest = api.version_v1_version_get()
        current = importlib.metadata.version("nearai")

        if latest != current:
            print(f"New version of nearai CLI available: {latest}. Current version: {current}")
            print("Run `pip install --upgrade nearai` to update.")

    except Exception as _:
        pass

completion

InferenceRouter

Bases: object

Source code in nearai/completion.py
class InferenceRouter(object):
    def __init__(self, config: Config) -> None:  # noqa: D107
        self._config = config
        if self._config.nearai_hub is None:
            self._config.nearai_hub = NearAiHubConfig()
        self._endpoint: Any

    def get_auth_str(self, auth: Optional[AuthData] = None) -> str:
        """Get authentication string from provided auth or config object.

        Args:
        ----
            auth (Optional[AuthData]): Authentication data. If None, uses config auth.

        Returns:
        -------
            str: JSON string containing authentication data.

        """
        _auth = auth
        if auth is None:
            _auth = self._config.auth

        bearer_data = {
            key: getattr(_auth, key)
            for key in ["account_id", "public_key", "signature", "callback_url", "message", "nonce", "recipient"]
        }

        return json.dumps(bearer_data)

    def completions(
        self,
        model: str,
        messages: Iterable[ChatCompletionMessageParam],
        stream: bool = False,
        temperature: Optional[float] = None,
        auth: Optional[AuthData] = None,
        max_tokens: Optional[int] = None,
        **kwargs: Any,
    ) -> Union[ModelResponse, CustomStreamWrapper]:
        """Takes a `model` and `messages` and returns completions.

        `model` can be:
        1. full path `provider::model_full_path`.
        2. `model_short_name`. Default provider will be used.
        """
        if self._config.nearai_hub is None:
            raise ValueError("Missing NearAI Hub config")
        provider, _ = get_provider_model(self._config.nearai_hub.default_provider, model)

        auth_bearer_token = self.get_auth_str(auth)

        if temperature is None:
            temperature = DEFAULT_MODEL_TEMPERATURE

        if max_tokens is None:
            max_tokens = DEFAULT_MODEL_MAX_TOKENS

        # NOTE(#246): this is to disable "Provider List" messages.
        litellm.suppress_debug_info = True

        self._endpoint = lambda model, messages, stream, temperature, max_tokens, **kwargs: litellm_completion(
            model,
            messages,
            stream=stream,
            custom_llm_provider=self._config.nearai_hub.custom_llm_provider,
            input_cost_per_token=0,
            output_cost_per_token=0,
            temperature=temperature,
            max_tokens=max_tokens,
            base_url=self._config.nearai_hub.base_url,
            provider=provider,
            api_key=auth_bearer_token,
            **kwargs,
        )

        try:
            result: Union[ModelResponse, CustomStreamWrapper] = self._endpoint(
                model=model, messages=messages, stream=stream, temperature=temperature, max_tokens=max_tokens, **kwargs
            )
        except Exception as e:
            raise ValueError(f"Bad request: {e}") from None

        return result

    def query_vector_store(
        self, vector_store_id: str, query: str, auth: Optional[AuthData] = None
    ) -> List[SimilaritySearch]:
        """Query a vector store."""
        if self._config.nearai_hub is None:
            raise ValueError("Missing NearAI Hub config")

        auth_bearer_token = self.get_auth_str(auth)

        headers = {"Content-Type": "application/json", "Authorization": f"Bearer {auth_bearer_token}"}

        data = {"query": query}

        endpoint = f"{self._config.nearai_hub.base_url}/vector_stores/{vector_store_id}/search"

        try:
            response = requests.post(endpoint, headers=headers, json=data)
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            raise ValueError(f"Error querying vector store: {e}") from None
completions
completions(model: str, messages: Iterable[ChatCompletionMessageParam], stream: bool = False, temperature: Optional[float] = None, auth: Optional[AuthData] = None, max_tokens: Optional[int] = None, **kwargs: Any) -> Union[ModelResponse, CustomStreamWrapper]

Takes a model and messages and returns completions.

model can be: 1. full path provider::model_full_path. 2. model_short_name. Default provider will be used.

Source code in nearai/completion.py
def completions(
    self,
    model: str,
    messages: Iterable[ChatCompletionMessageParam],
    stream: bool = False,
    temperature: Optional[float] = None,
    auth: Optional[AuthData] = None,
    max_tokens: Optional[int] = None,
    **kwargs: Any,
) -> Union[ModelResponse, CustomStreamWrapper]:
    """Takes a `model` and `messages` and returns completions.

    `model` can be:
    1. full path `provider::model_full_path`.
    2. `model_short_name`. Default provider will be used.
    """
    if self._config.nearai_hub is None:
        raise ValueError("Missing NearAI Hub config")
    provider, _ = get_provider_model(self._config.nearai_hub.default_provider, model)

    auth_bearer_token = self.get_auth_str(auth)

    if temperature is None:
        temperature = DEFAULT_MODEL_TEMPERATURE

    if max_tokens is None:
        max_tokens = DEFAULT_MODEL_MAX_TOKENS

    # NOTE(#246): this is to disable "Provider List" messages.
    litellm.suppress_debug_info = True

    self._endpoint = lambda model, messages, stream, temperature, max_tokens, **kwargs: litellm_completion(
        model,
        messages,
        stream=stream,
        custom_llm_provider=self._config.nearai_hub.custom_llm_provider,
        input_cost_per_token=0,
        output_cost_per_token=0,
        temperature=temperature,
        max_tokens=max_tokens,
        base_url=self._config.nearai_hub.base_url,
        provider=provider,
        api_key=auth_bearer_token,
        **kwargs,
    )

    try:
        result: Union[ModelResponse, CustomStreamWrapper] = self._endpoint(
            model=model, messages=messages, stream=stream, temperature=temperature, max_tokens=max_tokens, **kwargs
        )
    except Exception as e:
        raise ValueError(f"Bad request: {e}") from None

    return result
get_auth_str
get_auth_str(auth: Optional[AuthData] = None) -> str

Get authentication string from provided auth or config object.


auth (Optional[AuthData]): Authentication data. If None, uses config auth.

str: JSON string containing authentication data.
Source code in nearai/completion.py
def get_auth_str(self, auth: Optional[AuthData] = None) -> str:
    """Get authentication string from provided auth or config object.

    Args:
    ----
        auth (Optional[AuthData]): Authentication data. If None, uses config auth.

    Returns:
    -------
        str: JSON string containing authentication data.

    """
    _auth = auth
    if auth is None:
        _auth = self._config.auth

    bearer_data = {
        key: getattr(_auth, key)
        for key in ["account_id", "public_key", "signature", "callback_url", "message", "nonce", "recipient"]
    }

    return json.dumps(bearer_data)
query_vector_store
query_vector_store(vector_store_id: str, query: str, auth: Optional[AuthData] = None) -> List[SimilaritySearch]

Query a vector store.

Source code in nearai/completion.py
def query_vector_store(
    self, vector_store_id: str, query: str, auth: Optional[AuthData] = None
) -> List[SimilaritySearch]:
    """Query a vector store."""
    if self._config.nearai_hub is None:
        raise ValueError("Missing NearAI Hub config")

    auth_bearer_token = self.get_auth_str(auth)

    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {auth_bearer_token}"}

    data = {"query": query}

    endpoint = f"{self._config.nearai_hub.base_url}/vector_stores/{vector_store_id}/search"

    try:
        response = requests.post(endpoint, headers=headers, json=data)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as e:
        raise ValueError(f"Error querying vector store: {e}") from None

config

AuthData

Bases: BaseModel

Source code in nearai/config.py
class AuthData(BaseModel):
    account_id: str
    signature: str
    public_key: str
    callback_url: str
    nonce: str
    recipient: str
    message: str

    def generate_bearer_token(self):
        """Generates a JSON-encoded bearer token containing authentication data."""
        required_keys = {"account_id", "public_key", "signature", "callback_url", "message", "nonce", "recipient"}

        for key in required_keys:
            if getattr(self, key) is None:
                raise ValueError(f"Missing required auth data: {key}")

        bearer_data = {key: getattr(self, key) for key in required_keys}

        return json.dumps(bearer_data)
generate_bearer_token
generate_bearer_token()

Generates a JSON-encoded bearer token containing authentication data.

Source code in nearai/config.py
def generate_bearer_token(self):
    """Generates a JSON-encoded bearer token containing authentication data."""
    required_keys = {"account_id", "public_key", "signature", "callback_url", "message", "nonce", "recipient"}

    for key in required_keys:
        if getattr(self, key) is None:
            raise ValueError(f"Missing required auth data: {key}")

    bearer_data = {key: getattr(self, key) for key in required_keys}

    return json.dumps(bearer_data)

Config

Bases: BaseModel

Source code in nearai/config.py
class Config(BaseModel):
    origin: Optional[str] = None
    api_url: Optional[str] = "https://api.near.ai"
    inference_url: str = "http://localhost:5000/v1/"
    inference_api_key: str = "n/a"
    nearai_hub: Optional[NearAiHubConfig] = NearAiHubConfig()
    confirm_commands: bool = True
    auth: Optional[AuthData] = None

    def update_with(self, extra_config: Dict[str, Any], map_key: Callable[[str], str] = lambda x: x) -> "Config":
        """Update the config with the given dictionary."""
        dict_repr = self.model_dump()
        keys = list(map(map_key, dict_repr.keys()))

        for key in keys:
            value = extra_config.get(key, None)

            if value:
                # This will skip empty values, even if they are set in the `extra_config`
                dict_repr[key] = value

        return Config.model_validate(dict_repr)

    def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
        """Get the value of a key in the config if it exists."""
        return getattr(self, key, default)
get
get(key: str, default: Optional[Any] = None) -> Optional[Any]

Get the value of a key in the config if it exists.

Source code in nearai/config.py
def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
    """Get the value of a key in the config if it exists."""
    return getattr(self, key, default)
update_with
update_with(extra_config: Dict[str, Any], map_key: Callable[[str], str] = lambda x: x) -> Config

Update the config with the given dictionary.

Source code in nearai/config.py
def update_with(self, extra_config: Dict[str, Any], map_key: Callable[[str], str] = lambda x: x) -> "Config":
    """Update the config with the given dictionary."""
    dict_repr = self.model_dump()
    keys = list(map(map_key, dict_repr.keys()))

    for key in keys:
        value = extra_config.get(key, None)

        if value:
            # This will skip empty values, even if they are set in the `extra_config`
            dict_repr[key] = value

    return Config.model_validate(dict_repr)

NearAiHubConfig

Bases: BaseModel

NearAiHub Config.

login_with_near (Optional[bool]): Indicates whether to attempt login using Near Auth.

api_key (Optional[str]): The API key to use if Near Auth is not being utilized

base_url (Optional[str]): NearAI Hub url

default_provider (Optional[str]): Default provider name

default_model (Optional[str]): Default model name

custom_llm_provider (Optional[str]): provider to be used by litellm proxy

Source code in nearai/config.py
class NearAiHubConfig(BaseModel):
    """NearAiHub Config.

    login_with_near (Optional[bool]): Indicates whether to attempt login using Near Auth.

    api_key (Optional[str]): The API key to use if Near Auth is not being utilized

    base_url (Optional[str]): NearAI Hub url

    default_provider (Optional[str]): Default provider name

    default_model (Optional[str]): Default model name

    custom_llm_provider (Optional[str]): provider to be used by litellm proxy
    """

    base_url: str = "https://api.near.ai/v1"
    default_provider: str = DEFAULT_PROVIDER
    default_model: str = DEFAULT_PROVIDER_MODEL
    custom_llm_provider: str = "openai"
    login_with_near: Optional[bool] = True
    api_key: Optional[str] = ""

dataset

get_dataset

get_dataset(name: str, verbose: bool = True) -> Path

Download the dataset from the registry and download it locally if it hasn't been downloaded yet.

:param name: The name of the entry to download the dataset. The format should be namespace/name/version. :return: The path to the downloaded dataset

Source code in nearai/dataset.py
def get_dataset(name: str, verbose: bool = True) -> Path:
    """Download the dataset from the registry and download it locally if it hasn't been downloaded yet.

    :param name: The name of the entry to download the dataset. The format should be namespace/name/version.
    :return: The path to the downloaded dataset
    """
    return registry.download(name, verbose=verbose)

load_dataset

load_dataset(alias_or_name: str, verbose: bool = True) -> Union[Dataset, DatasetDict]

Load a dataset from the registry.

Source code in nearai/dataset.py
def load_dataset(alias_or_name: str, verbose: bool = True) -> Union[Dataset, DatasetDict]:
    """Load a dataset from the registry."""
    path = get_dataset(alias_or_name, verbose=verbose)
    return load_from_disk(path.as_posix())

environment

Environment

Bases: object

Source code in nearai/environment.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
class Environment(object):
    def __init__(  # noqa: D107
        self,
        path: str,
        agents: List[Agent],
        config: Config,
        create_files: bool = True,
        env_vars: Optional[Dict[str, Any]] = None,
        tool_resources: Optional[Dict[str, Any]] = None,
        print_system_log: bool = False,
    ) -> None:
        self._path = path
        self._agents = agents
        self._done = False
        self._config = config
        self._inference = InferenceRouter(config)
        self._tools = ToolRegistry()
        self.register_standard_tools()
        self.env_vars: Dict[str, Any] = env_vars if env_vars else {}
        self._last_used_model = ""
        self.tool_resources: Dict[str, Any] = tool_resources if tool_resources else {}
        self.print_system_log = print_system_log

        if self._config.nearai_hub is None:
            self._config.nearai_hub = NearAiHubConfig()

        if create_files:
            os.makedirs(self._path, exist_ok=True)
            open(os.path.join(self._path, CHAT_FILENAME), "a").close()
        os.chdir(self._path)

    @staticmethod
    def _generate_run_id() -> str:
        return uuid.uuid4().hex

    def get_tool_registry(self) -> ToolRegistry:  # noqa: D102
        """Returns the tool registry, a dictionary of tools that can be called by the agent."""
        return self._tools

    def register_standard_tools(self) -> None:  # noqa: D102
        reg = self.get_tool_registry()
        reg.register_tool(self.exec_command)
        reg.register_tool(self.read_file)
        reg.register_tool(self.write_file)
        reg.register_tool(self.request_user_input)
        reg.register_tool(self.list_files)
        reg.register_tool(self.verify_message)
        reg.register_tool(self.query_vector_store)

    def add_message(self, role: str, message: str, filename: str = CHAT_FILENAME, **kwargs: Any) -> None:
        """Add a message to the chat file."""
        with open(os.path.join(self._path, filename), "a") as f:
            f.write(json.dumps({"role": role, "content": message, **kwargs}) + DELIMITER)

    def add_system_log(self, log: str, level: int = logging.INFO) -> None:
        """Add system log with timestamp and log level."""
        logger = logging.getLogger("system_logger")
        if not logger.handlers:
            # Configure the logger if it hasn't been set up yet
            logger.setLevel(logging.DEBUG)
            file_handler = logging.FileHandler(os.path.join(self._path, SYSTEM_LOG_FILENAME))
            formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
            file_handler.setFormatter(formatter)
            logger.addHandler(file_handler)

            if self.print_system_log:
                console_handler = logging.StreamHandler()
                console_handler.setFormatter(formatter)
                logger.addHandler(console_handler)

        # Log the message
        logger.log(level, log)

    def add_agent_log(self, log: str, level: int = logging.INFO) -> None:
        """Add agent log with timestamp and log level."""
        logger = logging.getLogger("agent_logger")
        if not logger.handlers:
            # Configure the logger if it hasn't been set up yet
            logger.setLevel(logging.DEBUG)
            file_handler = logging.FileHandler(os.path.join(self._path, AGENT_LOG_FILENAME))
            formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
            file_handler.setFormatter(formatter)
            logger.addHandler(file_handler)

        # Log the message
        logger.log(level, log)

    def _add_agent_start_system_log(self, agent_idx: int) -> None:
        """Add agent start system log."""
        agent = self._agents[agent_idx]
        message = f"Starting an agent {agent.name}"
        if agent.model != "":
            model = self.get_model_for_inference(agent.model)
            self._last_used_model = model
            message += f" that will connect to {model}"
            if agent.model_temperature:
                message += f", temperature={agent.model_temperature}"
            if agent.model_max_tokens:
                message += f", max_tokens={agent.model_max_tokens}"
        self.add_system_log(message)

    def list_terminal_commands(self, filename: str = TERMINAL_FILENAME) -> List[Any]:
        """Returns the terminal commands from the terminal file."""
        return self.list_messages(filename)

    def list_messages(self, filename: str = CHAT_FILENAME) -> List[Any]:
        """Returns messages from a specified file."""
        path = os.path.join(self._path, filename)

        if not os.path.exists(path):
            return []

        with open(path, "r") as f:
            return [json.loads(message) for message in f.read().split(DELIMITER) if message]

    def verify_message(
        self, account_id: str, public_key: str, signature: str, message: str, nonce: str, callback_url: str
    ) -> bool:
        """Verify user message signed with NEAR Account."""
        return near.verify_signed_message(
            account_id, public_key, signature, message, nonce, self._agents[0].name, callback_url
        )

    def list_files(self, path: str) -> List[str]:
        """Lists files in the environment.

        path: The path to list files from.
        """
        return os.listdir(os.path.join(self._path, path))

    def get_path(self) -> str:  # noqa: D102
        """Returns the path of the current directory."""
        return self._path

    def read_file(self, filename: str) -> str:
        """Read a file from the environment.

        filename: The name of the file to read.
        """
        if not os.path.exists(os.path.join(self._path, filename)):
            return ""
        try:
            with open(os.path.join(self._path, filename), "r") as f:
                return f.read()
        except Exception as e:
            return f"failed to read file: {e}"

    def write_file(self, filename: str, content: str) -> str:
        """Writes a file to the environment.

        filename: The name of the file to write to
        content: The content to write to the file.
        """
        path = Path(self._path) / filename
        path.parent.mkdir(parents=True, exist_ok=True)
        with open(path, "w") as f:
            f.write(content)
        return f"Successfully wrote {len(content) if content else 0} characters to {filename}"

    def query_vector_store(self, vector_store_id: str, query: str):
        """Query a vector store.

        vector_store_id: The id of the vector store to query.
        query: The query to search for.
        """
        return self._inference.query_vector_store(vector_store_id, query)

    def exec_command(self, command: str) -> Dict[str, Union[str, int]]:
        """Executes a command in the environment and logs the output.

        The environment does not allow running interactive programs. It will run a program for 1 second then will interrupt it if it is still running or if it is waiting for user input.
        command: The command to execute, like 'ls -l' or 'python3 tests.py'
        """  # noqa: E501
        if self._config.get("confirm_commands", True):
            yes_no = input("> Do you want to run the following command? (Y/n): " + command)
            if yes_no != "" and yes_no.lower() != "y":
                return {
                    "command": command,
                    "returncode": 999,
                    "stdout": "",
                    "stderr": "declined by user",
                }

        try:
            process = subprocess.Popen(
                shlex.split(command),
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                bufsize=0,
                universal_newlines=True,
                cwd=self._path,
            )
        except Exception as e:
            return {
                "command": command,
                "returncode": 999,
                "stdout": "",
                "stderr": "Failed to execute: " + str(e),
            }

        msg = ""

        def kill_process_tree(p: Any) -> None:
            nonlocal msg
            msg = "Killing process due to timeout"

            process = psutil.Process(p.pid)
            for proc in process.children(recursive=True):
                proc.kill()
            process.kill()

        timer = threading.Timer(2, kill_process_tree, (process,))
        timer.start()
        process.wait()
        timer.cancel()

        result = {
            "command": command,
            "stdout": process.stdout.read() if process.stdout and hasattr(process.stdout, "read") else "",
            "stderr": process.stderr.read() if process.stderr and hasattr(process.stderr, "read") else "",
            "returncode": process.returncode,
            "msg": msg,
        }
        with open(os.path.join(self._path, TERMINAL_FILENAME), "a") as f:
            f.write(json.dumps(result) + DELIMITER)
        return result

    def get_model_for_inference(self, model: str = "") -> str:
        """Returns 'provider::model_full_path' or 'model_short_name' if provider is default or not given."""
        provider = self._agents[0].model_provider if self._agents else ""
        if model == "":
            model = self._agents[0].model if self._agents else ""
        if model == "":
            return DEFAULT_PROVIDER_MODEL

        # TODO(#225): convert model_short_name -> model_full_path before passing to AI Hub.
        # Until it's not implemented assume the model given from metadata for not default provider
        # is already model_full_path, or model_short_name as used by fireworks.
        if provider == "" or provider == DEFAULT_PROVIDER:
            return model
        return provider + PROVIDER_MODEL_SEP + model

    def _run_inference_completions(
        self,
        messages: Iterable[ChatCompletionMessageParam] | str,
        model: Iterable[ChatCompletionMessageParam] | str,
        stream: bool,
        **kwargs: Any,
    ) -> Union[ModelResponse, CustomStreamWrapper]:
        """Run inference completions for given parameters."""
        if isinstance(messages, str):
            self.add_system_log("Deprecated completions call. Pass `messages` as a first parameter.", logging.WARNING)
            messages_or_model = messages
            model_or_messages = model
            model = cast(str, messages_or_model)
            messages = cast(Iterable[ChatCompletionMessageParam], model_or_messages)
        else:
            model = cast(str, model)
            messages = cast(Iterable[ChatCompletionMessageParam], messages)
        model = self.get_model_for_inference(model)
        if model != self._last_used_model:
            self._last_used_model = model
            self.add_system_log(f"Connecting to {model}")
        return self._inference.completions(
            model,
            messages,
            stream=stream,
            temperature=self._agents[0].model_temperature if self._agents else None,
            max_tokens=self._agents[0].model_max_tokens if self._agents else None,
            **kwargs,
        )

    # TODO(286): `messages` may be model and `model` may be messages temporarily to support deprecated API.
    def completions(
        self,
        messages: Iterable[ChatCompletionMessageParam] | str,
        model: Iterable[ChatCompletionMessageParam] | str = "",
        stream: bool = False,
        **kwargs: Any,
    ) -> Union[ModelResponse, CustomStreamWrapper]:
        """Returns all completions for given messages using the given model."""
        return self._run_inference_completions(messages, model, stream, **kwargs)

    # TODO(286): `messages` may be model and `model` may be messages temporarily to support deprecated API.
    def completions_and_run_tools(
        self,
        messages: Iterable[ChatCompletionMessageParam] | str,
        model: Iterable[ChatCompletionMessageParam] | str = "",
        tools: Optional[List] = None,
        **kwargs: Any,
    ) -> ModelResponse:
        """Returns all completions for given messages using the given model and runs tools."""
        raw_response = self._run_inference_completions(messages, model, stream=False, tools=tools, **kwargs)
        assert isinstance(raw_response, ModelResponse), "Expected ModelResponse"
        response: ModelResponse = raw_response
        assert all(map(lambda choice: isinstance(choice, Choices), response.choices)), "Expected Choices"
        choices: List[Choices] = response.choices  # type: ignore
        response_message = choices[0].message
        if hasattr(response_message, "tool_calls") and response_message.tool_calls:
            for tool_call in response_message.tool_calls:
                function_name = tool_call.function.name
                assert function_name, "Tool call must have a function name"
                function_args = json.loads(tool_call.function.arguments)
                function_response = self._tools.call_tool(function_name, **function_args)

                if function_response:
                    function_response_json = json.dumps(function_response) if function_response else ""
                    self.add_message("tool", function_response_json, tool_call_id=tool_call.id, name=function_name)
        return response

    # TODO(286): `messages` may be model and `model` may be messages temporarily to support deprecated API.
    def completion(
        self,
        messages: Iterable[ChatCompletionMessageParam] | str,
        model: Iterable[ChatCompletionMessageParam] | str = "",
        auth: Dict | Optional[AuthData] = None,
    ) -> str:
        """Returns a completion for the given messages using the given model."""
        if isinstance(auth, Dict):
            auth = AuthData(**auth)
        raw_response = self.completions(messages, model, auth=auth)
        assert isinstance(raw_response, ModelResponse), "Expected ModelResponse"
        response: ModelResponse = raw_response
        assert all(map(lambda choice: isinstance(choice, Choices), response.choices)), "Expected Choices"
        choices: List[Choices] = response.choices  # type: ignore
        response_message = choices[0].message.content
        assert response_message, "No completions returned"
        return response_message

    # TODO(286): `messages` may be model and `model` may be messages temporarily to support deprecated API.
    def completion_and_run_tools(
        self,
        messages: Iterable[ChatCompletionMessageParam] | str,
        model: Iterable[ChatCompletionMessageParam] | str = "",
        tools: Optional[List] = None,
        **kwargs: Any,
    ) -> str:
        """Returns a completion for the given messages using the given model and runs tools."""
        completion_tools_response = self.completions_and_run_tools(messages, model, tools, **kwargs)
        assert all(
            map(lambda choice: isinstance(choice, Choices), completion_tools_response.choices)
        ), "Expected Choices"
        choices: List[Choices] = completion_tools_response.choices  # type: ignore
        response_message = choices[0].message.content
        assert response_message, "No completions returned"
        return response_message

    def call_agent(self, agent_path: int, task: str) -> None:
        """Calls agent with given task."""
        self._agents[agent_path].run(self, task=task)

    def get_agents(self) -> List[Agent]:
        """Returns list of agents available in environment."""
        return self._agents

    def is_done(self) -> bool:  # noqa: D102
        return self._done

    def mark_done(self) -> None:  # noqa: D102
        self._done = True

    def create_snapshot(self) -> bytes:
        """Create an in memory snapshot."""
        with tempfile.NamedTemporaryFile(suffix=".tar.gz") as f:
            with tarfile.open(fileobj=f, mode="w:gz") as tar:
                tar.add(self._path, arcname=".")
            f.flush()
            f.seek(0)
            snapshot = f.read()
        return snapshot

    def save_to_registry(
        self,
        path: str,
        run_type: str,
        run_id: str,
        base_id: Optional[Union[str, int]] = None,
        run_name: Optional[str] = None,
    ) -> Optional[bytes]:
        """Save Environment to Registry."""
        if self._config.auth is None:
            print("Warning: Authentication is not set up. Run not saved to registry. To log in, run `nearai login`")
            return None

        agent_name = self._agents[0].name if self._agents else "unknown"
        generated_name = f"environment_run_{agent_name}_{run_id}"
        name = run_name or generated_name

        tempdir = Path(tempfile.mkdtemp())
        environment_path = tempdir / "environment.tar.gz"

        with open(environment_path, "w+b") as f:
            with tarfile.open(fileobj=f, mode="w:gz") as tar:
                tar.add(path, arcname=".")
            f.flush()
            f.seek(0)
            snapshot = f.read()
            tar_filename = f.name

            timestamp = datetime.now(timezone.utc).isoformat()

            entry_location = registry.upload(
                tempdir,
                EntryMetadata.from_dict(
                    {
                        "name": name,
                        "version": "0.0.1",
                        "description": f"Agent {run_type} run {agent_name}",
                        "category": "environment",
                        "tags": ["environment"],
                        "details": {
                            "base_id": base_id,
                            "timestamp": timestamp,
                            "agents": [agent.name for agent in self._agents],
                            "run_id": run_id,
                            "run_type": run_type,
                            "filename": tar_filename,
                        },
                        "show_entry": True,
                    }
                ),
                show_progress=True,
            )

            location_str = plain_location(entry_location)

            print(f"Saved environment {entry_location} to registry. To load use flag `--load-env={location_str}`.")

        rmtree(tempdir)
        return snapshot

    def load_snapshot(self, snapshot: bytes) -> None:
        """Load Environment from Snapshot."""
        shutil.rmtree(self._path, ignore_errors=True)

        with tempfile.NamedTemporaryFile(suffix=".tar.gz") as f:
            f.write(snapshot)
            f.flush()
            f.seek(0)

            with tarfile.open(fileobj=f, mode="r:gz") as tar:
                tar.extractall(self._path)

    def load_from_registry(self, load_env: str) -> str:  # noqa: D102
        print(f"Loading environment from {load_env} to {self._path}")

        directory = registry.download(load_env)
        assert directory is not None, "Failed to download environment"

        files = os.listdir(directory)
        tarfile_file = next(f for f in files if f.endswith(".tar.gz"))

        with tarfile.open(directory / tarfile_file, "r") as tar:
            tar.extractall(self._path)
        return directory.name

    def __str__(self) -> str:  # noqa: D105
        return f"Environment({self._path})"

    def run_agent(self, task: Optional[str]) -> None:  # noqa: D102
        self._agents[0].run(self, task=task)

    def request_user_input(self) -> None:
        """Must be called to request input from the user."""
        self.set_next_actor("user")

    def clear_temp_agent_files(self) -> None:  # noqa: D102
        """Remove temp agent files created to be used in `runpy`."""
        for agent in self._agents:
            if agent.temp_dir and os.path.exists(agent.temp_dir):
                shutil.rmtree(agent.temp_dir)

    def set_next_actor(self, who: str) -> None:  # noqa: D102
        """Set the next actor / action in the dialogue."""
        next_action_fn = os.path.join(self._path, ".next_action")

        with open(next_action_fn, "w") as f:
            f.write(who)

    def get_next_actor(self) -> str:  # noqa: D102
        next_action_fn = os.path.join(self._path, ".next_action")

        if os.path.exists(next_action_fn):
            with open(next_action_fn) as f:
                return f.read().strip(" \n")
        else:
            # By default the user starts the conversation.
            return "user"

    def run_interactive(self, record_run: str = "", load_env: str = "") -> None:
        """Run an interactive session within the given environment."""
        run_id = self._generate_run_id()
        if load_env:
            base_id = self.load_from_registry(load_env)
        else:
            base_id = None
        last_message_idx = 0

        self._add_agent_start_system_log(agent_idx=0)

        if self._agents[0].welcome_description:
            if self._agents[0].welcome_title:
                print(f"{self._agents[0].welcome_title}: {self._agents[0].welcome_description}")
            else:
                print(self._agents[0].welcome_description)

        def print_messages(last_message_idx: int) -> int:
            messages = self.list_messages()
            for item in messages[last_message_idx:]:
                print(f"[{item['role']}]: {item['content']}", flush=True)
            return len(messages)

        last_message_idx = print_messages(last_message_idx)

        iteration_count = 0
        while True:
            if self.get_next_actor() != "user":
                messages = self.list_messages()
                new_message = None if not messages else messages[-1]["content"]

                iteration_count += 1
                self.run_agent(new_message)

                last_message_idx = print_messages(last_message_idx)
                if self.is_done():
                    break

            else:
                new_message = input("> ")
                if new_message == "exit":
                    break
                self.add_message("user", new_message)

                self.set_next_actor("agent")

        self.clear_temp_agent_files()

        if record_run:
            run_name = record_run if record_run and record_run != "true" else None
            self.save_to_registry(self._path, "interactive", run_id, base_id, run_name)

    def run_task(
        self,
        task: str,
        record_run: str = "",
        load_env: str = "",
        max_iterations: int = 10,
    ) -> None:
        """Runs a task within the given environment."""
        run_id = self._generate_run_id()
        if load_env:
            base_id = self.load_from_registry(load_env)
        else:
            base_id = None
        iteration = 0

        self._add_agent_start_system_log(agent_idx=0)

        if task:
            self.add_message("user", task)

        while iteration < max_iterations and not self.is_done():
            iteration += 1
            self._agents[0].run(self, task=task)

        if record_run:
            run_name = record_run if record_run and record_run != "true" else None
            self.save_to_registry(self._path, "task", run_id, base_id, run_name)

    def inspect(self) -> None:  # noqa: D102
        filename = Path(os.path.abspath(__file__)).parent / "streamlit_inspect.py"
        subprocess.call(["streamlit", "run", filename, "--", self._path])

    def contains_non_empty_chat_txt(self, directory: str) -> bool:  # noqa: D102
        chat_txt_path = os.path.join(directory, "chat.txt")
        return os.path.isfile(chat_txt_path) and os.path.getsize(chat_txt_path) > 0

    def save_folder(self, name: Optional[str] = None) -> None:  # noqa: D102
        path = self._path
        temp_dir = None

        def copy_relevant_folders(src: str, dest: str) -> None:
            for item in os.listdir(src):
                s = os.path.join(src, item)
                d = os.path.join(dest, item)
                if os.path.isdir(s):
                    if self.contains_non_empty_chat_txt(s):
                        shutil.copytree(s, d)
                    else:
                        os.makedirs(d, exist_ok=True)
                        copy_relevant_folders(s, d)
                        if not os.listdir(d):
                            os.rmdir(d)

        if not self.contains_non_empty_chat_txt(path):
            temp_dir = tempfile.mkdtemp()
            copy_relevant_folders(path, temp_dir)
            path = temp_dir

        try:
            if not os.listdir(path):
                raise ValueError(f"No files found in {path}")

            self.save_to_registry(
                path, "folders" if temp_dir else "folder", self.generate_folder_hash_id(path), None, name
            )
        finally:
            if temp_dir:
                shutil.rmtree(temp_dir)

    def save_from_history(self, lines: List[str], name: Optional[str] = None) -> None:  # noqa: D102
        # Parse lines and extract relevant information
        pattern = r"^\s*(?:\d+\s+)?(\S+)\s+environment\s+interactive\s+(\S+)\s+(\S+)(.*?)$"
        relevant_paths = {}
        for line in lines:
            match = re.match(pattern, line)
            if match:
                program_name, agents, path, other_args = match.groups()
                path = path.strip("/")
                if self.contains_non_empty_chat_txt(path):
                    command = f"{program_name} environment interactive {agents} {path} {other_args}"
                    relevant_paths[path] = {"command": command.strip()}

        if not relevant_paths:
            raise ValueError("No relevant paths with non-empty chat.txt files found in history")

        for path, info in relevant_paths.items():
            print(path)
            # Write start_command.log
            with open(os.path.join(path, "start_command.log"), "w") as f:
                f.write(info["command"])

        # Create temporary directory and copy relevant folders
        temp_dir = tempfile.mkdtemp()
        try:
            for path, _info in relevant_paths.items():
                dest = os.path.join(temp_dir, path.replace("/", "_").strip("_"))
                shutil.copytree(path, dest)
            self.save_to_registry(temp_dir, "folders", self.generate_folder_hash_id(temp_dir), None, name)

        finally:
            shutil.rmtree(temp_dir)

    def generate_folder_hash_id(self, path: str) -> str:
        """Returns id similar to _generate_run_id(), but based on files and their contents in path, including subfolders."""  # noqa: E501
        hash_obj = hashlib.md5()

        for root, _dirs, files in os.walk(path):
            for file in sorted(files):
                file_path = os.path.join(root, file)
                with open(file_path, "rb") as f:
                    while chunk := f.read(8192):
                        hash_obj.update(chunk)

        return hash_obj.hexdigest()
add_agent_log
add_agent_log(log: str, level: int = logging.INFO) -> None

Add agent log with timestamp and log level.

Source code in nearai/environment.py
def add_agent_log(self, log: str, level: int = logging.INFO) -> None:
    """Add agent log with timestamp and log level."""
    logger = logging.getLogger("agent_logger")
    if not logger.handlers:
        # Configure the logger if it hasn't been set up yet
        logger.setLevel(logging.DEBUG)
        file_handler = logging.FileHandler(os.path.join(self._path, AGENT_LOG_FILENAME))
        formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

    # Log the message
    logger.log(level, log)
add_message
add_message(role: str, message: str, filename: str = CHAT_FILENAME, **kwargs: Any) -> None

Add a message to the chat file.

Source code in nearai/environment.py
def add_message(self, role: str, message: str, filename: str = CHAT_FILENAME, **kwargs: Any) -> None:
    """Add a message to the chat file."""
    with open(os.path.join(self._path, filename), "a") as f:
        f.write(json.dumps({"role": role, "content": message, **kwargs}) + DELIMITER)
add_system_log
add_system_log(log: str, level: int = logging.INFO) -> None

Add system log with timestamp and log level.

Source code in nearai/environment.py
def add_system_log(self, log: str, level: int = logging.INFO) -> None:
    """Add system log with timestamp and log level."""
    logger = logging.getLogger("system_logger")
    if not logger.handlers:
        # Configure the logger if it hasn't been set up yet
        logger.setLevel(logging.DEBUG)
        file_handler = logging.FileHandler(os.path.join(self._path, SYSTEM_LOG_FILENAME))
        formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

        if self.print_system_log:
            console_handler = logging.StreamHandler()
            console_handler.setFormatter(formatter)
            logger.addHandler(console_handler)

    # Log the message
    logger.log(level, log)
call_agent
call_agent(agent_path: int, task: str) -> None

Calls agent with given task.

Source code in nearai/environment.py
def call_agent(self, agent_path: int, task: str) -> None:
    """Calls agent with given task."""
    self._agents[agent_path].run(self, task=task)
clear_temp_agent_files
clear_temp_agent_files() -> None

Remove temp agent files created to be used in runpy.

Source code in nearai/environment.py
def clear_temp_agent_files(self) -> None:  # noqa: D102
    """Remove temp agent files created to be used in `runpy`."""
    for agent in self._agents:
        if agent.temp_dir and os.path.exists(agent.temp_dir):
            shutil.rmtree(agent.temp_dir)
completion
completion(messages: Iterable[ChatCompletionMessageParam] | str, model: Iterable[ChatCompletionMessageParam] | str = '', auth: Dict | Optional[AuthData] = None) -> str

Returns a completion for the given messages using the given model.

Source code in nearai/environment.py
def completion(
    self,
    messages: Iterable[ChatCompletionMessageParam] | str,
    model: Iterable[ChatCompletionMessageParam] | str = "",
    auth: Dict | Optional[AuthData] = None,
) -> str:
    """Returns a completion for the given messages using the given model."""
    if isinstance(auth, Dict):
        auth = AuthData(**auth)
    raw_response = self.completions(messages, model, auth=auth)
    assert isinstance(raw_response, ModelResponse), "Expected ModelResponse"
    response: ModelResponse = raw_response
    assert all(map(lambda choice: isinstance(choice, Choices), response.choices)), "Expected Choices"
    choices: List[Choices] = response.choices  # type: ignore
    response_message = choices[0].message.content
    assert response_message, "No completions returned"
    return response_message
completion_and_run_tools
completion_and_run_tools(messages: Iterable[ChatCompletionMessageParam] | str, model: Iterable[ChatCompletionMessageParam] | str = '', tools: Optional[List] = None, **kwargs: Any) -> str

Returns a completion for the given messages using the given model and runs tools.

Source code in nearai/environment.py
def completion_and_run_tools(
    self,
    messages: Iterable[ChatCompletionMessageParam] | str,
    model: Iterable[ChatCompletionMessageParam] | str = "",
    tools: Optional[List] = None,
    **kwargs: Any,
) -> str:
    """Returns a completion for the given messages using the given model and runs tools."""
    completion_tools_response = self.completions_and_run_tools(messages, model, tools, **kwargs)
    assert all(
        map(lambda choice: isinstance(choice, Choices), completion_tools_response.choices)
    ), "Expected Choices"
    choices: List[Choices] = completion_tools_response.choices  # type: ignore
    response_message = choices[0].message.content
    assert response_message, "No completions returned"
    return response_message
completions
completions(messages: Iterable[ChatCompletionMessageParam] | str, model: Iterable[ChatCompletionMessageParam] | str = '', stream: bool = False, **kwargs: Any) -> Union[ModelResponse, CustomStreamWrapper]

Returns all completions for given messages using the given model.

Source code in nearai/environment.py
def completions(
    self,
    messages: Iterable[ChatCompletionMessageParam] | str,
    model: Iterable[ChatCompletionMessageParam] | str = "",
    stream: bool = False,
    **kwargs: Any,
) -> Union[ModelResponse, CustomStreamWrapper]:
    """Returns all completions for given messages using the given model."""
    return self._run_inference_completions(messages, model, stream, **kwargs)
completions_and_run_tools
completions_and_run_tools(messages: Iterable[ChatCompletionMessageParam] | str, model: Iterable[ChatCompletionMessageParam] | str = '', tools: Optional[List] = None, **kwargs: Any) -> ModelResponse

Returns all completions for given messages using the given model and runs tools.

Source code in nearai/environment.py
def completions_and_run_tools(
    self,
    messages: Iterable[ChatCompletionMessageParam] | str,
    model: Iterable[ChatCompletionMessageParam] | str = "",
    tools: Optional[List] = None,
    **kwargs: Any,
) -> ModelResponse:
    """Returns all completions for given messages using the given model and runs tools."""
    raw_response = self._run_inference_completions(messages, model, stream=False, tools=tools, **kwargs)
    assert isinstance(raw_response, ModelResponse), "Expected ModelResponse"
    response: ModelResponse = raw_response
    assert all(map(lambda choice: isinstance(choice, Choices), response.choices)), "Expected Choices"
    choices: List[Choices] = response.choices  # type: ignore
    response_message = choices[0].message
    if hasattr(response_message, "tool_calls") and response_message.tool_calls:
        for tool_call in response_message.tool_calls:
            function_name = tool_call.function.name
            assert function_name, "Tool call must have a function name"
            function_args = json.loads(tool_call.function.arguments)
            function_response = self._tools.call_tool(function_name, **function_args)

            if function_response:
                function_response_json = json.dumps(function_response) if function_response else ""
                self.add_message("tool", function_response_json, tool_call_id=tool_call.id, name=function_name)
    return response
create_snapshot
create_snapshot() -> bytes

Create an in memory snapshot.

Source code in nearai/environment.py
def create_snapshot(self) -> bytes:
    """Create an in memory snapshot."""
    with tempfile.NamedTemporaryFile(suffix=".tar.gz") as f:
        with tarfile.open(fileobj=f, mode="w:gz") as tar:
            tar.add(self._path, arcname=".")
        f.flush()
        f.seek(0)
        snapshot = f.read()
    return snapshot
exec_command
exec_command(command: str) -> Dict[str, Union[str, int]]

Executes a command in the environment and logs the output.

The environment does not allow running interactive programs. It will run a program for 1 second then will interrupt it if it is still running or if it is waiting for user input. command: The command to execute, like 'ls -l' or 'python3 tests.py'

Source code in nearai/environment.py
def exec_command(self, command: str) -> Dict[str, Union[str, int]]:
    """Executes a command in the environment and logs the output.

    The environment does not allow running interactive programs. It will run a program for 1 second then will interrupt it if it is still running or if it is waiting for user input.
    command: The command to execute, like 'ls -l' or 'python3 tests.py'
    """  # noqa: E501
    if self._config.get("confirm_commands", True):
        yes_no = input("> Do you want to run the following command? (Y/n): " + command)
        if yes_no != "" and yes_no.lower() != "y":
            return {
                "command": command,
                "returncode": 999,
                "stdout": "",
                "stderr": "declined by user",
            }

    try:
        process = subprocess.Popen(
            shlex.split(command),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            bufsize=0,
            universal_newlines=True,
            cwd=self._path,
        )
    except Exception as e:
        return {
            "command": command,
            "returncode": 999,
            "stdout": "",
            "stderr": "Failed to execute: " + str(e),
        }

    msg = ""

    def kill_process_tree(p: Any) -> None:
        nonlocal msg
        msg = "Killing process due to timeout"

        process = psutil.Process(p.pid)
        for proc in process.children(recursive=True):
            proc.kill()
        process.kill()

    timer = threading.Timer(2, kill_process_tree, (process,))
    timer.start()
    process.wait()
    timer.cancel()

    result = {
        "command": command,
        "stdout": process.stdout.read() if process.stdout and hasattr(process.stdout, "read") else "",
        "stderr": process.stderr.read() if process.stderr and hasattr(process.stderr, "read") else "",
        "returncode": process.returncode,
        "msg": msg,
    }
    with open(os.path.join(self._path, TERMINAL_FILENAME), "a") as f:
        f.write(json.dumps(result) + DELIMITER)
    return result
generate_folder_hash_id
generate_folder_hash_id(path: str) -> str

Returns id similar to _generate_run_id(), but based on files and their contents in path, including subfolders.

Source code in nearai/environment.py
def generate_folder_hash_id(self, path: str) -> str:
    """Returns id similar to _generate_run_id(), but based on files and their contents in path, including subfolders."""  # noqa: E501
    hash_obj = hashlib.md5()

    for root, _dirs, files in os.walk(path):
        for file in sorted(files):
            file_path = os.path.join(root, file)
            with open(file_path, "rb") as f:
                while chunk := f.read(8192):
                    hash_obj.update(chunk)

    return hash_obj.hexdigest()
get_agents
get_agents() -> List[Agent]

Returns list of agents available in environment.

Source code in nearai/environment.py
def get_agents(self) -> List[Agent]:
    """Returns list of agents available in environment."""
    return self._agents
get_model_for_inference
get_model_for_inference(model: str = '') -> str

Returns 'provider::model_full_path' or 'model_short_name' if provider is default or not given.

Source code in nearai/environment.py
def get_model_for_inference(self, model: str = "") -> str:
    """Returns 'provider::model_full_path' or 'model_short_name' if provider is default or not given."""
    provider = self._agents[0].model_provider if self._agents else ""
    if model == "":
        model = self._agents[0].model if self._agents else ""
    if model == "":
        return DEFAULT_PROVIDER_MODEL

    # TODO(#225): convert model_short_name -> model_full_path before passing to AI Hub.
    # Until it's not implemented assume the model given from metadata for not default provider
    # is already model_full_path, or model_short_name as used by fireworks.
    if provider == "" or provider == DEFAULT_PROVIDER:
        return model
    return provider + PROVIDER_MODEL_SEP + model
get_path
get_path() -> str

Returns the path of the current directory.

Source code in nearai/environment.py
def get_path(self) -> str:  # noqa: D102
    """Returns the path of the current directory."""
    return self._path
get_tool_registry
get_tool_registry() -> ToolRegistry

Returns the tool registry, a dictionary of tools that can be called by the agent.

Source code in nearai/environment.py
def get_tool_registry(self) -> ToolRegistry:  # noqa: D102
    """Returns the tool registry, a dictionary of tools that can be called by the agent."""
    return self._tools
list_files
list_files(path: str) -> List[str]

Lists files in the environment.

path: The path to list files from.

Source code in nearai/environment.py
def list_files(self, path: str) -> List[str]:
    """Lists files in the environment.

    path: The path to list files from.
    """
    return os.listdir(os.path.join(self._path, path))
list_messages
list_messages(filename: str = CHAT_FILENAME) -> List[Any]

Returns messages from a specified file.

Source code in nearai/environment.py
def list_messages(self, filename: str = CHAT_FILENAME) -> List[Any]:
    """Returns messages from a specified file."""
    path = os.path.join(self._path, filename)

    if not os.path.exists(path):
        return []

    with open(path, "r") as f:
        return [json.loads(message) for message in f.read().split(DELIMITER) if message]
list_terminal_commands
list_terminal_commands(filename: str = TERMINAL_FILENAME) -> List[Any]

Returns the terminal commands from the terminal file.

Source code in nearai/environment.py
def list_terminal_commands(self, filename: str = TERMINAL_FILENAME) -> List[Any]:
    """Returns the terminal commands from the terminal file."""
    return self.list_messages(filename)
load_snapshot
load_snapshot(snapshot: bytes) -> None

Load Environment from Snapshot.

Source code in nearai/environment.py
def load_snapshot(self, snapshot: bytes) -> None:
    """Load Environment from Snapshot."""
    shutil.rmtree(self._path, ignore_errors=True)

    with tempfile.NamedTemporaryFile(suffix=".tar.gz") as f:
        f.write(snapshot)
        f.flush()
        f.seek(0)

        with tarfile.open(fileobj=f, mode="r:gz") as tar:
            tar.extractall(self._path)
query_vector_store
query_vector_store(vector_store_id: str, query: str)

Query a vector store.

vector_store_id: The id of the vector store to query. query: The query to search for.

Source code in nearai/environment.py
def query_vector_store(self, vector_store_id: str, query: str):
    """Query a vector store.

    vector_store_id: The id of the vector store to query.
    query: The query to search for.
    """
    return self._inference.query_vector_store(vector_store_id, query)
read_file
read_file(filename: str) -> str

Read a file from the environment.

filename: The name of the file to read.

Source code in nearai/environment.py
def read_file(self, filename: str) -> str:
    """Read a file from the environment.

    filename: The name of the file to read.
    """
    if not os.path.exists(os.path.join(self._path, filename)):
        return ""
    try:
        with open(os.path.join(self._path, filename), "r") as f:
            return f.read()
    except Exception as e:
        return f"failed to read file: {e}"
request_user_input
request_user_input() -> None

Must be called to request input from the user.

Source code in nearai/environment.py
def request_user_input(self) -> None:
    """Must be called to request input from the user."""
    self.set_next_actor("user")
run_interactive
run_interactive(record_run: str = '', load_env: str = '') -> None

Run an interactive session within the given environment.

Source code in nearai/environment.py
def run_interactive(self, record_run: str = "", load_env: str = "") -> None:
    """Run an interactive session within the given environment."""
    run_id = self._generate_run_id()
    if load_env:
        base_id = self.load_from_registry(load_env)
    else:
        base_id = None
    last_message_idx = 0

    self._add_agent_start_system_log(agent_idx=0)

    if self._agents[0].welcome_description:
        if self._agents[0].welcome_title:
            print(f"{self._agents[0].welcome_title}: {self._agents[0].welcome_description}")
        else:
            print(self._agents[0].welcome_description)

    def print_messages(last_message_idx: int) -> int:
        messages = self.list_messages()
        for item in messages[last_message_idx:]:
            print(f"[{item['role']}]: {item['content']}", flush=True)
        return len(messages)

    last_message_idx = print_messages(last_message_idx)

    iteration_count = 0
    while True:
        if self.get_next_actor() != "user":
            messages = self.list_messages()
            new_message = None if not messages else messages[-1]["content"]

            iteration_count += 1
            self.run_agent(new_message)

            last_message_idx = print_messages(last_message_idx)
            if self.is_done():
                break

        else:
            new_message = input("> ")
            if new_message == "exit":
                break
            self.add_message("user", new_message)

            self.set_next_actor("agent")

    self.clear_temp_agent_files()

    if record_run:
        run_name = record_run if record_run and record_run != "true" else None
        self.save_to_registry(self._path, "interactive", run_id, base_id, run_name)
run_task
run_task(task: str, record_run: str = '', load_env: str = '', max_iterations: int = 10) -> None

Runs a task within the given environment.

Source code in nearai/environment.py
def run_task(
    self,
    task: str,
    record_run: str = "",
    load_env: str = "",
    max_iterations: int = 10,
) -> None:
    """Runs a task within the given environment."""
    run_id = self._generate_run_id()
    if load_env:
        base_id = self.load_from_registry(load_env)
    else:
        base_id = None
    iteration = 0

    self._add_agent_start_system_log(agent_idx=0)

    if task:
        self.add_message("user", task)

    while iteration < max_iterations and not self.is_done():
        iteration += 1
        self._agents[0].run(self, task=task)

    if record_run:
        run_name = record_run if record_run and record_run != "true" else None
        self.save_to_registry(self._path, "task", run_id, base_id, run_name)
save_to_registry
save_to_registry(path: str, run_type: str, run_id: str, base_id: Optional[Union[str, int]] = None, run_name: Optional[str] = None) -> Optional[bytes]

Save Environment to Registry.

Source code in nearai/environment.py
def save_to_registry(
    self,
    path: str,
    run_type: str,
    run_id: str,
    base_id: Optional[Union[str, int]] = None,
    run_name: Optional[str] = None,
) -> Optional[bytes]:
    """Save Environment to Registry."""
    if self._config.auth is None:
        print("Warning: Authentication is not set up. Run not saved to registry. To log in, run `nearai login`")
        return None

    agent_name = self._agents[0].name if self._agents else "unknown"
    generated_name = f"environment_run_{agent_name}_{run_id}"
    name = run_name or generated_name

    tempdir = Path(tempfile.mkdtemp())
    environment_path = tempdir / "environment.tar.gz"

    with open(environment_path, "w+b") as f:
        with tarfile.open(fileobj=f, mode="w:gz") as tar:
            tar.add(path, arcname=".")
        f.flush()
        f.seek(0)
        snapshot = f.read()
        tar_filename = f.name

        timestamp = datetime.now(timezone.utc).isoformat()

        entry_location = registry.upload(
            tempdir,
            EntryMetadata.from_dict(
                {
                    "name": name,
                    "version": "0.0.1",
                    "description": f"Agent {run_type} run {agent_name}",
                    "category": "environment",
                    "tags": ["environment"],
                    "details": {
                        "base_id": base_id,
                        "timestamp": timestamp,
                        "agents": [agent.name for agent in self._agents],
                        "run_id": run_id,
                        "run_type": run_type,
                        "filename": tar_filename,
                    },
                    "show_entry": True,
                }
            ),
            show_progress=True,
        )

        location_str = plain_location(entry_location)

        print(f"Saved environment {entry_location} to registry. To load use flag `--load-env={location_str}`.")

    rmtree(tempdir)
    return snapshot
set_next_actor
set_next_actor(who: str) -> None

Set the next actor / action in the dialogue.

Source code in nearai/environment.py
def set_next_actor(self, who: str) -> None:  # noqa: D102
    """Set the next actor / action in the dialogue."""
    next_action_fn = os.path.join(self._path, ".next_action")

    with open(next_action_fn, "w") as f:
        f.write(who)
verify_message
verify_message(account_id: str, public_key: str, signature: str, message: str, nonce: str, callback_url: str) -> bool

Verify user message signed with NEAR Account.

Source code in nearai/environment.py
def verify_message(
    self, account_id: str, public_key: str, signature: str, message: str, nonce: str, callback_url: str
) -> bool:
    """Verify user message signed with NEAR Account."""
    return near.verify_signed_message(
        account_id, public_key, signature, message, nonce, self._agents[0].name, callback_url
    )
write_file
write_file(filename: str, content: str) -> str

Writes a file to the environment.

filename: The name of the file to write to content: The content to write to the file.

Source code in nearai/environment.py
def write_file(self, filename: str, content: str) -> str:
    """Writes a file to the environment.

    filename: The name of the file to write to
    content: The content to write to the file.
    """
    path = Path(self._path) / filename
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w") as f:
        f.write(content)
    return f"Successfully wrote {len(content) if content else 0} characters to {filename}"

evaluation

evaluation_table

evaluation_table(namespace: str = '', tags: str = '') -> Tuple[Dict[tuple[tuple[str, Any], ...], Dict[str, str]], List[str], List[str]]

Returns rows, columns, and important columns.

Source code in nearai/evaluation.py
def evaluation_table(
    namespace: str = "", tags: str = ""
) -> Tuple[Dict[tuple[tuple[str, Any], ...], Dict[str, str]], List[str], List[str]]:
    """Returns rows, columns, and important columns."""
    # Make sure tags is a comma-separated list of tags
    tags_l = parse_tags(tags)
    tags = ",".join(tags_l)

    entries = registry.list(
        namespace=namespace,
        category="evaluation",
        tags=tags,
        total=10000,
        offset=0,
        show_all=False,
        show_latest_version=True,
    )
    rows: Dict[tuple[tuple[str, Any], ...], Dict[str, str]] = {}
    metric_names: Set[str] = set()
    important_metric_names: Set[str] = set()
    for entry in entries:
        evaluation_name = f"{entry.namespace}/{entry.name}/{entry.version}"
        evaluation_path = registry.download(evaluation_name, verbose=False)
        metrics_path = evaluation_path / "metrics.json"
        with open(metrics_path, "r") as f:
            metrics = json.load(f)
            key = {
                "model": metrics[EVALUATED_ENTRY_METADATA].get("model", ""),
                "agent": metrics[EVALUATED_ENTRY_METADATA].get("agent", ""),
                "namespace": metrics[EVALUATED_ENTRY_METADATA].get("namespace", ""),
                "version": metrics[EVALUATED_ENTRY_METADATA].get("version", ""),
                "provider": metrics[EVALUATED_ENTRY_METADATA].get("provider", ""),
            }

            # Convert the key dictionary to a tuple to use as a key in rows
            key_tuple = tuple(key.items())

            # Initialize the inner dictionary if this key doesn't exist
            if key_tuple not in rows:
                rows[key_tuple] = {}

            # Add all other metrics that are not EVALUATED_ENTRY_METADATA
            for metric_name, metric_value in metrics.items():
                if metric_name == EVALUATED_ENTRY_METADATA:
                    continue
                if _is_important_metric(metric_name, metrics):
                    important_metric_names.add(metric_name)
                rows[key_tuple][metric_name] = str(metric_value)
                metric_names.add(metric_name)

    sorted_metric_names = sorted(metric_names)
    columns = ["model", "agent", "namespace", "version", "provider"] + sorted_metric_names
    important_columns = ["model", "agent"] + sorted(important_metric_names)
    return rows, columns, important_columns

print_evaluation_table

print_evaluation_table(rows: Dict[tuple[tuple[str, Any], ...], Dict[str, str]], columns: List[str], important_columns: List[str], all_key_columns: bool, all_metrics: bool, num_columns: int, metric_name_max_length: int) -> None

Prints table of evaluations.

Source code in nearai/evaluation.py
def print_evaluation_table(
    rows: Dict[tuple[tuple[str, Any], ...], Dict[str, str]],
    columns: List[str],
    important_columns: List[str],
    all_key_columns: bool,
    all_metrics: bool,
    num_columns: int,
    metric_name_max_length: int,
) -> None:
    """Prints table of evaluations."""
    metric_names = columns[5:] if all_metrics else important_columns[2:]
    _print_metrics_tables(rows, metric_names, num_columns, all_key_columns, metric_name_max_length)

record_evaluation_metrics

record_evaluation_metrics(solver_strategy: SolverStrategy, metrics: Dict[str, Any], prepend_evaluation_name: bool = True) -> None

Uploads evaluation metrics into registry.

Source code in nearai/evaluation.py
def record_evaluation_metrics(
    solver_strategy: SolverStrategy, metrics: Dict[str, Any], prepend_evaluation_name: bool = True
) -> None:
    """Uploads evaluation metrics into registry."""
    evaluation_name = solver_strategy.evaluation_name()
    model = ""
    agent = ""
    version = ""

    if model_metadata := solver_strategy.model_metadata():
        model = model_metadata.get("name", "")
        version = model_metadata.get("version", "")

    if agent_metadata := solver_strategy.agent_metadata():
        agent = agent_metadata.get("name", "")
        version = agent_metadata.get("version", "")

    upload_evaluation(
        evaluation_name,
        metrics if not prepend_evaluation_name else _prepend_name_to_metrics(evaluation_name, metrics),
        model,
        agent,
        solver_strategy.evaluated_entry_namespace(),
        version,
        solver_strategy.model_provider(),
    )

record_single_score_evaluation

record_single_score_evaluation(solver_strategy: SolverStrategy, score: float) -> None

Uploads single score evaluation into registry.

Source code in nearai/evaluation.py
def record_single_score_evaluation(solver_strategy: SolverStrategy, score: float) -> None:
    """Uploads single score evaluation into registry."""
    evaluation_name = solver_strategy.evaluation_name()
    record_evaluation_metrics(solver_strategy, {evaluation_name: score}, False)

upload_evaluation

upload_evaluation(evaluation_name: str, metrics: Dict[str, Any], model: str = '', agent: str = '', namespace: str = '', version: str = '', provider: str = '') -> None

Uploads evaluation into registry.

evaluation_name: a unique name for (benchmark, solver) tuple, e.g. "mbpp" or "live_bench" or "mmlu-5-shot". metrics: metrics from evaluation. model: model that was used. agent: agent that was evaluated, in any. namespace: namespace of evaluated agent or evaluated model. version: version of evaluated agent or evaluated model. provider: provider of model used; pass local if running locally.

Source code in nearai/evaluation.py
def upload_evaluation(
    evaluation_name: str,
    metrics: Dict[str, Any],
    model: str = "",
    agent: str = "",
    namespace: str = "",
    version: str = "",
    provider: str = "",
) -> None:
    """Uploads evaluation into registry.

    `evaluation_name`: a unique name for (benchmark, solver) tuple, e.g. "mbpp" or "live_bench" or "mmlu-5-shot".
    `metrics`: metrics from evaluation.
    `model`: model that was used.
    `agent`: agent that was evaluated, in any.
    `namespace`: namespace of evaluated agent or evaluated model.
    `version`: version of evaluated agent or evaluated model.
    `provider`: provider of model used; pass `local` if running locally.
    """
    key = f"evaluation_{evaluation_name}"
    metrics[EVALUATED_ENTRY_METADATA] = {}
    if agent != "":
        metrics[EVALUATED_ENTRY_METADATA]["agent"] = agent
        key += f"_agent_{agent}"
    if model != "":
        metrics[EVALUATED_ENTRY_METADATA]["model"] = model
        key += f"_model_{model}"
    if namespace != "":
        metrics[EVALUATED_ENTRY_METADATA]["namespace"] = namespace
        key += f"_namespace_{namespace}"
    if version != "":
        metrics[EVALUATED_ENTRY_METADATA]["version"] = version
        key += f"_version_{version}"
    if provider != "":
        metrics[EVALUATED_ENTRY_METADATA]["provider"] = provider
        key += f"_provider_{provider}"

    entry_path = get_registry_folder() / key
    # Create folder entry_path if not present
    entry_path.mkdir(parents=True, exist_ok=True)
    # Write file metrics.json inside
    metrics_file = entry_path / "metrics.json"
    with metrics_file.open("w") as f:
        json.dump(metrics, f, indent=2)

    metadata_path = entry_path / "metadata.json"
    # TODO(#273): Currently that will not update existing evaluation.
    with open(metadata_path, "w") as f:
        json.dump(
            {
                "name": key,
                "version": "0.0.1",
                "description": "",
                "category": "evaluation",
                "tags": [],
                "details": {},
                "show_entry": True,
            },
            f,
            indent=2,
        )

    registry.upload(Path(entry_path), show_progress=True)

finetune

FinetuneCli

Source code in nearai/finetune/__init__.py
class FinetuneCli:
    def start(
        self,
        model: str,
        tokenizer: str,
        dataset: str,
        num_procs: int,
        format: str,
        upload_checkpoint: bool = True,
        num_nodes: int = 1,
        job_id: Optional[str] = None,
        checkpoint: Optional[str] = None,
        **dataset_kwargs: Any,
    ) -> None:
        """Start a finetuning job on the current node.

        Args:
        ----
            model: Name of a model in the registry. Base model to finetune.
            tokenizer: Name of a tokenizer in the registry. Using tokenizer.model format.
            dataset: Name of a dataset in the registry.
            num_procs: Number of GPUs to use for training
            format: Name of the configuration file to use. For example llama3-70b, llama3-8b. Valid options are in etc/finetune.
            upload_checkpoint: Whether to upload the checkpoint to the registry. Default is True.
            num_nodes: Number of nodes to use for training. Default is 1.
            job_id: Unique identifier for the job. Default is None.
            checkpoint: Name of the model checkpoint to start from. Default is None.
            dataset_kwargs: Additional keyword arguments to pass to the dataset constructor.

        """  # noqa: E501
        from nearai.dataset import get_dataset

        assert num_nodes >= 1

        # Prepare job id folder
        if job_id is None:
            job_id = "job"
        job_id = f"{job_id}-{timestamp()}-{randint(10**8, 10**9 - 1)}"
        job_folder = DATA_FOLDER / "finetune" / job_id
        job_folder.mkdir(parents=True, exist_ok=True)

        # Either use the provided config file template or load one predefined one
        if Path(format).exists():
            config_template_path = Path(format)
        else:
            configs = ETC_FOLDER / "finetune"
            config_template_path = configs / f"{format}.yml"

        if not config_template_path.exists():
            raise FileNotFoundError(f"Config file not found: {config_template_path}")

        CONFIG_TEMPLATE = config_template_path.read_text()  # noqa: N806

        # Download model
        model_path = get_model(model)

        # Download tokenizer
        tokenizer_path = registry.download(tokenizer) / "tokenizer.model"
        assert tokenizer_path.exists(), f"tokenizer.model not found in {tokenizer_path}"

        # Download checkpoint if any
        checkpoint_path = get_model(checkpoint) if checkpoint else "null"
        resume_checkpoint = checkpoint_path != "null"

        # Download dataset
        dataset_path = get_dataset(dataset)

        # Set up output directories
        checkpoint_output_dir = job_folder / "checkpoint_output"
        logging_output_dir = job_folder / "logs"
        logging_output_dir.mkdir(parents=True, exist_ok=True)

        # Prepare config file
        dataset_args_dict = deepcopy(dataset_kwargs)

        dataset_args_dict["_component_"] = dataset_args_dict.pop("method")
        dataset_args_dict["source"] = str(dataset_path.absolute())
        dataset_args = "\n".join(f"  {key}: {value}" for key, value in dataset_args_dict.items())

        config = job_folder / "config.yaml"
        with open(config, "w") as f:
            f.write(
                CONFIG_TEMPLATE.format(
                    TOKENIZER=str(tokenizer_path),
                    MODEL=str(model_path),
                    RECIPE_CHECKPOINT=checkpoint_path,
                    RESUME_FROM_CHECKPOINT=resume_checkpoint,
                    CHECKPOINT_OUTPUT_DIR=str(checkpoint_output_dir),
                    DATASET_ARGS=dataset_args,
                    LOGGING_OUTPUT_DIR=str(logging_output_dir),
                )
            )

        # Spawn background thread to read logs and push to database
        threading.Thread(target=find_new_logs_background, args=(logging_output_dir, job_id)).start()

        print("Starting job at", job_folder)
        if num_nodes == 1:
            run(
                [
                    "tune",
                    "run",
                    "--nproc_per_node",
                    str(num_procs),
                    "lora_finetune_distributed",
                    "--config",
                    str(config),
                ]
            )
        else:
            # Fetch rank and master addr from environment variables
            raise NotImplementedError()

        global BACKGROUND_PROCESS
        BACKGROUND_PROCESS = False

        if upload_checkpoint:
            registry.upload(
                job_folder,
                EntryMetadata.from_dict(
                    {
                        "name": f"finetune-{job_id}",
                        "version": "0.0.1",
                        "description": f"Finetuned checkpoint from base mode {model} using dataset {dataset}",
                        "category": "finetune",
                        "tags": ["finetune", f"base-model-{model}", f"base-dataset-{dataset}"],
                        "details": dict(
                            model=model,
                            tokenizer=tokenizer,
                            dataset=dataset,
                            num_procs=num_procs,
                            format=format,
                            num_nodes=num_nodes,
                            checkpoint=checkpoint,
                            **dataset_kwargs,
                        ),
                        "show_entry": True,
                    }
                ),
                show_progress=True,
            )

    def inspect(self, job_id: str) -> None:  # noqa: D102
        raise NotImplementedError()
start
start(model: str, tokenizer: str, dataset: str, num_procs: int, format: str, upload_checkpoint: bool = True, num_nodes: int = 1, job_id: Optional[str] = None, checkpoint: Optional[str] = None, **dataset_kwargs: Any) -> None

Start a finetuning job on the current node.


model: Name of a model in the registry. Base model to finetune.
tokenizer: Name of a tokenizer in the registry. Using tokenizer.model format.
dataset: Name of a dataset in the registry.
num_procs: Number of GPUs to use for training
format: Name of the configuration file to use. For example llama3-70b, llama3-8b. Valid options are in etc/finetune.
upload_checkpoint: Whether to upload the checkpoint to the registry. Default is True.
num_nodes: Number of nodes to use for training. Default is 1.
job_id: Unique identifier for the job. Default is None.
checkpoint: Name of the model checkpoint to start from. Default is None.
dataset_kwargs: Additional keyword arguments to pass to the dataset constructor.
Source code in nearai/finetune/__init__.py
def start(
    self,
    model: str,
    tokenizer: str,
    dataset: str,
    num_procs: int,
    format: str,
    upload_checkpoint: bool = True,
    num_nodes: int = 1,
    job_id: Optional[str] = None,
    checkpoint: Optional[str] = None,
    **dataset_kwargs: Any,
) -> None:
    """Start a finetuning job on the current node.

    Args:
    ----
        model: Name of a model in the registry. Base model to finetune.
        tokenizer: Name of a tokenizer in the registry. Using tokenizer.model format.
        dataset: Name of a dataset in the registry.
        num_procs: Number of GPUs to use for training
        format: Name of the configuration file to use. For example llama3-70b, llama3-8b. Valid options are in etc/finetune.
        upload_checkpoint: Whether to upload the checkpoint to the registry. Default is True.
        num_nodes: Number of nodes to use for training. Default is 1.
        job_id: Unique identifier for the job. Default is None.
        checkpoint: Name of the model checkpoint to start from. Default is None.
        dataset_kwargs: Additional keyword arguments to pass to the dataset constructor.

    """  # noqa: E501
    from nearai.dataset import get_dataset

    assert num_nodes >= 1

    # Prepare job id folder
    if job_id is None:
        job_id = "job"
    job_id = f"{job_id}-{timestamp()}-{randint(10**8, 10**9 - 1)}"
    job_folder = DATA_FOLDER / "finetune" / job_id
    job_folder.mkdir(parents=True, exist_ok=True)

    # Either use the provided config file template or load one predefined one
    if Path(format).exists():
        config_template_path = Path(format)
    else:
        configs = ETC_FOLDER / "finetune"
        config_template_path = configs / f"{format}.yml"

    if not config_template_path.exists():
        raise FileNotFoundError(f"Config file not found: {config_template_path}")

    CONFIG_TEMPLATE = config_template_path.read_text()  # noqa: N806

    # Download model
    model_path = get_model(model)

    # Download tokenizer
    tokenizer_path = registry.download(tokenizer) / "tokenizer.model"
    assert tokenizer_path.exists(), f"tokenizer.model not found in {tokenizer_path}"

    # Download checkpoint if any
    checkpoint_path = get_model(checkpoint) if checkpoint else "null"
    resume_checkpoint = checkpoint_path != "null"

    # Download dataset
    dataset_path = get_dataset(dataset)

    # Set up output directories
    checkpoint_output_dir = job_folder / "checkpoint_output"
    logging_output_dir = job_folder / "logs"
    logging_output_dir.mkdir(parents=True, exist_ok=True)

    # Prepare config file
    dataset_args_dict = deepcopy(dataset_kwargs)

    dataset_args_dict["_component_"] = dataset_args_dict.pop("method")
    dataset_args_dict["source"] = str(dataset_path.absolute())
    dataset_args = "\n".join(f"  {key}: {value}" for key, value in dataset_args_dict.items())

    config = job_folder / "config.yaml"
    with open(config, "w") as f:
        f.write(
            CONFIG_TEMPLATE.format(
                TOKENIZER=str(tokenizer_path),
                MODEL=str(model_path),
                RECIPE_CHECKPOINT=checkpoint_path,
                RESUME_FROM_CHECKPOINT=resume_checkpoint,
                CHECKPOINT_OUTPUT_DIR=str(checkpoint_output_dir),
                DATASET_ARGS=dataset_args,
                LOGGING_OUTPUT_DIR=str(logging_output_dir),
            )
        )

    # Spawn background thread to read logs and push to database
    threading.Thread(target=find_new_logs_background, args=(logging_output_dir, job_id)).start()

    print("Starting job at", job_folder)
    if num_nodes == 1:
        run(
            [
                "tune",
                "run",
                "--nproc_per_node",
                str(num_procs),
                "lora_finetune_distributed",
                "--config",
                str(config),
            ]
        )
    else:
        # Fetch rank and master addr from environment variables
        raise NotImplementedError()

    global BACKGROUND_PROCESS
    BACKGROUND_PROCESS = False

    if upload_checkpoint:
        registry.upload(
            job_folder,
            EntryMetadata.from_dict(
                {
                    "name": f"finetune-{job_id}",
                    "version": "0.0.1",
                    "description": f"Finetuned checkpoint from base mode {model} using dataset {dataset}",
                    "category": "finetune",
                    "tags": ["finetune", f"base-model-{model}", f"base-dataset-{dataset}"],
                    "details": dict(
                        model=model,
                        tokenizer=tokenizer,
                        dataset=dataset,
                        num_procs=num_procs,
                        format=format,
                        num_nodes=num_nodes,
                        checkpoint=checkpoint,
                        **dataset_kwargs,
                    ),
                    "show_entry": True,
                }
            ),
            show_progress=True,
        )

parse_line

parse_line(line: str) -> Tuple[int, dict[str, float]]

Example of line to be parsed.

Step 33 | loss:1.5400923490524292 lr:9.9e-05 tokens_per_second_per_gpu:101.22285588141214

Source code in nearai/finetune/__init__.py
def parse_line(line: str) -> Tuple[int, dict[str, float]]:
    """Example of line to be parsed.

    Step 33 | loss:1.5400923490524292 lr:9.9e-05 tokens_per_second_per_gpu:101.22285588141214
    """
    step_raw, metrics_raw = map(str.strip, line.strip(" \n").split("|"))
    step = int(step_raw.split(" ")[-1])
    metrics = {metric[0]: float(metric[1]) for metric in map(lambda metric: metric.split(":"), metrics_raw.split(" "))}
    return step, metrics

text_completion

TextCompletionDataset

Bases: Dataset

Freeform dataset for any unstructured text corpus. Quickly load any dataset from Hugging Face or local disk and tokenize it for your model.


tokenizer (BaseTokenizer): Tokenizer used to encode data. Tokenize must implement an ``encode`` and ``decode`` method.
source (str): path string of dataset, anything supported by Hugging Face's ``load_dataset``
    (https://huggingface.co/docs/datasets/en/package_reference/loading_methods#datasets.load_dataset.path)
column (str): name of column in the sample that contains the text data. This is typically required
    for Hugging Face datasets or tabular data. For local datasets with a single column, use the default "text",
    which is what is assigned by Hugging Face datasets when loaded into memory. Default is "text".
max_seq_len (Optional[int]): Maximum number of tokens in the returned input and label token id lists.
    Default is None, disabling truncation. We recommend setting this to the highest you can fit in memory
    and is supported by the model. For example, llama2-7B supports up to 4096 for sequence length.
**load_dataset_kwargs (Dict[str, Any]): additional keyword arguments to pass to ``load_dataset``.
Source code in nearai/finetune/text_completion.py
class TextCompletionDataset(Dataset):
    """Freeform dataset for any unstructured text corpus. Quickly load any dataset from Hugging Face or local disk and tokenize it for your model.

    Args:
    ----
        tokenizer (BaseTokenizer): Tokenizer used to encode data. Tokenize must implement an ``encode`` and ``decode`` method.
        source (str): path string of dataset, anything supported by Hugging Face's ``load_dataset``
            (https://huggingface.co/docs/datasets/en/package_reference/loading_methods#datasets.load_dataset.path)
        column (str): name of column in the sample that contains the text data. This is typically required
            for Hugging Face datasets or tabular data. For local datasets with a single column, use the default "text",
            which is what is assigned by Hugging Face datasets when loaded into memory. Default is "text".
        max_seq_len (Optional[int]): Maximum number of tokens in the returned input and label token id lists.
            Default is None, disabling truncation. We recommend setting this to the highest you can fit in memory
            and is supported by the model. For example, llama2-7B supports up to 4096 for sequence length.
        **load_dataset_kwargs (Dict[str, Any]): additional keyword arguments to pass to ``load_dataset``.

    """  # noqa: E501

    def __init__(  # noqa: D107
        self,
        tokenizer: BaseTokenizer,
        source: str,
        column: str = "text",
        split: Optional[str] = None,
        max_seq_len: Optional[int] = None,
        **load_dataset_kwargs: Dict[str, Any],
    ) -> None:
        self._tokenizer = tokenizer
        self._data = load_from_disk(source, **load_dataset_kwargs)
        if split is not None:
            self._data = self._data[split]
        self.max_seq_len = max_seq_len
        self._column = column

    def __len__(self) -> int:  # noqa: D105
        return len(self._data)

    def __getitem__(self, index: int) -> Dict[str, List[int]]:  # noqa: D105
        sample = self._data[index]
        return self._prepare_sample(sample)

    def _prepare_sample(self, sample: Mapping[str, Any]) -> Dict[str, List[int]]:
        prompt = sample[self._column]
        tokens = self._tokenizer.encode(text=prompt, add_bos=True, add_eos=True)

        # Truncate if needed, but don't coerce EOS id
        if self.max_seq_len is not None:
            tokens = truncate(tokens, self.max_seq_len - 1)

        # No need to offset labels by 1 - happens in the recipe
        labels = tokens.copy()

        return {"tokens": tokens, "labels": labels}
truncate
truncate(tokens: List[Any], max_seq_len: int, eos_id: Optional[Any] = None) -> List[Any]

Truncate a list of tokens to a maximum length. If eos_id is provided, the last token will be replaced with eos_id.


tokens (List[Any]): list of tokens to truncate
max_seq_len (int): maximum length of the list
eos_id (Optional[Any]): token to replace the last token with. If None, the
    last token will not be replaced. Default is None.

List[Any]: truncated list of tokens
Source code in nearai/finetune/text_completion.py
def truncate(
    tokens: List[Any],
    max_seq_len: int,
    eos_id: Optional[Any] = None,
) -> List[Any]:
    """Truncate a list of tokens to a maximum length. If eos_id is provided, the last token will be replaced with eos_id.

    Args:
    ----
        tokens (List[Any]): list of tokens to truncate
        max_seq_len (int): maximum length of the list
        eos_id (Optional[Any]): token to replace the last token with. If None, the
            last token will not be replaced. Default is None.

    Returns:
    -------
        List[Any]: truncated list of tokens

    """  # noqa: E501
    tokens_truncated = tokens[:max_seq_len]
    if eos_id is not None and tokens_truncated[-1] != eos_id:
        tokens_truncated[-1] = eos_id
    return tokens_truncated

hub

Hub

Bases: object

Source code in nearai/hub.py
class Hub(object):
    def __init__(self, config: Config) -> None:
        """Initializes the Hub class with the given configuration."""
        self.info = None
        self.provider = None
        self.model = None
        self.endpoint = None
        self.query = None
        self._config = config

    def parse_hub_chat_params(self, kwargs):
        """Parses and sets instance attributes from the given keyword arguments, using default values if needed."""
        if self._config.nearai_hub is None:
            self._config.nearai_hub = NearAiHubConfig()

        self.query = kwargs.get("query")
        self.endpoint = kwargs.get("endpoint", f"{self._config.nearai_hub.base_url}/chat/completions")
        self.model = kwargs.get("model", self._config.nearai_hub.default_model)
        self.provider = kwargs.get("provider", self._config.nearai_hub.default_provider)
        self.info = kwargs.get("info", False)

    def chat(self, kwargs):
        """Processes a chat request by sending parameters to the NearAI Hub and printing the response."""
        try:
            self.parse_hub_chat_params(kwargs)

            if not self.query:
                return print("Error: 'query' is required for the `hub chat` command.")

            if self._config.nearai_hub is None:
                self._config.nearai_hub = NearAiHubConfig()

            data = {
                "max_tokens": 256,
                "temperature": 1,
                "frequency_penalty": 0,
                "n": 1,
                "messages": [{"role": "user", "content": str(self.query)}],
                "model": self.model,
            }

            auth = self._config.auth

            if self._config.nearai_hub.login_with_near:
                bearer_token = auth.generate_bearer_token()
                headers = {"Content-Type": "application/json", "Authorization": f"Bearer {bearer_token}"}

                data["provider"] = self.provider
            elif self._config.nearai_hub.api_key:
                headers = {
                    "Content-Type": "application/json",
                    "Authorization": "Bearer {}".format(self._config.nearai_hub.api_key),
                }
            else:
                return print("Illegal NearAI Hub Config")

            if self.info:
                print(f"Requesting hub using NEAR Account {auth.account_id}")

            response = requests.post(self.endpoint, headers=headers, data=json.dumps(data))

            completion = response.json()

            print(completion["choices"][0]["message"]["content"])

        except Exception as e:
            print(f"Request failed: {e}")
__init__
__init__(config: Config) -> None

Initializes the Hub class with the given configuration.

Source code in nearai/hub.py
def __init__(self, config: Config) -> None:
    """Initializes the Hub class with the given configuration."""
    self.info = None
    self.provider = None
    self.model = None
    self.endpoint = None
    self.query = None
    self._config = config
chat
chat(kwargs)

Processes a chat request by sending parameters to the NearAI Hub and printing the response.

Source code in nearai/hub.py
def chat(self, kwargs):
    """Processes a chat request by sending parameters to the NearAI Hub and printing the response."""
    try:
        self.parse_hub_chat_params(kwargs)

        if not self.query:
            return print("Error: 'query' is required for the `hub chat` command.")

        if self._config.nearai_hub is None:
            self._config.nearai_hub = NearAiHubConfig()

        data = {
            "max_tokens": 256,
            "temperature": 1,
            "frequency_penalty": 0,
            "n": 1,
            "messages": [{"role": "user", "content": str(self.query)}],
            "model": self.model,
        }

        auth = self._config.auth

        if self._config.nearai_hub.login_with_near:
            bearer_token = auth.generate_bearer_token()
            headers = {"Content-Type": "application/json", "Authorization": f"Bearer {bearer_token}"}

            data["provider"] = self.provider
        elif self._config.nearai_hub.api_key:
            headers = {
                "Content-Type": "application/json",
                "Authorization": "Bearer {}".format(self._config.nearai_hub.api_key),
            }
        else:
            return print("Illegal NearAI Hub Config")

        if self.info:
            print(f"Requesting hub using NEAR Account {auth.account_id}")

        response = requests.post(self.endpoint, headers=headers, data=json.dumps(data))

        completion = response.json()

        print(completion["choices"][0]["message"]["content"])

    except Exception as e:
        print(f"Request failed: {e}")
parse_hub_chat_params
parse_hub_chat_params(kwargs)

Parses and sets instance attributes from the given keyword arguments, using default values if needed.

Source code in nearai/hub.py
def parse_hub_chat_params(self, kwargs):
    """Parses and sets instance attributes from the given keyword arguments, using default values if needed."""
    if self._config.nearai_hub is None:
        self._config.nearai_hub = NearAiHubConfig()

    self.query = kwargs.get("query")
    self.endpoint = kwargs.get("endpoint", f"{self._config.nearai_hub.base_url}/chat/completions")
    self.model = kwargs.get("model", self._config.nearai_hub.default_model)
    self.provider = kwargs.get("provider", self._config.nearai_hub.default_provider)
    self.info = kwargs.get("info", False)

lib

parse_location

parse_location(entry_location: str) -> EntryLocation

Create a EntryLocation from a string in the format namespace/name/version.

Source code in nearai/lib.py
def parse_location(entry_location: str) -> EntryLocation:
    """Create a EntryLocation from a string in the format namespace/name/version."""
    match = entry_location_pattern.match(entry_location)

    if match is None:
        raise ValueError(f"Invalid entry format: {entry_location}. Should have the format <namespace>/<name>/<version>")

    return EntryLocation(
        namespace=match.group("namespace"),
        name=match.group("name"),
        version=match.group("version"),
    )

login

AuthHandler

Bases: SimpleHTTPRequestHandler

Source code in nearai/login.py
class AuthHandler(http.server.SimpleHTTPRequestHandler):
    def log_message(self, format, *args):
        """Webserver logging method."""
        pass  # Override to suppress logging

    def do_GET(self):  # noqa: N802
        """Webserver GET method."""
        global NONCE, PORT

        script_path = Path(__file__).resolve()
        assets_folder = script_path.parent / "assets"

        if self.path.startswith("/capture"):
            with open(os.path.join(assets_folder, "auth_capture.html"), "r", encoding="utf-8") as file:
                content = file.read()
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.end_headers()
            self.wfile.write(content.encode("utf-8"))

        if self.path.startswith("/auth"):
            parsed_url = urlparse.urlparse(self.path)
            fragment = parsed_url.query
            params = urlparse.parse_qs(fragment)

            required_params = ["accountId", "signature", "publicKey"]

            if all(param in params for param in required_params):
                update_auth_config(
                    params["accountId"][0],
                    params["signature"][0],
                    params["publicKey"][0],
                    callback_url=generate_callback_url(PORT),
                    nonce=NONCE,
                )
            else:
                print("Required parameters not found")

            with open(os.path.join(assets_folder, "auth_complete.html"), "r", encoding="utf-8") as file:
                content = file.read()
            self.send_response(200)
            self.send_header("Content-type", "text/html")
            self.end_headers()
            self.wfile.write(content.encode("utf-8"))

            # Give the server some time to read the response before shutting it down
            def shutdown_server():
                global httpd
                time.sleep(2)  # Wait 2 seconds before shutting down
                if httpd:
                    httpd.shutdown()

            threading.Thread(target=shutdown_server).start()
do_GET
do_GET()

Webserver GET method.

Source code in nearai/login.py
def do_GET(self):  # noqa: N802
    """Webserver GET method."""
    global NONCE, PORT

    script_path = Path(__file__).resolve()
    assets_folder = script_path.parent / "assets"

    if self.path.startswith("/capture"):
        with open(os.path.join(assets_folder, "auth_capture.html"), "r", encoding="utf-8") as file:
            content = file.read()
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()
        self.wfile.write(content.encode("utf-8"))

    if self.path.startswith("/auth"):
        parsed_url = urlparse.urlparse(self.path)
        fragment = parsed_url.query
        params = urlparse.parse_qs(fragment)

        required_params = ["accountId", "signature", "publicKey"]

        if all(param in params for param in required_params):
            update_auth_config(
                params["accountId"][0],
                params["signature"][0],
                params["publicKey"][0],
                callback_url=generate_callback_url(PORT),
                nonce=NONCE,
            )
        else:
            print("Required parameters not found")

        with open(os.path.join(assets_folder, "auth_complete.html"), "r", encoding="utf-8") as file:
            content = file.read()
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()
        self.wfile.write(content.encode("utf-8"))

        # Give the server some time to read the response before shutting it down
        def shutdown_server():
            global httpd
            time.sleep(2)  # Wait 2 seconds before shutting down
            if httpd:
                httpd.shutdown()

        threading.Thread(target=shutdown_server).start()
log_message
log_message(format, *args)

Webserver logging method.

Source code in nearai/login.py
def log_message(self, format, *args):
    """Webserver logging method."""
    pass  # Override to suppress logging

find_open_port

find_open_port() -> int

Finds and returns an open port number by binding to a free port on the local machine.

Source code in nearai/login.py
def find_open_port() -> int:
    """Finds and returns an open port number by binding to a free port on the local machine."""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("", 0))
        return s.getsockname()[1]

generate_and_save_signature

generate_and_save_signature(account_id, private_key)

Generates a signature for the given account ID and private key, then updates the auth configuration.

Source code in nearai/login.py
def generate_and_save_signature(account_id, private_key):
    """Generates a signature for the given account ID and private key, then updates the auth configuration."""
    nonce = generate_nonce()
    payload = near.Payload(MESSAGE, nonce, RECIPIENT, None)

    signature, public_key = near.create_signature(private_key, payload)

    if update_auth_config(account_id, signature, public_key, None, nonce):
        print_login_status()

generate_callback_url

generate_callback_url(port)

Generates a callback URL using the specified port number.

Source code in nearai/login.py
def generate_callback_url(port):
    """Generates a callback URL using the specified port number."""
    return f"http://localhost:{port}/capture"

generate_nonce

generate_nonce()

Generates a nonce based on the current time in milliseconds.

Source code in nearai/login.py
def generate_nonce():
    """Generates a nonce based on the current time in milliseconds."""
    return str(int(time.time() * 1000))

login_with_file_credentials

login_with_file_credentials(account_id)

Logs in using credentials from a file for the specified account ID, generating and saving a signature.

Source code in nearai/login.py
def login_with_file_credentials(account_id):
    """Logs in using credentials from a file for the specified account ID, generating and saving a signature."""
    file_path = os.path.expanduser(os.path.join("~/.near-credentials/", "mainnet", f"{account_id}.json"))

    if os.path.exists(file_path):
        with open(file_path, "r") as file:
            content = file.read()
            account_data = json.loads(content)
            private_key = account_data.get("private_key", None)
            if not private_key:
                return print(f"Private key is missing for {account_id} on mainnet")
            generate_and_save_signature(account_id, account_data["private_key"])

    else:
        return print(f"Account data is missing for {account_id}")

login_with_near_auth

login_with_near_auth(remote, auth_url)

Initiates the login process using NEAR authentication, either starting a local server to handle the callback or providing a URL for remote authentication.

Source code in nearai/login.py
def login_with_near_auth(remote, auth_url):
    """Initiates the login process using NEAR authentication, either starting a local server to handle the callback or providing a URL for remote authentication."""  # noqa: E501
    global NONCE, PORT
    NONCE = generate_nonce()

    params = {
        "message": MESSAGE,
        "nonce": NONCE,
        "recipient": RECIPIENT,
    }

    if not remote:
        PORT = find_open_port()

        global httpd
        with socketserver.TCPServer(("", PORT), AuthHandler) as httpd:
            params["callbackUrl"] = f"http://localhost:{PORT}/capture"

            encoded_params = urlparse.urlencode(params)

            print_url_message(f"{auth_url}?{encoded_params}")

            httpd.serve_forever()

    else:
        encoded_params = urlparse.urlencode(params)

        print_url_message(f"{auth_url}?{encoded_params}")
        print("After visiting the URL, follow the instructions to save your auth signature")

print_login_status

print_login_status()

Prints the current authentication status if available in the config file.

Source code in nearai/login.py
def print_login_status():
    """Prints the current authentication status if available in the config file."""
    config = load_config_file()
    if config.get("auth") and config["auth"].get("account_id"):
        print(f'Auth data for: {config["auth"]["account_id"]}')
        print(f'signature: {config["auth"]["signature"]}')
        print(f'public_key: {config["auth"]["public_key"]}')
        print(f'nonce: {config["auth"]["nonce"]}')
        print(f'message: {config["auth"]["message"]}')
        print(f'recipient: {config["auth"]["recipient"]}')
    else:
        print("Near auth details not found")

print_url_message

print_url_message(url)

Prints a message instructing the user to visit the given URL to complete the login process.

Source code in nearai/login.py
def print_url_message(url):
    """Prints a message instructing the user to visit the given URL to complete the login process."""
    print(f"Please visit the following URL to complete the login process: {url}")

update_auth_config

update_auth_config(account_id, signature, public_key, callback_url, nonce)

Update authentication configuration if the provided signature is valid.

Source code in nearai/login.py
def update_auth_config(account_id, signature, public_key, callback_url, nonce):
    """Update authentication configuration if the provided signature is valid."""
    if near.verify_signed_message(
        account_id,
        public_key,
        signature,
        MESSAGE,
        nonce,
        RECIPIENT,
        callback_url,
    ):
        config = load_config_file()

        auth = AuthData.model_validate(
            {
                "account_id": account_id,
                "signature": signature,
                "public_key": public_key,
                "callback_url": callback_url,
                "nonce": nonce,
                "recipient": RECIPIENT,
                "message": MESSAGE,
            }
        )

        config["auth"] = auth.model_dump()
        save_config_file(config)

        print(f"Auth data has been successfully saved! You are now logged in with account ID: {account_id}")
        return True
    else:
        print("Signature verification failed. Abort")
        return False

model

get_model

get_model(name: str) -> Path

Download the model from the registry and download it locally if it hasn't been downloaded yet.

:param name: The name of the entry to download the model. The format should be namespace/name/version. :return: The path to the downloaded model

Source code in nearai/model.py
def get_model(name: str) -> Path:
    """Download the model from the registry and download it locally if it hasn't been downloaded yet.

    :param name: The name of the entry to download the model. The format should be namespace/name/version.
    :return: The path to the downloaded model
    """
    return registry.download(name)

naming

get_canonical_name

get_canonical_name(name: str) -> str

Returns a name that can be used for matching entities.

Applies such transformations: 1. All letters lowercase. 2. Convert '.' between digits to 'p'. 3. Convert 'v' -> '' 4. Remove all non-alphanumeric characters except between digits. Use '_' between digits. 5. Convert 'metallama' -> 'llama'.

e.g. "llama-3.1-70b-instruct" -> "llama3p1_70binstruct"

Source code in nearai/naming.py
def get_canonical_name(name: str) -> str:
    """Returns a name that can be used for matching entities.

    Applies such transformations:
    1. All letters lowercase.
    2. Convert '.' between digits to 'p'.
    3. Convert '<not letter>v<digit>' -> '<not letter><digit>'
    4. Remove all non-alphanumeric characters except between digits.
        Use '_' between digits.
    5. Convert 'metallama' -> 'llama'.

    e.g. "llama-3.1-70b-instruct" -> "llama3p1_70binstruct"
    """
    # Convert to lowercase
    name = name.lower()
    # Convert '.' between digits to 'p'
    name = re.sub(r"(\d)\.(\d)", r"\1p\2", name)
    # Convert '<digit>v<digit>' -> '<digit>_<digit>'
    name = re.sub(r"(\d)v(\d)", r"\1_\2", name)
    # Convert '<not letter>v<digit>' -> '<not letter><digit>'
    name = re.sub(r"(^|[^a-z])v(\d)", r"\1\2", name)
    # Replace non-alphanumeric characters between digits with '_'
    name = re.sub(r"(\d)[^a-z0-9]+(\d)", r"\1_\2", name)
    # Remove remaining non-alphanumeric characters, except '_'
    name = re.sub(r"[^a-z0-9_]", "", name)
    # Remove any remaining underscores that are not between digits
    name = re.sub(r"(?<!\d)_|_(?!\d)", "", name)
    # Convert 'metallama' to 'llama'
    name = name.replace("metallama", "llama")
    return name

registry

Registry

Source code in nearai/registry.py
class Registry:
    def __init__(self):
        """Create Registry object to interact with the registry programmatically."""
        self.download_folder = DATA_FOLDER / "registry"
        self.api = RegistryApi()

        if not self.download_folder.exists():
            self.download_folder.mkdir(parents=True, exist_ok=True)

    def update(self, entry_location: EntryLocation, metadata: EntryMetadataInput) -> Dict[str, Any]:
        """Update metadata of a entry in the registry."""
        result = self.api.upload_metadata_v1_registry_upload_metadata_post(
            BodyUploadMetadataV1RegistryUploadMetadataPost(metadata=metadata, entry_location=entry_location)
        )
        return result

    def info(self, entry_location: EntryLocation) -> Optional[EntryMetadata]:
        """Get metadata of a entry in the registry."""
        try:
            return self.api.download_metadata_v1_registry_download_metadata_post(
                BodyDownloadMetadataV1RegistryDownloadMetadataPost.from_dict(dict(entry_location=entry_location))
            )
        except NotFoundException:
            return None

    def upload_file(self, entry_location: EntryLocation, local_path: Path, path: Path) -> bool:
        """Upload a file to the registry."""
        with open(local_path, "rb") as file:
            data = file.read()

            try:
                self.api.upload_file_v1_registry_upload_file_post(
                    path=str(path),
                    file=data,
                    namespace=entry_location.namespace,
                    name=entry_location.name,
                    version=entry_location.version,
                )
                return True
            except BadRequestException as e:
                if isinstance(e.body, str) and "already exists" in e.body:
                    return False

                raise e

    def download_file(self, entry_location: EntryLocation, path: Path, local_path: Path):
        """Download a file from the registry."""
        result = self.api.download_file_v1_registry_download_file_post_without_preload_content(
            BodyDownloadFileV1RegistryDownloadFilePost.from_dict(
                dict(
                    entry_location=entry_location,
                    path=str(path),
                )
            )
        )

        local_path.parent.mkdir(parents=True, exist_ok=True)

        with open(local_path, "wb") as f:
            copyfileobj(result, f)

    def download(
        self,
        entry_location: Union[str, EntryLocation],
        force: bool = False,
        show_progress: bool = False,
        verbose: bool = True,
    ) -> Path:
        """Download entry from the registry locally."""
        if isinstance(entry_location, str):
            entry_location = parse_location(entry_location)

        download_path = get_registry_folder() / entry_location.namespace / entry_location.name / entry_location.version

        if download_path.exists():
            if not force:
                if verbose:
                    print(
                        f"Entry {entry_location} already exists at {download_path}. Use --force to overwrite the entry."
                    )
                return download_path

        files = registry.list_files(entry_location)

        download_path.mkdir(parents=True, exist_ok=True)

        metadata = registry.info(entry_location)

        if metadata is None:
            raise ValueError(f"Entry {entry_location} not found.")

        metadata_path = download_path / "metadata.json"
        with open(metadata_path, "w") as f:
            f.write(metadata.model_dump_json(indent=2))

        for file in (pbar := tqdm(files, disable=not show_progress)):
            pbar.set_description(file)
            registry.download_file(entry_location, file, download_path / file)

        return download_path

    def upload(
        self,
        local_path: Path,
        metadata: Optional[EntryMetadata] = None,
        show_progress: bool = False,
    ) -> EntryLocation:
        """Upload entry to the registry.

        If metadata is provided it will overwrite the metadata in the directory,
        otherwise it will use the metadata.json found on the root of the directory.
        """
        path = Path(local_path).absolute()

        if not path.exists():
            # try path in local registry if original path not exists
            path = get_registry_folder() / local_path

        if CONFIG.auth is None:
            print("Please login with `nearai login`")
            exit(1)

        metadata_path = path / "metadata.json"

        if metadata is not None:
            with open(metadata_path, "w") as f:
                f.write(metadata.model_dump_json(indent=2))

        check_metadata(metadata_path)

        with open(metadata_path) as f:
            plain_metadata: Dict[str, Any] = json.load(f)

        namespace = get_namespace(local_path)
        name = plain_metadata.pop("name")

        entry_location = EntryLocation.model_validate(
            dict(
                namespace=namespace,
                name=name,
                version=plain_metadata.pop("version"),
            )
        )

        entry_metadata = EntryMetadataInput.model_validate(plain_metadata)
        source = entry_metadata.details.get("_source", None)

        if source is not None:
            print(f"Only default source is allowed, found: {source}. Remove details._source from metadata.")
            exit(1)

        if self.info(entry_location) is None:
            # New entry location. Check for similar names in registry.
            entries = self.list_all_visible()
            canonical_namespace = get_canonical_name(namespace)
            canonical_name = get_canonical_name(name)

            for entry in entries:
                if entry.name == name and entry.namespace == namespace:
                    break
                if (
                    get_canonical_name(entry.name) == canonical_name
                    and get_canonical_name(entry.namespace) == canonical_namespace
                ):
                    print(f"A registry item with a similar name already exists: {entry.namespace}/{entry.name}")
                    exit(1)

        registry.update(entry_location, entry_metadata)

        all_files = []
        total_size = 0

        # Traverse all files in the directory `path`
        for file in path.rglob("*"):
            if not file.is_file():
                continue

            relative = file.relative_to(path)

            # Don't upload metadata file.
            if file == metadata_path:
                continue

            # Don't upload backup files.
            if file.name.endswith("~"):
                continue

            # Don't upload configuration files.
            if relative.parts[0] == ".nearai":
                continue

            size = file.stat().st_size
            total_size += size

            all_files.append((file, relative, size))

        pbar = tqdm(total=total_size, unit="B", unit_scale=True, disable=not show_progress)
        for file, relative, size in all_files:
            registry.upload_file(entry_location, file, relative)
            pbar.update(size)

        return entry_location

    def list_files(self, entry_location: EntryLocation) -> List[str]:
        """List files in from an entry in the registry.

        Return the relative paths to all files with respect to the root of the entry.
        """
        result = self.api.list_files_v1_registry_list_files_post(
            BodyListFilesV1RegistryListFilesPost.from_dict(dict(entry_location=entry_location))
        )
        return [file.filename for file in result]

    def list(
        self,
        namespace: str,
        category: str,
        tags: str,
        total: int,
        offset: int,
        show_all: bool,
        show_latest_version: bool,
        starred_by: str = "",
    ) -> List[EntryInformation]:
        """List and filter entries in the registry."""
        return self.api.list_entries_v1_registry_list_entries_post(
            namespace=namespace,
            category=category,
            tags=tags,
            total=total,
            offset=offset,
            show_hidden=show_all,
            show_latest_version=show_latest_version,
            starred_by=starred_by,
        )

    def list_all_visible(self) -> List[EntryInformation]:
        """List all visible entries."""
        total = 10000
        entries = self.list("", "", "", total, 0, False, True)
        assert len(entries) < total
        return entries
__init__
__init__()

Create Registry object to interact with the registry programmatically.

Source code in nearai/registry.py
def __init__(self):
    """Create Registry object to interact with the registry programmatically."""
    self.download_folder = DATA_FOLDER / "registry"
    self.api = RegistryApi()

    if not self.download_folder.exists():
        self.download_folder.mkdir(parents=True, exist_ok=True)
download
download(entry_location: Union[str, EntryLocation], force: bool = False, show_progress: bool = False, verbose: bool = True) -> Path

Download entry from the registry locally.

Source code in nearai/registry.py
def download(
    self,
    entry_location: Union[str, EntryLocation],
    force: bool = False,
    show_progress: bool = False,
    verbose: bool = True,
) -> Path:
    """Download entry from the registry locally."""
    if isinstance(entry_location, str):
        entry_location = parse_location(entry_location)

    download_path = get_registry_folder() / entry_location.namespace / entry_location.name / entry_location.version

    if download_path.exists():
        if not force:
            if verbose:
                print(
                    f"Entry {entry_location} already exists at {download_path}. Use --force to overwrite the entry."
                )
            return download_path

    files = registry.list_files(entry_location)

    download_path.mkdir(parents=True, exist_ok=True)

    metadata = registry.info(entry_location)

    if metadata is None:
        raise ValueError(f"Entry {entry_location} not found.")

    metadata_path = download_path / "metadata.json"
    with open(metadata_path, "w") as f:
        f.write(metadata.model_dump_json(indent=2))

    for file in (pbar := tqdm(files, disable=not show_progress)):
        pbar.set_description(file)
        registry.download_file(entry_location, file, download_path / file)

    return download_path
download_file
download_file(entry_location: EntryLocation, path: Path, local_path: Path)

Download a file from the registry.

Source code in nearai/registry.py
def download_file(self, entry_location: EntryLocation, path: Path, local_path: Path):
    """Download a file from the registry."""
    result = self.api.download_file_v1_registry_download_file_post_without_preload_content(
        BodyDownloadFileV1RegistryDownloadFilePost.from_dict(
            dict(
                entry_location=entry_location,
                path=str(path),
            )
        )
    )

    local_path.parent.mkdir(parents=True, exist_ok=True)

    with open(local_path, "wb") as f:
        copyfileobj(result, f)
info
info(entry_location: EntryLocation) -> Optional[EntryMetadata]

Get metadata of a entry in the registry.

Source code in nearai/registry.py
def info(self, entry_location: EntryLocation) -> Optional[EntryMetadata]:
    """Get metadata of a entry in the registry."""
    try:
        return self.api.download_metadata_v1_registry_download_metadata_post(
            BodyDownloadMetadataV1RegistryDownloadMetadataPost.from_dict(dict(entry_location=entry_location))
        )
    except NotFoundException:
        return None
list
list(namespace: str, category: str, tags: str, total: int, offset: int, show_all: bool, show_latest_version: bool, starred_by: str = '') -> List[EntryInformation]

List and filter entries in the registry.

Source code in nearai/registry.py
def list(
    self,
    namespace: str,
    category: str,
    tags: str,
    total: int,
    offset: int,
    show_all: bool,
    show_latest_version: bool,
    starred_by: str = "",
) -> List[EntryInformation]:
    """List and filter entries in the registry."""
    return self.api.list_entries_v1_registry_list_entries_post(
        namespace=namespace,
        category=category,
        tags=tags,
        total=total,
        offset=offset,
        show_hidden=show_all,
        show_latest_version=show_latest_version,
        starred_by=starred_by,
    )
list_all_visible
list_all_visible() -> List[EntryInformation]

List all visible entries.

Source code in nearai/registry.py
def list_all_visible(self) -> List[EntryInformation]:
    """List all visible entries."""
    total = 10000
    entries = self.list("", "", "", total, 0, False, True)
    assert len(entries) < total
    return entries
list_files
list_files(entry_location: EntryLocation) -> List[str]

List files in from an entry in the registry.

Return the relative paths to all files with respect to the root of the entry.

Source code in nearai/registry.py
def list_files(self, entry_location: EntryLocation) -> List[str]:
    """List files in from an entry in the registry.

    Return the relative paths to all files with respect to the root of the entry.
    """
    result = self.api.list_files_v1_registry_list_files_post(
        BodyListFilesV1RegistryListFilesPost.from_dict(dict(entry_location=entry_location))
    )
    return [file.filename for file in result]
update
update(entry_location: EntryLocation, metadata: EntryMetadataInput) -> Dict[str, Any]

Update metadata of a entry in the registry.

Source code in nearai/registry.py
def update(self, entry_location: EntryLocation, metadata: EntryMetadataInput) -> Dict[str, Any]:
    """Update metadata of a entry in the registry."""
    result = self.api.upload_metadata_v1_registry_upload_metadata_post(
        BodyUploadMetadataV1RegistryUploadMetadataPost(metadata=metadata, entry_location=entry_location)
    )
    return result
upload
upload(local_path: Path, metadata: Optional[EntryMetadata] = None, show_progress: bool = False) -> EntryLocation

Upload entry to the registry.

If metadata is provided it will overwrite the metadata in the directory, otherwise it will use the metadata.json found on the root of the directory.

Source code in nearai/registry.py
def upload(
    self,
    local_path: Path,
    metadata: Optional[EntryMetadata] = None,
    show_progress: bool = False,
) -> EntryLocation:
    """Upload entry to the registry.

    If metadata is provided it will overwrite the metadata in the directory,
    otherwise it will use the metadata.json found on the root of the directory.
    """
    path = Path(local_path).absolute()

    if not path.exists():
        # try path in local registry if original path not exists
        path = get_registry_folder() / local_path

    if CONFIG.auth is None:
        print("Please login with `nearai login`")
        exit(1)

    metadata_path = path / "metadata.json"

    if metadata is not None:
        with open(metadata_path, "w") as f:
            f.write(metadata.model_dump_json(indent=2))

    check_metadata(metadata_path)

    with open(metadata_path) as f:
        plain_metadata: Dict[str, Any] = json.load(f)

    namespace = get_namespace(local_path)
    name = plain_metadata.pop("name")

    entry_location = EntryLocation.model_validate(
        dict(
            namespace=namespace,
            name=name,
            version=plain_metadata.pop("version"),
        )
    )

    entry_metadata = EntryMetadataInput.model_validate(plain_metadata)
    source = entry_metadata.details.get("_source", None)

    if source is not None:
        print(f"Only default source is allowed, found: {source}. Remove details._source from metadata.")
        exit(1)

    if self.info(entry_location) is None:
        # New entry location. Check for similar names in registry.
        entries = self.list_all_visible()
        canonical_namespace = get_canonical_name(namespace)
        canonical_name = get_canonical_name(name)

        for entry in entries:
            if entry.name == name and entry.namespace == namespace:
                break
            if (
                get_canonical_name(entry.name) == canonical_name
                and get_canonical_name(entry.namespace) == canonical_namespace
            ):
                print(f"A registry item with a similar name already exists: {entry.namespace}/{entry.name}")
                exit(1)

    registry.update(entry_location, entry_metadata)

    all_files = []
    total_size = 0

    # Traverse all files in the directory `path`
    for file in path.rglob("*"):
        if not file.is_file():
            continue

        relative = file.relative_to(path)

        # Don't upload metadata file.
        if file == metadata_path:
            continue

        # Don't upload backup files.
        if file.name.endswith("~"):
            continue

        # Don't upload configuration files.
        if relative.parts[0] == ".nearai":
            continue

        size = file.stat().st_size
        total_size += size

        all_files.append((file, relative, size))

    pbar = tqdm(total=total_size, unit="B", unit_scale=True, disable=not show_progress)
    for file, relative, size in all_files:
        registry.upload_file(entry_location, file, relative)
        pbar.update(size)

    return entry_location
upload_file
upload_file(entry_location: EntryLocation, local_path: Path, path: Path) -> bool

Upload a file to the registry.

Source code in nearai/registry.py
def upload_file(self, entry_location: EntryLocation, local_path: Path, path: Path) -> bool:
    """Upload a file to the registry."""
    with open(local_path, "rb") as file:
        data = file.read()

        try:
            self.api.upload_file_v1_registry_upload_file_post(
                path=str(path),
                file=data,
                namespace=entry_location.namespace,
                name=entry_location.name,
                version=entry_location.version,
            )
            return True
        except BadRequestException as e:
            if isinstance(e.body, str) and "already exists" in e.body:
                return False

            raise e

get_namespace

get_namespace(local_path: Path) -> str

Returns namespace of an item or user namespace.

Source code in nearai/registry.py
def get_namespace(local_path: Path) -> str:
    """Returns namespace of an item or user namespace."""
    registry_folder = get_registry_folder()

    try:
        # Check if the path matches the expected structure
        relative_path = local_path.relative_to(registry_folder)
        parts = relative_path.parts

        # If the path has 3 parts (namespace, item_name, version),
        # return the first part as the namespace
        if len(parts) == 3:
            return str(parts[0])
    except ValueError:
        # relative_to() raises ValueError if local_path is not relative to registry_folder
        pass

    # If we couldn't extract a namespace from the path, return the default
    if CONFIG.auth is None:
        raise ValueError("AuthData is None")
    return CONFIG.auth.account_id

get_registry_folder

get_registry_folder() -> Path

Path to local registry.

Source code in nearai/registry.py
def get_registry_folder() -> Path:
    """Path to local registry."""
    return DATA_FOLDER / REGISTRY_FOLDER

solvers

DDOTSV0Solver

Bases: SolverStrategy

Solver strategy for competitive programming problems live on DDOTS.

This dataset will run agents in an Agent environment previously prepared.

workspace/ .id -- Id of the problem PROBLEM.txt -- Description of the problem

The agent should call env.submit_python(code) to submit the code to the DDOTS server.

Source code in nearai/solvers/ddot_v0_solver.py
class DDOTSV0Solver(SolverStrategy):
    """Solver strategy for competitive programming problems live on DDOTS.

    This dataset will run agents in an Agent environment previously prepared.

    workspace/
        .id             -- Id of the problem
        PROBLEM.txt     -- Description of the problem

    The agent should call env.submit_python(code) to submit the code to the DDOTS server.

    """

    def __init__(self, dataset_ref: Dataset, agents: str, max_iterations: int, save_snapshots: bool = False):  # noqa: D107
        self.agents = [load_agent(agent) for agent in agents.split(",")]
        self.max_iterations = max_iterations

        date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        rnd_id = random.randint(10**8, 10**9 - 1)
        self._saved_trajectories = DATA_FOLDER / "data" / "ddots_v0_trajectories" / f"{date}_{rnd_id}"
        self._saved_trajectories.mkdir(parents=True, exist_ok=True)

        self.save_snapshots = save_snapshots
        print("Saving trajectories to", self._saved_trajectories)

    def evaluation_name(self) -> str:  # noqa: D102
        return "ddots"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["ddots_codeforces_small/v0", "datasets/ddots_codeforces_medium_A_B/v0"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        # TODO: we may want to return the model used by an agent here.
        return None

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return self.agents[0].metadata

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        return self.agents[0].namespace

    def model_provider(self) -> str:  # noqa: D102
        # TODO: we may want to return the provider used by an agent here.
        return DEFAULT_PROVIDER

    def solve(self, datum: dict) -> bool:  # noqa: D102
        problem_id = datum["problem_id"]
        description = datum["description"]

        config = deepcopy(CONFIG)
        config.confirm_commands = False

        env = DDOTSEnvironment(self.agents, problem_id, description, config)
        env.write_file(".solved", str(False))

        try:
            env.run_task(description, max_iterations=self.max_iterations)
            env.write_file(".solved", str(env.solved))

        except Exception as e:
            print(f"Error running task: {e}")

        finally:
            if self.save_snapshots:
                snapshot = env.create_snapshot()
                with open(self._saved_trajectories / f"{problem_id}.tar.gz", "wb") as f:
                    f.write(snapshot)

        return env.solved

GSM8KSolverStrategy

Bases: SolverStrategy

Solver strategy for the GSM8K dataset.

Source code in nearai/solvers/gsm8k_solver.py
class GSM8KSolverStrategy(SolverStrategy):
    """Solver strategy for the GSM8K dataset."""

    SHOTS = 8

    def __init__(self, dataset_ref: Union[Dataset, DatasetDict], model: str) -> None:  # noqa: D107
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        self.model = model

    def evaluation_name(self) -> str:  # noqa: D102
        return "gsm8k"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["gsm8k"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def solve(self, datum: dict) -> bool:  # noqa: D102
        parsed_datum: GSM8KDatum = GSM8KDatum(**datum)

        problem_shots_indices = list(range(0, self.SHOTS))
        problem_shots = list(
            map(
                lambda i: GSM8KDatum(**self.dataset_ref["train"][i]).model_dump(),
                problem_shots_indices,
            )
        )
        res: ModelResponse = cast(
            ModelResponse,
            self.completion_fn(
                model=self.model,
                messages=[
                    {
                        "role": "system",
                        "content": dedent(
                            """
                    You are a helpful assistant. You're goal is to answer word based math questions.
                    """
                            + "\n\n"
                            + "Here are some examples of math questions and their answers:"
                            + "\n\n".join(
                                [f"Question: {shot['question']}\nAnswer: {shot['answer']}" for shot in problem_shots]
                            )
                            + "\n\n"
                            + "Now, answer the next question provided in the user prompt. "
                            + "Think step by step about how to solve the problem. "
                            + "Then, provide the answer."
                        ),
                    },
                    {"role": "user", "content": parsed_datum.question},
                ],
            ),
        )
        res_output = str(cast(List[Choices], res.choices)[0].message.content).strip()
        res_refined: ModelResponse = cast(
            ModelResponse,
            self.completion_fn(
                model=self.model,
                messages=[
                    {
                        "role": "system",
                        "content": dedent(
                            f"""
                    You are a helpful assistant. You're goal is to answer math questions.

                    You have just answered a math question with the following response:

                    --- BEGIN RESPONSE ---
                    {res_output}
                    --- END RESPONSE ---

                    Please refine your answer.

                    Only output the final number *without units* as your answer. Nothing else.
                    """
                        ),
                    },
                ],
            ),
        )

        ## cleanup the output
        res_refined_output = str(cast(List[Choices], res_refined.choices)[0].message.content).strip()
        res_refined_output = res_refined_output.replace("$", "").replace(",", "")
        if " " in res_refined_output:
            res_refined_output = res_refined_output.split(" ")[0]
        try:
            res_refined_output = str(int(res_refined_output))
        except Exception:
            pass
        try:
            res_refined_output = str(int(float(res_refined_output)))
        except Exception:
            pass

        refined_answer = parsed_datum.answer.replace("$", "").replace(",", "")
        print(res_refined_output, refined_answer)
        return res_refined_output == refined_answer

HellaswagSolverStrategy

Bases: SolverStrategy

Solver strategy for the MMLU dataset.

Source code in nearai/solvers/hellaswag_solver.py
class HellaswagSolverStrategy(SolverStrategy):
    """Solver strategy for the MMLU dataset."""

    SHOTS = 8

    def __init__(self, dataset_ref: Union[Dataset, DatasetDict], model: str) -> None:  # noqa: D107
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        self.model = model

    def evaluation_name(self) -> str:  # noqa: D102
        return "hellaswag"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["hellaswag"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def solve(self, datum: dict) -> bool:  # noqa: D102
        datum = HellaswagDatum(**datum).model_dump()

        choices = ["A", "B", "C", "D"]
        example_problems_indices = list(range(0, 5 * self.SHOTS, 5))
        example_problems = list(
            map(
                lambda d: HellaswagDatum(**d).model_dump(),
                [self.dataset_ref["validation"][i] for i in example_problems_indices],
            )
        )
        base_prompt = Template(
            open(PROMPTS_FOLDER / "hellaswag_verbose_answer.j2").read(),
            trim_blocks=True,
        ).render(
            example_problems=example_problems,
            challenge_problem=datum,
            choices=choices,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": base_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        ## Extract the answer from the response
        extract_answer_prompt = Template(
            open(PROMPTS_FOLDER / "hellaswag_extract_answer.j2").read(),
            trim_blocks=True,
        ).render(
            challenge_problem=datum,
            answer_text=response,
            choices=choices,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": extract_answer_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        try:
            answer = choices.index(response)
            return bool(answer == int(datum["label"]))
        except Exception:
            print("Failed to parse answer")
            return False

LiveBenchSolverStrategy

Bases: SolverStrategy

Solver strategy for the live bench dataset.

Source code in nearai/solvers/livebench_solver.py
class LiveBenchSolverStrategy(SolverStrategy):
    """Solver strategy for the live bench dataset."""

    def __init__(  # noqa: D107
        self, dataset_ref: str, model: str, step: str = "all"
    ) -> None:
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        assert "/" not in model
        self.model = model
        self.step = step

    def evaluation_name(self) -> str:  # noqa: D102
        return "live_bench"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["near.ai/live_bench/1.0.0"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def get_custom_tasks(self) -> List[dict]:  # noqa: D102
        return [{"summary": "all"}]

    @SolverStrategyClassProperty
    def scoring_method(self) -> SolverScoringMethod:  # noqa: D102
        return SolverScoringMethod.Custom

    def solve(self, _datum: dict) -> Tuple[bool, dict]:  # noqa: D102
        if self.step == "gen_model_answer":
            self.gen_model_answer()
            return True, {}
        if self.step == "gen_ground_truth_judgement":
            return self.gen_ground_truth_judgement(), {}
        if self.step == "show_livebench_results":
            return self.show_livebench_results()
        if self.step == "all":
            self.gen_model_answer()
            if not self.gen_ground_truth_judgement():
                return False, {}
            return self.show_livebench_results()
        return False, {}

    def gen_model_answer(self) -> None:  # noqa: D102
        print("")
        print("----------- Step gen_model_answer -----------")
        print("")
        list_of_question_files = glob.glob(f"{self.dataset_ref}/**/question.jsonl", recursive=True)
        for question_file in list_of_question_files:
            questions = load_questions_jsonl(question_file)
            bench_name = os.path.dirname(question_file).split(str(self.dataset_ref))[-1]
            answer_file = f"~/.nearai/live_bench_answers/{bench_name}/model_answer/{self.model}.jsonl"
            print(f"Questions from {question_file}")
            print(f"Output to {answer_file}")
            self.run_eval(questions, answer_file)

    def run_eval(self, questions, answer_file) -> None:  # noqa: D102
        answer_file = os.path.expanduser(answer_file)

        # Load existing answers
        existing_answers = set()
        if os.path.exists(answer_file):
            print(
                f"Answer file {answer_file} exists. Will skip already answered questions. Delete this file if that is not intended."  # noqa: E501
            )
            with open(answer_file, "r") as fin:
                for line in fin:
                    answer = json.loads(line)
                    existing_answers.add(answer["question_id"])

        for question in tqdm(questions):
            if question["question_id"] in existing_answers:
                continue
            choices = self.answer_question(question)

            ans_json = {
                "question_id": question["question_id"],
                "answer_id": shortuuid.uuid(),
                "model_id": self.model,
                "choices": choices,
                "tstamp": time.time(),
            }

            os.makedirs(os.path.dirname(answer_file), exist_ok=True)
            with open(answer_file, "a") as fout:
                fout.write(json.dumps(ans_json) + "\n")

    def answer_question(self, question) -> List[dict]:  # noqa: D102
        conv = []
        # Append system prompt here if needed.
        turns = []
        for qs in question["turns"]:
            conv.append({"role": "user", "content": qs})

            completion_response = cast(
                ModelResponse,
                self.completion_fn(
                    self.model,
                    messages=[convert_message(msg) for msg in conv],
                    temperature=0.0,
                ),
            )
            output = str(cast(List[Choices], completion_response.choices)[0].message.content)

            conv.append({"role": "assistant", "content": output})
            turns.append(output)

        return [{"index": 0, "turns": turns}]

    def gen_ground_truth_judgement(self) -> bool:  # noqa: D102
        print("")
        print("----------- Step gen_ground_truth_judgement -----------")
        print("")
        script_path = "nearai/projects/live_bench/gen_ground_truth_judgement.sh"

        try:
            # Run the script without capturing output
            subprocess.run(["/bin/bash", script_path, self.model, self.dataset_ref], check=True)
            return True

        except subprocess.CalledProcessError as e:
            print(f"An error occurred while running the script: {e}")
            return False

    def show_livebench_results(self) -> Tuple[bool, dict]:  # noqa: D102
        print("")
        print("----------- Step show_livebench_results -----------")
        print("")
        script_path = "nearai/projects/live_bench/show_livebench_results.sh"

        try:
            # Run the script without capturing output
            subprocess.run(["/bin/bash", script_path, self.model], check=True)

        except subprocess.CalledProcessError as e:
            print(f"An error occurred while running the script: {e}")
            return False, {}

        return self.create_result_dict()

    def read_csv_to_dict(self, file_path) -> dict:  # noqa: D102
        file_path = os.path.expanduser(file_path)
        with open(file_path, "r") as f:
            reader = csv.DictReader(f)
            matching_rows = [row for row in reader if row["model"] == self.model]
            return matching_rows[-1] if matching_rows else {}  # Get the last matching row

    def create_result_dict(self) -> Tuple[bool, dict]:  # noqa: D102
        tasks_data = self.read_csv_to_dict("~/.nearai/LiveBench/livebench/all_tasks.csv")
        groups_data = self.read_csv_to_dict("~/.nearai/LiveBench/livebench/all_groups.csv")

        if not tasks_data or not groups_data:
            return False, {}  # Return None if the model is not found in either file

        result: dict = {"tasks": {}, "groups": {}}

        for key, value in tasks_data.items():
            if key != "model":
                result["tasks"][key] = float(value)

        for key, value in groups_data.items():
            if key != "model":
                result["groups"][key] = float(value)

        return True, result

    def get_evaluation_metrics(self, tasks_results: List[Tuple[bool, Any]]) -> Dict[str, Any]:  # noqa: D102
        results: Dict[str, Dict[str, Any]] = tasks_results[-1][1]
        metrics: Dict[str, Any] = {"average": results["groups"]["average"]}

        for group, score in results["groups"].items():
            if group == "average":
                continue
            metrics[f"group/{group}"] = score

        for task, score in results["tasks"].items():
            metrics[f"task/{task}"] = score

        return metrics

MBPPSolverAgent

Bases: SolverStrategy

Solver strategy for the MBPP dataset.

Source code in nearai/solvers/mbpp_agent_solver.py
class MBPPSolverAgent(SolverStrategy):
    """Solver strategy for the MBPP dataset."""

    def __init__(  # noqa: D107
        self, dataset_ref: Union[Dataset, DatasetDict], agent: str, num_iterations: int = 16, verbose: bool = False
    ) -> None:
        super().__init__()
        self.dataset_ref = dataset_ref
        self.agent = load_agent(agent)
        self.verbose = verbose
        self.num_iterations = num_iterations

    def evaluation_name(self) -> str:  # noqa: D102
        return "mbpp"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["mbpp"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        # TODO: we may want to return the model used by an agent here.
        return None

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return self.agent.metadata

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        return self.agent.namespace

    def model_provider(self) -> str:  # noqa: D102
        # TODO: we may want to return the provider used by an agent here.
        return DEFAULT_PROVIDER

    def solve(self, datum: dict) -> bool:  # noqa: D102
        datum = MBPPDatum(**datum).model_dump()
        function_name = get_function_name(datum["code"])

        path = os.path.join(
            "/tmp",
            "mbpp",
            str(datum["task_id"]),
            str(int(time.time() * 1000)),
            str(random.randint(0, 1000)),
        )
        CONFIG.confirm_commands = False
        env = Environment(path, [self.agent], CONFIG)

        new_line = "\n"
        task = f"""{datum["text"]}
Write a single file with python function named `{function_name}` that solves the above problem and satisfied the following tests:
```python\n{new_line.join(datum["test_list"])}\n```"""  # noqa: E501
        if self.verbose:
            print(task)
            print(path)
        env.run_task(task, max_iterations=self.num_iterations)

        code = ""
        for filename in env.list_files("."):
            if filename.endswith(".py"):
                code += env.read_file(filename) + "\n"

        try:
            for test in datum["test_list"] + datum["challenge_test_list"]:
                test_code = code + "\n" + test
                exec(test_code, {}, {})
            return True
        except Exception as e:
            if self.verbose:
                print(e)
            return False

MBPPSolverStrategy

Bases: SolverStrategy

Solver strategy for the MBPP dataset.

Source code in nearai/solvers/mbpp_solver.py
class MBPPSolverStrategy(SolverStrategy):
    """Solver strategy for the MBPP dataset."""

    SHOTS = 3

    def __init__(self, dataset_ref: Union[Dataset, DatasetDict], model: str) -> None:  # noqa: D107
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        self.model = model

    def evaluation_name(self) -> str:  # noqa: D102
        return "mbpp"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["mbpp"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def solve(self, datum: dict) -> bool:  # noqa: D102
        datum = MBPPDatum(**datum).model_dump()

        ## Allow LLM to think "out loud" for it's answer
        function_name = get_function_name(datum["code"])
        example_problems = list(islice(self.dataset_ref["prompt"], self.SHOTS))
        base_prompt = Template(open(PROMPTS_FOLDER / "mbpp_verbose_answer.j2").read(), trim_blocks=True).render(
            function_name=function_name,
            example_problems=example_problems,
            challenge_problem=datum,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": base_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        ## Extract the answer from the response
        extract_answer_prompt = Template(
            open(PROMPTS_FOLDER / "mbpp_extract_answer.j2").read(), trim_blocks=True
        ).render(
            function_name=function_name,
            answer_text=response,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": extract_answer_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        ## Parse the python code
        python_code_blocks = parse_python_code_block(response) + parse_code_block(response)
        code = ""
        if len(python_code_blocks) == 0:
            code = response
        else:
            code = python_code_blocks[0]

        ## Evaluate the code
        try:
            for test in datum["test_list"] + datum["challenge_test_list"]:
                test_code = code + "\n" + test
                exec(test_code)
            return True
        except Exception:
            return False

MMLUSolverStrategy

Bases: SolverStrategy

Solver strategy for the MMLU dataset.

Source code in nearai/solvers/mmlu_solver.py
class MMLUSolverStrategy(SolverStrategy):
    """Solver strategy for the MMLU dataset."""

    SHOTS = 8

    def __init__(self, dataset_ref: Union[Dataset, DatasetDict], model: str) -> None:  # noqa: D107
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        self.model = model

    def evaluation_name(self) -> str:  # noqa: D102
        return "mmlu"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["mmlu"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def solve(self, datum: dict) -> bool:  # noqa: D102
        datum = MMLUDatum(**datum).model_dump()

        choices = ["A", "B", "C", "D"]
        example_problems_indices = list(range(0, 5 * self.SHOTS, 5))
        example_problems = list(
            map(
                lambda d: MMLUDatum(**d).model_dump(),
                [self.dataset_ref["dev"][i] for i in example_problems_indices],
            )
        )
        base_prompt = Template(open(PROMPTS_FOLDER / "mmlu_verbose_answer.j2").read(), trim_blocks=True).render(
            example_problems=example_problems,
            challenge_problem=datum,
            choices=choices,
        )

        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": base_prompt},
                ],
                temperature=0.2,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        ## Extract the answer from the response
        extract_answer_prompt = Template(
            open(PROMPTS_FOLDER / "mmlu_extract_answer.j2").read(), trim_blocks=True
        ).render(
            challenge_problem=datum,
            answer_text=response,
            choices=choices,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": extract_answer_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        try:
            answer = choices.index(response)
            return bool(answer == datum["answer"])
        except Exception:
            print("Failed to parse answer")
            return False

SolverStrategy

Bases: ABC

Abstract class for solver strategies.

Source code in nearai/solvers/__init__.py
class SolverStrategy(ABC, metaclass=SolverStrategyMeta):
    """Abstract class for solver strategies."""

    def __init__(self) -> None:
        pass

    @property
    def name(self) -> str:
        """Returns the name of the solver strategy."""
        return type(self).__name__

    @SolverStrategyClassProperty
    def scoring_method(self) -> SolverScoringMethod:
        return SolverScoringMethod.TrueOrFalseList

    @abstractmethod
    def evaluation_name(self) -> str:
        """Returns a unique name for (benchmark, solver) tuple, e.g. 'mbpp' or 'live_bench' or 'mmlu-5-shot'."""
        ...

    @abstractmethod
    def compatible_datasets(self) -> List[str]:
        """Returns the list of datasets that the solver strategy is compatible with."""
        ...

    @abstractmethod
    def model_metadata(self) -> Optional[Dict[str, Any]]:
        """Returns model metadata that is evaluated or used by an agent."""
        ...

    @abstractmethod
    def agent_metadata(self) -> Optional[Dict[str, Any]]:
        """Returns agent metadata that is evaluated."""
        ...

    @abstractmethod
    def evaluated_entry_namespace(self) -> str:
        """Returns namespace of a model or agent to be evaluated."""
        ...

    @abstractmethod
    def model_provider(self) -> str:
        """Returns model provider."""
        ...

    @abstractmethod
    def solve(self, datum: dict) -> Union[bool, Tuple[bool, Any]]:
        """Solves the task for the given datum."""
        ...

    def get_custom_tasks(self) -> List[dict]:
        """Custom tasks for custom benchmark."""
        if self.scoring_method == SolverScoringMethod.Custom:
            raise NotImplementedError("get_custom_tasks must be implemented for Custom scoring method")
        else:
            raise AttributeError("get_custom_tasks is only applicable for Custom scoring method")

    def get_evaluation_metrics(self, tasks_results: List[Tuple[bool, Any]]) -> Dict[str, Any]:
        """Given results for all datums, returns evaluation metrics.

        Not used by TrueOrFalseList scoring method.
        Do not prepend with evaluation_name. If hierarchical, use slashes /.
        Expected metrics is a dict of scores, e.g.: {"average": <val>, "group/coding": <val>}.
        """
        raise NotImplementedError("get_evaluation_metrics not implemented")
name property
name: str

Returns the name of the solver strategy.

agent_metadata abstractmethod
agent_metadata() -> Optional[Dict[str, Any]]

Returns agent metadata that is evaluated.

Source code in nearai/solvers/__init__.py
@abstractmethod
def agent_metadata(self) -> Optional[Dict[str, Any]]:
    """Returns agent metadata that is evaluated."""
    ...
compatible_datasets abstractmethod
compatible_datasets() -> List[str]

Returns the list of datasets that the solver strategy is compatible with.

Source code in nearai/solvers/__init__.py
@abstractmethod
def compatible_datasets(self) -> List[str]:
    """Returns the list of datasets that the solver strategy is compatible with."""
    ...
evaluated_entry_namespace abstractmethod
evaluated_entry_namespace() -> str

Returns namespace of a model or agent to be evaluated.

Source code in nearai/solvers/__init__.py
@abstractmethod
def evaluated_entry_namespace(self) -> str:
    """Returns namespace of a model or agent to be evaluated."""
    ...
evaluation_name abstractmethod
evaluation_name() -> str

Returns a unique name for (benchmark, solver) tuple, e.g. 'mbpp' or 'live_bench' or 'mmlu-5-shot'.

Source code in nearai/solvers/__init__.py
@abstractmethod
def evaluation_name(self) -> str:
    """Returns a unique name for (benchmark, solver) tuple, e.g. 'mbpp' or 'live_bench' or 'mmlu-5-shot'."""
    ...
get_custom_tasks
get_custom_tasks() -> List[dict]

Custom tasks for custom benchmark.

Source code in nearai/solvers/__init__.py
def get_custom_tasks(self) -> List[dict]:
    """Custom tasks for custom benchmark."""
    if self.scoring_method == SolverScoringMethod.Custom:
        raise NotImplementedError("get_custom_tasks must be implemented for Custom scoring method")
    else:
        raise AttributeError("get_custom_tasks is only applicable for Custom scoring method")
get_evaluation_metrics
get_evaluation_metrics(tasks_results: List[Tuple[bool, Any]]) -> Dict[str, Any]

Given results for all datums, returns evaluation metrics.

Not used by TrueOrFalseList scoring method. Do not prepend with evaluation_name. If hierarchical, use slashes /. Expected metrics is a dict of scores, e.g.: {"average": , "group/coding": }.

Source code in nearai/solvers/__init__.py
def get_evaluation_metrics(self, tasks_results: List[Tuple[bool, Any]]) -> Dict[str, Any]:
    """Given results for all datums, returns evaluation metrics.

    Not used by TrueOrFalseList scoring method.
    Do not prepend with evaluation_name. If hierarchical, use slashes /.
    Expected metrics is a dict of scores, e.g.: {"average": <val>, "group/coding": <val>}.
    """
    raise NotImplementedError("get_evaluation_metrics not implemented")
model_metadata abstractmethod
model_metadata() -> Optional[Dict[str, Any]]

Returns model metadata that is evaluated or used by an agent.

Source code in nearai/solvers/__init__.py
@abstractmethod
def model_metadata(self) -> Optional[Dict[str, Any]]:
    """Returns model metadata that is evaluated or used by an agent."""
    ...
model_provider abstractmethod
model_provider() -> str

Returns model provider.

Source code in nearai/solvers/__init__.py
@abstractmethod
def model_provider(self) -> str:
    """Returns model provider."""
    ...
solve abstractmethod
solve(datum: dict) -> Union[bool, Tuple[bool, Any]]

Solves the task for the given datum.

Source code in nearai/solvers/__init__.py
@abstractmethod
def solve(self, datum: dict) -> Union[bool, Tuple[bool, Any]]:
    """Solves the task for the given datum."""
    ...

SolverStrategyMeta

Bases: ABCMeta

Metaclass that automatically registers subclasses in the SolverStrategyRegistry.

Source code in nearai/solvers/__init__.py
class SolverStrategyMeta(ABCMeta):
    """Metaclass that automatically registers subclasses in the SolverStrategyRegistry."""

    def __new__(cls, name: str, bases: tuple, namespace: dict) -> Any:
        new_class = super().__new__(cls, name, bases, namespace)
        if bases != (ABC,):  # Avoid registering the abstract base class itself
            SolverStrategyRegistry[new_class.__name__] = new_class  # type: ignore
        return new_class

ddot_v0_solver

DDOTSEnvironment

Bases: Environment

Source code in nearai/solvers/ddot_v0_solver.py
class DDOTSEnvironment(Environment):
    def __init__(self, agents: List[Agent], problem_id: str, description: str, config: Config):  # noqa: D107
        self.tdir = TemporaryDirectory()
        super().__init__(self.tdir.name, agents, config)

        self.problem_id = problem_id
        self.solved = False

        files = {
            ".id": problem_id,
            "PROBLEM.txt": description,
            "solution.py": "",
            "test.in": "",
            "test.sh": "#!/bin/bash\npython3 solution.py < test.in",
        }
        for fname, content in files.items():
            with open(self.tdir.name + "/" + fname, "w") as f:
                f.write(content)

    async def async_submit(self, code: str) -> Tuple[bool, str]:  # noqa: D102
        submission_id = await submit_problem(self.problem_id, code, Extensions.PYTHON)

        try:
            await is_output_ready(submission_id)
        except Exception:
            print("WARNING: Submission took too long to execute on DDOTS")
            self.mark_done()
            return False, "Submission took too long to execute on the platform"

        ok = await submission_accepted(submission_id)

        if ok:
            self.solved = True
            self.mark_done()
            return True, ""

        output = await get_output(submission_id)

        return False, output

    def submit_python(self, code: str) -> Tuple[bool, str]:
        """Returns True if the submission was accepted, False otherwise.

        The second element of the tuple is the output of the checker if the submission was rejected.
        """
        return asyncio.run(self.async_submit(code))
submit_python
submit_python(code: str) -> Tuple[bool, str]

Returns True if the submission was accepted, False otherwise.

The second element of the tuple is the output of the checker if the submission was rejected.

Source code in nearai/solvers/ddot_v0_solver.py
def submit_python(self, code: str) -> Tuple[bool, str]:
    """Returns True if the submission was accepted, False otherwise.

    The second element of the tuple is the output of the checker if the submission was rejected.
    """
    return asyncio.run(self.async_submit(code))
DDOTSV0Solver

Bases: SolverStrategy

Solver strategy for competitive programming problems live on DDOTS.

This dataset will run agents in an Agent environment previously prepared.

workspace/ .id -- Id of the problem PROBLEM.txt -- Description of the problem

The agent should call env.submit_python(code) to submit the code to the DDOTS server.

Source code in nearai/solvers/ddot_v0_solver.py
class DDOTSV0Solver(SolverStrategy):
    """Solver strategy for competitive programming problems live on DDOTS.

    This dataset will run agents in an Agent environment previously prepared.

    workspace/
        .id             -- Id of the problem
        PROBLEM.txt     -- Description of the problem

    The agent should call env.submit_python(code) to submit the code to the DDOTS server.

    """

    def __init__(self, dataset_ref: Dataset, agents: str, max_iterations: int, save_snapshots: bool = False):  # noqa: D107
        self.agents = [load_agent(agent) for agent in agents.split(",")]
        self.max_iterations = max_iterations

        date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        rnd_id = random.randint(10**8, 10**9 - 1)
        self._saved_trajectories = DATA_FOLDER / "data" / "ddots_v0_trajectories" / f"{date}_{rnd_id}"
        self._saved_trajectories.mkdir(parents=True, exist_ok=True)

        self.save_snapshots = save_snapshots
        print("Saving trajectories to", self._saved_trajectories)

    def evaluation_name(self) -> str:  # noqa: D102
        return "ddots"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["ddots_codeforces_small/v0", "datasets/ddots_codeforces_medium_A_B/v0"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        # TODO: we may want to return the model used by an agent here.
        return None

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return self.agents[0].metadata

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        return self.agents[0].namespace

    def model_provider(self) -> str:  # noqa: D102
        # TODO: we may want to return the provider used by an agent here.
        return DEFAULT_PROVIDER

    def solve(self, datum: dict) -> bool:  # noqa: D102
        problem_id = datum["problem_id"]
        description = datum["description"]

        config = deepcopy(CONFIG)
        config.confirm_commands = False

        env = DDOTSEnvironment(self.agents, problem_id, description, config)
        env.write_file(".solved", str(False))

        try:
            env.run_task(description, max_iterations=self.max_iterations)
            env.write_file(".solved", str(env.solved))

        except Exception as e:
            print(f"Error running task: {e}")

        finally:
            if self.save_snapshots:
                snapshot = env.create_snapshot()
                with open(self._saved_trajectories / f"{problem_id}.tar.gz", "wb") as f:
                    f.write(snapshot)

        return env.solved

gsm8k_solver

GSM8KSolverStrategy

Bases: SolverStrategy

Solver strategy for the GSM8K dataset.

Source code in nearai/solvers/gsm8k_solver.py
class GSM8KSolverStrategy(SolverStrategy):
    """Solver strategy for the GSM8K dataset."""

    SHOTS = 8

    def __init__(self, dataset_ref: Union[Dataset, DatasetDict], model: str) -> None:  # noqa: D107
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        self.model = model

    def evaluation_name(self) -> str:  # noqa: D102
        return "gsm8k"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["gsm8k"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def solve(self, datum: dict) -> bool:  # noqa: D102
        parsed_datum: GSM8KDatum = GSM8KDatum(**datum)

        problem_shots_indices = list(range(0, self.SHOTS))
        problem_shots = list(
            map(
                lambda i: GSM8KDatum(**self.dataset_ref["train"][i]).model_dump(),
                problem_shots_indices,
            )
        )
        res: ModelResponse = cast(
            ModelResponse,
            self.completion_fn(
                model=self.model,
                messages=[
                    {
                        "role": "system",
                        "content": dedent(
                            """
                    You are a helpful assistant. You're goal is to answer word based math questions.
                    """
                            + "\n\n"
                            + "Here are some examples of math questions and their answers:"
                            + "\n\n".join(
                                [f"Question: {shot['question']}\nAnswer: {shot['answer']}" for shot in problem_shots]
                            )
                            + "\n\n"
                            + "Now, answer the next question provided in the user prompt. "
                            + "Think step by step about how to solve the problem. "
                            + "Then, provide the answer."
                        ),
                    },
                    {"role": "user", "content": parsed_datum.question},
                ],
            ),
        )
        res_output = str(cast(List[Choices], res.choices)[0].message.content).strip()
        res_refined: ModelResponse = cast(
            ModelResponse,
            self.completion_fn(
                model=self.model,
                messages=[
                    {
                        "role": "system",
                        "content": dedent(
                            f"""
                    You are a helpful assistant. You're goal is to answer math questions.

                    You have just answered a math question with the following response:

                    --- BEGIN RESPONSE ---
                    {res_output}
                    --- END RESPONSE ---

                    Please refine your answer.

                    Only output the final number *without units* as your answer. Nothing else.
                    """
                        ),
                    },
                ],
            ),
        )

        ## cleanup the output
        res_refined_output = str(cast(List[Choices], res_refined.choices)[0].message.content).strip()
        res_refined_output = res_refined_output.replace("$", "").replace(",", "")
        if " " in res_refined_output:
            res_refined_output = res_refined_output.split(" ")[0]
        try:
            res_refined_output = str(int(res_refined_output))
        except Exception:
            pass
        try:
            res_refined_output = str(int(float(res_refined_output)))
        except Exception:
            pass

        refined_answer = parsed_datum.answer.replace("$", "").replace(",", "")
        print(res_refined_output, refined_answer)
        return res_refined_output == refined_answer

hellaswag_solver

HellaswagSolverStrategy

Bases: SolverStrategy

Solver strategy for the MMLU dataset.

Source code in nearai/solvers/hellaswag_solver.py
class HellaswagSolverStrategy(SolverStrategy):
    """Solver strategy for the MMLU dataset."""

    SHOTS = 8

    def __init__(self, dataset_ref: Union[Dataset, DatasetDict], model: str) -> None:  # noqa: D107
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        self.model = model

    def evaluation_name(self) -> str:  # noqa: D102
        return "hellaswag"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["hellaswag"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def solve(self, datum: dict) -> bool:  # noqa: D102
        datum = HellaswagDatum(**datum).model_dump()

        choices = ["A", "B", "C", "D"]
        example_problems_indices = list(range(0, 5 * self.SHOTS, 5))
        example_problems = list(
            map(
                lambda d: HellaswagDatum(**d).model_dump(),
                [self.dataset_ref["validation"][i] for i in example_problems_indices],
            )
        )
        base_prompt = Template(
            open(PROMPTS_FOLDER / "hellaswag_verbose_answer.j2").read(),
            trim_blocks=True,
        ).render(
            example_problems=example_problems,
            challenge_problem=datum,
            choices=choices,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": base_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        ## Extract the answer from the response
        extract_answer_prompt = Template(
            open(PROMPTS_FOLDER / "hellaswag_extract_answer.j2").read(),
            trim_blocks=True,
        ).render(
            challenge_problem=datum,
            answer_text=response,
            choices=choices,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": extract_answer_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        try:
            answer = choices.index(response)
            return bool(answer == int(datum["label"]))
        except Exception:
            print("Failed to parse answer")
            return False

livebench_solver

LiveBenchSolverStrategy

Bases: SolverStrategy

Solver strategy for the live bench dataset.

Source code in nearai/solvers/livebench_solver.py
class LiveBenchSolverStrategy(SolverStrategy):
    """Solver strategy for the live bench dataset."""

    def __init__(  # noqa: D107
        self, dataset_ref: str, model: str, step: str = "all"
    ) -> None:
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        assert "/" not in model
        self.model = model
        self.step = step

    def evaluation_name(self) -> str:  # noqa: D102
        return "live_bench"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["near.ai/live_bench/1.0.0"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def get_custom_tasks(self) -> List[dict]:  # noqa: D102
        return [{"summary": "all"}]

    @SolverStrategyClassProperty
    def scoring_method(self) -> SolverScoringMethod:  # noqa: D102
        return SolverScoringMethod.Custom

    def solve(self, _datum: dict) -> Tuple[bool, dict]:  # noqa: D102
        if self.step == "gen_model_answer":
            self.gen_model_answer()
            return True, {}
        if self.step == "gen_ground_truth_judgement":
            return self.gen_ground_truth_judgement(), {}
        if self.step == "show_livebench_results":
            return self.show_livebench_results()
        if self.step == "all":
            self.gen_model_answer()
            if not self.gen_ground_truth_judgement():
                return False, {}
            return self.show_livebench_results()
        return False, {}

    def gen_model_answer(self) -> None:  # noqa: D102
        print("")
        print("----------- Step gen_model_answer -----------")
        print("")
        list_of_question_files = glob.glob(f"{self.dataset_ref}/**/question.jsonl", recursive=True)
        for question_file in list_of_question_files:
            questions = load_questions_jsonl(question_file)
            bench_name = os.path.dirname(question_file).split(str(self.dataset_ref))[-1]
            answer_file = f"~/.nearai/live_bench_answers/{bench_name}/model_answer/{self.model}.jsonl"
            print(f"Questions from {question_file}")
            print(f"Output to {answer_file}")
            self.run_eval(questions, answer_file)

    def run_eval(self, questions, answer_file) -> None:  # noqa: D102
        answer_file = os.path.expanduser(answer_file)

        # Load existing answers
        existing_answers = set()
        if os.path.exists(answer_file):
            print(
                f"Answer file {answer_file} exists. Will skip already answered questions. Delete this file if that is not intended."  # noqa: E501
            )
            with open(answer_file, "r") as fin:
                for line in fin:
                    answer = json.loads(line)
                    existing_answers.add(answer["question_id"])

        for question in tqdm(questions):
            if question["question_id"] in existing_answers:
                continue
            choices = self.answer_question(question)

            ans_json = {
                "question_id": question["question_id"],
                "answer_id": shortuuid.uuid(),
                "model_id": self.model,
                "choices": choices,
                "tstamp": time.time(),
            }

            os.makedirs(os.path.dirname(answer_file), exist_ok=True)
            with open(answer_file, "a") as fout:
                fout.write(json.dumps(ans_json) + "\n")

    def answer_question(self, question) -> List[dict]:  # noqa: D102
        conv = []
        # Append system prompt here if needed.
        turns = []
        for qs in question["turns"]:
            conv.append({"role": "user", "content": qs})

            completion_response = cast(
                ModelResponse,
                self.completion_fn(
                    self.model,
                    messages=[convert_message(msg) for msg in conv],
                    temperature=0.0,
                ),
            )
            output = str(cast(List[Choices], completion_response.choices)[0].message.content)

            conv.append({"role": "assistant", "content": output})
            turns.append(output)

        return [{"index": 0, "turns": turns}]

    def gen_ground_truth_judgement(self) -> bool:  # noqa: D102
        print("")
        print("----------- Step gen_ground_truth_judgement -----------")
        print("")
        script_path = "nearai/projects/live_bench/gen_ground_truth_judgement.sh"

        try:
            # Run the script without capturing output
            subprocess.run(["/bin/bash", script_path, self.model, self.dataset_ref], check=True)
            return True

        except subprocess.CalledProcessError as e:
            print(f"An error occurred while running the script: {e}")
            return False

    def show_livebench_results(self) -> Tuple[bool, dict]:  # noqa: D102
        print("")
        print("----------- Step show_livebench_results -----------")
        print("")
        script_path = "nearai/projects/live_bench/show_livebench_results.sh"

        try:
            # Run the script without capturing output
            subprocess.run(["/bin/bash", script_path, self.model], check=True)

        except subprocess.CalledProcessError as e:
            print(f"An error occurred while running the script: {e}")
            return False, {}

        return self.create_result_dict()

    def read_csv_to_dict(self, file_path) -> dict:  # noqa: D102
        file_path = os.path.expanduser(file_path)
        with open(file_path, "r") as f:
            reader = csv.DictReader(f)
            matching_rows = [row for row in reader if row["model"] == self.model]
            return matching_rows[-1] if matching_rows else {}  # Get the last matching row

    def create_result_dict(self) -> Tuple[bool, dict]:  # noqa: D102
        tasks_data = self.read_csv_to_dict("~/.nearai/LiveBench/livebench/all_tasks.csv")
        groups_data = self.read_csv_to_dict("~/.nearai/LiveBench/livebench/all_groups.csv")

        if not tasks_data or not groups_data:
            return False, {}  # Return None if the model is not found in either file

        result: dict = {"tasks": {}, "groups": {}}

        for key, value in tasks_data.items():
            if key != "model":
                result["tasks"][key] = float(value)

        for key, value in groups_data.items():
            if key != "model":
                result["groups"][key] = float(value)

        return True, result

    def get_evaluation_metrics(self, tasks_results: List[Tuple[bool, Any]]) -> Dict[str, Any]:  # noqa: D102
        results: Dict[str, Dict[str, Any]] = tasks_results[-1][1]
        metrics: Dict[str, Any] = {"average": results["groups"]["average"]}

        for group, score in results["groups"].items():
            if group == "average":
                continue
            metrics[f"group/{group}"] = score

        for task, score in results["tasks"].items():
            metrics[f"task/{task}"] = score

        return metrics

mbpp_agent_solver

MBPPSolverAgent

Bases: SolverStrategy

Solver strategy for the MBPP dataset.

Source code in nearai/solvers/mbpp_agent_solver.py
class MBPPSolverAgent(SolverStrategy):
    """Solver strategy for the MBPP dataset."""

    def __init__(  # noqa: D107
        self, dataset_ref: Union[Dataset, DatasetDict], agent: str, num_iterations: int = 16, verbose: bool = False
    ) -> None:
        super().__init__()
        self.dataset_ref = dataset_ref
        self.agent = load_agent(agent)
        self.verbose = verbose
        self.num_iterations = num_iterations

    def evaluation_name(self) -> str:  # noqa: D102
        return "mbpp"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["mbpp"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        # TODO: we may want to return the model used by an agent here.
        return None

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return self.agent.metadata

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        return self.agent.namespace

    def model_provider(self) -> str:  # noqa: D102
        # TODO: we may want to return the provider used by an agent here.
        return DEFAULT_PROVIDER

    def solve(self, datum: dict) -> bool:  # noqa: D102
        datum = MBPPDatum(**datum).model_dump()
        function_name = get_function_name(datum["code"])

        path = os.path.join(
            "/tmp",
            "mbpp",
            str(datum["task_id"]),
            str(int(time.time() * 1000)),
            str(random.randint(0, 1000)),
        )
        CONFIG.confirm_commands = False
        env = Environment(path, [self.agent], CONFIG)

        new_line = "\n"
        task = f"""{datum["text"]}
Write a single file with python function named `{function_name}` that solves the above problem and satisfied the following tests:
```python\n{new_line.join(datum["test_list"])}\n```"""  # noqa: E501
        if self.verbose:
            print(task)
            print(path)
        env.run_task(task, max_iterations=self.num_iterations)

        code = ""
        for filename in env.list_files("."):
            if filename.endswith(".py"):
                code += env.read_file(filename) + "\n"

        try:
            for test in datum["test_list"] + datum["challenge_test_list"]:
                test_code = code + "\n" + test
                exec(test_code, {}, {})
            return True
        except Exception as e:
            if self.verbose:
                print(e)
            return False

mbpp_solver

MBPPSolverStrategy

Bases: SolverStrategy

Solver strategy for the MBPP dataset.

Source code in nearai/solvers/mbpp_solver.py
class MBPPSolverStrategy(SolverStrategy):
    """Solver strategy for the MBPP dataset."""

    SHOTS = 3

    def __init__(self, dataset_ref: Union[Dataset, DatasetDict], model: str) -> None:  # noqa: D107
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        self.model = model

    def evaluation_name(self) -> str:  # noqa: D102
        return "mbpp"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["mbpp"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def solve(self, datum: dict) -> bool:  # noqa: D102
        datum = MBPPDatum(**datum).model_dump()

        ## Allow LLM to think "out loud" for it's answer
        function_name = get_function_name(datum["code"])
        example_problems = list(islice(self.dataset_ref["prompt"], self.SHOTS))
        base_prompt = Template(open(PROMPTS_FOLDER / "mbpp_verbose_answer.j2").read(), trim_blocks=True).render(
            function_name=function_name,
            example_problems=example_problems,
            challenge_problem=datum,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": base_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        ## Extract the answer from the response
        extract_answer_prompt = Template(
            open(PROMPTS_FOLDER / "mbpp_extract_answer.j2").read(), trim_blocks=True
        ).render(
            function_name=function_name,
            answer_text=response,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": extract_answer_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        ## Parse the python code
        python_code_blocks = parse_python_code_block(response) + parse_code_block(response)
        code = ""
        if len(python_code_blocks) == 0:
            code = response
        else:
            code = python_code_blocks[0]

        ## Evaluate the code
        try:
            for test in datum["test_list"] + datum["challenge_test_list"]:
                test_code = code + "\n" + test
                exec(test_code)
            return True
        except Exception:
            return False

mmlu_solver

MMLUSolverStrategy

Bases: SolverStrategy

Solver strategy for the MMLU dataset.

Source code in nearai/solvers/mmlu_solver.py
class MMLUSolverStrategy(SolverStrategy):
    """Solver strategy for the MMLU dataset."""

    SHOTS = 8

    def __init__(self, dataset_ref: Union[Dataset, DatasetDict], model: str) -> None:  # noqa: D107
        super().__init__()
        self.dataset_ref = dataset_ref
        self.completion_fn = InferenceRouter(CONFIG).completions
        self.model = model

    def evaluation_name(self) -> str:  # noqa: D102
        return "mmlu"

    def compatible_datasets(self) -> List[str]:  # noqa: D102
        return ["mmlu"]

    def model_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return {"name": self.model}

    def agent_metadata(self) -> Optional[Dict[str, Any]]:  # noqa: D102
        return None

    def evaluated_entry_namespace(self) -> str:  # noqa: D102
        # Only provider models are supported.
        return ""

    def model_provider(self) -> str:  # noqa: D102
        provider, _ = get_provider_model(DEFAULT_PROVIDER, self.model)
        return provider

    def solve(self, datum: dict) -> bool:  # noqa: D102
        datum = MMLUDatum(**datum).model_dump()

        choices = ["A", "B", "C", "D"]
        example_problems_indices = list(range(0, 5 * self.SHOTS, 5))
        example_problems = list(
            map(
                lambda d: MMLUDatum(**d).model_dump(),
                [self.dataset_ref["dev"][i] for i in example_problems_indices],
            )
        )
        base_prompt = Template(open(PROMPTS_FOLDER / "mmlu_verbose_answer.j2").read(), trim_blocks=True).render(
            example_problems=example_problems,
            challenge_problem=datum,
            choices=choices,
        )

        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": base_prompt},
                ],
                temperature=0.2,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        ## Extract the answer from the response
        extract_answer_prompt = Template(
            open(PROMPTS_FOLDER / "mmlu_extract_answer.j2").read(), trim_blocks=True
        ).render(
            challenge_problem=datum,
            answer_text=response,
            choices=choices,
        )
        completion_response = cast(
            ModelResponse,
            self.completion_fn(
                self.model,
                messages=[
                    {"role": "system", "content": extract_answer_prompt},
                ],
                temperature=0.0,
            ),
        )
        response = str(cast(List[Choices], completion_response.choices)[0].message.content)

        try:
            answer = choices.index(response)
            return bool(answer == datum["answer"])
        except Exception:
            print("Failed to parse answer")
            return False

tool_registry

ToolRegistry

A registry for tools that can be called by the agent.

Source code in nearai/tool_registry.py
class ToolRegistry:
    """A registry for tools that can be called by the agent."""

    def __init__(self) -> None:  # noqa: D107
        self.tools: Dict[str, Callable] = {}

    def register_tool(self, tool: Callable) -> None:  # noqa: D102
        """Register a tool."""
        self.tools[tool.__name__] = tool

    def get_tool(self, name: str) -> Optional[Callable]:  # noqa: D102
        """Get a tool by name."""
        return self.tools.get(name)

    def get_all_tools(self) -> Dict[str, Callable]:  # noqa: D102
        """Get all tools."""
        return self.tools

    def call_tool(self, name: str, **kwargs: Any) -> Any:  # noqa: D102
        """Call a tool by name."""
        tool = self.get_tool(name)
        if tool is None:
            raise ValueError(f"Tool '{name}' not found.")
        return tool(**kwargs)

    def get_tool_definition(self, name: str) -> Optional[Dict]:  # noqa: D102
        """Get the definition of a tool by name."""
        tool = self.get_tool(name)
        if tool is None:
            return None

        assert tool.__doc__ is not None, f"Docstring missing for tool '{name}'."
        docstring = tool.__doc__.strip().split("\n")

        # The first line of the docstring is the function description
        function_description = docstring[0].strip()

        # The rest of the lines contain parameter descriptions
        param_descriptions = docstring[1:]

        # Extract parameter names and types
        signature = inspect.signature(tool)
        type_hints = get_type_hints(tool)

        parameters: Dict[str, Any] = {"type": "object", "properties": {}, "required": []}

        # Iterate through function parameters
        for param in signature.parameters.values():
            param_name = param.name
            param_type = type_hints.get(param_name, str)  # Default to str if type hint is missing
            param_description = ""

            # Find the parameter description in the docstring
            for line in param_descriptions:
                if line.strip().startswith(param_name):
                    param_description = line.strip().split(":", 1)[1].strip()
                    break

            # Convert type hint to JSON Schema type
            if isinstance(param_type, _GenericAlias) and param_type.__origin__ is Literal:
                json_type = "string"
            else:
                json_type = param_type.__name__.lower()

            json_type = {"int": "integer", "float": "number", "str": "string", "bool": "boolean"}.get(
                json_type, json_type
            )

            # Add parameter to the definition
            parameters["properties"][param_name] = {"description": param_description, "type": json_type}

            # Params without default values are required params
            if param.default == inspect.Parameter.empty:
                parameters["required"].append(param_name)

        return {
            "type": "function",
            "function": {"name": tool.__name__, "description": function_description, "parameters": parameters},
        }

    def get_all_tool_definitions(self) -> list[Dict]:  # noqa: D102
        definitions = []
        for tool_name, _tool in self.tools.items():
            definition = self.get_tool_definition(tool_name)
            if definition is not None:
                definitions.append(definition)
        return definitions
call_tool
call_tool(name: str, **kwargs: Any) -> Any

Call a tool by name.

Source code in nearai/tool_registry.py
def call_tool(self, name: str, **kwargs: Any) -> Any:  # noqa: D102
    """Call a tool by name."""
    tool = self.get_tool(name)
    if tool is None:
        raise ValueError(f"Tool '{name}' not found.")
    return tool(**kwargs)
get_all_tools
get_all_tools() -> Dict[str, Callable]

Get all tools.

Source code in nearai/tool_registry.py
def get_all_tools(self) -> Dict[str, Callable]:  # noqa: D102
    """Get all tools."""
    return self.tools
get_tool
get_tool(name: str) -> Optional[Callable]

Get a tool by name.

Source code in nearai/tool_registry.py
def get_tool(self, name: str) -> Optional[Callable]:  # noqa: D102
    """Get a tool by name."""
    return self.tools.get(name)
get_tool_definition
get_tool_definition(name: str) -> Optional[Dict]

Get the definition of a tool by name.

Source code in nearai/tool_registry.py
def get_tool_definition(self, name: str) -> Optional[Dict]:  # noqa: D102
    """Get the definition of a tool by name."""
    tool = self.get_tool(name)
    if tool is None:
        return None

    assert tool.__doc__ is not None, f"Docstring missing for tool '{name}'."
    docstring = tool.__doc__.strip().split("\n")

    # The first line of the docstring is the function description
    function_description = docstring[0].strip()

    # The rest of the lines contain parameter descriptions
    param_descriptions = docstring[1:]

    # Extract parameter names and types
    signature = inspect.signature(tool)
    type_hints = get_type_hints(tool)

    parameters: Dict[str, Any] = {"type": "object", "properties": {}, "required": []}

    # Iterate through function parameters
    for param in signature.parameters.values():
        param_name = param.name
        param_type = type_hints.get(param_name, str)  # Default to str if type hint is missing
        param_description = ""

        # Find the parameter description in the docstring
        for line in param_descriptions:
            if line.strip().startswith(param_name):
                param_description = line.strip().split(":", 1)[1].strip()
                break

        # Convert type hint to JSON Schema type
        if isinstance(param_type, _GenericAlias) and param_type.__origin__ is Literal:
            json_type = "string"
        else:
            json_type = param_type.__name__.lower()

        json_type = {"int": "integer", "float": "number", "str": "string", "bool": "boolean"}.get(
            json_type, json_type
        )

        # Add parameter to the definition
        parameters["properties"][param_name] = {"description": param_description, "type": json_type}

        # Params without default values are required params
        if param.default == inspect.Parameter.empty:
            parameters["required"].append(param_name)

    return {
        "type": "function",
        "function": {"name": tool.__name__, "description": function_description, "parameters": parameters},
    }
register_tool
register_tool(tool: Callable) -> None

Register a tool.

Source code in nearai/tool_registry.py
def register_tool(self, tool: Callable) -> None:  # noqa: D102
    """Register a tool."""
    self.tools[tool.__name__] = tool