Skip to main content
Version: Next 🚧

cubepi.checkpointer

Checkpointer​

class

source

CheckpointData​

class

CheckpointData(self, messages: list[Any] = list(), extra: dict[str, Any] = dict())

source

MemoryCheckpointer​

class

MemoryCheckpointer(self)

source

PostgresCheckpointer​

class

PostgresCheckpointer(self, dsn: str, *, min_pool_size: int = 1, max_pool_size: int = 10)

Checkpointer backed by PostgreSQL.

Usage

cp = PostgresCheckpointer(dsn="postgresql://...")
async with cp:
await cp.append(thread_id, [msg1, msg2])
data = await cp.load(thread_id)
await cp.save_extra(thread_id, {"k": "v"})

Raises CubepiSchemaUninitialized / CubepiSchemaMismatch at aenter if the DB schema isn't compatible with this cubepi version.

source

SQLiteCheckpointer​

class

SQLiteCheckpointer(self, db_path: str)

source