purging tasks

This commit is contained in:
jpt 2025-05-03 15:27:36 -05:00
parent 9e3dd591d6
commit c5eef57aeb
7 changed files with 24 additions and 270 deletions

View File

@ -2,12 +2,11 @@ import typer
import httpx
import lxml.html
from typing_extensions import Annotated
from .controller.tasks import add_task
from .db import initialize_db
from .tui.tasks import run as tasks_tui
from .tui.things import run as things_tui
from .tui.overview import run as overview_tui
#from .tui.overview import run as overview_tui
from .tui.recurring import run as recurring_tui
from .controller.things import add_thing
app = typer.Typer()
@ -38,16 +37,8 @@ def new(
# due = typer.prompt("Due (YYYY-MM-DD):")
# TODO: validate/allow blank
add_task(name, category, due)
typer.echo("Created new task!")
@app.command()
def tasks(
view: Annotated[str, typer.Option("-v", "--view", help="saved view")] = "default",
):
initialize_db()
tasks_tui(view)
add_thing(name, category, due)
typer.echo("Created new thing!")
@app.command()
def things(

View File

@ -1,15 +1,15 @@
import json
from datetime import date, timedelta
from ..db import db, TaskGenerator
from .tasks import add_task
from ..db import db, ThingGenerator
from .things import add_thing
def get_generator(item_id: int) -> TaskGenerator:
return TaskGenerator.get_by_id(item_id)
def get_generator(item_id: int) -> ThingGenerator:
return ThingGenerator.get_by_id(item_id)
def get_generators() -> list[TaskGenerator]:
query = TaskGenerator.select().where(~TaskGenerator.deleted)
def get_generators() -> list[ThingGenerator]:
query = ThingGenerator.select().where(~ThingGenerator.deleted)
return query.order_by("type", "template")
@ -17,11 +17,11 @@ def add_generator(
template: str,
type: str,
val: str,
) -> TaskGenerator:
) -> ThingGenerator:
# JSON for future expansion
config = json.dumps({"val": val})
with db.atomic():
task = TaskGenerator.create(
task = ThingGenerator.create(
template=template,
type=type,
config=config,
@ -46,7 +46,7 @@ def generate_needed_tasks():
)
for c in to_create:
add_task(**c)
add_thing(**c)
return to_create
@ -54,13 +54,13 @@ def generate_needed_tasks():
def update_generator(
item_id: int,
**kwargs,
) -> TaskGenerator:
) -> ThingGenerator:
# replace "val" with JSON
if "val" in kwargs:
config = {"val": kwargs.pop("val")}
kwargs["config"] = json.dumps(config)
with db.atomic():
query = TaskGenerator.update(kwargs).where(TaskGenerator.id == item_id)
query = ThingGenerator.update(kwargs).where(ThingGenerator.id == item_id)
query.execute()
task = TaskGenerator.get_by_id(item_id)
task = ThingGenerator.get_by_id(item_id)
return task

View File

@ -1,107 +0,0 @@
from datetime import datetime, timedelta
from peewee import fn, JOIN
from ..db import Task
def get_category_summary(num: int = 5) -> list[dict]:
"""
Returns summary of top categories with task counts by status and due dates.
Args:
num: Number of categories to return, ordered by total task count
Returns:
List of dicts containing category name and task statistics
"""
return []
def get_recently_active(num: int = 5, category: str | None = None) -> list[dict]:
"""
Returns most recently active tasks, optionally filtered by category.
Args:
num: Number of tasks to return
category: Optional category name to filter by
Returns:
List of tasks ordered by last activity (updated_at)
"""
query = (
Task.select(Task, Category).join(Category, JOIN.LEFT_OUTER).where(~Task.deleted)
)
if category:
query = query.where(Category.name == category)
query = query.order_by(Task.updated_at.desc()).limit(num)
return [
{
"id": task.id,
"text": task.text,
"status": task.status,
"type": task.type,
"category": task.category.name if task.category else None,
"due": task.due,
"updated_at": task.updated_at,
}
for task in query
]
def get_due_soon(
num: int = 5, all_overdue: bool = True, category: str | None = None
) -> list[dict]:
"""
Returns tasks ordered by due date, optionally including all overdue tasks.
Args:
num: Number of non-overdue tasks to return
all_overdue: If True, returns all overdue tasks plus num more tasks
If False, includes overdue tasks in the count
category: Optional category name to filter by
Returns:
List of tasks ordered by due date
"""
now = datetime.now()
# unfinished tasks w/ due date
query = (
Task.select(Task)
.join(Category, JOIN.LEFT_OUTER)
.where(
(~Task.deleted)
& (Task.due.is_null(False))
& (Task.due != "")
& (Task.status != "done")
)
)
# filter by category
if category:
query = query.where(Category.name == category)
if all_overdue:
# grab all overdue & append a few more on
overdue_tasks = list(query.where(Task.due < now).order_by(Task.due))
upcoming_tasks = list(
query.where(Task.due >= now).order_by(Task.due).limit(num)
)
tasks = overdue_tasks + upcoming_tasks
else:
# just most due tasks
tasks = list(query.order_by(Task.due).limit(num))
return [
{
"id": task.id,
"text": task.text,
"status": task.status,
"type": task.type,
"category": task.category.name if task.category else None,
"due": task.due,
"days_until_due": (task.due - now).days if task.due else None,
}
for task in tasks
]

View File

@ -1,114 +0,0 @@
import json
from datetime import datetime
from peewee import fn
from peewee import Case, Value
from ..db import db, Task, SavedSearch
from .. import config
from ..constants import SPECIAL_DATES_PIECES
def get_task(item_id: int) -> Task:
return Task.get_by_id(item_id)
def add_task(
text: str,
status: str,
due: datetime | str = SPECIAL_DATES_PIECES["unclassified"],
type: str = "",
project: str = "",
) -> Task:
"""
Add a new task to the database.
Returns the created task instance.
"""
with db.atomic():
task = Task.create(
text=text, type=type, status=status, due=due, project=project
)
return task
def update_task(
item_id: int,
**kwargs,
) -> Task:
with db.atomic():
query = Task.update(kwargs).where(Task.id == item_id)
query.execute()
task = Task.get_by_id(item_id)
return task
def _parse_sort_string(sort_string, status_order):
"""
Convert sort string like 'field1,-field2' to peewee order_by expressions.
"""
sort_expressions = []
if not sort_string:
return sort_expressions
for field in sort_string.split(","):
is_desc = field.startswith("-")
field_name = field[1:] if is_desc else field
if field == "status":
if not status_order:
status_order = list(config.STATUSES.keys())
# CASE statement that maps each status to its position in the order
order_case = Case(
Task.status,
[(s, Value(i)) for i, s in enumerate(status_order)],
)
sort_expressions.append(order_case.desc() if is_desc else order_case.asc())
elif field_name == "due_date":
expr = fn.COALESCE(getattr(Task, field_name), datetime(3000, 12, 31))
sort_expressions.append(expr.desc() if is_desc else expr)
else:
field_expr = getattr(Task, field_name)
sort_expressions.append(field_expr.desc() if is_desc else field_expr)
return sort_expressions
def get_tasks(
search_text: str | None = None,
statuses: tuple[str] | None = None,
projects: tuple[str] | None = None,
sort: str = "",
) -> list[Task]:
query = Task.select().where(~Task.deleted)
if search_text:
query = query.where(fn.Lower(Task.text).contains(search_text.lower()))
if statuses:
query = query.where(Task.status.in_(statuses))
if projects:
query = query.where(Task.project.in_(projects))
sort_expressions = _parse_sort_string(sort, statuses)
query = query.order_by(*sort_expressions)
return list(query)
def save_view(name: str, *, filters: dict, sort_string: str) -> SavedSearch:
filters_json = json.dumps(filters)
with db.atomic():
if SavedSearch.select().where(SavedSearch.name == name).exists():
query = SavedSearch.update(
filters=filters_json, sort_string=sort_string
).where(SavedSearch.name == name)
query.execute()
else:
SavedSearch.create(name=name, filters=filters_json, sort_string=sort_string)
def get_saved_view_names() -> list[str]:
return [search.name for search in SavedSearch.select()]
def get_saved_view(name: str) -> SavedSearch:
return SavedSearch.get(SavedSearch.name == name)

View File

@ -40,25 +40,9 @@ class Thing(BaseModel):
self.updated_at = datetime.now()
return super(Thing, self).save(*args, **kwargs)
class Task(BaseModel):
text = TextField()
status = CharField()
due = DateTimeField(null=True)
type = CharField()
project = CharField()
created_at = DateTimeField(default=datetime.now)
updated_at = DateTimeField(default=datetime.now)
deleted = BooleanField(default=False)
def save(self, *args, **kwargs):
self.updated_at = datetime.now()
return super(Task, self).save(*args, **kwargs)
@property
def due_week(self):
return self.due.isocalendar()[1] - 12
# @property
# def due_week(self):
# return self.due.isocalendar()[1] - 12
class SavedSearch(BaseModel):
@ -70,7 +54,7 @@ class SavedSearch(BaseModel):
return self.name
class TaskGenerator(BaseModel):
class ThingGenerator(BaseModel):
template = CharField()
type = CharField()
config = TextField() # JSON
@ -124,5 +108,5 @@ class TaskGenerator(BaseModel):
def initialize_db():
db.connect()
db.create_tables([Task, SavedSearch, TaskGenerator, Thing])
db.create_tables([SavedSearch, ThingGenerator, Thing])
db.close()

View File

@ -10,7 +10,7 @@ from ..controller.generators import (
from .editor import ( TableEditor, )
class TaskGenEditor(TableEditor):
class GenEditor(TableEditor):
def __init__(self):
super().__init__()
self.update_item_callback = update_generator
@ -34,5 +34,5 @@ class TaskGenEditor(TableEditor):
def run():
app = TaskGenEditor()
app = GenEditor()
app.run()

View File

@ -43,7 +43,7 @@ class ThingTable(TableEditor):
# event.prevent_default()
# elif self.mode == "load-view":
# self._load_view(event.value)
# self.refresh_tasks(restore_cursor=False)
# self.refresh_things(restore_cursor=False)
# self._hide_input()
# # if event isn't handled here it will bubble to parent
# event.prevent_default()