Skip to content

Context Reference

Cendry

Bases: _BaseCendry

Synchronous Firestore ODM context.

Parameters:

Name Type Description Default
client Any

Optional Firestore Client. Uses default credentials if not provided.

None
backend Backend | None

Optional Backend instance. Mutually exclusive with client.

None
type_registry TypeRegistry | None

Optional TypeRegistry override. Uses the global default if not provided.

None

get

get(
    model_class: type[T],
    document_id: str,
    *,
    parent: Model | None = None,
) -> T

Fetch a single document by ID.

Parameters:

Name Type Description Default
model_class type[T]

The Model class to deserialize into.

required
document_id str

Firestore document ID.

required
parent Model | None

Parent document for subcollection queries.

None

Returns:

Type Description
T

The deserialized model instance.

Raises:

Type Description
DocumentNotFoundError

If the document does not exist.

find

find(
    model_class: type[T],
    document_id: str,
    *,
    parent: Model | None = None,
) -> T | None

Fetch a single document by ID, returning None if not found.

Parameters:

Name Type Description Default
model_class type[T]

The Model class to deserialize into.

required
document_id str

Firestore document ID.

required
parent Model | None

Parent document for subcollection queries.

None

Returns:

Type Description
T | None

The deserialized model instance, or None.

get_many

get_many(
    model_class: type[T],
    document_ids: list[str],
    *,
    parent: Model | None = None,
) -> list[T]

Batch fetch multiple documents by ID in a single round trip.

Parameters:

Name Type Description Default
model_class type[T]

The Model class to deserialize into.

required
document_ids list[str]

List of Firestore document IDs.

required
parent Model | None

Parent document for subcollection queries.

None

Returns:

Type Description
list[T]

List of deserialized model instances.

Raises:

Type Description
DocumentNotFoundError

If any documents are missing.

select

select(
    model_class: type[T],
    *filters: Any,
    order_by: list[Any] | None = None,
    limit: int | None = None,
    start_at: dict[str, Any] | Model | None = None,
    start_after: dict[str, Any] | Model | None = None,
    end_at: dict[str, Any] | Model | None = None,
    end_before: dict[str, Any] | Model | None = None,
    parent: Model | None = None,
) -> Query[T]

Query documents. Returns a Query with convenience methods.

select_group

select_group(
    model_class: type[T],
    *filters: Any,
    order_by: list[Any] | None = None,
    limit: int | None = None,
    start_at: dict[str, Any] | Model | None = None,
    start_after: dict[str, Any] | Model | None = None,
    end_at: dict[str, Any] | Model | None = None,
    end_before: dict[str, Any] | Model | None = None,
) -> Query[T]

Query across all subcollections with the given collection name.

save

save(instance: T, *, parent: Model | None = None) -> str

Save (upsert) a document. Returns the document ID.

Parameters:

Name Type Description Default
instance T

Model instance to save.

required
parent Model | None

Parent document for subcollection writes.

None

Returns:

Type Description
str

The document ID (auto-generated if instance.id was None).

create

create(instance: T, *, parent: Model | None = None) -> str

Create a document. Raises if it already exists. Returns the document ID.

Parameters:

Name Type Description Default
instance T

Model instance to create.

required
parent Model | None

Parent document for subcollection writes.

None

Returns:

Type Description
str

The document ID (auto-generated if instance.id was None).

Raises:

Type Description
DocumentAlreadyExistsError

If the document already exists.

update

update(
    instance: Model,
    field_updates: dict[str, Any],
    *,
    parent: Model | None = None,
    if_unchanged: bool | datetime = False,
) -> None
update(
    model_class: type[T],
    doc_id: str,
    field_updates: dict[str, Any],
    *,
    parent: Model | None = None,
    if_unchanged: bool | datetime = False,
) -> None
update(
    instance_or_class: Model | type[T],
    field_updates_or_doc_id: dict[str, Any] | str,
    field_updates_or_none: dict[str, Any] | None = None,
    *,
    parent: Model | None = None,
    if_unchanged: bool | datetime = False,
) -> None

Partially update a document's fields.

Parameters:

Name Type Description Default
instance_or_class Model | type[T]

A Model instance, or a Model class.

required
field_updates_or_doc_id dict[str, Any] | str

Field updates dict (instance form) or doc ID (class form).

required
field_updates_or_none dict[str, Any] | None

Field updates dict (class form only).

None
parent Model | None

Parent document for subcollection updates.

None
if_unchanged bool | datetime

If True, only update if unchanged since last read. Accepts datetime.

False

Raises:

Type Description
DocumentNotFoundError

If the document does not exist.

delete

delete(
    instance: Model,
    *,
    parent: Model | None = None,
    if_unchanged: bool | datetime = False,
) -> None
delete(
    model_class: type[T],
    doc_id: str,
    *,
    parent: Model | None = None,
    must_exist: bool = False,
    if_unchanged: bool | datetime = False,
) -> None
delete(
    instance_or_class: Model | type[T],
    doc_id: str | None = None,
    *,
    parent: Model | None = None,
    must_exist: bool = False,
    if_unchanged: bool | datetime = False,
) -> None

Delete a document by instance or by class + ID.

Parameters:

Name Type Description Default
instance_or_class Model | type[T]

A Model instance, or a Model class.

required
doc_id str | None

Document ID (required when passing a class).

None
parent Model | None

Parent document for subcollection deletes.

None
must_exist bool

If True, raise DocumentNotFoundError when the document doesn't exist.

False
if_unchanged bool | datetime

If True, only delete if unchanged since last read. Accepts datetime.

False

refresh

refresh(
    instance: T, *, parent: Model | None = None
) -> None

Re-fetch a document and update the instance in-place.

Parameters:

Name Type Description Default
instance T

Model instance to refresh.

required
parent Model | None

Parent document for subcollection queries.

None

Raises:

Type Description
DocumentNotFoundError

If the document does not exist.

batch

batch() -> Batch

Create a batch writer for atomic multi-document operations.

save_many

save_many(
    instances: list[T], *, parent: Model | None = None
) -> None

Save multiple documents atomically. Max 500 items.

Parameters:

Name Type Description Default
instances list[T]

List of Model instances to save.

required
parent Model | None

Parent document for subcollection writes.

None

Raises:

Type Description
CendryError

If more than 500 items or validation fails.

delete_many

delete_many(
    instances: list[Model], *, parent: Model | None = None
) -> None
delete_many(
    model_class: type[T],
    doc_ids: list[str],
    *,
    parent: Model | None = None,
) -> None
delete_many(
    instances_or_class: list[Model] | type[T],
    doc_ids: list[str] | None = None,
    *,
    parent: Model | None = None,
) -> None

Delete multiple documents atomically. Max 500 items.

transaction

transaction(
    fn: Callable[[Txn], Any],
    *,
    max_attempts: int = 5,
    read_only: bool = False,
) -> Any
transaction(
    fn: None = None,
    *,
    max_attempts: int = 5,
    read_only: bool = False,
) -> Txn
transaction(
    fn: Callable[[Txn], Any] | None = None,
    *,
    max_attempts: int = 5,
    read_only: bool = False,
) -> Any

Run a transaction with callback (auto-retry) or context manager (single attempt).

Parameters:

Name Type Description Default
fn Callable[[Txn], Any] | None

Callback receiving a Txn. If None, returns a context manager.

None
max_attempts int

Max retry attempts on contention (callback form only).

5
read_only bool

If True, no writes allowed.

False

AsyncCendry

Bases: _BaseCendry

Asynchronous Firestore ODM context.

Works with anyio (asyncio + trio).

Parameters:

Name Type Description Default
client Any

Optional async Firestore Client.

None
backend AsyncBackend | None

Optional AsyncBackend instance. Mutually exclusive with client.

None
type_registry TypeRegistry | None

Optional TypeRegistry override.

None
Note

select() and select_group() are regular def methods — they return AsyncQuery synchronously. Only get, find, get_many are async def.

get async

get(
    model_class: type[T],
    document_id: str,
    *,
    parent: Model | None = None,
) -> T

Get a document by ID. Raises DocumentNotFoundError if it doesn't exist.

find async

find(
    model_class: type[T],
    document_id: str,
    *,
    parent: Model | None = None,
) -> T | None

Get a document by ID. Returns None if it doesn't exist.

get_many async

get_many(
    model_class: type[T],
    document_ids: list[str],
    *,
    parent: Model | None = None,
) -> list[T]

Batch fetch multiple documents by ID in a single round trip.

Parameters:

Name Type Description Default
model_class type[T]

The Model class to deserialize into.

required
document_ids list[str]

List of Firestore document IDs.

required
parent Model | None

Parent document for subcollection queries.

None

Returns:

Type Description
list[T]

List of deserialized model instances.

Raises:

Type Description
DocumentNotFoundError

If any documents are missing.

select

select(
    model_class: type[T],
    *filters: Any,
    order_by: list[Any] | None = None,
    limit: int | None = None,
    start_at: dict[str, Any] | Model | None = None,
    start_after: dict[str, Any] | Model | None = None,
    end_at: dict[str, Any] | Model | None = None,
    end_before: dict[str, Any] | Model | None = None,
    parent: Model | None = None,
) -> AsyncQuery[T]

Query documents. Returns an AsyncQuery with convenience methods.

select_group

select_group(
    model_class: type[T],
    *filters: Any,
    order_by: list[Any] | None = None,
    limit: int | None = None,
    start_at: dict[str, Any] | Model | None = None,
    start_after: dict[str, Any] | Model | None = None,
    end_at: dict[str, Any] | Model | None = None,
    end_before: dict[str, Any] | Model | None = None,
) -> AsyncQuery[T]

Query across all subcollections with the given collection name.

save async

save(instance: T, *, parent: Model | None = None) -> str

Save (upsert) a document. Returns the document ID.

create async

create(instance: T, *, parent: Model | None = None) -> str

Create a document. Raises if it already exists. Returns the document ID.

update async

update(
    instance: Model,
    field_updates: dict[str, Any],
    *,
    parent: Model | None = None,
    if_unchanged: bool | datetime = False,
) -> None
update(
    model_class: type[T],
    doc_id: str,
    field_updates: dict[str, Any],
    *,
    parent: Model | None = None,
    if_unchanged: bool | datetime = False,
) -> None
update(
    instance_or_class: Model | type[T],
    field_updates_or_doc_id: dict[str, Any] | str,
    field_updates_or_none: dict[str, Any] | None = None,
    *,
    parent: Model | None = None,
    if_unchanged: bool | datetime = False,
) -> None

Partially update a document's fields.

delete async

delete(
    instance: Model,
    *,
    parent: Model | None = None,
    if_unchanged: bool | datetime = False,
) -> None
delete(
    model_class: type[T],
    doc_id: str,
    *,
    parent: Model | None = None,
    must_exist: bool = False,
    if_unchanged: bool | datetime = False,
) -> None
delete(
    instance_or_class: Model | type[T],
    doc_id: str | None = None,
    *,
    parent: Model | None = None,
    must_exist: bool = False,
    if_unchanged: bool | datetime = False,
) -> None

Delete a document by instance or by class + ID.

refresh async

refresh(
    instance: T, *, parent: Model | None = None
) -> None

Re-fetch a document and update the instance in-place.

batch

batch() -> AsyncBatch

Create an async batch writer for atomic multi-document operations.

save_many async

save_many(
    instances: list[T], *, parent: Model | None = None
) -> None

Save multiple documents atomically. Max 500 items.

delete_many async

delete_many(
    instances: list[Model], *, parent: Model | None = None
) -> None
delete_many(
    model_class: type[T],
    doc_ids: list[str],
    *,
    parent: Model | None = None,
) -> None
delete_many(
    instances_or_class: list[Model] | type[T],
    doc_ids: list[str] | None = None,
    *,
    parent: Model | None = None,
) -> None

Delete multiple documents atomically. Max 500 items.

transaction

transaction(
    fn: Callable[[AsyncTxn], Any],
    *,
    max_attempts: int = 5,
    read_only: bool = False,
) -> Any
transaction(
    fn: None = None,
    *,
    max_attempts: int = 5,
    read_only: bool = False,
) -> AsyncTxn
transaction(
    fn: Callable[[AsyncTxn], Any] | None = None,
    *,
    max_attempts: int = 5,
    read_only: bool = False,
) -> Any

Run a transaction with callback (auto-retry) or context manager (single attempt).

Parameters:

Name Type Description Default
fn Callable[[AsyncTxn], Any] | None

Async callback receiving an AsyncTxn. If None, returns a context manager.

None
max_attempts int

Max retry attempts on contention (callback form only).

5
read_only bool

If True, no writes allowed.

False

Batch

Bases: WritesMixin

Synchronous batch writer with model-aware methods.

Use as a context manager — auto-commits on successful exit.


AsyncBatch

Bases: WritesMixin

Asynchronous batch writer with model-aware methods.

Use as an async context manager — auto-commits on successful exit.


Txn

Bases: WritesMixin

Synchronous transaction wrapper with model-aware read and write methods.

Use as a context manager for single-attempt transactions, or pass a callback to ctx.transaction(fn) for auto-retry.

get

get(
    model_class: type[T],
    doc_id: str,
    *,
    parent: Model | None = None,
) -> T

Read a document within the transaction.

Raises:

Type Description
DocumentNotFoundError

If the document does not exist.

find

find(
    model_class: type[T],
    doc_id: str,
    *,
    parent: Model | None = None,
) -> T | None

Read a document within the transaction, returning None if not found.


AsyncTxn

Bases: WritesMixin

Asynchronous transaction wrapper with model-aware read and write methods.

Use as an async context manager for single-attempt transactions, or pass a callback to await ctx.transaction(fn) for auto-retry. Write methods are synchronous — they queue operations, not execute them.

get async

get(
    model_class: type[T],
    doc_id: str,
    *,
    parent: Model | None = None,
) -> T

Read a document within the transaction.

find async

find(
    model_class: type[T],
    doc_id: str,
    *,
    parent: Model | None = None,
) -> T | None

Read a document within the transaction, returning None if not found.