馃悕馃悕馃悕
1
2import os
3import sys
4
5from pathlib import Path
6
7from pyt.core.terminal.ansi import codes as ac
8from pyt.core import AttrDict, lsnap
9from pyt.core.commands import registrar_attr, register_builtins
10
11def _find_pytrc():
12 config_home = os.getenv("XDG_CONFIG_HOME")
13 if not config_home:
14 config_home = Path.home() / ".config"
15 else:
16 config_home = Path(config_home)
17
18 snakepyt_dir = config_home / "snakepyt"
19 snakepyt_dir.mkdir(parents=True, exist_ok=True)
20
21 return snakepyt_dir / "pytrc.py"
22
23class PytSession:
24 def define_cli_args(parser):
25 parse_str_list = lambda s: s.split(",")
26
27 parser.add_argument("--out", dest="pyt_out", type=Path,
28 default=None,
29 help="Where to place sketch outputs")
30 parser.add_argument("--in", dest="pyt_in", type=Path,
31 default=".",
32 help="Where to find input data for sketches (default: current working directory)")
33 parser.add_argument("--sketches", dest="pyt_sketch", type=Path,
34 default=None,
35 help="Where to find sketches")
36 parser.add_argument("--write", dest="write_flags", type=parse_str_list,
37 default=["pytfile","sketch","outputs"],
38 help="What to include in output directories",
39 metavar="pytfile,sketch,outputs,...")
40 parser.add_argument("--pytrc", dest="pytrc", type=Path,
41 default=None,
42 help="Path of pytrc.py config file")
43 parser.add_argument("--python", dest="python_path", type=Path,
44 default=None,
45 help="Path of preferred python interpreter")
46
47 def __init__(self, cli_args):
48 self.cli_args = cli_args
49 self.snakepyt_version = (0, 2)
50 self.repl_continue = True
51
52 self.prefix = None
53
54 self.favorite_dirs = {}
55
56 self.persistent_state = {}
57 self.persistent_hashes = {}
58
59 from pyt.core.terminal import Logger
60 self.log = Logger().mode("ok").tag("snakepyt")
61
62 self.commands = AttrDict()
63
64 self.env = AttrDict()
65
66 self.load_pytrc()
67
68 self._get_paths()
69
70 from pyt.core.terminal import persona
71 self.persona = persona.Persona.from_config(persona.default) # TODO configurable
72
73 self.commands.builtin = []
74
75 register_builtins(self.commands.builtin)
76
77 self.commands.all_available = self.commands.user + self.commands.builtin
78
79 # TODO store username
80 try:
81 username = os.getlogin()
82 except:
83 username = ""
84
85 self.log(f"{self.persona.hello()} {username}! {self.persona.smile()}" if username else f"{self.persona.hello()}! {self.persona.smile()}")
86 self.log.blank()
87
88 def _get_paths(self):
89 # priority order: cli > pytrc > env > default
90
91 # TODO next time a new env var is added, do the obvious refactor here
92 self.env.OUT = self.cli_args.pyt_out or self.env.get("OUT") or os.getenv("PYT_OUT") or None
93 self.env.IN = self.cli_args.pyt_in or self.env.get("IN") or os.getenv("PYT_IN") or Path(".")
94 self.env.SKETCH = self.cli_args.pyt_sketch or self.env.get("SKETCH") or os.getenv("PYT_SKETCH") or Path(".")
95 self.env.TEMPLATE = self.env.get("TEMPLATE") or os.getenv("PYT_TEMPLATE") or "verbose"
96 self.env.PYTHON_PATH = self.cli_args.python_path or self.env.get("PYTHON_PATH") or os.getenv("PYTHON_PATH") or sys.executable
97
98 if isinstance(self.env.OUT, str): self.env.OUT = Path(self.env.OUT)
99 if isinstance(self.env.IN, str): self.env.IN = Path(self.env.IN)
100 if isinstance(self.env.SKETCH, str): self.env.SKETCH = Path(self.env.SKETCH)
101
102 if isinstance(self.env.PYTHON_PATH, str): self.env.PYTHON_PATH = Path(self.env.PYTHON_PATH)
103
104 def load_pytrc(self):
105 self.commands.user = []
106
107 pytrc = self.cli_args.pytrc if self.cli_args.pytrc else _find_pytrc()
108
109 log = self.log.tag(ac.file_link(pytrc))
110 long_link = ac.file_link(pytrc, full=True)
111
112 if not pytrc.exists():
113 # TODO in the case where no pytrc exists but env vars have been passed as CLI args,
114 # it would be preferable to generate a pytrc that sets those args as its defaults.
115
116 self.log("no pytrc.py found. generating default configuration", mode="info")
117 from importlib.resources import files
118 import shutil
119
120 template = files("pyt.core.templates").joinpath("pytrc.py")
121
122 shutil.copy(template, pytrc)
123
124 log.blank().log(f"generated pytrc.py at {long_link}. you can edit it to customize snakepyt. if you'd prefer to keep your configuration elsewhere, use the --pytrc flag to specify its location, or set the XDG_CONFIG_HOME environment variable.", mode="ok").blank()
125
126 if pytrc.exists():
127 namespace = {
128 "command": registrar_attr(self.commands.user),
129 "session": self,
130 "print": log
131 }
132 try:
133 with open(pytrc) as rcfile:
134 code = compile(rcfile.read(), filename=str(pytrc), mode="exec")
135 exec(code, namespace)
136 log("loaded successfully", mode="ok")
137 except (KeyboardInterrupt, SystemExit):
138 raise
139 except:
140 log("encountered error in user configuration", mode="error")
141 log.indented().trace()
142 log("some configuration settings may not be loaded", mode="warning")
143
144 def try_handle_command(self, command, remainder):
145 for aliases, behavior in self.commands.all_available:
146 if command in aliases:
147 self.log = self.log.tag(aliases[0])
148 behavior(self, remainder)
149 return True
150 return False
151
152 def handle_message(self, message):
153 log = self.log
154 try:
155 if self.prefix:
156 if message in ["un", "unpre", "unprefix"]:
157 self.prefix = None
158 return
159 message = " ".join([self.prefix, message])
160
161 if message.startswith("."):
162 if message.rstrip() == ".":
163 state_dump = "\n".join([f" {k}: {type(v).__name__}" for k, v in self.persistent_state.items()])
164 log(f"base:\n{state_dump}")
165 else:
166 segments = [segment.strip() for segment in message.split(".")][1:]
167 selection = ("base scope", self.persistent_state)
168 for segment in segments:
169 if segment == "":
170 log("repeated dots (..) are redundant", mode="warning")
171 return
172 try:
173 selection = (segment, selection[1][segment])
174 log(f"{selection[0]}: {selection[1]}")
175 except KeyError:
176 log(f"no \"{segment}\" in {selection[0]}", mode="error")
177 return
178 except TypeError:
179 log(f"{selection[0]} is not a scope", mode="error")
180 log.indented()(f"{selection[0]}: {selection[1]}", mode="info")
181
182 return
183
184 (command, remainder) = lsnap(message)
185
186 if not self.try_handle_command(command, remainder):
187 log(f"unknown command: {command}", mode="info")
188 except:
189 log.indented().trace()
190 finally:
191 self.log = log.tag("snakepyt")
192
193 def update_class(self, new_class):
194 try:
195 new_instance = new_class(self.cli_args)
196 except (KeyboardInterrupt, SystemExit):
197 raise
198 except:
199 self.log.trace()
200 self.log("New session constructor failed. Session will not be updated.", mode="error")
201 return
202
203 state = self.persistent_state
204 hashes = self.persistent_hashes
205 prefix = self.prefix
206
207 self.__class__ = new_instance.__class__
208 self.__dict__.clear()
209 self.__dict__.update(new_instance.__dict__)
210
211 self.persistent_state = state
212 self.persistent_hashes = hashes
213 self.prefix = prefix
214