initial -> things migration

This commit is contained in:
jpt 2025-05-03 13:39:49 -05:00
parent 076f43b082
commit 5de9914341
4 changed files with 184 additions and 1 deletions

View File

@ -5,6 +5,7 @@ from typing_extensions import Annotated
from .controller.tasks import add_task
from .db import initialize_db
from .tui.tasks import run as tasks_tui
from .tui.things import run as things_tui
from .tui.overview import run as overview_tui
from .tui.recurring import run as recurring_tui
@ -48,6 +49,13 @@ def tasks(
initialize_db()
tasks_tui(view)
@app.command()
def things(
view: Annotated[str, typer.Option("-v", "--view", help="saved view")] = "default",
):
initialize_db()
things_tui(view)
@app.command()
def generators():

View File

@ -0,0 +1,90 @@
from datetime import datetime
from peewee import fn
from peewee import Case, Value
from ..db import db, Thing
from .. import config
def get_thing(item_id: int) -> Thing:
return Thing.get_by_id(item_id)
def add_thing(
type: str = "?",
**kwargs,
) -> Thing:
"""
Add a new Thing to the database.
Returns the created Thing instance.
"""
with db.atomic():
thing = Thing.create(
type=type, data=kwargs
)
return thing
def update_thing(
item_id: int,
**kwargs,
) -> Thing:
with db.atomic():
query = Thing.update(data=kwargs).where(Thing.id == item_id)
query.execute()
thing = Thing.get_by_id(item_id)
return thing
def _parse_sort_string(sort_string):
"""
Convert sort string like 'field1,-field2' to peewee order_by expressions.
"""
sort_expressions = []
if not sort_string:
return sort_expressions
for field_name in sort_string.split(","):
is_desc = field_name.startswith("-")
field_name = field_name[1:] if is_desc else field_name
# TODO: look up field type
field_type = "text"
if field_type == "enum":
# TODO: allow dynamic ordering
order = config.get_enum(field_name)
# CASE statement that maps each status to its position in the order
order_case = Case(
Thing.data[field_name],
[(s, Value(i)) for i, s in enumerate(order)],
)
sort_expressions.append(order_case.desc() if is_desc else order_case.asc())
elif field_type == "date":
expr = fn.COALESCE(Thing.data[field_name], datetime(3000, 12, 31))
sort_expressions.append(expr.desc() if is_desc else expr)
else:
field_expr = Thing.data[field_name]
sort_expressions.append(field_expr.desc() if is_desc else field_expr)
return sort_expressions
def get_things(
search_text: str | None = None,
filters: dict[str, str] | None = None,
sort: str = "",
) -> list[Thing]:
query = Thing.select().where(~Thing.deleted)
if search_text:
# TODO: which fields are searchable?
query = query.where(fn.Lower(Thing.data['text']).contains(search_text.lower()))
for param, val in filters.items():
query = query.where(Thing.data[param] == val)
sort_expressions = _parse_sort_string(sort)
query = query.order_by(*sort_expressions)
return list(query)

View File

@ -9,6 +9,7 @@ from peewee import (
SqliteDatabase,
TextField,
)
from playhouse.sqlite_ext import JSONField
# This module defines the core data types.
#
@ -28,7 +29,19 @@ class BaseModel(Model):
database = db
class Thing(BaseModel):
type = CharField()
data = JSONField()
created_at = DateTimeField(default=datetime.now)
updated_at = DateTimeField(default=datetime.now)
deleted = BooleanField(default=False)
def save(self, *args, **kwargs):
self.updated_at = datetime.now()
return super(Task, self).save(*args, **kwargs)
class Task(BaseModel):
text = TextField()
status = CharField()
@ -111,5 +124,5 @@ class TaskGenerator(BaseModel):
def initialize_db():
db.connect()
db.create_tables([Task, SavedSearch, TaskGenerator])
db.create_tables([Task, SavedSearch, TaskGenerator, Thing])
db.close()

72
src/tt/tui/things.py Normal file
View File

@ -0,0 +1,72 @@
from ..controller.things import (
get_thing,
get_things,
add_thing,
update_thing,
)
from .editor import TableEditor
class ThingTable(TableEditor):
# BINDINGS = [
# # saved views
# ("ctrl+s", "save_view", "save current view"),
# ("ctrl+o", "load_view", "load saved view"),
# ]
def __init__(self, default_view="default"):
super().__init__("things")
self.update_item_callback = update_thing
self.add_item_callback = add_thing
self.get_item_callback = get_thing
# def _load_db_view(self, name):
# try:
# saved = get_saved_view(name)
# self.filters = json.loads(saved.filters)
# self.sort_string = saved.sort_string
# except Exception:
# self.notify(f"Could not load {name}")
# def action_save_view(self):
# self._show_input("save-view", "default")
# def action_load_view(self):
# self._show_input("load-view", "")
# def on_input_submitted(self, event: Input.Submitted):
# # Override to add save/load view
# if self.mode == "save-view":
# save_view(event.value, filters=self.filters, sort_string=self.sort_string)
# self._hide_input()
# event.prevent_default()
# elif self.mode == "load-view":
# self._load_view(event.value)
# self.refresh_tasks(restore_cursor=False)
# self._hide_input()
# # if event isn't handled here it will bubble to parent
# event.prevent_default()
def refresh_items(self):
items = get_things(
self.search_query,
filters={},# key: self._filters_to_list(key) for key in self._filters},
sort=self.sort_string,
)
for item in items:
self.table.add_row(
*self._db_item_to_row(item),
key=str(item.id),
)
def _filters_to_list(self, key):
filters = self.filters.get(key)
if filters:
return filters.split(",")
else:
return None
def run(default_view):
app = ThingTable(default_view)
app.run()