Skip to content

Moderator

lingo.moderator

Testing harness and utilities for orchestration.

Job

Bases: Generic[T]

Represents a dispatched job with result tracking.

Source code in lingo/moderator.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class Job(Generic[T]):
    """Represents a dispatched job with result tracking."""

    def __init__(self, job_id: str, info: Any = None):
        self.id = job_id
        self.code = JobStatusCode.PENDING
        self.info = info
        self.status_obj = JobStatus(JobStatusCode.PENDING)
        self._result: Optional[T] = None
        self._done = threading.Event()

    def status(self) -> JobStatus:
        """Get the current status of the job."""
        return self.status_obj

    def result(self, block: bool = False) -> Optional[T]:
        """Get the result of the job."""
        if block:
            self._done.wait()
        return self._result

    def set_running(self) -> None:
        """Mark the job as running."""
        self.code = JobStatusCode.RUNNING
        self.status_obj = JobStatus(JobStatusCode.RUNNING)

    def set_result(self, value: T) -> None:
        """Mark the job as completed with a result."""
        self._result = value
        self.code = JobStatusCode.COMPLETED
        self.status_obj = JobStatus(JobStatusCode.COMPLETED)
        self._done.set()

    def set_failed(self, message: str) -> None:
        """Mark the job as failed with an error message."""
        self.code = JobStatusCode.FAILED
        self.status_obj = JobStatus(JobStatusCode.FAILED, message=message)
        self._done.set()

    def set_cancelled(self) -> None:
        """Mark the job as cancelled."""
        self.code = JobStatusCode.CANCELLED
        self.status_obj = JobStatus(JobStatusCode.CANCELLED)
        self._done.set()

    def set_pending(self) -> None:
        """Mark the job as pending."""
        self.code = JobStatusCode.PENDING
        self.status_obj = JobStatus(JobStatusCode.PENDING)

result(block: bool = False) -> Optional[T]

Get the result of the job.

Source code in lingo/moderator.py
51
52
53
54
55
def result(self, block: bool = False) -> Optional[T]:
    """Get the result of the job."""
    if block:
        self._done.wait()
    return self._result

set_cancelled() -> None

Mark the job as cancelled.

Source code in lingo/moderator.py
75
76
77
78
79
def set_cancelled(self) -> None:
    """Mark the job as cancelled."""
    self.code = JobStatusCode.CANCELLED
    self.status_obj = JobStatus(JobStatusCode.CANCELLED)
    self._done.set()

set_failed(message: str) -> None

Mark the job as failed with an error message.

Source code in lingo/moderator.py
69
70
71
72
73
def set_failed(self, message: str) -> None:
    """Mark the job as failed with an error message."""
    self.code = JobStatusCode.FAILED
    self.status_obj = JobStatus(JobStatusCode.FAILED, message=message)
    self._done.set()

set_pending() -> None

Mark the job as pending.

Source code in lingo/moderator.py
81
82
83
84
def set_pending(self) -> None:
    """Mark the job as pending."""
    self.code = JobStatusCode.PENDING
    self.status_obj = JobStatus(JobStatusCode.PENDING)

set_result(value: T) -> None

Mark the job as completed with a result.

Source code in lingo/moderator.py
62
63
64
65
66
67
def set_result(self, value: T) -> None:
    """Mark the job as completed with a result."""
    self._result = value
    self.code = JobStatusCode.COMPLETED
    self.status_obj = JobStatus(JobStatusCode.COMPLETED)
    self._done.set()

set_running() -> None

Mark the job as running.

Source code in lingo/moderator.py
57
58
59
60
def set_running(self) -> None:
    """Mark the job as running."""
    self.code = JobStatusCode.RUNNING
    self.status_obj = JobStatus(JobStatusCode.RUNNING)

status() -> JobStatus

Get the current status of the job.

Source code in lingo/moderator.py
47
48
49
def status(self) -> JobStatus:
    """Get the current status of the job."""
    return self.status_obj

JobStatus

Represents the status of a job execution.

Source code in lingo/moderator.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class JobStatus:
    """Represents the status of a job execution."""

    def __init__(self, code: str = JobStatusCode.PENDING, message: Optional[str] = None):
        self.code = code
        self.message = message

    def is_ongoing(self) -> bool:
        """Check if the job is still running."""
        return self.code in ("pending", "running")

    def succeeded(self) -> bool:
        """Check if the job succeeded."""
        return self.code == "completed"

is_ongoing() -> bool

Check if the job is still running.

Source code in lingo/moderator.py
27
28
29
def is_ongoing(self) -> bool:
    """Check if the job is still running."""
    return self.code in ("pending", "running")

succeeded() -> bool

Check if the job succeeded.

Source code in lingo/moderator.py
31
32
33
def succeeded(self) -> bool:
    """Check if the job succeeded."""
    return self.code == "completed"

Moderator

Test harness for orchestrating test runs.

Source code in lingo/moderator.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class Moderator:
    """Test harness for orchestrating test runs."""

    def __init__(self, language: Any = None, force_local: bool = True):
        self.language = language
        self.force_local = force_local
        self.jobs: dict[str, Job] = {}

    def say(self, phrase: Any) -> Job:
        """Dispatch a phrase and return a job handle."""
        if self.language is not None:
            local_job = Job(f"test-job-{len(self.jobs)}")
            local_job.set_running()
            self.jobs[local_job.id] = local_job

            def _runner() -> None:
                original_mode = getattr(self.language, "execution_mode", None)
                if self.force_local and original_mode is not None:
                    self.language.execution_mode = "local"
                try:
                    dispatched = asyncio.run(self.language.say(phrase))
                    local_job.id = dispatched.id
                    local_job.info = dispatched.info
                    self.jobs[local_job.id] = local_job

                    if dispatched.code == JobStatusCode.COMPLETED:
                        local_job.set_result(dispatched.result(block=True))
                    elif dispatched.code == JobStatusCode.FAILED:
                        message = dispatched.status().message if dispatched.status() else "failed"
                        local_job.set_failed(message or "failed")
                    elif dispatched.code == JobStatusCode.CANCELLED:
                        local_job.set_cancelled()
                    else:
                        # If language.say() returns a non-terminal state, keep waiting
                        # for the dispatched job to reach a terminal status.
                        result = dispatched.result(block=True)
                        final_status = dispatched.status()
                        if final_status.succeeded():
                            local_job.set_result(result)
                        elif final_status.code == JobStatusCode.FAILED:
                            local_job.set_failed(final_status.message or "failed")
                        elif final_status.code == JobStatusCode.CANCELLED:
                            local_job.set_cancelled()
                        else:
                            local_job.set_pending()
                except Exception as exc:
                    details = traceback.format_exc()
                    local_job.set_failed(details or str(exc))
                finally:
                    if self.force_local and original_mode is not None:
                        self.language.execution_mode = original_mode

            thread = threading.Thread(target=_runner, daemon=True)
            thread.start()
            return local_job

        job = Job(f"test-job-{len(self.jobs)}")
        self.jobs[job.id] = job
        return job

say(phrase: Any) -> Job

Dispatch a phrase and return a job handle.

Source code in lingo/moderator.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def say(self, phrase: Any) -> Job:
    """Dispatch a phrase and return a job handle."""
    if self.language is not None:
        local_job = Job(f"test-job-{len(self.jobs)}")
        local_job.set_running()
        self.jobs[local_job.id] = local_job

        def _runner() -> None:
            original_mode = getattr(self.language, "execution_mode", None)
            if self.force_local and original_mode is not None:
                self.language.execution_mode = "local"
            try:
                dispatched = asyncio.run(self.language.say(phrase))
                local_job.id = dispatched.id
                local_job.info = dispatched.info
                self.jobs[local_job.id] = local_job

                if dispatched.code == JobStatusCode.COMPLETED:
                    local_job.set_result(dispatched.result(block=True))
                elif dispatched.code == JobStatusCode.FAILED:
                    message = dispatched.status().message if dispatched.status() else "failed"
                    local_job.set_failed(message or "failed")
                elif dispatched.code == JobStatusCode.CANCELLED:
                    local_job.set_cancelled()
                else:
                    # If language.say() returns a non-terminal state, keep waiting
                    # for the dispatched job to reach a terminal status.
                    result = dispatched.result(block=True)
                    final_status = dispatched.status()
                    if final_status.succeeded():
                        local_job.set_result(result)
                    elif final_status.code == JobStatusCode.FAILED:
                        local_job.set_failed(final_status.message or "failed")
                    elif final_status.code == JobStatusCode.CANCELLED:
                        local_job.set_cancelled()
                    else:
                        local_job.set_pending()
            except Exception as exc:
                details = traceback.format_exc()
                local_job.set_failed(details or str(exc))
            finally:
                if self.force_local and original_mode is not None:
                    self.language.execution_mode = original_mode

        thread = threading.Thread(target=_runner, daemon=True)
        thread.start()
        return local_job

    job = Job(f"test-job-{len(self.jobs)}")
    self.jobs[job.id] = job
    return job

conversation(func: Callable) -> Callable

Decorator that provides a Moderator to orchestrate test runs.

Usage

@lang.conversation def test_something(mod: Moderator): ...

Source code in lingo/moderator.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def conversation(func: Callable) -> Callable:
    """
    Decorator that provides a Moderator to orchestrate test runs.

    Usage:
        @lang.conversation
        def test_something(mod: Moderator):
            ...
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        moderator = Moderator()
        return func(moderator, *args, **kwargs)

    # Preserve pytest fixture injection for parameters after the moderator argument.
    sig = inspect.signature(func)
    params = list(sig.parameters.values())
    if params:
        wrapper.__signature__ = sig.replace(parameters=params[1:])
    wrapper.__lingo_conversation__ = True
    wrapper.__lingo_language_name__ = "moderator"
    wrapper.__lingo_execution_mode__ = "local"

    return wrapper

echo(task_name: str, reply: Any = None, delay: float = 0.0) -> Callable

Mock task for testing that injects a known result.

Parameters:

Name Type Description Default
task_name str

Name of the task to mock

required
reply Any

The result to return from this task

None
delay float

Delay in seconds before returning

0.0

Returns:

Type Description
Callable

A callable that constructs bypass phrases

Source code in lingo/moderator.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def echo(task_name: str, reply: Any = None, delay: float = 0.0) -> Callable:
    """
    Mock task for testing that injects a known result.

    Args:
        task_name: Name of the task to mock
        reply: The result to return from this task
        delay: Delay in seconds before returning

    Returns:
        A callable that constructs bypass phrases
    """
    def _echo_builder(*args, **kwargs) -> Phrase:
        return Phrase(
            task_name=task_name,
            args=args,
            kwargs=kwargs,
            mock_reply=reply,
            mock_delay=delay,
        )

    return _echo_builder

scribble(root: Optional[str] = None) -> str

Generate a temporary directory for task execution.

Parameters:

Name Type Description Default
root Optional[str]

Optional root directory for scribble storage

None

Returns:

Type Description
str

Path to temporary directory

Source code in lingo/moderator.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def scribble(root: Optional[str] = None) -> str:
    """
    Generate a temporary directory for task execution.

    Args:
        root: Optional root directory for scribble storage

    Returns:
        Path to temporary directory
    """
    import tempfile
    from pathlib import Path

    if root:
        base = Path(root)
        base.mkdir(parents=True, exist_ok=True)
        tmpdir = base / f"scribble_{uuid.uuid4().hex[:10]}"
        tmpdir.mkdir(parents=True, exist_ok=True)
        return tmpdir

    tmpdir = tempfile.mkdtemp(prefix="scribble_")
    return Path(tmpdir)