tt-server/apps/core/api.py
jpt 87b8c3894e
Some checks are pending
Lint / lint (push) Waiting to run
sync api prototype
2025-05-03 21:23:59 -05:00

71 lines
1.9 KiB
Python

from ninja import NinjaAPI
from ninja import Schema
from ninja.security import APIKeyQuery
import datetime
from ..accounts.models import User
from .models import ThingEdit, Thing as ThingModel
class ApiKey(APIKeyQuery):
param_name = "api_key"
def authenticate(self, request, key):
try:
# TODO: replace with secret keys
user = User.objects.get(username=key)
return user
except User.DoesNotExist:
pass
api = NinjaAPI(auth=ApiKey())
class Thing(Schema):
id: int
data: dict
type: str
@api.post("/thing/")
def sync_thing(request, thing: Thing):
action = "none"
try:
old_thing = ThingModel.objects.get(user=request.auth, thing_id=thing.id)
print("update", thing)
if old_thing.data != thing.data:
ThingEdit.objects.create(
thing=old_thing,
data=old_thing.data,
timestamp=old_thing.synced_at,
)
old_thing.data = thing.data
old_thing.synced_at = datetime.datetime.utcnow()
old_thing.save()
thing = old_thing
action = "updated"
except ThingModel.DoesNotExist:
print("new", thing)
thing = ThingModel.objects.create(
user=request.auth,
thing_id=thing.id,
data=thing.data,
type=thing.type,
synced_at=datetime.datetime.utcnow(),
)
action = "created"
# TODO: deleted
return {"action": action}
@api.get("/thing/{id}")
def get_thing(request, id: int, key: str):
thing = ThingModel.objects.get(user=request.auth, thing_id=id)
return thing.data
@api.get("/thing/latest_changes")
def latest_changes(request, since: str):
updates = ThingModel.objects.filter(user=request.auth, updated_at__gte=since)
return [{"id": u.id, "updated_at": u.updated_at} for u in updates]