Skip to content

Phrase DSL

This module is the workflow-definition entry point.

Key mental model:

  • A Phrase is lazy and does not execute until lang.say(...).
  • .then(...) builds linear chains.
  • Nested phrases in arguments form DAG structures.
  • .local(force=...) requests local-placement semantics in distributed mode.

lingo.phrase

Phrase construction for lazy-evaluated task DAGs.

Phrase

Represents a lazy-evaluated task node or DAG to be executed later.

Source code in lingo/phrase.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class Phrase:
    """Represents a lazy-evaluated task node or DAG to be executed later."""

    def __init__(
        self,
        task_name: str,
        args: tuple = (),
        kwargs: Optional[dict] = None,
        *,
        mock_reply: Any = None,
        mock_delay: float = 0.0,
    ):
        self.task_name = task_name
        self.args = args
        self.kwargs = kwargs or {}
        self.chain: list[tuple[str, tuple, dict]] = []
        self._is_local = False
        self._local_force = True
        self._mock_reply = mock_reply
        self._mock_delay = mock_delay

    def then(self, task_name: str, *args, **kwargs) -> Phrase:
        """Chain another task after this one."""
        self.chain.append((task_name, args, kwargs))
        return self

    def local(self, force: bool = True) -> Phrase:
        """Mark this phrase for local execution.

        Args:
            force: If False, orchestration raises when no compatible single worker is known.
        """
        self._is_local = True
        self._local_force = force
        return self

    def task_names(self) -> list[str]:
        """Return all task names involved in this phrase chain."""
        names = [self.task_name]
        for task_name, _, _ in self.chain:
            names.append(task_name)
        return names

local(force: bool = True) -> Phrase

Mark this phrase for local execution.

Parameters:

Name Type Description Default
force bool

If False, orchestration raises when no compatible single worker is known.

True
Source code in lingo/phrase.py
37
38
39
40
41
42
43
44
45
def local(self, force: bool = True) -> Phrase:
    """Mark this phrase for local execution.

    Args:
        force: If False, orchestration raises when no compatible single worker is known.
    """
    self._is_local = True
    self._local_force = force
    return self

task_names() -> list[str]

Return all task names involved in this phrase chain.

Source code in lingo/phrase.py
47
48
49
50
51
52
def task_names(self) -> list[str]:
    """Return all task names involved in this phrase chain."""
    names = [self.task_name]
    for task_name, _, _ in self.chain:
        names.append(task_name)
    return names

then(task_name: str, *args, **kwargs) -> Phrase

Chain another task after this one.

Source code in lingo/phrase.py
32
33
34
35
def then(self, task_name: str, *args, **kwargs) -> Phrase:
    """Chain another task after this one."""
    self.chain.append((task_name, args, kwargs))
    return self

phrase(task_name: str) -> Callable[..., Phrase]

Construct a lazy-evaluated task node (or DAG) to be executed later.

Parameters:

Name Type Description Default
task_name str

Name of the task to execute

required

Returns:

Type Description
Callable[..., Phrase]

A callable that constructs a Phrase object

Source code in lingo/phrase.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def phrase(task_name: str) -> Callable[..., Phrase]:
    """
    Construct a lazy-evaluated task node (or DAG) to be executed later.

    Args:
        task_name: Name of the task to execute

    Returns:
        A callable that constructs a Phrase object
    """
    def _phrase_builder(*args, **kwargs) -> Phrase:
        return Phrase(task_name, args, kwargs)

    return _phrase_builder