a digital person for bluesky
1from letta_client import Letta
2from typing import Optional
3
4def upsert_block(letta: Letta, label: str, value: str, **kwargs):
5 """
6 Ensures that a block by this label exists. If the block exists, it will
7 replace content provided by kwargs with the values in this function call.
8 """
9 # Get the list of blocks (v1.0: list returns page object)
10 blocks_page = letta.blocks.list(label=label)
11 blocks = blocks_page.items if hasattr(blocks_page, 'items') else blocks_page
12
13 # Check if we had any -- if not, create it
14 if len(blocks) == 0:
15 # Make the new block
16 new_block = letta.blocks.create(
17 label=label,
18 value=value,
19 **kwargs
20 )
21
22 return new_block
23
24 if len(blocks) > 1:
25 raise Exception(f"{len(blocks)} blocks by the label '{label}' retrieved, label must identify a unique block")
26
27 else:
28 existing_block = blocks[0]
29
30 if kwargs.get('update', False):
31 # Remove 'update' from kwargs before passing to update
32 kwargs_copy = kwargs.copy()
33 kwargs_copy.pop('update', None)
34
35 updated_block = letta.blocks.update(
36 block_id = existing_block.id,
37 label = label,
38 value = value,
39 **kwargs_copy
40 )
41
42 return updated_block
43 else:
44 return existing_block
45
46def upsert_agent(letta: Letta, name: str, **kwargs):
47 """
48 Ensures that an agent by this label exists. If the agent exists, it will
49 update the agent to match kwargs.
50 """
51 # Get the list of agents (v1.0: list returns page object)
52 agents_page = letta.agents.list(name=name)
53 agents = agents_page.items if hasattr(agents_page, 'items') else agents_page
54
55 # Check if we had any -- if not, create it
56 if len(agents) == 0:
57 # Make the new agent
58 new_agent = letta.agents.create(
59 name=name,
60 **kwargs
61 )
62
63 return new_agent
64
65 if len(agents) > 1:
66 raise Exception(f"{len(agents)} agents by the name '{name}' retrieved, name must identify a unique agent")
67
68 else:
69 existing_agent = agents[0]
70
71 if kwargs.get('update', False):
72 # Remove 'update' from kwargs before passing to update
73 kwargs_copy = kwargs.copy()
74 kwargs_copy.pop('update', None)
75
76 updated_agent = letta.agents.update(
77 agent_id = existing_agent.id,
78 **kwargs_copy
79 )
80
81 return updated_agent
82 else:
83 return existing_agent