import datetime
import functools
import json
import warnings
from decimal import Decimal
from enum import Enum, IntEnum
from typing import TYPE_CHECKING, Any, Callable, Optional, Type, TypeVar, Union
from uuid import UUID, uuid4
from pypika import functions
from pypika.enums import SqlTypes
from pypika.terms import Term
from tortoise import timezone
from tortoise.exceptions import ConfigurationError, FieldError
from tortoise.fields.base import Field
from tortoise.timezone import get_default_timezone, get_timezone, get_use_tz, localtime
from tortoise.validators import MaxLengthValidator
try:
from ciso8601 import parse_datetime
except ImportError: # pragma: nocoverage
from iso8601 import parse_date
parse_datetime = functools.partial(parse_date, default_timezone=None)
if TYPE_CHECKING: # pragma: nocoverage
from tortoise.models import Model
__all__ = (
"BigIntField",
"BinaryField",
"BooleanField",
"CharEnumField",
"CharField",
"DateField",
"DatetimeField",
"DecimalField",
"FloatField",
"IntEnumField",
"IntField",
"JSONField",
"SmallIntField",
"TextField",
"TimeDeltaField",
"UUIDField",
)
T = TypeVar("T")
# Doing this we can replace json dumps/loads with different implementations
JsonDumpsFunc = Callable[[Any], str]
JsonLoadsFunc = Callable[[Union[str, bytes]], Any]
JSON_DUMPS: JsonDumpsFunc = functools.partial(json.dumps, separators=(",", ":"))
JSON_LOADS: JsonLoadsFunc = json.loads
try:
# Use orjson as an optional accelerator
import orjson
JSON_DUMPS = lambda x: orjson.dumps(x).decode() # noqa: E731
JSON_LOADS = orjson.loads
except ImportError: # pragma: nocoverage
pass
[docs]class IntField(Field[int], int):
"""
Integer field. (32-bit signed)
``primary_key`` (bool):
True if field is Primary Key.
"""
SQL_TYPE = "INT"
allows_generated = True
def __init__(self, primary_key: Optional[bool] = None, **kwargs: Any) -> None:
if primary_key or kwargs.get("pk"):
kwargs["generated"] = bool(kwargs.get("generated", True))
super().__init__(primary_key=primary_key, **kwargs)
@property
def constraints(self) -> dict:
return {
"ge": -2147483648,
"le": 2147483647,
}
class _db_postgres:
GENERATED_SQL = "SERIAL NOT NULL PRIMARY KEY"
class _db_sqlite:
GENERATED_SQL = "INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL"
class _db_mysql:
GENERATED_SQL = "INT NOT NULL PRIMARY KEY AUTO_INCREMENT"
class _db_mssql:
GENERATED_SQL = "INT IDENTITY(1,1) NOT NULL PRIMARY KEY"
class _db_oracle:
GENERATED_SQL = "INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY NOT NULL"
[docs]class BigIntField(IntField):
"""
Big integer field. (64-bit signed)
``primary_key`` (bool):
True if field is Primary Key.
"""
SQL_TYPE = "BIGINT"
@property
def constraints(self) -> dict:
return {
"ge": -9223372036854775808,
"le": 9223372036854775807,
}
class _db_postgres:
GENERATED_SQL = "BIGSERIAL NOT NULL PRIMARY KEY"
class _db_mysql:
GENERATED_SQL = "BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT"
class _db_mssql:
GENERATED_SQL = "BIGINT IDENTITY(1,1) NOT NULL PRIMARY KEY"
class _db_oracle:
SQL_TYPE = "INT"
GENERATED_SQL = "INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY NOT NULL"
[docs]class SmallIntField(IntField):
"""
Small integer field. (16-bit signed)
``primary_key`` (bool):
True if field is Primary Key.
"""
SQL_TYPE = "SMALLINT"
@property
def constraints(self) -> dict:
return {
"ge": -32768,
"le": 32767,
}
class _db_postgres:
GENERATED_SQL = "SMALLSERIAL NOT NULL PRIMARY KEY"
class _db_mysql:
GENERATED_SQL = "SMALLINT NOT NULL PRIMARY KEY AUTO_INCREMENT"
class _db_mssql:
GENERATED_SQL = "SMALLINT IDENTITY(1,1) NOT NULL PRIMARY KEY"
class _db_oracle:
GENERATED_SQL = "SMALLINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY NOT NULL"
[docs]class CharField(Field[str]):
"""
Character field.
You must provide the following:
``max_length`` (int):
Maximum length of the field in characters.
"""
field_type = str
def __init__(self, max_length: int, **kwargs: Any) -> None:
if int(max_length) < 1:
raise ConfigurationError("'max_length' must be >= 1")
self.max_length = int(max_length)
super().__init__(**kwargs)
self.validators.append(MaxLengthValidator(self.max_length))
@property
def constraints(self) -> dict:
return {
"max_length": self.max_length,
}
@property
def SQL_TYPE(self) -> str: # type: ignore
return f"VARCHAR({self.max_length})"
class _db_oracle:
def __init__(self, field: "CharField") -> None:
self.field = field
@property
def SQL_TYPE(self) -> str:
return f"NVARCHAR2({self.field.max_length})"
[docs]class TextField(Field[str], str): # type: ignore
"""
Large Text field.
"""
indexable = False
SQL_TYPE = "TEXT"
def __init__(
self,
primary_key: Optional[bool] = None,
unique: bool = False,
db_index: bool = False,
**kwargs: Any,
) -> None:
if primary_key or kwargs.get("pk"):
warnings.warn(
"TextField as a PrimaryKey is Deprecated, use CharField instead",
DeprecationWarning,
stacklevel=2,
)
if unique:
raise ConfigurationError(
"TextField doesn't support unique indexes, consider CharField or another strategy"
)
if db_index or kwargs.get("index"):
raise ConfigurationError("TextField can't be indexed, consider CharField")
super().__init__(primary_key=primary_key, **kwargs)
class _db_mysql:
SQL_TYPE = "LONGTEXT"
class _db_mssql:
SQL_TYPE = "NVARCHAR(MAX)"
class _db_oracle:
SQL_TYPE = "NCLOB"
[docs]class BooleanField(Field[bool]):
"""
Boolean field.
"""
# Bool is not subclassable, so we specify type here
field_type = bool
SQL_TYPE = "BOOL"
class _db_sqlite:
SQL_TYPE = "INT"
class _db_mssql:
SQL_TYPE = "BIT"
class _db_oracle:
SQL_TYPE = "NUMBER(1)"
[docs]class DecimalField(Field[Decimal], Decimal):
"""
Accurate decimal field.
You must provide the following:
``max_digits`` (int):
Max digits of significance of the decimal field.
``decimal_places`` (int):
How many of those significant digits is after the decimal point.
"""
skip_to_python_if_native = True
def __init__(self, max_digits: int, decimal_places: int, **kwargs: Any) -> None:
if int(max_digits) < 1:
raise ConfigurationError("'max_digits' must be >= 1")
if int(decimal_places) < 0:
raise ConfigurationError("'decimal_places' must be >= 0")
super().__init__(**kwargs)
self.max_digits = max_digits
self.decimal_places = decimal_places
self.quant = Decimal("1" if decimal_places == 0 else f"1.{('0' * decimal_places)}")
def to_python_value(self, value: Any) -> Optional[Decimal]:
if value is not None:
value = Decimal(value).quantize(self.quant).normalize()
return value
@property
def SQL_TYPE(self) -> str: # type: ignore
return f"DECIMAL({self.max_digits},{self.decimal_places})"
class _db_sqlite:
SQL_TYPE = "VARCHAR(40)"
def function_cast(self, term: Term) -> Term:
return functions.Cast(term, SqlTypes.NUMERIC)
# In case of queryset with filter `__year`/`__month`/`__day` ..., value can be int, float or str. Example:
# `await MyModel.filter(created_at__year=2024)`
# `await MyModel.filter(created_at__year=2024.0)`
# `await MyModel.filter(created_at__year='2024')`
DatetimeFieldQueryValueType = TypeVar(
"DatetimeFieldQueryValueType", datetime.datetime, int, float, str
)
[docs]class DatetimeField(Field[datetime.datetime], datetime.datetime):
"""
Datetime field.
``auto_now`` and ``auto_now_add`` is exclusive.
You can opt to set neither or only ONE of them.
``auto_now`` (bool):
Always set to ``datetime.utcnow()`` on save.
``auto_now_add`` (bool):
Set to ``datetime.utcnow()`` on first save only.
"""
SQL_TYPE = "TIMESTAMP"
class _db_mysql:
SQL_TYPE = "DATETIME(6)"
class _db_postgres:
SQL_TYPE = "TIMESTAMPTZ"
class _db_mssql:
SQL_TYPE = "DATETIME2"
class _db_oracle:
SQL_TYPE = "TIMESTAMP WITH TIME ZONE"
def __init__(self, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any) -> None:
if auto_now_add and auto_now:
raise ConfigurationError("You can choose only 'auto_now' or 'auto_now_add'")
super().__init__(**kwargs)
self.auto_now = auto_now
self.auto_now_add = auto_now | auto_now_add
def to_python_value(self, value: Any) -> Optional[datetime.datetime]:
if value is not None:
if isinstance(value, datetime.datetime):
value = value
elif isinstance(value, int):
value = datetime.datetime.fromtimestamp(value)
else:
value = parse_datetime(value)
if timezone.is_naive(value):
value = timezone.make_aware(value, get_timezone())
else:
value = localtime(value)
return value
def to_db_value(
self, value: Optional[DatetimeFieldQueryValueType], instance: "Union[Type[Model], Model]"
) -> Optional[DatetimeFieldQueryValueType]:
# Only do this if it is a Model instance, not class. Test for guaranteed instance var
if hasattr(instance, "_saved_in_db") and (
self.auto_now
or (self.auto_now_add and getattr(instance, self.model_field_name) is None)
):
now = timezone.now()
setattr(instance, self.model_field_name, now)
return now # type:ignore[return-value]
if value is not None:
if isinstance(value, datetime.datetime) and get_use_tz():
if timezone.is_naive(value):
warnings.warn(
"DateTimeField %s received a naive datetime (%s)"
" while time zone support is active." % (self.model_field_name, value),
RuntimeWarning,
)
value = timezone.make_aware(value, "UTC")
self.validate(value)
return value
@property
def constraints(self) -> dict:
data = {}
if self.auto_now_add:
data["readOnly"] = True
return data
[docs] def describe(self, serializable: bool) -> dict:
desc = super().describe(serializable)
desc["auto_now_add"] = self.auto_now_add
desc["auto_now"] = self.auto_now
return desc
[docs]class DateField(Field[datetime.date], datetime.date):
"""
Date field.
"""
skip_to_python_if_native = True
SQL_TYPE = "DATE"
def to_python_value(self, value: Any) -> Optional[datetime.date]:
if value is not None and not isinstance(value, datetime.date):
value = parse_datetime(value).date()
return value
def to_db_value(
self, value: Optional[Union[datetime.date, str]], instance: "Union[Type[Model], Model]"
) -> Optional[datetime.date]:
if value is not None and not isinstance(value, datetime.date):
value = parse_datetime(value).date()
self.validate(value)
return value
class TimeField(Field[datetime.time], datetime.time):
"""
Time field.
"""
skip_to_python_if_native = True
SQL_TYPE = "TIME"
class _db_oracle:
SQL_TYPE = "NVARCHAR2(8)"
def __init__(self, auto_now: bool = False, auto_now_add: bool = False, **kwargs: Any) -> None:
if auto_now_add and auto_now:
raise ConfigurationError("You can choose only 'auto_now' or 'auto_now_add'")
super().__init__(**kwargs)
self.auto_now = auto_now
self.auto_now_add = auto_now | auto_now_add
def to_python_value(self, value: Any) -> Optional[Union[datetime.time, datetime.timedelta]]:
if value is not None:
if isinstance(value, str):
value = datetime.time.fromisoformat(value)
if isinstance(value, datetime.timedelta):
return value
if timezone.is_naive(value):
value = value.replace(tzinfo=get_default_timezone())
return value
def to_db_value(
self,
value: Optional[Union[datetime.time, datetime.timedelta]],
instance: "Union[Type[Model], Model]",
) -> Optional[Union[datetime.time, datetime.timedelta]]:
# Only do this if it is a Model instance, not class. Test for guaranteed instance var
if hasattr(instance, "_saved_in_db") and (
self.auto_now
or (self.auto_now_add and getattr(instance, self.model_field_name) is None)
):
now = timezone.now().time()
setattr(instance, self.model_field_name, now)
return now
if value is not None:
if isinstance(value, datetime.timedelta):
return value
if get_use_tz():
if timezone.is_naive(value):
warnings.warn(
"TimeField %s received a naive time (%s)"
" while time zone support is active." % (self.model_field_name, value),
RuntimeWarning,
)
value = value.replace(tzinfo=get_default_timezone())
self.validate(value)
return value
class _db_mysql:
SQL_TYPE = "TIME(6)"
class _db_postgres:
SQL_TYPE = "TIMETZ"
[docs]class TimeDeltaField(Field[datetime.timedelta]):
"""
A field for storing time differences.
"""
field_type = datetime.timedelta
SQL_TYPE = "BIGINT"
class _db_oracle:
SQL_TYPE = "NUMBER(19)"
def to_python_value(self, value: Any) -> Optional[datetime.timedelta]:
if value is None or isinstance(value, datetime.timedelta):
return value
return datetime.timedelta(microseconds=value)
def to_db_value(
self, value: Optional[datetime.timedelta], instance: "Union[Type[Model], Model]"
) -> Optional[int]:
self.validate(value)
if value is None:
return None
return (value.days * 86400000000) + (value.seconds * 1000000) + value.microseconds
[docs]class FloatField(Field[float], float):
"""
Float (double) field.
"""
SQL_TYPE = "DOUBLE PRECISION"
class _db_sqlite:
SQL_TYPE = "REAL"
class _db_mysql:
SQL_TYPE = "DOUBLE"
[docs]class JSONField(Field[T], dict, list): # type: ignore
"""
JSON field.
This field can store dictionaries or lists of any JSON-compliant structure.
You can use generics to make static checking more friendly. Example: ``JSONField[dict[str, str]]``
You can specify your own custom JSON encoder/decoder, leaving at the default should work well.
If you have ``orjson`` installed, we default to using that,
else the default ``json`` module will be used.
``encoder``:
The custom JSON encoder.
``decoder``:
The custom JSON decoder.
If you want to use Pydantic model as the field type for generating a better OpenAPI documentation, you can use ``field_type`` to specify the type of the field.
``field_type``:
The Pydantic model class.
"""
SQL_TYPE = "JSON"
indexable = False
class _db_postgres:
SQL_TYPE = "JSONB"
class _db_mssql:
SQL_TYPE = "NVARCHAR(MAX)"
class _db_oracle:
SQL_TYPE = "NCLOB"
def __init__(
self,
encoder: JsonDumpsFunc = JSON_DUMPS,
decoder: JsonLoadsFunc = JSON_LOADS,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
if field_type := kwargs.get("field_type", None):
self.field_type = field_type
def to_db_value(
self,
value: Optional[Union[T, dict, list, str, bytes]],
instance: "Union[Type[Model], Model]",
) -> Optional[str]:
self.validate(value)
if value is None:
return None
if isinstance(value, (str, bytes)):
try:
self.decoder(value)
except Exception:
raise FieldError(f"Value {value!r} is invalid json value.")
if isinstance(value, bytes):
return value.decode()
return value
try:
from pydantic import BaseModel
if isinstance(value, BaseModel):
value = value.model_dump()
except ImportError:
pass
return self.encoder(value)
def to_python_value(
self, value: Optional[Union[T, str, bytes, dict, list]]
) -> Optional[Union[T, dict, list]]:
if isinstance(value, (str, bytes)):
try:
data = self.decoder(value)
try:
from pydantic._internal._model_construction import ModelMetaclass
if isinstance(self.field_type, ModelMetaclass) and not isinstance(data, list):
return self.field_type(**data)
except ImportError:
pass
return data
except Exception:
raise FieldError(
f"Value {value if isinstance(value, str) else value.decode()} is invalid json value."
)
return value
[docs]class UUIDField(Field[UUID], UUID):
"""
UUID Field
This field can store uuid value.
If used as a primary key, it will auto-generate a UUID4 by default.
"""
SQL_TYPE = "CHAR(36)"
class _db_postgres:
SQL_TYPE = "UUID"
def __init__(self, **kwargs: Any) -> None:
if (kwargs.get("primary_key") or kwargs.get("pk", False)) and "default" not in kwargs:
kwargs["default"] = uuid4
super().__init__(**kwargs)
def to_db_value(self, value: Any, instance: "Union[Type[Model], Model]") -> Optional[str]:
return value and str(value)
def to_python_value(self, value: Any) -> Optional[UUID]:
if value is None or isinstance(value, UUID):
return value
return UUID(value)
[docs]class BinaryField(Field[bytes], bytes): # type: ignore
"""
Binary field.
This is for storing ``bytes`` objects.
Note that filter or queryset-update operations are not supported.
"""
indexable = False
SQL_TYPE = "BLOB"
class _db_postgres:
SQL_TYPE = "BYTEA"
class _db_mysql:
SQL_TYPE = "LONGBLOB"
class _db_mssql:
SQL_TYPE = "VARBINARY(MAX)"
class IntEnumFieldInstance(SmallIntField):
def __init__(
self,
enum_type: Type[IntEnum],
description: Optional[str] = None,
generated: bool = False,
**kwargs: Any,
) -> None:
# Validate values
minimum = 1 if generated else -32768
for item in enum_type:
try:
value = int(item.value)
except ValueError:
raise ConfigurationError("IntEnumField only supports integer enums!")
if not minimum <= value < 32768:
raise ConfigurationError(
"The valid range of IntEnumField's values is {}..32767!".format(minimum)
)
# Automatic description for the field if not specified by the user
if description is None:
description = "\n".join([f"{e.name}: {int(e.value)}" for e in enum_type])[:2048]
super().__init__(description=description, **kwargs)
self.enum_type = enum_type
def to_python_value(self, value: Union[int, None]) -> Union[IntEnum, None]:
value = self.enum_type(value) if value is not None else None
return value
def to_db_value(
self, value: Union[IntEnum, None, int], instance: "Union[Type[Model], Model]"
) -> Union[int, None]:
if isinstance(value, IntEnum):
value = int(value.value)
if isinstance(value, int):
value = int(self.enum_type(value))
self.validate(value)
return value
IntEnumType = TypeVar("IntEnumType", bound=IntEnum)
[docs]def IntEnumField(
enum_type: Type[IntEnumType],
description: Optional[str] = None,
**kwargs: Any,
) -> IntEnumType:
"""
Enum Field
A field representing an integer enumeration.
The description of the field is set automatically if not specified to a multiline list of
"name: value" pairs.
**Note**: Valid int value of ``enum_type`` is acceptable.
``enum_type``:
The enum class
``description``:
The description of the field. It is set automatically if not specified to a multiline list
of "name: value" pairs.
"""
return IntEnumFieldInstance(enum_type, description, **kwargs) # type: ignore
class CharEnumFieldInstance(CharField):
def __init__(
self,
enum_type: Type[Enum],
description: Optional[str] = None,
max_length: int = 0,
**kwargs: Any,
) -> None:
# Automatic description for the field if not specified by the user
if description is None:
description = "\n".join([f"{e.name}: {str(e.value)}" for e in enum_type])[:2048]
# Automatic CharField max_length
if max_length == 0:
for item in enum_type:
item_len = len(str(item.value))
if item_len > max_length:
max_length = item_len
super().__init__(description=description, max_length=max_length, **kwargs)
self.enum_type = enum_type
def to_python_value(self, value: Union[str, None]) -> Union[Enum, None]:
return self.enum_type(value) if value is not None else None
def to_db_value(
self, value: Union[Enum, None, str], instance: "Union[Type[Model], Model]"
) -> Union[str, None]:
self.validate(value)
if isinstance(value, Enum):
return str(value.value)
if isinstance(value, str):
return str(self.enum_type(value).value)
return value
CharEnumType = TypeVar("CharEnumType", bound=Enum)
[docs]def CharEnumField(
enum_type: Type[CharEnumType],
description: Optional[str] = None,
max_length: int = 0,
**kwargs: Any,
) -> CharEnumType:
"""
Char Enum Field
A field representing a character enumeration.
**Warning**: If ``max_length`` is not specified or equals to zero, the size of represented
char fields is automatically detected. So if later you update the enum, you need to update your
table schema as well.
**Note**: Valid str value of ``enum_type`` is acceptable.
``enum_type``:
The enum class
``description``:
The description of the field. It is set automatically if not specified to a multiline list
of "name: value" pairs.
``max_length``:
The length of the created CharField. If it is zero it is automatically detected from
enum_type.
"""
return CharEnumFieldInstance(enum_type, description, max_length, **kwargs) # type: ignore