Skip to content

Core Runtime

Core abstractions define the stable contracts used across implementations.

Use this module when you need to understand or extend:

  • Language orchestration contracts (say, speak, cancellation, resubmit)
  • Journal persistence and status/progress retrieval contracts
  • Archive artifact/corpus abstraction contracts

lingo.core

Base classes for Language, Journal, and Archive.

Archive

Bases: ABC

Base class for bulk data storage and retrieval.

Source code in lingo/core.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class Archive(ABC):
    """Base class for bulk data storage and retrieval."""

    @abstractmethod
    async def corpus_from_url(
        self,
        url: str,
        content_type: Optional[str] = None,
    ) -> Corpus:
        """Load a corpus from a URL."""
        pass

    @abstractmethod
    async def get_reference(
        self,
        path: str,
        content_type: Optional[str] = None,
    ) -> Reference:
        """Get a reference to a storage location."""
        pass

    @abstractmethod
    async def get_public_url(self, reference: Reference[Any]) -> str:
        """Get a public URL for a reference."""
        pass

corpus_from_url(url: str, content_type: Optional[str] = None) -> Corpus abstractmethod async

Load a corpus from a URL.

Source code in lingo/core.py
105
106
107
108
109
110
111
112
@abstractmethod
async def corpus_from_url(
    self,
    url: str,
    content_type: Optional[str] = None,
) -> Corpus:
    """Load a corpus from a URL."""
    pass

get_public_url(reference: Reference[Any]) -> str abstractmethod async

Get a public URL for a reference.

Source code in lingo/core.py
123
124
125
126
@abstractmethod
async def get_public_url(self, reference: Reference[Any]) -> str:
    """Get a public URL for a reference."""
    pass

get_reference(path: str, content_type: Optional[str] = None) -> Reference abstractmethod async

Get a reference to a storage location.

Source code in lingo/core.py
114
115
116
117
118
119
120
121
@abstractmethod
async def get_reference(
    self,
    path: str,
    content_type: Optional[str] = None,
) -> Reference:
    """Get a reference to a storage location."""
    pass

Journal

Bases: ABC

Base class for job state and metadata storage.

Source code in lingo/core.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class Journal(ABC):
    """Base class for job state and metadata storage."""

    @abstractmethod
    async def search_runs(
        self,
        created_after: Optional[str] = None,
        created_before: Optional[str] = None,
        status: Optional[str] = None,
        info_filter: Optional[Callable] = None,
        limit: int = 50,
        sort_by: Optional[str] = None,
    ) -> List[Any]:
        """Search for job runs with filters."""
        pass

    @abstractmethod
    async def recover(self, id: str) -> Any:
        """Recover a job by ID."""
        pass

    @abstractmethod
    async def recover_status(self, id: str) -> Any:
        """Recover the status of a job by ID."""
        pass

    @abstractmethod
    async def get_status(self, job: Any) -> Any:
        """Get the current status of a job."""
        pass

    @abstractmethod
    async def get_result(self, job: Any) -> Any:
        """Get the result of a completed job."""
        pass

    @abstractmethod
    async def get_progress(self, id: str) -> Any:
        """Return a timeline-rich progress view for a job."""
        pass

    @abstractmethod
    async def restart_recent(self, limit: int = 50) -> List[Any]:
        """Restart recent failed jobs."""
        pass

    @abstractmethod
    def pre_generate_id(self):
        """Context manager for pre-generating job IDs."""
        pass

get_progress(id: str) -> Any abstractmethod async

Return a timeline-rich progress view for a job.

Source code in lingo/core.py
86
87
88
89
@abstractmethod
async def get_progress(self, id: str) -> Any:
    """Return a timeline-rich progress view for a job."""
    pass

get_result(job: Any) -> Any abstractmethod async

Get the result of a completed job.

Source code in lingo/core.py
81
82
83
84
@abstractmethod
async def get_result(self, job: Any) -> Any:
    """Get the result of a completed job."""
    pass

get_status(job: Any) -> Any abstractmethod async

Get the current status of a job.

Source code in lingo/core.py
76
77
78
79
@abstractmethod
async def get_status(self, job: Any) -> Any:
    """Get the current status of a job."""
    pass

pre_generate_id() abstractmethod

Context manager for pre-generating job IDs.

Source code in lingo/core.py
96
97
98
99
@abstractmethod
def pre_generate_id(self):
    """Context manager for pre-generating job IDs."""
    pass

recover(id: str) -> Any abstractmethod async

Recover a job by ID.

Source code in lingo/core.py
66
67
68
69
@abstractmethod
async def recover(self, id: str) -> Any:
    """Recover a job by ID."""
    pass

recover_status(id: str) -> Any abstractmethod async

Recover the status of a job by ID.

Source code in lingo/core.py
71
72
73
74
@abstractmethod
async def recover_status(self, id: str) -> Any:
    """Recover the status of a job by ID."""
    pass

restart_recent(limit: int = 50) -> List[Any] abstractmethod async

Restart recent failed jobs.

Source code in lingo/core.py
91
92
93
94
@abstractmethod
async def restart_recent(self, limit: int = 50) -> List[Any]:
    """Restart recent failed jobs."""
    pass

search_runs(created_after: Optional[str] = None, created_before: Optional[str] = None, status: Optional[str] = None, info_filter: Optional[Callable] = None, limit: int = 50, sort_by: Optional[str] = None) -> List[Any] abstractmethod async

Search for job runs with filters.

Source code in lingo/core.py
53
54
55
56
57
58
59
60
61
62
63
64
@abstractmethod
async def search_runs(
    self,
    created_after: Optional[str] = None,
    created_before: Optional[str] = None,
    status: Optional[str] = None,
    info_filter: Optional[Callable] = None,
    limit: int = 50,
    sort_by: Optional[str] = None,
) -> List[Any]:
    """Search for job runs with filters."""
    pass

Language

Bases: ABC

Base class for orchestration languages (task dispatchers).

Source code in lingo/core.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Language(ABC):
    """Base class for orchestration languages (task dispatchers)."""

    @abstractmethod
    def grammar(self, task_name: str, reply: Any = None) -> Callable:
        """Register the expected input/output signatures of a task."""
        pass

    @abstractmethod
    def speak(self, block: bool = True) -> None:
        """Initialize the language service."""
        pass

    @abstractmethod
    async def say(
        self,
        phrase: Phrase,
        info: Optional[BaseModel] = None,
        policy: Optional[RestartPolicy] = None,
    ) -> Any:
        """Dispatch a phrase (DAG) to the message broker for execution."""
        pass

    @abstractmethod
    async def attempt_cancel(self, job: Any) -> bool:
        """Attempt to cancel a running job."""
        pass

    @abstractmethod
    async def resubmit(self, job_record: Any, policy: Optional[Any] = None) -> Any:
        """Resubmit a previously recorded job using stored dispatch payload."""
        pass

attempt_cancel(job: Any) -> bool abstractmethod async

Attempt to cancel a running job.

Source code in lingo/core.py
39
40
41
42
@abstractmethod
async def attempt_cancel(self, job: Any) -> bool:
    """Attempt to cancel a running job."""
    pass

grammar(task_name: str, reply: Any = None) -> Callable abstractmethod

Register the expected input/output signatures of a task.

Source code in lingo/core.py
19
20
21
22
@abstractmethod
def grammar(self, task_name: str, reply: Any = None) -> Callable:
    """Register the expected input/output signatures of a task."""
    pass

resubmit(job_record: Any, policy: Optional[Any] = None) -> Any abstractmethod async

Resubmit a previously recorded job using stored dispatch payload.

Source code in lingo/core.py
44
45
46
47
@abstractmethod
async def resubmit(self, job_record: Any, policy: Optional[Any] = None) -> Any:
    """Resubmit a previously recorded job using stored dispatch payload."""
    pass

say(phrase: Phrase, info: Optional[BaseModel] = None, policy: Optional[RestartPolicy] = None) -> Any abstractmethod async

Dispatch a phrase (DAG) to the message broker for execution.

Source code in lingo/core.py
29
30
31
32
33
34
35
36
37
@abstractmethod
async def say(
    self,
    phrase: Phrase,
    info: Optional[BaseModel] = None,
    policy: Optional[RestartPolicy] = None,
) -> Any:
    """Dispatch a phrase (DAG) to the message broker for execution."""
    pass

speak(block: bool = True) -> None abstractmethod

Initialize the language service.

Source code in lingo/core.py
24
25
26
27
@abstractmethod
def speak(self, block: bool = True) -> None:
    """Initialize the language service."""
    pass