馃悕馃悕馃悕
at main 214 lines 8.5 kB view raw
1 2import os 3import sys 4 5from pathlib import Path 6 7from pyt.core.terminal.ansi import codes as ac 8from pyt.core import AttrDict, lsnap 9from pyt.core.commands import registrar_attr, register_builtins 10 11def _find_pytrc(): 12 config_home = os.getenv("XDG_CONFIG_HOME") 13 if not config_home: 14 config_home = Path.home() / ".config" 15 else: 16 config_home = Path(config_home) 17 18 snakepyt_dir = config_home / "snakepyt" 19 snakepyt_dir.mkdir(parents=True, exist_ok=True) 20 21 return snakepyt_dir / "pytrc.py" 22 23class PytSession: 24 def define_cli_args(parser): 25 parse_str_list = lambda s: s.split(",") 26 27 parser.add_argument("--out", dest="pyt_out", type=Path, 28 default=None, 29 help="Where to place sketch outputs") 30 parser.add_argument("--in", dest="pyt_in", type=Path, 31 default=".", 32 help="Where to find input data for sketches (default: current working directory)") 33 parser.add_argument("--sketches", dest="pyt_sketch", type=Path, 34 default=None, 35 help="Where to find sketches") 36 parser.add_argument("--write", dest="write_flags", type=parse_str_list, 37 default=["pytfile","sketch","outputs"], 38 help="What to include in output directories", 39 metavar="pytfile,sketch,outputs,...") 40 parser.add_argument("--pytrc", dest="pytrc", type=Path, 41 default=None, 42 help="Path of pytrc.py config file") 43 parser.add_argument("--python", dest="python_path", type=Path, 44 default=None, 45 help="Path of preferred python interpreter") 46 47 def __init__(self, cli_args): 48 self.cli_args = cli_args 49 self.snakepyt_version = (0, 2) 50 self.repl_continue = True 51 52 self.prefix = None 53 54 self.favorite_dirs = {} 55 56 self.persistent_state = {} 57 self.persistent_hashes = {} 58 59 from pyt.core.terminal import Logger 60 self.log = Logger().mode("ok").tag("snakepyt") 61 62 self.commands = AttrDict() 63 64 self.env = AttrDict() 65 66 self.load_pytrc() 67 68 self._get_paths() 69 70 from pyt.core.terminal import persona 71 self.persona = persona.Persona.from_config(persona.default) # TODO configurable 72 73 self.commands.builtin = [] 74 75 register_builtins(self.commands.builtin) 76 77 self.commands.all_available = self.commands.user + self.commands.builtin 78 79 # TODO store username 80 try: 81 username = os.getlogin() 82 except: 83 username = "" 84 85 self.log(f"{self.persona.hello()} {username}! {self.persona.smile()}" if username else f"{self.persona.hello()}! {self.persona.smile()}") 86 self.log.blank() 87 88 def _get_paths(self): 89 # priority order: cli > pytrc > env > default 90 91 # TODO next time a new env var is added, do the obvious refactor here 92 self.env.OUT = self.cli_args.pyt_out or self.env.get("OUT") or os.getenv("PYT_OUT") or None 93 self.env.IN = self.cli_args.pyt_in or self.env.get("IN") or os.getenv("PYT_IN") or Path(".") 94 self.env.SKETCH = self.cli_args.pyt_sketch or self.env.get("SKETCH") or os.getenv("PYT_SKETCH") or Path(".") 95 self.env.TEMPLATE = self.env.get("TEMPLATE") or os.getenv("PYT_TEMPLATE") or "verbose" 96 self.env.PYTHON_PATH = self.cli_args.python_path or self.env.get("PYTHON_PATH") or os.getenv("PYTHON_PATH") or sys.executable 97 98 if isinstance(self.env.OUT, str): self.env.OUT = Path(self.env.OUT) 99 if isinstance(self.env.IN, str): self.env.IN = Path(self.env.IN) 100 if isinstance(self.env.SKETCH, str): self.env.SKETCH = Path(self.env.SKETCH) 101 102 if isinstance(self.env.PYTHON_PATH, str): self.env.PYTHON_PATH = Path(self.env.PYTHON_PATH) 103 104 def load_pytrc(self): 105 self.commands.user = [] 106 107 pytrc = self.cli_args.pytrc if self.cli_args.pytrc else _find_pytrc() 108 109 log = self.log.tag(ac.file_link(pytrc)) 110 long_link = ac.file_link(pytrc, full=True) 111 112 if not pytrc.exists(): 113 # TODO in the case where no pytrc exists but env vars have been passed as CLI args, 114 # it would be preferable to generate a pytrc that sets those args as its defaults. 115 116 self.log("no pytrc.py found. generating default configuration", mode="info") 117 from importlib.resources import files 118 import shutil 119 120 template = files("pyt.core.templates").joinpath("pytrc.py") 121 122 shutil.copy(template, pytrc) 123 124 log.blank().log(f"generated pytrc.py at {long_link}. you can edit it to customize snakepyt. if you'd prefer to keep your configuration elsewhere, use the --pytrc flag to specify its location, or set the XDG_CONFIG_HOME environment variable.", mode="ok").blank() 125 126 if pytrc.exists(): 127 namespace = { 128 "command": registrar_attr(self.commands.user), 129 "session": self, 130 "print": log 131 } 132 try: 133 with open(pytrc) as rcfile: 134 code = compile(rcfile.read(), filename=str(pytrc), mode="exec") 135 exec(code, namespace) 136 log("loaded successfully", mode="ok") 137 except (KeyboardInterrupt, SystemExit): 138 raise 139 except: 140 log("encountered error in user configuration", mode="error") 141 log.indented().trace() 142 log("some configuration settings may not be loaded", mode="warning") 143 144 def try_handle_command(self, command, remainder): 145 for aliases, behavior in self.commands.all_available: 146 if command in aliases: 147 self.log = self.log.tag(aliases[0]) 148 behavior(self, remainder) 149 return True 150 return False 151 152 def handle_message(self, message): 153 log = self.log 154 try: 155 if self.prefix: 156 if message in ["un", "unpre", "unprefix"]: 157 self.prefix = None 158 return 159 message = " ".join([self.prefix, message]) 160 161 if message.startswith("."): 162 if message.rstrip() == ".": 163 state_dump = "\n".join([f" {k}: {type(v).__name__}" for k, v in self.persistent_state.items()]) 164 log(f"base:\n{state_dump}") 165 else: 166 segments = [segment.strip() for segment in message.split(".")][1:] 167 selection = ("base scope", self.persistent_state) 168 for segment in segments: 169 if segment == "": 170 log("repeated dots (..) are redundant", mode="warning") 171 return 172 try: 173 selection = (segment, selection[1][segment]) 174 log(f"{selection[0]}: {selection[1]}") 175 except KeyError: 176 log(f"no \"{segment}\" in {selection[0]}", mode="error") 177 return 178 except TypeError: 179 log(f"{selection[0]} is not a scope", mode="error") 180 log.indented()(f"{selection[0]}: {selection[1]}", mode="info") 181 182 return 183 184 (command, remainder) = lsnap(message) 185 186 if not self.try_handle_command(command, remainder): 187 log(f"unknown command: {command}", mode="info") 188 except: 189 log.indented().trace() 190 finally: 191 self.log = log.tag("snakepyt") 192 193 def update_class(self, new_class): 194 try: 195 new_instance = new_class(self.cli_args) 196 except (KeyboardInterrupt, SystemExit): 197 raise 198 except: 199 self.log.trace() 200 self.log("New session constructor failed. Session will not be updated.", mode="error") 201 return 202 203 state = self.persistent_state 204 hashes = self.persistent_hashes 205 prefix = self.prefix 206 207 self.__class__ = new_instance.__class__ 208 self.__dict__.clear() 209 self.__dict__.update(new_instance.__dict__) 210 211 self.persistent_state = state 212 self.persistent_hashes = hashes 213 self.prefix = prefix 214