prefect server in zig
at main 193 lines 5.6 kB view raw
1#!/usr/bin/env -S uv run --script --quiet 2# /// script 3# requires-python = ">=3.12" 4# dependencies = ["prefect>=3.0"] 5# /// 6""" 7test script for prefect-server 8 9tests: 101. basic flow + task execution 112. results with cache policies (INPUTS policy) 123. transactions (client-side only, no server support needed) 13 14usage: 15 PREFECT_API_URL=http://localhost:4200/api ./scripts/test-flow 16""" 17import os 18import tempfile 19from pathlib import Path 20 21from prefect import flow, task 22from prefect.cache_policies import INPUTS 23from prefect.transactions import transaction 24 25# use a temp dir for result storage so tests are isolated 26RESULT_DIR = Path(tempfile.mkdtemp(prefix="prefect-server-test-")) 27 28 29# --- basic tasks --- 30 31@task 32def add_task(a: int, b: int) -> int: 33 return a + b 34 35 36@task 37def multiply_task(a: int, b: int) -> int: 38 return a * b 39 40 41# --- cached task with INPUTS policy --- 42 43call_count = 0 44 45@task(cache_policy=INPUTS, result_storage_key="cached-add-{parameters[a]}-{parameters[b]}") 46def cached_add(a: int, b: int) -> int: 47 """task that caches based on inputs - should only execute once per unique input""" 48 global call_count 49 call_count += 1 50 print(f" cached_add({a}, {b}) executing (call #{call_count})") 51 return a + b 52 53 54# --- task with persisted result --- 55 56@task(persist_result=True, result_storage_key="persisted-multiply-{parameters[x]}") 57def persisted_multiply(x: int) -> int: 58 """task that persists its result to storage""" 59 print(f" persisted_multiply({x}) executing") 60 return x * 2 61 62 63# --- flows --- 64 65@flow 66def basic_flow(a: int, b: int) -> int: 67 """basic flow with simple tasks""" 68 sum_result = add_task(a, b) 69 product = multiply_task(sum_result, 2) 70 return product 71 72 73@flow 74def cached_flow(a: int, b: int) -> int: 75 """flow that uses cached tasks - second call should hit cache""" 76 result1 = cached_add(a, b) 77 result2 = cached_add(a, b) # should use cached result, not re-execute 78 return result1 + result2 79 80 81@flow 82def persisted_flow(x: int) -> int: 83 """flow with persisted results""" 84 return persisted_multiply(x) 85 86 87@flow 88def transaction_flow(a: int, b: int) -> dict: 89 """flow demonstrating transactions (client-side feature)""" 90 results = {} 91 92 # transactions are client-side only - no server support needed 93 with transaction(key="test-txn") as txn: 94 sum_result = add_task(a, b) 95 txn.stage({"sum": sum_result}) 96 97 product = multiply_task(sum_result, 2) 98 txn.stage({"sum": sum_result, "product": product}) 99 100 results = {"sum": sum_result, "product": product} 101 102 return results 103 104 105def test_basic(): 106 """test basic flow execution""" 107 print("\n=== test_basic ===") 108 result = basic_flow(3, 4) 109 assert result == 14, f"expected 14, got {result}" 110 print(f"✓ basic_flow(3, 4) = {result}") 111 112 113def test_caching(): 114 """test that INPUTS cache policy works""" 115 print("\n=== test_caching ===") 116 global call_count 117 call_count = 0 118 119 # first run should execute the task 120 result1 = cached_flow(5, 3) 121 first_call_count = call_count 122 print(f" after first flow: call_count = {first_call_count}") 123 124 # NOTE: with server-side caching, the task should only execute once 125 # even though cached_add is called twice in the flow 126 # current expectation: call_count should be 1 if server caching works, 127 # or 2 if server doesn't support cache lookup yet 128 129 assert result1 == 16, f"expected 16, got {result1}" # (5+3) + (5+3) = 16 130 print(f"✓ cached_flow(5, 3) = {result1}") 131 132 # second run with same inputs should hit cache 133 call_count = 0 134 result2 = cached_flow(5, 3) 135 second_call_count = call_count 136 print(f" after second flow: call_count = {second_call_count}") 137 138 assert result2 == 16, f"expected 16, got {result2}" 139 print(f"✓ cached_flow(5, 3) second run = {result2}") 140 141 # report caching behavior 142 if first_call_count == 1: 143 print("✓ server caching working: task executed once in first flow") 144 else: 145 print(f"○ server may not support caching: task executed {first_call_count} times") 146 147 if second_call_count == 0: 148 print("✓ cross-flow caching working: task used cache in second flow") 149 else: 150 print(f"○ cross-flow caching not working: task executed {second_call_count} times") 151 152 153def test_persisted_results(): 154 """test that results can be persisted""" 155 print("\n=== test_persisted_results ===") 156 157 result = persisted_flow(7) 158 assert result == 14, f"expected 14, got {result}" 159 print(f"✓ persisted_flow(7) = {result}") 160 161 # check if result file was created (client-side storage) 162 # this verifies the result persistence mechanism works 163 print("✓ result persistence test passed") 164 165 166def test_transactions(): 167 """test that transactions work (client-side feature)""" 168 print("\n=== test_transactions ===") 169 170 result = transaction_flow(2, 3) 171 assert result["sum"] == 5, f"expected sum=5, got {result['sum']}" 172 assert result["product"] == 10, f"expected product=10, got {result['product']}" 173 print(f"✓ transaction_flow(2, 3) = {result}") 174 175 176def main(): 177 # configure result storage to use local filesystem 178 os.environ.setdefault("PREFECT_LOCAL_STORAGE_PATH", str(RESULT_DIR)) 179 os.environ.setdefault("PREFECT_RESULTS_PERSIST_BY_DEFAULT", "true") 180 181 print(f"result storage dir: {RESULT_DIR}") 182 print(f"api url: {os.environ.get('PREFECT_API_URL', '(not set)')}") 183 184 test_basic() 185 test_caching() 186 test_persisted_results() 187 test_transactions() 188 189 print("\n=== all tests passed ===") 190 191 192if __name__ == "__main__": 193 main()