Skip to content

Module API - Parser

Alt Text

clak.parser

Clak ParserNode Module

This module provides an enhanced command-line argument parsing system built on top of argparse. It supports hierarchical command structures, subcommands, and argument injection.

Key Features:

  • Hierarchical command structure support via subparsers
  • Argument injection capabilities
  • Enhanced help formatting
  • Debug logging support
  • Exception handling for clean program termination

The module provides several key classes:

  • ParserNode: Main parser class extending argparse functionality
  • SubParser: For creating nested command structures
  • Command: Alias for SubParser for compatibility

Usage can be in either argparse-style:

ArgumentParser()
Argument() 
SubParser()

Or Clak-style:

ClakParser()
Opt()
Arg() 
Cmd()

Debug logging can be enabled by setting CLAK_DEBUG=1 environment variable.

ArgParseItem

Bases: Fn

Base class for argument parser items.

This class represents a generic argument parser item that can be added to an argument parser. It provides common functionality for handling destinations and building parameter dictionaries.

Attributes:

Name Type Description
_destination str

The destination name for the argument value

Source code in clak/parser.py
class ArgParseItem(Fn):
    """Base class for argument parser items.

    This class represents a generic argument parser item that can be added to an argument parser.
    It provides common functionality for handling destinations and building parameter dictionaries.

    Attributes:
        _destination (str): The destination name for the argument value
    """

    _destination: str = None

    @property
    def destination(self) -> Optional[str]:
        """Get the destination name for this argument.

        Returns:
            str: The destination name, derived from the argument name if not explicitly set
            None: If no destination can be determined
        """
        return self._get_best_dest()

    @destination.setter
    def destination(self, value):
        self._destination = value

    def _get_best_dest(self) -> str:
        "Get the best destination name for this argument"
        if self._destination is not None:
            return self._destination

        # If no arguments, return None
        if not self.args:
            return None

        # Get first argument which should be the flag name
        arg = self.args[0]

        # Remove leading dashes and convert remaining dashes to underscores
        if arg.startswith("--"):
            key = arg[2:].replace("-", "_")
        elif arg.startswith("-"):
            # For short flags like -v, use the longer version if available
            if len(self.args) > 1 and self.args[1].startswith("--"):
                key = self.args[1][2:].replace("-", "_")
            else:
                key = arg[1:]
        else:
            key = arg.replace("-", "_")

        return key

    def build_params(self, dest: str) -> Tuple[tuple, dict]:
        """Build parameter dictionary for argument parser.

        Args:
            dest (str): Destination name for the argument

        Returns:
            tuple: A tuple containing (args, kwargs) for argument parser

        Raises:
            ValueError: If no arguments are found
        """
        # Create parser arguments
        kwargs = self.kwargs

        # kind = "option"
        if len(self.args) > 0:
            if len(self.args) > 2:
                raise ValueError(
                    f"Too many arguments found for {self.__class__.__name__}: {self.args}"
                )

            args = self.args

            arg1 = args[0]
            if not arg1.startswith("-"):
                # Remove first position arg to avoid argparse error:
                # ValueError: dest supplied twice for positional argument
                kwargs["metavar"] = args[0]
                args = ()
                # kind = "argument"

        elif dest:
            if len(dest) <= 2:
                args = (f"-{dest}",)
            else:
                args = (f"--{dest}",)
        else:
            raise ValueError(
                f"No arguments found for {self.__class__.__name__}: {self.__dict__}"
            )

        # Update dest if forced
        if dest:
            kwargs["dest"] = dest

        # if kind == "argument":
        #     if "dest" in kwargs:
        #         if len(args) == 1:
        #             # Remove first position arg to avoid argparse error:
        #             # ValueError: dest supplied twice for positional argument
        #             kwargs["metavar"] = args[0]
        #             args = ()
        #         else:
        #             raise ValueError(
        #                 f"Too many arguments found for {self.__class__.__name__}: {self.__dict__}"
        #             )

        return args, kwargs

destination property writable

Get the destination name for this argument.

Returns:

Name Type Description
str Optional[str]

The destination name, derived from the argument name if not explicitly set

None Optional[str]

If no destination can be determined

build_params(dest)

Build parameter dictionary for argument parser.

Parameters:

Name Type Description Default
dest str

Destination name for the argument

required

Returns:

Name Type Description
tuple Tuple[tuple, dict]

A tuple containing (args, kwargs) for argument parser

Raises:

Type Description
ValueError

If no arguments are found

Source code in clak/parser.py
def build_params(self, dest: str) -> Tuple[tuple, dict]:
    """Build parameter dictionary for argument parser.

    Args:
        dest (str): Destination name for the argument

    Returns:
        tuple: A tuple containing (args, kwargs) for argument parser

    Raises:
        ValueError: If no arguments are found
    """
    # Create parser arguments
    kwargs = self.kwargs

    # kind = "option"
    if len(self.args) > 0:
        if len(self.args) > 2:
            raise ValueError(
                f"Too many arguments found for {self.__class__.__name__}: {self.args}"
            )

        args = self.args

        arg1 = args[0]
        if not arg1.startswith("-"):
            # Remove first position arg to avoid argparse error:
            # ValueError: dest supplied twice for positional argument
            kwargs["metavar"] = args[0]
            args = ()
            # kind = "argument"

    elif dest:
        if len(dest) <= 2:
            args = (f"-{dest}",)
        else:
            args = (f"--{dest}",)
    else:
        raise ValueError(
            f"No arguments found for {self.__class__.__name__}: {self.__dict__}"
        )

    # Update dest if forced
    if dest:
        kwargs["dest"] = dest

    # if kind == "argument":
    #     if "dest" in kwargs:
    #         if len(args) == 1:
    #             # Remove first position arg to avoid argparse error:
    #             # ValueError: dest supplied twice for positional argument
    #             kwargs["metavar"] = args[0]
    #             args = ()
    #         else:
    #             raise ValueError(
    #                 f"Too many arguments found for {self.__class__.__name__}: {self.__dict__}"
    #             )

    return args, kwargs

Argument

Bases: ArgParseItem

Represents an argument that can be added to an argument parser.

This class handles both positional arguments and optional flags, automatically determining the appropriate type based on the argument format.

Source code in clak/parser.py
class Argument(ArgParseItem):
    """Represents an argument that can be added to an argument parser.

    This class handles both positional arguments and optional flags, automatically determining
    the appropriate type based on the argument format.
    """

    def attach_arg_to_parser(self, key: str, config: "ParserNode") -> argparse.Action:
        """Create and add an argument to the parser.

        Args:
            key (str): The argument key/name
            config (ParserNode): The parser configuration object

        Returns:
            argparse.Action: The created argument parser action
        """
        parser = config.parser
        args, kwargs = self.build_params(key)
        assert isinstance(
            args, tuple
        ), f"Args must be a list for {self.__class__.__name__}: {type(args)}"

        # Create argument
        logger.debug(
            "Create new argument %s.%s: %s",
            config.get_fname(attr="key"),
            key,
            self.kwargs,
        )

        parser.add_argument(*args, **kwargs)

        return parser

attach_arg_to_parser(key, config)

Create and add an argument to the parser.

Parameters:

Name Type Description Default
key str

The argument key/name

required
config ParserNode

The parser configuration object

required

Returns:

Type Description
Action

argparse.Action: The created argument parser action

Source code in clak/parser.py
def attach_arg_to_parser(self, key: str, config: "ParserNode") -> argparse.Action:
    """Create and add an argument to the parser.

    Args:
        key (str): The argument key/name
        config (ParserNode): The parser configuration object

    Returns:
        argparse.Action: The created argument parser action
    """
    parser = config.parser
    args, kwargs = self.build_params(key)
    assert isinstance(
        args, tuple
    ), f"Args must be a list for {self.__class__.__name__}: {type(args)}"

    # Create argument
    logger.debug(
        "Create new argument %s.%s: %s",
        config.get_fname(attr="key"),
        key,
        self.kwargs,
    )

    parser.add_argument(*args, **kwargs)

    return parser

FormatEnv

Bases: dict

Format env

Source code in clak/parser.py
class FormatEnv(dict):
    "Format env"

    _default = {
        "type": "type FUNC",
    }

    def __init__(self, variables=None):
        self._variables = variables or {}

    # def __str__(self):
    #     return self.value.format(**self.variables)

    def get(self):
        "Get dict of vars"
        out = {}
        out.update(self._default)
        out.update(self._variables)
        return out

    def __dict__(self):
        return dict(self.get())

get()

Get dict of vars

Source code in clak/parser.py
def get(self):
    "Get dict of vars"
    out = {}
    out.update(self._default)
    out.update(self._variables)
    return out

MetaSetting

Bases: Fn

A setting that is used to configure a node

Source code in clak/parser.py
class MetaSetting(Fn):  # pylint: disable=too-few-public-methods
    "A setting that is used to configure a node"

Parser

Bases: ParserNode

A simplified parser class that extends ParserNode.

This class provides a more streamlined interface to ParserNode by: - Automatically parsing arguments on initialization - Maintaining compatibility with legacy argument parser names - Providing simpler command/argument creation methods

Parameters:

Name Type Description Default
*args list

Positional arguments passed to ParserNode

()
parse bool

Whether to automatically parse arguments on init, only on root nodes

True
**kwargs dict

Keyword arguments passed to ParserNode

{}
Source code in clak/parser.py
class Parser(ParserNode):
    """A simplified parser class that extends ParserNode.

    This class provides a more streamlined interface to ParserNode by:
    - Automatically parsing arguments on initialization
    - Maintaining compatibility with legacy argument parser names
    - Providing simpler command/argument creation methods

    Args:
        *args: Positional arguments passed to ParserNode
        parse (bool): Whether to automatically parse arguments on init,
            only on root nodes
        **kwargs: Keyword arguments passed to ParserNode
    """

    def __init__(self, *args: list, parse: bool = True, **kwargs: dict):
        super().__init__(*args, **kwargs)

        if not self.parent and parse is True:
            logger.debug("Starting automatig arg_parse")
            self.dispatch(*args)

ParserNode

Bases: Node

An extensible argument parser that can be inherited to create custom CLIs.

This class provides a framework for building complex command-line interfaces with: - Hierarchical subcommands - Automatic help generation - Plugin support - Custom argument types - Exception handling

The parser can be extended by: 1. Subclassing and adding Argument instances as class attributes 2. Adding SubParser instances to create command hierarchies 3. Implementing cli_run() for command execution 4. Implementing cli_group() for command group behavior

Attributes:

Name Type Description
arguments_dict dict

Dictionary of argument name to ArgParseItem

children dict

Dictionary of subcommand name to subcommand class

inject_as_subparser bool

Whether to inject as subparser vs direct

meta__name str

ParserNode name

Source code in clak/parser.py
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
class ParserNode(Node):  # pylint: disable=too-many-instance-attributes
    """An extensible argument parser that can be inherited to create custom CLIs.

    This class provides a framework for building complex command-line interfaces with:
    - Hierarchical subcommands
    - Automatic help generation
    - Plugin support
    - Custom argument types
    - Exception handling

    The parser can be extended by:
    1. Subclassing and adding Argument instances as class attributes
    2. Adding SubParser instances to create command hierarchies
    3. Implementing cli_run() for command execution
    4. Implementing cli_group() for command group behavior

    Attributes:
        arguments_dict (dict): Dictionary of argument name to ArgParseItem
        children (dict): Dictionary of subcommand name to subcommand class
        inject_as_subparser (bool): Whether to inject as subparser vs direct
        meta__name (str): ParserNode name
    """

    arguments_dict: dict[str, ArgParseItem] = {}
    children: dict[str, type] = {}  # Dictionary of subcommand name to subcommand class
    inject_as_subparser: bool = True

    meta__name: str = NOT_SET

    meta__subcommands_dict: dict[str, SubParser] = {}
    meta__arguments_dict: dict[str, Argument] = {}

    meta__cli_view: ClakView = None

    # Meta settings
    meta__config__name = MetaSetting(
        help="Name of the parser",
    )
    meta__config__app_name = MetaSetting(
        help="Name of the application",
    )
    meta__config__app_proc_name = MetaSetting(
        help="Name of the application processus",
    )
    meta__config__help_usage = MetaSetting(
        help="Message to display in help usage",
    )
    meta__config__help_description = MetaSetting(
        help="Message to display in help description",
    )
    meta__config__help_epilog = MetaSetting(
        help="Message to display in help epilog",
    )
    meta__config__known_exceptions = MetaSetting(
        help="List of known exceptions to handle",
    )

    # Views support
    meta__config__cli_view = MetaSetting(
        help="class of the view to use",
    )

    def __init__(  # pylint: disable=too-many-arguments,too-many-positional-arguments
        self,
        add_help: bool = True,
        parent: "ParserNode" = None,
        name: str = None,
        key: str = None,
        parser: argparse.ArgumentParser = None,
        inject_as_subparser: bool = True,
        proc_name: str = None,
    ):
        """Initialize the parser.

        Args:
            add_help (bool): Whether to add help flags
            parent (ParserNode): Parent parser instance
            name (str): ParserNode name
            key (str): ParserNode key
            parser (ArgumentParser): Existing parser to use
            inject_as_subparser (bool): Whether to inject as subparser
            proc_name (str): Process name
        """

        self.logger = logger

        super().__init__(parent=parent)

        self.name = self.query_cfg_parents("name", default=self.__class__.__name__)
        self.key = key
        self.fkey = self.get_fname(attr="key")
        self.inject_as_subparser = inject_as_subparser
        self.proc_name = proc_name
        self.add_help = add_help

        # Add children link
        self.children = {}
        self.registry = {}
        if parent:
            parent.children[self.key] = self
            self.registry = parent.registry
        self.registry[self.fkey] = self  # RegistryEntry(config=self)

        # Create or reuse parent parser
        if parser is None:
            self.parser = self.create_parser()
            self.proc_name = self.parser.prog
        else:
            self.parser = parser
            self.proc_name = self.parent.proc_name

        # Init _subparsers
        self._subparsers = None

        # Add arguments and subcommands
        # meta__arguments_dict = {}
        # meta__subcommands_dict = {}
        self.add_arguments()
        self.add_subcommands()

    def __repr__(self):
        return f"<{self.__class__.__module__}.{self.__class__.__name__}>"

    def create_parser(self):
        "Create a new parser"
        usage = self.query_cfg_parents("help_usage", default=None)
        desc = self.query_cfg_parents("help_description", default=self.__doc__)
        epilog = self.query_cfg_parents("help_epilog", default=None)

        fenv = FormatEnv({"self": self})
        usage = prepare_docstring(usage, variables=fenv.get())
        desc = prepare_docstring(desc, variables=fenv.get())
        epilog = prepare_docstring(epilog, variables=fenv.get())
        parser = argparse.ArgumentParser(
            prog=self.proc_name,
            usage=usage,
            description=desc,
            epilog=epilog,
            formatter_class=RecursiveHelpFormatter,
            add_help=self.add_help,
            exit_on_error=False,
        )
        return parser

    def __getitem__(self, key):
        return self.children[key]

    def get_fname(self, attr="key"):
        "Get full name of the parser, use key instead of name by default"
        return super().get_fname(attr=attr)

    @property
    def subparsers(self):
        """Lazily create and return the subparsers object."""
        # if not self.inject_as_subparser:
        #     return self.parser

        if self._subparsers is None:
            level = len(self.get_hierarchy())
            self._subparsers = self.parser.add_subparsers(
                dest=f"__cli_cmd__{level}", help="Available commands"
            )
        return self._subparsers

    # Argument management
    # ========================

    def add_arguments(self, arguments: dict = None):
        """Initialize all argument options defined for this parser.

        This method:
        1. Collects arguments from arguments_dict
        2. Collects arguments defined as class attributes
        3. Adds internal arguments like __cli_self__
        4. Creates all argument parser entries
        """
        arguments = arguments or getattr(self, "meta__arguments_dict", {}) or {}
        assert isinstance(arguments, dict), f"Got {type(arguments)} instead of dict"

        # Add arguments from class attributes including inherited ones
        for cls in self.__class__.__mro__:
            for name, value in vars(cls).items():
                if isinstance(value, Argument) and name not in arguments:
                    value.destination = name
                    arguments[name] = value

        # Add __cli_self__ argument
        arguments["__cli_self__"] = Argument(help=argparse.SUPPRESS, default=self)

        # Create all options
        for key, arg in arguments.items():
            self.add_argument(key, arg)
            # arg.attach_arg_to_parser(key, self)

    def add_argument(
        self, key: str, arg: Optional[Argument] = None, **kwargs: Any
    ) -> None:
        """Add an argument to this parser.

        Args:
            key (str): The key/name for the argument
            arg (Argument): The argument object to add
            **kwargs (Any): Additional keyword arguments to pass to add_argument()

        This method adds a new argument to the parser. The argument can be either a
        positional argument or an optional flag, determined by the Argument object.
        """

        if arg is None:
            arg = Argument(**kwargs)

        arg.attach_arg_to_parser(key, self)

    # Subcommand management
    # ========================

    def add_subcommands(self, subcommands: dict = None):
        """Initialize all subcommands defined for this parser.

        This method:
        1. Collects subcommands from children dictionary
        2. Collects Command instances defined as class attributes
        3. Creates parser entries for all subcommands
        """

        subcommands = subcommands or getattr(self, "meta__subcommands_dict", {}) or {}
        assert isinstance(subcommands, dict), f"Got {type(subcommands)} instead of dict"

        # Add arguments from class attributes that are Command instances
        for cls in self.__class__.__mro__:
            for attr_name, attr_value in cls.__dict__.items():
                if isinstance(attr_value, Command):
                    # Store the attribute name as the key in the Fn instance
                    attr_value.destination = attr_name
                    subcommands[attr_name] = attr_value

        for key, arg in subcommands.items():
            # arg.attach_sub_to_parser(key, self)
            self.add_subcommand(key, arg)

    def add_subcommand(self, key: str, arg=None, **kwargs) -> None:
        "Add a subcommand to this parser"
        if arg is None:
            arg = Command(**kwargs)

        arg.attach_sub_to_parser(key, self)

    # Help methods
    # ========================

    def show_help(self):
        """Display the help message for this parser."""
        self.parser.print_help()

    def show_usage(self):
        """Display the usage message for this parser."""
        self.parser.print_usage()

    def show_epilog(self):
        """Display the epilog message for this parser."""
        self.parser.print_epilog()

    # Execution helpers
    # ========================

    def cli_exit(self, status=0, message=None):
        """Exit the CLI application with given status and message.

        Args:
            status (int): Exit status code
            message (str): Optional message to display
        """
        self.parser.exit(status=status, message=message)

    def cli_exit_error(self, message):
        """Exit the CLI application with an error message.

        Args:
            message (str): Error message to display
        """
        self.parser.error(message)

    def cli_run(self, **kwargs: Any) -> None:  # pylint: disable=unused-argument
        """Execute the command implementation.

        This method should be overridden by subclasses to implement command behavior.
        The base implementation shows help for non-leaf nodes.

        Args:
            **kwargs: Additional keyword arguments from command line

        Raises:
            ClakNotImplementedError: If leaf node has no implementation
        """

        ctx = kwargs["ctx"]

        # Check if class is a leaf or not
        if len(ctx.cli_children) > 0:
            self.show_help()
        else:
            raise exception.ClakNotImplementedError(
                f"No 'cli_run' method found for {self}"
            )

    def cli_group(self, ctx: SimpleNamespace, **_: Any) -> None:
        """Execute group-level command behavior.

        Args:
            ctx: Command context object
            **_: Unused keyword arguments
        """

    def find_closest_subcommand(self, args: Optional[List[str]] = None) -> "ParserNode":
        """Find the deepest valid subcommand from given arguments.

        Args:
            args (list): Command line arguments, defaults to sys.argv[1:]

        Returns:
            ParserNode: The deepest valid subcommand parser
        """

        # Get the current command line from sys.argv
        current_cmd = sys.argv[1:] if args is None else args
        last_child = self

        # Loop through each argument to find the deepest valid subcommand
        for arg in current_cmd:
            # Skip options (starting with -)
            if arg.startswith("-"):
                break

            # Check if argument exists as a subcommand
            if arg in last_child.children:
                last_child = last_child.children[arg]
            else:
                break

        return last_child

    def clean_terminate(self, err, known_exceptions=None):
        """Handle program termination based on exception type.

        Args:
            err (Exception): The exception that triggered termination
            known_exceptions (list): List of exception types to handle specially
        """

        # def default_exception_handler(node, exc):
        #     print(f"Default exception handler: {exc} on {node}")
        #     sys.exit(1)

        # # Prepare known exceptions list
        # known_exceptions = known_exceptions or []
        # known_exceptions_conf = {}
        # for _exception in known_exceptions:
        #     exception_fn = default_exception_handler
        #     if isinstance(_exception, Sequence):
        #         exception_cls = _exception[0]
        #         if len(_exception) > 1:
        #             exception_fn = _exception[1]
        #     else:
        #         exception_cls = _exception

        #     exception_name = str(exception_cls)
        #     known_exceptions_conf[exception_name] = {
        #         "fn": exception_fn,
        #         "exception": exception_cls,
        #     }
        # known_exceptions_list = tuple(
        #     val["exception"] for val in known_exceptions_conf.values()
        # )
        # # Check user overrides
        # if known_exceptions_list and isinstance(err, known_exceptions_list):
        #     print("DEBUG", type(err), str(type(err)), err)
        #     pprint(known_exceptions_conf)
        #     get_handler = known_exceptions_conf[str(type(err))]["fn"]
        #     get_handler(self, err)
        #     # If handler did not exited, ensure we do
        #     sys.exit(1)

        # Check user overrides
        known_exceptions = tuple(known_exceptions)
        if known_exceptions and isinstance(err, known_exceptions):
            logger.fatal(err)
            sys.exit(1)

        # If user made an error on command line, show usage before leaving
        if isinstance(err, exception.ClakParseError):
            # Must go to stdout
            self.show_usage()
            print(f"{err}")
            sys.exit(err.rc)

        # Choose dead end way generic user error
        if isinstance(err, exception.ClakUserError):
            if isinstance(err.advice, str):
                logger.warning(err.advice)

            print(f"{err}")
            sys.exit(err.rc)

        # Internal clak errors
        if isinstance(err, exception.ClakError):
            err_name = err.__class__.__name__
            if isinstance(err.advice, str):
                logger.warning(err.advice)

            err_message = err.message
            if not err_message:
                err_message = err.__doc__

            print(f"{err}")
            logger.critical(
                "Program exited with bug %s(%s): %s",
                err_name,
                err.rc,
                err_message,
            )
            sys.exit(err.rc)

        oserrors = [
            PermissionError,
            FileExistsError,
            FileNotFoundError,
            InterruptedError,
            IsADirectoryError,
            NotADirectoryError,
            TimeoutError,
        ]

        if err.__class__ in oserrors:

            # Decode OS errors
            # errno = os.strerror(err.errno)
            # errint = str(err.errno)

            logger.critical("Program exited with OS error: %s", err)
            sys.exit(err.errno)

    def parse_args(
        self, args: Optional[Union[str, List[str], Dict[str, Any]]] = None
    ) -> argparse.Namespace:
        """Parse command line arguments.

        Args:
            args: Arguments to parse, can be:
                - None: Use sys.argv[1:]
                - str: Split on spaces
                - list: Use directly
                - dict: Return as-is

        Returns:
            Namespace: Parsed argument namespace

        Raises:
            ValueError: If args is invalid type
        """
        parser = self.parser
        # argcomplete.autocomplete(parser)

        # args = args[0] if len(args) > 0 else sys.argv[1:]

        if args is None:
            args = sys.argv[1:]
        elif isinstance(args, str):
            args = args.split(" ")
        elif isinstance(args, list):
            pass
        elif isinstance(args, dict):
            return args
        else:
            raise ValueError(f"Invalid args type: {type(args)}")

        return parser.parse_args(args)

    def dispatch(
        self,
        args: Optional[Dict[str, Any]] = None,
        trace: Optional[bool] = False,
        **_: Any,
    ) -> Any:
        """Main dispatch function for command execution.

        Args:
            args: Arguments to parse
            **_: Unused keyword arguments
        """

        # Process or reuse args
        # if args is None:
        error = None
        try:
            args = self.parse_args(args)
            args = args.__dict__
        except argparse.ArgumentError as err:
            msg = f"Could not parse command line: {err.argument_name} {err.message}"
            error = exception.ClakParseError(msg)
            # raise exception.ClakParseError(msg) from err

        if not error:
            assert isinstance(args, dict)

            # Check for trace mode
            if "app_trace_mode" in args:
                trace = args["app_trace_mode"]
            if CLAK_DEBUG:
                trace = True

            # Run app command
            try:
                # Process commands
                data = self.cli_execute(args=args)

            except Exception as err:  # pylint: disable=broad-exception-caught
                error = err

        if not error:
            # Prepare viewer output
            viewer = None
            if isinstance(data, ClakView):
                viewer = data
            else:
                viewer = self.query_cfg_parents("cli_view", default=None)
                if viewer:
                    viewer.payload = data

            # Render output or return data
            if viewer:
                viewer.render()
            return data

        if trace is True:
            # print("TRACE")
            # Show traceback if debug mode is enabled
            logger.error("".join(traceback.format_exception(error)))
            # print("TRACE")

        # Process exception handling
        known_exceptions = self.query_cfg_parents("known_exceptions", default=[])
        self.clean_terminate(error, known_exceptions)

        # Developer catchall, when an exception is not handled
        if trace is False:
            # print("TRACE")
            # Show traceback if not already shown
            logger.error("".join(traceback.format_exception(error)))
        logger.critical(
            "Uncaught error %s, this may be a bug! Error: %s", error.__class__, error
        )
        # logger.critical("Exit 1 with bugs")
        sys.exit(1)

    def cli_execute(  # pylint: disable=too-many-locals,too-many-statements
        self, args: Optional[Dict[str, Any]] = None
    ) -> Any:
        """Execute the command with given arguments.

        Args:
            args: Arguments to parse

        Raises:
            ClakParseError: If argument parsing fails
            NotImplementedError: If command has no implementation
        """
        assert isinstance(args, dict)

        # Prepare args and context
        hook_list = {}

        # args = args.__dict__
        cli_command_hier = [
            value
            for key, value in sorted(args.items())
            if key.startswith("__cli_cmd__")
        ]
        args = {
            key: value
            for key, value in args.items()
            if not key.startswith("__cli_cmd__")
        }

        cli_self = self
        if "__cli_self__" in args:
            cli_self = args.pop("__cli_self__")

        # Prepare data
        fn_group_name = "cli_group"
        fn_exec_name = "cli_run"
        fn_hook_prefix = "cli_hook__"
        name = self.name
        hierarchy = cli_self.get_hierarchy()
        node_count = len(hierarchy)

        logger.debug("Run instance %s", cli_self)

        ctx = {}
        ctx["registry"] = self.registry

        # Fetch settings
        ctx["name"] = name
        ctx["app_name"] = self.query_cfg_parents("app_name", default=name)
        ctx["app_proc_name"] = self.query_cfg_parents(
            "app_proc_name", default=self.proc_name
        )
        # ctx["app_env_prefix"] = self.query_cfg_parents(
        #     "app_env_prefix", default=name.upper()
        # )

        # Loop constant
        ctx["cli_self"] = cli_self
        ctx["cli_root"] = self
        ctx["cli_depth"] = node_count
        ctx["cli_commands"] = cli_command_hier
        ctx["args"] = ObjectNamespace(**args)

        # Shared data
        ctx["data"] = {}
        ctx["plugins"] = {}

        # Loop var init
        ctx["cli_first"] = True
        ctx["cli_state"] = None
        ctx["cli_methods"] = None

        # Execute all nodes in hierarchy
        ret = None
        # pylint: disable=attribute-defined-outside-init
        for idx, node in enumerate(hierarchy):
            last_node = idx == (node_count - 1)

            logger.info("Processing node %d:%s.%s", idx, node, fn_group_name)
            # print(f"Node {idx}:{node}")

            # Prepare hooks list
            cls_hooks = [
                method for method in dir(self) if method.startswith(fn_hook_prefix)
            ]
            for hook_name in cls_hooks:
                if not hook_name in hook_list:
                    hook_fn = getattr(self, hook_name, None)
                    if hook_fn is not None:
                        # Hooks order should be preserved with dict
                        hook_list[hook_name] = hook_fn

            # Update ctx with node attributes
            ctx["cli_parent"] = hierarchy[-2] if len(hierarchy) > 1 else None
            ctx["cli_parents"] = hierarchy[:idx]
            ctx["cli_children"] = dict(node.children)
            ctx["cli_last"] = last_node
            ctx["cli_hooks"] = hook_list
            ctx["cli_index"] = idx

            # Sort ctx dict by keys before creating namespace
            sorted_ctx = dict(sorted(ctx.items()))
            _ctx = ObjectNamespace(**sorted_ctx)
            _ctx.cli_state = "run_hooks"

            # Process hooks
            for name, hook_fn in hook_list.items():
                # hook_fn = getattr(self, hook, None)
                # if hook_fn is not None:
                logger.info("Run hook %d:%s.%s", idx, node, name)
                hook_fn(node, _ctx)

            # Store the list of available plugins methods
            _ctx.cli_methods = getattr(node, "cli_methods", {})

            # Run group_run
            _ctx.cli_state = "run_groups"

            group_fn = getattr(node, fn_group_name, None)
            # print ("GROUP FN", group_fn)
            if group_fn is not None:
                logger.info(
                    "Group function execute: %d:%s.%s", idx, node, fn_group_name
                )
                group_fn(ctx=_ctx, **_ctx.__dict__)

            # Run leaf only if last node
            _ctx.cli_state = "run_exec"
            if last_node is True:
                run_fn = getattr(node, fn_exec_name, None)

                logger.info("Run function execute: %d:%s.%s", idx, node, fn_exec_name)
                ret = run_fn(ctx=_ctx, **_ctx.args.__dict__)

            # Change status
            ctx["cli_first"] = False

        return ret

subparsers property

Lazily create and return the subparsers object.

__init__(add_help=True, parent=None, name=None, key=None, parser=None, inject_as_subparser=True, proc_name=None)

Initialize the parser.

Parameters:

Name Type Description Default
add_help bool

Whether to add help flags

True
parent ParserNode

Parent parser instance

None
name str

ParserNode name

None
key str

ParserNode key

None
parser ArgumentParser

Existing parser to use

None
inject_as_subparser bool

Whether to inject as subparser

True
proc_name str

Process name

None
Source code in clak/parser.py
def __init__(  # pylint: disable=too-many-arguments,too-many-positional-arguments
    self,
    add_help: bool = True,
    parent: "ParserNode" = None,
    name: str = None,
    key: str = None,
    parser: argparse.ArgumentParser = None,
    inject_as_subparser: bool = True,
    proc_name: str = None,
):
    """Initialize the parser.

    Args:
        add_help (bool): Whether to add help flags
        parent (ParserNode): Parent parser instance
        name (str): ParserNode name
        key (str): ParserNode key
        parser (ArgumentParser): Existing parser to use
        inject_as_subparser (bool): Whether to inject as subparser
        proc_name (str): Process name
    """

    self.logger = logger

    super().__init__(parent=parent)

    self.name = self.query_cfg_parents("name", default=self.__class__.__name__)
    self.key = key
    self.fkey = self.get_fname(attr="key")
    self.inject_as_subparser = inject_as_subparser
    self.proc_name = proc_name
    self.add_help = add_help

    # Add children link
    self.children = {}
    self.registry = {}
    if parent:
        parent.children[self.key] = self
        self.registry = parent.registry
    self.registry[self.fkey] = self  # RegistryEntry(config=self)

    # Create or reuse parent parser
    if parser is None:
        self.parser = self.create_parser()
        self.proc_name = self.parser.prog
    else:
        self.parser = parser
        self.proc_name = self.parent.proc_name

    # Init _subparsers
    self._subparsers = None

    # Add arguments and subcommands
    # meta__arguments_dict = {}
    # meta__subcommands_dict = {}
    self.add_arguments()
    self.add_subcommands()

add_argument(key, arg=None, **kwargs)

Add an argument to this parser.

Parameters:

Name Type Description Default
key str

The key/name for the argument

required
arg Argument

The argument object to add

None
**kwargs Any

Additional keyword arguments to pass to add_argument()

{}

This method adds a new argument to the parser. The argument can be either a positional argument or an optional flag, determined by the Argument object.

Source code in clak/parser.py
def add_argument(
    self, key: str, arg: Optional[Argument] = None, **kwargs: Any
) -> None:
    """Add an argument to this parser.

    Args:
        key (str): The key/name for the argument
        arg (Argument): The argument object to add
        **kwargs (Any): Additional keyword arguments to pass to add_argument()

    This method adds a new argument to the parser. The argument can be either a
    positional argument or an optional flag, determined by the Argument object.
    """

    if arg is None:
        arg = Argument(**kwargs)

    arg.attach_arg_to_parser(key, self)

add_arguments(arguments=None)

Initialize all argument options defined for this parser.

This method: 1. Collects arguments from arguments_dict 2. Collects arguments defined as class attributes 3. Adds internal arguments like cli_self 4. Creates all argument parser entries

Source code in clak/parser.py
def add_arguments(self, arguments: dict = None):
    """Initialize all argument options defined for this parser.

    This method:
    1. Collects arguments from arguments_dict
    2. Collects arguments defined as class attributes
    3. Adds internal arguments like __cli_self__
    4. Creates all argument parser entries
    """
    arguments = arguments or getattr(self, "meta__arguments_dict", {}) or {}
    assert isinstance(arguments, dict), f"Got {type(arguments)} instead of dict"

    # Add arguments from class attributes including inherited ones
    for cls in self.__class__.__mro__:
        for name, value in vars(cls).items():
            if isinstance(value, Argument) and name not in arguments:
                value.destination = name
                arguments[name] = value

    # Add __cli_self__ argument
    arguments["__cli_self__"] = Argument(help=argparse.SUPPRESS, default=self)

    # Create all options
    for key, arg in arguments.items():
        self.add_argument(key, arg)

add_subcommand(key, arg=None, **kwargs)

Add a subcommand to this parser

Source code in clak/parser.py
def add_subcommand(self, key: str, arg=None, **kwargs) -> None:
    "Add a subcommand to this parser"
    if arg is None:
        arg = Command(**kwargs)

    arg.attach_sub_to_parser(key, self)

add_subcommands(subcommands=None)

Initialize all subcommands defined for this parser.

This method: 1. Collects subcommands from children dictionary 2. Collects Command instances defined as class attributes 3. Creates parser entries for all subcommands

Source code in clak/parser.py
def add_subcommands(self, subcommands: dict = None):
    """Initialize all subcommands defined for this parser.

    This method:
    1. Collects subcommands from children dictionary
    2. Collects Command instances defined as class attributes
    3. Creates parser entries for all subcommands
    """

    subcommands = subcommands or getattr(self, "meta__subcommands_dict", {}) or {}
    assert isinstance(subcommands, dict), f"Got {type(subcommands)} instead of dict"

    # Add arguments from class attributes that are Command instances
    for cls in self.__class__.__mro__:
        for attr_name, attr_value in cls.__dict__.items():
            if isinstance(attr_value, Command):
                # Store the attribute name as the key in the Fn instance
                attr_value.destination = attr_name
                subcommands[attr_name] = attr_value

    for key, arg in subcommands.items():
        # arg.attach_sub_to_parser(key, self)
        self.add_subcommand(key, arg)

clean_terminate(err, known_exceptions=None)

Handle program termination based on exception type.

Parameters:

Name Type Description Default
err Exception

The exception that triggered termination

required
known_exceptions list

List of exception types to handle specially

None
Source code in clak/parser.py
def clean_terminate(self, err, known_exceptions=None):
    """Handle program termination based on exception type.

    Args:
        err (Exception): The exception that triggered termination
        known_exceptions (list): List of exception types to handle specially
    """

    # def default_exception_handler(node, exc):
    #     print(f"Default exception handler: {exc} on {node}")
    #     sys.exit(1)

    # # Prepare known exceptions list
    # known_exceptions = known_exceptions or []
    # known_exceptions_conf = {}
    # for _exception in known_exceptions:
    #     exception_fn = default_exception_handler
    #     if isinstance(_exception, Sequence):
    #         exception_cls = _exception[0]
    #         if len(_exception) > 1:
    #             exception_fn = _exception[1]
    #     else:
    #         exception_cls = _exception

    #     exception_name = str(exception_cls)
    #     known_exceptions_conf[exception_name] = {
    #         "fn": exception_fn,
    #         "exception": exception_cls,
    #     }
    # known_exceptions_list = tuple(
    #     val["exception"] for val in known_exceptions_conf.values()
    # )
    # # Check user overrides
    # if known_exceptions_list and isinstance(err, known_exceptions_list):
    #     print("DEBUG", type(err), str(type(err)), err)
    #     pprint(known_exceptions_conf)
    #     get_handler = known_exceptions_conf[str(type(err))]["fn"]
    #     get_handler(self, err)
    #     # If handler did not exited, ensure we do
    #     sys.exit(1)

    # Check user overrides
    known_exceptions = tuple(known_exceptions)
    if known_exceptions and isinstance(err, known_exceptions):
        logger.fatal(err)
        sys.exit(1)

    # If user made an error on command line, show usage before leaving
    if isinstance(err, exception.ClakParseError):
        # Must go to stdout
        self.show_usage()
        print(f"{err}")
        sys.exit(err.rc)

    # Choose dead end way generic user error
    if isinstance(err, exception.ClakUserError):
        if isinstance(err.advice, str):
            logger.warning(err.advice)

        print(f"{err}")
        sys.exit(err.rc)

    # Internal clak errors
    if isinstance(err, exception.ClakError):
        err_name = err.__class__.__name__
        if isinstance(err.advice, str):
            logger.warning(err.advice)

        err_message = err.message
        if not err_message:
            err_message = err.__doc__

        print(f"{err}")
        logger.critical(
            "Program exited with bug %s(%s): %s",
            err_name,
            err.rc,
            err_message,
        )
        sys.exit(err.rc)

    oserrors = [
        PermissionError,
        FileExistsError,
        FileNotFoundError,
        InterruptedError,
        IsADirectoryError,
        NotADirectoryError,
        TimeoutError,
    ]

    if err.__class__ in oserrors:

        # Decode OS errors
        # errno = os.strerror(err.errno)
        # errint = str(err.errno)

        logger.critical("Program exited with OS error: %s", err)
        sys.exit(err.errno)

cli_execute(args=None)

Execute the command with given arguments.

Parameters:

Name Type Description Default
args Optional[Dict[str, Any]]

Arguments to parse

None

Raises:

Type Description
ClakParseError

If argument parsing fails

NotImplementedError

If command has no implementation

Source code in clak/parser.py
def cli_execute(  # pylint: disable=too-many-locals,too-many-statements
    self, args: Optional[Dict[str, Any]] = None
) -> Any:
    """Execute the command with given arguments.

    Args:
        args: Arguments to parse

    Raises:
        ClakParseError: If argument parsing fails
        NotImplementedError: If command has no implementation
    """
    assert isinstance(args, dict)

    # Prepare args and context
    hook_list = {}

    # args = args.__dict__
    cli_command_hier = [
        value
        for key, value in sorted(args.items())
        if key.startswith("__cli_cmd__")
    ]
    args = {
        key: value
        for key, value in args.items()
        if not key.startswith("__cli_cmd__")
    }

    cli_self = self
    if "__cli_self__" in args:
        cli_self = args.pop("__cli_self__")

    # Prepare data
    fn_group_name = "cli_group"
    fn_exec_name = "cli_run"
    fn_hook_prefix = "cli_hook__"
    name = self.name
    hierarchy = cli_self.get_hierarchy()
    node_count = len(hierarchy)

    logger.debug("Run instance %s", cli_self)

    ctx = {}
    ctx["registry"] = self.registry

    # Fetch settings
    ctx["name"] = name
    ctx["app_name"] = self.query_cfg_parents("app_name", default=name)
    ctx["app_proc_name"] = self.query_cfg_parents(
        "app_proc_name", default=self.proc_name
    )
    # ctx["app_env_prefix"] = self.query_cfg_parents(
    #     "app_env_prefix", default=name.upper()
    # )

    # Loop constant
    ctx["cli_self"] = cli_self
    ctx["cli_root"] = self
    ctx["cli_depth"] = node_count
    ctx["cli_commands"] = cli_command_hier
    ctx["args"] = ObjectNamespace(**args)

    # Shared data
    ctx["data"] = {}
    ctx["plugins"] = {}

    # Loop var init
    ctx["cli_first"] = True
    ctx["cli_state"] = None
    ctx["cli_methods"] = None

    # Execute all nodes in hierarchy
    ret = None
    # pylint: disable=attribute-defined-outside-init
    for idx, node in enumerate(hierarchy):
        last_node = idx == (node_count - 1)

        logger.info("Processing node %d:%s.%s", idx, node, fn_group_name)
        # print(f"Node {idx}:{node}")

        # Prepare hooks list
        cls_hooks = [
            method for method in dir(self) if method.startswith(fn_hook_prefix)
        ]
        for hook_name in cls_hooks:
            if not hook_name in hook_list:
                hook_fn = getattr(self, hook_name, None)
                if hook_fn is not None:
                    # Hooks order should be preserved with dict
                    hook_list[hook_name] = hook_fn

        # Update ctx with node attributes
        ctx["cli_parent"] = hierarchy[-2] if len(hierarchy) > 1 else None
        ctx["cli_parents"] = hierarchy[:idx]
        ctx["cli_children"] = dict(node.children)
        ctx["cli_last"] = last_node
        ctx["cli_hooks"] = hook_list
        ctx["cli_index"] = idx

        # Sort ctx dict by keys before creating namespace
        sorted_ctx = dict(sorted(ctx.items()))
        _ctx = ObjectNamespace(**sorted_ctx)
        _ctx.cli_state = "run_hooks"

        # Process hooks
        for name, hook_fn in hook_list.items():
            # hook_fn = getattr(self, hook, None)
            # if hook_fn is not None:
            logger.info("Run hook %d:%s.%s", idx, node, name)
            hook_fn(node, _ctx)

        # Store the list of available plugins methods
        _ctx.cli_methods = getattr(node, "cli_methods", {})

        # Run group_run
        _ctx.cli_state = "run_groups"

        group_fn = getattr(node, fn_group_name, None)
        # print ("GROUP FN", group_fn)
        if group_fn is not None:
            logger.info(
                "Group function execute: %d:%s.%s", idx, node, fn_group_name
            )
            group_fn(ctx=_ctx, **_ctx.__dict__)

        # Run leaf only if last node
        _ctx.cli_state = "run_exec"
        if last_node is True:
            run_fn = getattr(node, fn_exec_name, None)

            logger.info("Run function execute: %d:%s.%s", idx, node, fn_exec_name)
            ret = run_fn(ctx=_ctx, **_ctx.args.__dict__)

        # Change status
        ctx["cli_first"] = False

    return ret

cli_exit(status=0, message=None)

Exit the CLI application with given status and message.

Parameters:

Name Type Description Default
status int

Exit status code

0
message str

Optional message to display

None
Source code in clak/parser.py
def cli_exit(self, status=0, message=None):
    """Exit the CLI application with given status and message.

    Args:
        status (int): Exit status code
        message (str): Optional message to display
    """
    self.parser.exit(status=status, message=message)

cli_exit_error(message)

Exit the CLI application with an error message.

Parameters:

Name Type Description Default
message str

Error message to display

required
Source code in clak/parser.py
def cli_exit_error(self, message):
    """Exit the CLI application with an error message.

    Args:
        message (str): Error message to display
    """
    self.parser.error(message)

cli_group(ctx, **_)

Execute group-level command behavior.

Parameters:

Name Type Description Default
ctx SimpleNamespace

Command context object

required
**_ Any

Unused keyword arguments

{}
Source code in clak/parser.py
def cli_group(self, ctx: SimpleNamespace, **_: Any) -> None:
    """Execute group-level command behavior.

    Args:
        ctx: Command context object
        **_: Unused keyword arguments
    """

cli_run(**kwargs)

Execute the command implementation.

This method should be overridden by subclasses to implement command behavior. The base implementation shows help for non-leaf nodes.

Parameters:

Name Type Description Default
**kwargs Any

Additional keyword arguments from command line

{}

Raises:

Type Description
ClakNotImplementedError

If leaf node has no implementation

Source code in clak/parser.py
def cli_run(self, **kwargs: Any) -> None:  # pylint: disable=unused-argument
    """Execute the command implementation.

    This method should be overridden by subclasses to implement command behavior.
    The base implementation shows help for non-leaf nodes.

    Args:
        **kwargs: Additional keyword arguments from command line

    Raises:
        ClakNotImplementedError: If leaf node has no implementation
    """

    ctx = kwargs["ctx"]

    # Check if class is a leaf or not
    if len(ctx.cli_children) > 0:
        self.show_help()
    else:
        raise exception.ClakNotImplementedError(
            f"No 'cli_run' method found for {self}"
        )

create_parser()

Create a new parser

Source code in clak/parser.py
def create_parser(self):
    "Create a new parser"
    usage = self.query_cfg_parents("help_usage", default=None)
    desc = self.query_cfg_parents("help_description", default=self.__doc__)
    epilog = self.query_cfg_parents("help_epilog", default=None)

    fenv = FormatEnv({"self": self})
    usage = prepare_docstring(usage, variables=fenv.get())
    desc = prepare_docstring(desc, variables=fenv.get())
    epilog = prepare_docstring(epilog, variables=fenv.get())
    parser = argparse.ArgumentParser(
        prog=self.proc_name,
        usage=usage,
        description=desc,
        epilog=epilog,
        formatter_class=RecursiveHelpFormatter,
        add_help=self.add_help,
        exit_on_error=False,
    )
    return parser

dispatch(args=None, trace=False, **_)

Main dispatch function for command execution.

Parameters:

Name Type Description Default
args Optional[Dict[str, Any]]

Arguments to parse

None
**_ Any

Unused keyword arguments

{}
Source code in clak/parser.py
def dispatch(
    self,
    args: Optional[Dict[str, Any]] = None,
    trace: Optional[bool] = False,
    **_: Any,
) -> Any:
    """Main dispatch function for command execution.

    Args:
        args: Arguments to parse
        **_: Unused keyword arguments
    """

    # Process or reuse args
    # if args is None:
    error = None
    try:
        args = self.parse_args(args)
        args = args.__dict__
    except argparse.ArgumentError as err:
        msg = f"Could not parse command line: {err.argument_name} {err.message}"
        error = exception.ClakParseError(msg)
        # raise exception.ClakParseError(msg) from err

    if not error:
        assert isinstance(args, dict)

        # Check for trace mode
        if "app_trace_mode" in args:
            trace = args["app_trace_mode"]
        if CLAK_DEBUG:
            trace = True

        # Run app command
        try:
            # Process commands
            data = self.cli_execute(args=args)

        except Exception as err:  # pylint: disable=broad-exception-caught
            error = err

    if not error:
        # Prepare viewer output
        viewer = None
        if isinstance(data, ClakView):
            viewer = data
        else:
            viewer = self.query_cfg_parents("cli_view", default=None)
            if viewer:
                viewer.payload = data

        # Render output or return data
        if viewer:
            viewer.render()
        return data

    if trace is True:
        # print("TRACE")
        # Show traceback if debug mode is enabled
        logger.error("".join(traceback.format_exception(error)))
        # print("TRACE")

    # Process exception handling
    known_exceptions = self.query_cfg_parents("known_exceptions", default=[])
    self.clean_terminate(error, known_exceptions)

    # Developer catchall, when an exception is not handled
    if trace is False:
        # print("TRACE")
        # Show traceback if not already shown
        logger.error("".join(traceback.format_exception(error)))
    logger.critical(
        "Uncaught error %s, this may be a bug! Error: %s", error.__class__, error
    )
    # logger.critical("Exit 1 with bugs")
    sys.exit(1)

find_closest_subcommand(args=None)

Find the deepest valid subcommand from given arguments.

Parameters:

Name Type Description Default
args list

Command line arguments, defaults to sys.argv[1:]

None

Returns:

Name Type Description
ParserNode ParserNode

The deepest valid subcommand parser

Source code in clak/parser.py
def find_closest_subcommand(self, args: Optional[List[str]] = None) -> "ParserNode":
    """Find the deepest valid subcommand from given arguments.

    Args:
        args (list): Command line arguments, defaults to sys.argv[1:]

    Returns:
        ParserNode: The deepest valid subcommand parser
    """

    # Get the current command line from sys.argv
    current_cmd = sys.argv[1:] if args is None else args
    last_child = self

    # Loop through each argument to find the deepest valid subcommand
    for arg in current_cmd:
        # Skip options (starting with -)
        if arg.startswith("-"):
            break

        # Check if argument exists as a subcommand
        if arg in last_child.children:
            last_child = last_child.children[arg]
        else:
            break

    return last_child

get_fname(attr='key')

Get full name of the parser, use key instead of name by default

Source code in clak/parser.py
def get_fname(self, attr="key"):
    "Get full name of the parser, use key instead of name by default"
    return super().get_fname(attr=attr)

parse_args(args=None)

Parse command line arguments.

Parameters:

Name Type Description Default
args Optional[Union[str, List[str], Dict[str, Any]]]

Arguments to parse, can be: - None: Use sys.argv[1:] - str: Split on spaces - list: Use directly - dict: Return as-is

None

Returns:

Name Type Description
Namespace Namespace

Parsed argument namespace

Raises:

Type Description
ValueError

If args is invalid type

Source code in clak/parser.py
def parse_args(
    self, args: Optional[Union[str, List[str], Dict[str, Any]]] = None
) -> argparse.Namespace:
    """Parse command line arguments.

    Args:
        args: Arguments to parse, can be:
            - None: Use sys.argv[1:]
            - str: Split on spaces
            - list: Use directly
            - dict: Return as-is

    Returns:
        Namespace: Parsed argument namespace

    Raises:
        ValueError: If args is invalid type
    """
    parser = self.parser
    # argcomplete.autocomplete(parser)

    # args = args[0] if len(args) > 0 else sys.argv[1:]

    if args is None:
        args = sys.argv[1:]
    elif isinstance(args, str):
        args = args.split(" ")
    elif isinstance(args, list):
        pass
    elif isinstance(args, dict):
        return args
    else:
        raise ValueError(f"Invalid args type: {type(args)}")

    return parser.parse_args(args)

show_epilog()

Display the epilog message for this parser.

Source code in clak/parser.py
def show_epilog(self):
    """Display the epilog message for this parser."""
    self.parser.print_epilog()

show_help()

Display the help message for this parser.

Source code in clak/parser.py
def show_help(self):
    """Display the help message for this parser."""
    self.parser.print_help()

show_usage()

Display the usage message for this parser.

Source code in clak/parser.py
def show_usage(self):
    """Display the usage message for this parser."""
    self.parser.print_usage()

RegistryEntry

Registry entry

Source code in clak/parser.py
class RegistryEntry:
    "Registry entry"

    def __init__(self, config):
        # super().__init__(*args, **kwargs)
        # self.parser = None
        self._config = config
        self._entries = {}

    def add_entry(self, key: str, value: Any) -> None:
        """Add a new entry to the registry.

        Args:
            key: Key to store the entry under
            value: Value to store in the registry
        """
        self._entries[key] = value

    def __repr__(self):
        return f"RegistryEntry({self._config})"

add_entry(key, value)

Add a new entry to the registry.

Parameters:

Name Type Description Default
key str

Key to store the entry under

required
value Any

Value to store in the registry

required
Source code in clak/parser.py
def add_entry(self, key: str, value: Any) -> None:
    """Add a new entry to the registry.

    Args:
        key: Key to store the entry under
        value: Value to store in the registry
    """
    self._entries[key] = value

SubParser

Bases: ArgParseItem

Represents a subcommand parser that can be added to a parent parser.

This class handles creation of nested command structures, allowing for hierarchical command-line interfaces. It supports both subparser and injection modes.

Attributes:

Name Type Description
meta__help_flags bool

Whether to enable -h and --help support

meta__usage str

Custom usage message

meta__description str

Custom description message

meta__epilog str

Custom epilog message

Source code in clak/parser.py
class SubParser(ArgParseItem):
    """Represents a subcommand parser that can be added to a parent parser.

    This class handles creation of nested command structures, allowing for hierarchical
    command-line interfaces. It supports both subparser and injection modes.

    Attributes:
        meta__help_flags (bool): Whether to enable -h and --help support
        meta__usage (str): Custom usage message
        meta__description (str): Custom description message
        meta__epilog (str): Custom epilog message
    """

    # If true, enable -h and --help support
    meta__help_flags = True

    meta__usage = None
    meta__description = None
    meta__epilog = None

    def __init__(self, cls, *args, use_subparsers: bool = USE_SUBPARSERS, **kwargs):
        super().__init__(*args, **kwargs)
        self.cls = cls
        self.use_subparsers = use_subparsers

    def attach_sub_to_parser(self, key: str, config: "ParserNode") -> "ParserNode":
        """Create a subcommand parser for this command.

        Creates a new subparser for the command and configures it with the appropriate
        help text and options. Validates that the command name is valid.

        Args:
            key (str): Name of the subcommand
            config (ParserNode): Parent parser configuration object

        Raises:
            ValueError: If command name contains spaces

        Returns:
            ParserNode: The created child parser instance
        """

        if " " in key:
            raise ValueError(
                f"Command name '{key}' contains spaces. Command names must not contain spaces."
            )

        if self.use_subparsers:

            logger.debug(
                "Create new subparser %s.%s",
                config.get_fname(attr="key"),
                key,
            )  # , self.kwargs)

            # Fetch help from class
            parser_help = self.kwargs.get(
                "help",
                self.cls.query_cfg_inst(
                    self.cls, "help_description", default=self.cls.__doc__
                ),
            )
            parser_help_enabled = self.kwargs.get(
                "help_flags",
                self.cls.query_cfg_inst(self.cls, "help_flags", default=True),
            )

            ctx_vars = {"key": key, "self": config}

            # Create a new subparser for this command (flat structure)
            parser_help = prepare_docstring(
                first_doc_line(parser_help), variables=ctx_vars
            )
            parser_kwargs = {
                "formatter_class": RecursiveHelpFormatter,
                "add_help": parser_help_enabled,  # Add support for --help
                "exit_on_error": False,
                "help": parser_help,
            }
            # if parser_help is not None:
            #     parser_kwargs["help"] = parser_help

            # Create parser
            subparser = config.subparsers.add_parser(
                key,
                **parser_kwargs,
            )

            # Create an instance of the command class with the subparser
            child = self.cls(parent=config, parser=subparser, key=key)
            ctx_vars["self"] = child

            # logger.debug(
            #     "Create new SUBPARSER %s %s %s",
            #     child.get_fname(attr="key"),
            #     key,
            #     self.kwargs,
            # )

            child_usage = child.query_cfg_inst("help_usage", default=None)
            child_desc = first_doc_line(
                child.query_cfg_inst("help_description", default=child.__doc__)
            )
            child_epilog = child.query_cfg_inst("help_epilog", default=None)
            # print(f"DESC: |{desc}|")

            # Reconfigure subparser
            child_usage = prepare_docstring(child_usage, variables=ctx_vars)
            child_desc = prepare_docstring(child_desc, variables=ctx_vars)
            child_epilog = prepare_docstring(child_epilog, variables=ctx_vars)

            subparser.add_help = (
                False  # child.query_cfg_inst("help_enable", default=True)
            )
            subparser.usage = child_usage
            subparser.description = child_desc
            subparser.epilog = child_epilog

            # pprint (subparser.__dict__)

        else:
            # This part is in BETA

            # Create nested structure
            child = self.cls(parent=config)
            # Pass help text from Command class kwargs
            child.parser.help = self.kwargs.get("help", child.__doc__)
            argparse_inject_as_subparser(config.parser, key, child.parser)

        return child

attach_sub_to_parser(key, config)

Create a subcommand parser for this command.

Creates a new subparser for the command and configures it with the appropriate help text and options. Validates that the command name is valid.

Parameters:

Name Type Description Default
key str

Name of the subcommand

required
config ParserNode

Parent parser configuration object

required

Raises:

Type Description
ValueError

If command name contains spaces

Returns:

Name Type Description
ParserNode ParserNode

The created child parser instance

Source code in clak/parser.py
def attach_sub_to_parser(self, key: str, config: "ParserNode") -> "ParserNode":
    """Create a subcommand parser for this command.

    Creates a new subparser for the command and configures it with the appropriate
    help text and options. Validates that the command name is valid.

    Args:
        key (str): Name of the subcommand
        config (ParserNode): Parent parser configuration object

    Raises:
        ValueError: If command name contains spaces

    Returns:
        ParserNode: The created child parser instance
    """

    if " " in key:
        raise ValueError(
            f"Command name '{key}' contains spaces. Command names must not contain spaces."
        )

    if self.use_subparsers:

        logger.debug(
            "Create new subparser %s.%s",
            config.get_fname(attr="key"),
            key,
        )  # , self.kwargs)

        # Fetch help from class
        parser_help = self.kwargs.get(
            "help",
            self.cls.query_cfg_inst(
                self.cls, "help_description", default=self.cls.__doc__
            ),
        )
        parser_help_enabled = self.kwargs.get(
            "help_flags",
            self.cls.query_cfg_inst(self.cls, "help_flags", default=True),
        )

        ctx_vars = {"key": key, "self": config}

        # Create a new subparser for this command (flat structure)
        parser_help = prepare_docstring(
            first_doc_line(parser_help), variables=ctx_vars
        )
        parser_kwargs = {
            "formatter_class": RecursiveHelpFormatter,
            "add_help": parser_help_enabled,  # Add support for --help
            "exit_on_error": False,
            "help": parser_help,
        }
        # if parser_help is not None:
        #     parser_kwargs["help"] = parser_help

        # Create parser
        subparser = config.subparsers.add_parser(
            key,
            **parser_kwargs,
        )

        # Create an instance of the command class with the subparser
        child = self.cls(parent=config, parser=subparser, key=key)
        ctx_vars["self"] = child

        # logger.debug(
        #     "Create new SUBPARSER %s %s %s",
        #     child.get_fname(attr="key"),
        #     key,
        #     self.kwargs,
        # )

        child_usage = child.query_cfg_inst("help_usage", default=None)
        child_desc = first_doc_line(
            child.query_cfg_inst("help_description", default=child.__doc__)
        )
        child_epilog = child.query_cfg_inst("help_epilog", default=None)
        # print(f"DESC: |{desc}|")

        # Reconfigure subparser
        child_usage = prepare_docstring(child_usage, variables=ctx_vars)
        child_desc = prepare_docstring(child_desc, variables=ctx_vars)
        child_epilog = prepare_docstring(child_epilog, variables=ctx_vars)

        subparser.add_help = (
            False  # child.query_cfg_inst("help_enable", default=True)
        )
        subparser.usage = child_usage
        subparser.description = child_desc
        subparser.epilog = child_epilog

        # pprint (subparser.__dict__)

    else:
        # This part is in BETA

        # Create nested structure
        child = self.cls(parent=config)
        # Pass help text from Command class kwargs
        child.parser.help = self.kwargs.get("help", child.__doc__)
        argparse_inject_as_subparser(config.parser, key, child.parser)

    return child

first_doc_line(text)

Get the first non-empty line from a text string.

Parameters:

Name Type Description Default
text str

The text to extract the first line from

required

Returns:

Name Type Description
str str

The first non-empty line, or empty string if no non-empty lines found

Raises:

Type Description
AssertionError

If first non-empty line starts with spaces

Source code in clak/parser.py
def first_doc_line(text: str) -> str:
    """Get the first non-empty line from a text string.

    Args:
        text (str): The text to extract the first line from

    Returns:
        str: The first non-empty line, or empty string if no non-empty lines found

    Raises:
        AssertionError: If first non-empty line starts with spaces
    """
    lines = text.split("\n")
    for line in lines:
        if line.strip():
            assert not line.startswith(
                " "
            ), f"First line of docstring should not start with 2 spaces: {line}"
            return line
    return ""

prepare_docstring(text, variables=None, reindent='')

Prepare a docstring by deindenting and formatting with variables.

Parameters:

Name Type Description Default
text str

The docstring text to prepare

required
variables dict

Variables to format into the docstring

None
reindent str

String to use for reindenting

''

Returns:

Name Type Description
str Optional[str]

The prepared docstring, or None/SUPPRESS if input was None/SUPPRESS

Raises:

Type Description
KeyError

If formatting fails due to missing variables

AssertionError

If variables arg is not a dict

Source code in clak/parser.py
def prepare_docstring(
    text: Optional[str], variables: Optional[Dict[str, Any]] = None, reindent: str = ""
) -> Optional[str]:
    """Prepare a docstring by deindenting and formatting with variables.

    Args:
        text (str): The docstring text to prepare
        variables (dict, optional): Variables to format into the docstring
        reindent (str, optional): String to use for reindenting

    Returns:
        str: The prepared docstring, or None/SUPPRESS if input was None/SUPPRESS

    Raises:
        KeyError: If formatting fails due to missing variables
        AssertionError: If variables arg is not a dict
    """

    variables = variables or {}
    assert isinstance(variables, dict), f"Got {type(variables)} instead of dict"

    if text is None:
        return None
    if text == SUPPRESS:
        return SUPPRESS

    text = deindent_docstring(text, reindent=reindent)
    try:
        text = text.format(**variables)
    except KeyError as err:
        print(f"Error formatting docstring: {err}")
        print(f"Variables: {variables}")
        print(f"Text: {text}")
        raise err

    return text