restored lost code

This commit is contained in:
James Turk 2024-12-16 23:04:51 -06:00
commit fd78a86e20
37 changed files with 1624 additions and 0 deletions

28
README.rst Normal file
View File

@ -0,0 +1,28 @@
django-simplekeys
=================
django-simplekeys is a reusable Django app that provides a simple way to add
API keys to an existing Django project, regardless of API framework.
* GitHub: https://github.com/jamesturk/django-simplekeys
* Documentation: https://django-simplekeys.readthedocs.io/en/latest/
.. image:: https://travis-ci.org/jamesturk/django-simplekeys.svg?branch=master
:target: https://travis-ci.org/jamesturk/django-simplekeys
.. image:: https://img.shields.io/pypi/v/django-simplekeys.svg
:target: https://pypi.python.org/pypi/django-simplekeys
.. image:: https://readthedocs.org/projects/django-simplekeys/badge/?version=latest
:target: https://django-simplekeys.readthedocs.io/en/latest/
Features
--------
* `Token bucket <https://en.wikipedia.org/wiki/Token_bucket>`_ rate limiting, for limiting requests/second with optional bursting behavior.
* Quota-based rate limiting (e.g. requests/day)
* Ability to configure different usage tiers, to give different users different rates/quotas.
* Ability to configure different 'zones' so that different API methods can have different limits. (e.g. some particularly computationally expensive queries can have a much lower limit than cheap GET queries)
* Provided views for very simple email-based API key registration.

0
example/__init__.py Normal file
View File

75
example/settings.py Normal file
View File

@ -0,0 +1,75 @@
import os
import django
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SECRET_KEY = 'not-a-secret'
DEBUG = True
ALLOWED_HOSTS = []
SITE_ID = 1
EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.staticfiles',
'django.contrib.sessions',
'django.contrib.sites',
'django.contrib.contenttypes',
'simplekeys',
]
SIMPLEKEYS_RATE_LIMIT_BACKEND = 'simplekeys.backends.MemoryBackend'
SIMPLEKEYS_ZONE_PATHS = [('/via.*/', 'default')]
_MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'simplekeys.middleware.SimpleKeysMiddleware',
]
if django.VERSION >= (1, 11):
MIDDLEWARE = _MIDDLEWARE
else:
MIDDLEWARE_CLASSES = _MIDDLEWARE
ROOT_URLCONF = 'simplekeys.tests.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'example.wsgi.application'
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
}
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_L10N = True
USE_TZ = True
STATIC_URL = '/static/'

16
example/wsgi.py Normal file
View File

@ -0,0 +1,16 @@
"""
WSGI config for example project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/1.11/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "example.settings")
application = get_wsgi_application()

10
setup.cfg Normal file
View File

@ -0,0 +1,10 @@
[bdist_wheel]
universal = 1
[flake8]
exclude = simplekeys/migrations
[egg_info]
tag_build =
tag_date = 0

42
setup.py Normal file
View File

@ -0,0 +1,42 @@
from setuptools import setup, find_packages
setup(
name='django-simplekeys',
version="0.5.3",
packages=find_packages(),
include_package_data=True,
zip_safe=False,
description='Django API Key management & validation',
author='James Turk',
author_email='dev@jamesturk.net',
license='MIT License',
url='http://github.com/jamesturk/django-simplekeys/',
long_description=open('README.rst').read(),
platforms=["any"],
install_requires=[
"Django",
],
extras_require={
'dev': [
'freezegun',
'flake8',
'sphinx',
'sphinx-rtd-theme',
]
},
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Natural Language :: English',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Environment :: Web Environment',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
],
)

0
simplekeys/__init__.py Normal file
View File

31
simplekeys/admin.py Normal file
View File

@ -0,0 +1,31 @@
from django.contrib import admin
from .models import Tier, Zone, Limit, Key
@admin.register(Key)
class KeyAdmin(admin.ModelAdmin):
list_display = ('key', 'email', 'name', 'tier', 'status', 'created_at')
list_filter = ('tier', 'status')
search_fields = ('email', 'name', 'key')
list_select_related = ('tier',)
@admin.register(Zone)
class ZoneAdmin(admin.ModelAdmin):
fields = ('name', 'slug')
prepopulated_fields = {"slug": ("name",)}
class LimitInline(admin.TabularInline):
model = Limit
extra = 1
@admin.register(Tier)
class TierAdmin(admin.ModelAdmin):
fields = ('name', 'slug')
prepopulated_fields = {"slug": ("name",)}
inlines = [
LimitInline,
]

5
simplekeys/apps.py Normal file
View File

@ -0,0 +1,5 @@
from django.apps import AppConfig
class SimplekeysConfig(AppConfig):
name = 'simplekeys'

119
simplekeys/backends.py Normal file
View File

@ -0,0 +1,119 @@
import time
import itertools
import datetime
from collections import Counter
from django.conf import settings
from .models import Zone, Key
class AbstractBackend(object):
def get_tokens_and_timestamp(self, kz):
"""
returns token_count, timestamp for kz
if not found return (0, None)
"""
raise NotImplementedError()
def set_token_count(self, kz, tokens):
"""
set counter for kz & timestamp to current time
"""
raise NotImplementedError()
def get_and_inc_quota_value(self, key, zone, quota_range):
"""
increment & get quota value
(value will increase regardless of validity)
"""
raise NotImplementedError()
def get_usage(self, keys=None, days=7):
"""
get usage in a nested dictionary
{
api_key: {
date: {
zone: N
}
}
}
such that result['apikey']['20170501']['default'] is equal to the
number of requests made by 'apikey' to 'default' zone endpoints on
May 1, 2017.
"""
raise NotImplementedError()
class MemoryBackend(AbstractBackend):
def __init__(self):
self.reset()
def reset(self):
self._counter = {}
self._last_replenished = {}
self._quota = Counter()
def get_tokens_and_timestamp(self, key, zone):
kz = (key, zone)
return self._counter.get(kz, 0), self._last_replenished.get(kz)
def set_token_count(self, key, zone, tokens):
kz = (key, zone)
self._last_replenished[kz] = time.time()
self._counter[kz] = tokens
def get_and_inc_quota_value(self, key, zone, quota_range):
quota_key = '{}-{}-{}'.format(key, zone, quota_range)
self._quota[quota_key] += 1
return self._quota[quota_key]
class CacheBackend(AbstractBackend):
def __init__(self):
from django.core.cache import caches
self.cache = caches[getattr(settings, 'SIMPLEKEYS_CACHE', 'default')]
# 25 hour default, just longer than a day so that day limits are OK
self.timeout = getattr(settings, 'SIMPLEKEYS_CACHE_TIMEOUT', 25*60*60)
def get_tokens_and_timestamp(self, key, zone):
kz = '{}~{}'.format(key, zone)
# once we drop Django 1.8 support we can use
# return self.cache.get_or_set(kz, (0, None), self.timeout)
val = self.cache.get(kz)
if val is None:
val = self.cache.add(kz, (0, None), timeout=self.timeout)
if val:
return self.cache.get(kz)
return val
def set_token_count(self, key, zone, tokens):
kz = '{}~{}'.format(key, zone)
self.cache.set(kz, (tokens, time.time()), self.timeout)
def get_and_inc_quota_value(self, key, zone, quota_range):
quota_key = '{}~{}~{}'.format(key, zone, quota_range)
# once we drop Django 1.8 support we can use
# self.cache.get_or_set(quota_key, 0, timeout=self.timeout)
if quota_key not in self.cache:
self.cache.add(quota_key, 0, timeout=self.timeout)
return self.cache.incr(quota_key)
def get_usage(self, keys=None, days=7):
today = datetime.date.today()
dates = [(today - datetime.timedelta(days=d)).strftime('%Y%m%d')
for d in range(days)]
zones = Zone.objects.all().values_list('slug', flat=True)
if not keys:
keys = Key.objects.all().values_list('key', flat=True)
all_keys = ['{}~{}~{}'.format(key, zone, date) for (key, zone, date) in
itertools.product(keys, zones, dates)]
result = {k: {d: Counter() for d in dates} for k in keys}
for cache_key, cache_val in self.cache.get_many(all_keys).items():
key, zone, date = cache_key.split('~')
result[key][date][zone] = cache_val
return result

18
simplekeys/decorators.py Normal file
View File

@ -0,0 +1,18 @@
from django.conf import settings
from functools import wraps
from .verifier import verify_request
def key_required(zone=None):
if not zone:
zone = getattr(settings, 'SIMPLEKEYS_DEFAULT_ZONE', 'default')
def decorator(func):
@wraps(func)
def newfunc(request, *args, **kwargs):
resp = verify_request(request, zone)
return resp or func(request, *args, **kwargs)
return newfunc
return decorator

14
simplekeys/forms.py Normal file
View File

@ -0,0 +1,14 @@
from django import forms
from .models import Key
class KeyRegistrationForm(forms.ModelForm):
class Meta:
model = Key
fields = ['email', 'name', 'website', 'organization', 'usage']
class KeyConfirmationForm(forms.Form):
email = forms.CharField(widget=forms.HiddenInput)
key = forms.CharField(widget=forms.HiddenInput)
confirm_hash = forms.CharField(widget=forms.HiddenInput)

View File

View File

@ -0,0 +1,26 @@
import csv
from django.core.management.base import BaseCommand
from ...models import Key
class Command(BaseCommand):
help = 'Export API user information.'
def add_arguments(self, parser):
parser.add_argument('--since', dest='since', default=False,
help='only dump users since a given date')
def handle(self, *args, **options):
fields = ('key', 'status', 'tier', 'email', 'name', 'organization', 'usage', 'website',
'created_at', 'updated_at')
dw = csv.DictWriter(self.stdout, fields)
dw.writeheader()
qs = Key.objects.all()
if options['since']:
qs = qs.filter(created_at__gte=options['since'])
for key in qs.order_by('created_at'):
dw.writerow({f: getattr(key, f) for f in fields})

View File

@ -0,0 +1,25 @@
import csv
from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils.module_loading import import_string
class Command(BaseCommand):
help = 'Export API usage report.'
def add_arguments(self, parser):
parser.add_argument('--days', dest='days', default=7, help='days of usage to export')
def handle(self, *args, **options):
backend = getattr(settings, 'SIMPLEKEYS_RATE_LIMIT_BACKEND',
'simplekeys.backends.CacheBackend')
backend = import_string(backend)()
dw = csv.writer(self.stdout)
dw.writerow(('key', 'zone', 'date', 'requests'))
for key, date_zone_num in backend.get_usage(days=int(options['days'])).items():
for date, zone_num in date_zone_num.items():
for zone, requests in zone_num.items():
dw.writerow((key, zone, date, requests))

33
simplekeys/middleware.py Normal file
View File

@ -0,0 +1,33 @@
import re
from .verifier import verify_request
try:
from django.utils.deprecation import MiddlewareMixin
except ImportError:
MiddlewareMixin = object
class SimpleKeysMiddleware(MiddlewareMixin):
def __init__(self, get_response=None):
self._zones = None
self.get_response = get_response
@property
def zones(self):
if not self._zones:
from django.conf import settings
zones = getattr(settings, 'SIMPLEKEYS_ZONE_PATHS',
[('.*', 'default')])
self._zones = [
((re.compile(path) if isinstance(path, str) else path), zone)
for (path, zone) in zones
]
return self._zones
def process_request(self, request):
for path, zone in self.zones:
if path.match(request.path):
return verify_request(request, zone)
# no paths matched, pass-through
return None

View File

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-04-19 01:01
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
import uuid
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Key',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('key', models.CharField(default=uuid.uuid4, max_length=40, unique=True)),
('status', models.CharField(choices=[('u', 'Unactivated'), ('s', 'Suspended'), ('a', 'Active')], max_length=1)),
('email', models.EmailField(max_length=254, unique=True)),
('name', models.CharField(max_length=100)),
('organization', models.CharField(max_length=100)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
),
migrations.CreateModel(
name='Limit',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('quota_period', models.CharField(choices=[('d', 'daily'), ('m', 'monthly')], max_length=1)),
('quota_requests', models.PositiveIntegerField()),
('requests_per_second', models.PositiveIntegerField()),
('burst_size', models.PositiveIntegerField()),
],
),
migrations.CreateModel(
name='Tier',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('slug', models.SlugField(unique=True)),
('name', models.CharField(max_length=50)),
],
),
migrations.CreateModel(
name='Zone',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('slug', models.SlugField(unique=True)),
('name', models.CharField(max_length=50)),
],
),
migrations.AddField(
model_name='limit',
name='tier',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='limits', to='simplekeys.Tier'),
),
migrations.AddField(
model_name='limit',
name='zone',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='limits', to='simplekeys.Zone'),
),
migrations.AddField(
model_name='key',
name='tier',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='keys', to='simplekeys.Tier'),
),
migrations.AlterUniqueTogether(
name='limit',
unique_together=set([('tier', 'zone')]),
),
]

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-04-21 03:07
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('simplekeys', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='key',
name='usage',
field=models.TextField(blank=True, verbose_name='Intended Usage'),
),
migrations.AddField(
model_name='key',
name='website',
field=models.URLField(blank=True),
),
migrations.AlterField(
model_name='key',
name='organization',
field=models.CharField(blank=True, max_length=100),
),
]

View File

@ -0,0 +1,19 @@
# Generated by Django 2.0.9 on 2018-10-30 00:30
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('simplekeys', '0002_auto_20170421_0307'),
]
operations = [
migrations.AlterField(
model_name='key',
name='tier',
field=models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='keys', to='simplekeys.Tier'),
),
]

View File

63
simplekeys/models.py Normal file
View File

@ -0,0 +1,63 @@
import uuid
from django.db import models
QUOTA_PERIODS = (
('d', 'daily'),
('m', 'monthly'),
)
KEY_STATUSES = (
('u', 'Unactivated'),
('s', 'Suspended'),
('a', 'Active'),
)
class Tier(models.Model):
slug = models.SlugField(max_length=50, unique=True)
name = models.CharField(max_length=50)
def __str__(self):
return self.name
class Zone(models.Model):
slug = models.SlugField(max_length=50, unique=True)
name = models.CharField(max_length=50)
def __str__(self):
return self.name
class Limit(models.Model):
tier = models.ForeignKey(Tier, related_name='limits', on_delete=models.CASCADE)
zone = models.ForeignKey(Zone, related_name='limits', on_delete=models.CASCADE)
quota_period = models.CharField(max_length=1, choices=QUOTA_PERIODS)
quota_requests = models.PositiveIntegerField()
requests_per_second = models.PositiveIntegerField()
burst_size = models.PositiveIntegerField()
class Meta:
unique_together = (
('tier', 'zone'),
)
class Key(models.Model):
key = models.CharField(max_length=40, unique=True, default=uuid.uuid4)
status = models.CharField(max_length=1, choices=KEY_STATUSES)
tier = models.ForeignKey(Tier, related_name='keys', on_delete=models.PROTECT)
email = models.EmailField(unique=True)
name = models.CharField(max_length=100)
organization = models.CharField(max_length=100, blank=True)
usage = models.TextField('Intended Usage', blank=True)
website = models.URLField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __str__(self):
return '{} ({})'.format(self.email, self.key)

View File

@ -0,0 +1,16 @@
<html>
<head>
<title>API Key Confirmation</title>
</head>
<body>
<h1>API Key Confirmation</h1>
<p> Click Submit to confirm your key: </p>
<form action="." method="POST">
{{ form.as_ul }}
<input type="submit">
{% csrf_token %}
</form>
</body>
</html>

View File

@ -0,0 +1,13 @@
{{ key.name }},
Your API key registration is almost done!
Your new API key is: {{ key.key }}
Please visit:
{{ confirmation_url|safe }}
to activate your key.
Thanks!

View File

@ -0,0 +1,11 @@
<html>
<head>
<title>API Key Confirmed</title>
</head>
<body>
<h1>API Key Confirmed</h1>
<p> Your key has been activated: {{ key.key }} </p>
</body>
</html>

View File

@ -0,0 +1,13 @@
<html>
<head>
<title>API Key Registration</title>
</head>
<body>
<h1>API Key Registration</h1>
<form action="." method="POST">
{{ form.as_ul }}
<input type="submit">
{% csrf_token %}
</form>
</body>
</html>

View File

View File

@ -0,0 +1,103 @@
from django.test import TestCase
from freezegun import freeze_time
from ..models import Zone, Key, Tier
from ..backends import MemoryBackend, CacheBackend
class MemoryBackendTestCase(TestCase):
def get_backend(self):
return MemoryBackend()
def test_get_tokens_and_timestamp_initial(self):
b = self.get_backend()
self.assertEquals(b.get_tokens_and_timestamp('key', 'zone'), (0, None))
def test_token_set_and_retrieve(self):
b = self.get_backend()
with freeze_time('2017-04-17'):
b.set_token_count('key', 'zone', 100)
tokens, timestamp = b.get_tokens_and_timestamp('key', 'zone')
self.assertEquals(tokens, 100)
self.assertEquals(int(timestamp), 1492387200) # frozen time
def test_key_and_zone_independence(self):
b = self.get_backend()
b.set_token_count('key', 'zone', 100)
self.assertEquals(b.get_tokens_and_timestamp('key2', 'zone'),
(0, None))
self.assertEquals(b.get_tokens_and_timestamp('key', 'zone2'),
(0, None))
def test_get_and_inc_quota(self):
b = self.get_backend()
self.assertEquals(
b.get_and_inc_quota_value('key', 'zone', '20170411'), 1
)
self.assertEquals(
b.get_and_inc_quota_value('key', 'zone', '20170411'), 2
)
self.assertEquals(
b.get_and_inc_quota_value('key', 'zone', '20170412'), 1
)
def test_get_and_inc_quota_key_and_zone_independence(self):
b = self.get_backend()
self.assertEquals(
b.get_and_inc_quota_value('key', 'zone', '20170411'), 1
)
self.assertEquals(
b.get_and_inc_quota_value('key2', 'zone', '20170411'), 1
)
self.assertEquals(
b.get_and_inc_quota_value('key', 'zone2', '20170411'), 1
)
class CacheBackendTestCase(MemoryBackendTestCase):
""" do the same tests as MemoryBackendTestCase but w/ CacheBackend """
def get_backend(self):
# ensure we have a fresh cache backend each time
c = CacheBackend()
c.cache.clear()
return c
def test_get_usage(self):
Zone.objects.create(slug='default', name='Default')
Zone.objects.create(slug='special', name='Special')
tier = Tier.objects.create(slug='default')
Key.objects.create(key='key1', tier=tier, email='key1@example.com')
Key.objects.create(key='key2', tier=tier, email='key2@example.com')
b = self.get_backend()
# key, zone, day, num
usage = [
# Key 1 usage
('key1', 'default', '20170501', 20),
('key1', 'default', '20170502', 200),
('key1', 'default', '20170503', 20),
('key1', 'special', '20170502', 5),
# Key 2 usage
('key2', 'special', '20170501', 1),
('key2', 'special', '20170502', 1),
('key2', 'special', '20170503', 1),
]
for key, zone, day, num in usage:
for _ in range(num):
b.get_and_inc_quota_value(key, zone, day)
with freeze_time('2017-05-05'):
key1usage = b.get_usage()['key1']
assert key1usage['20170501']['default'] == 20
assert key1usage['20170502']['default'] == 200
assert key1usage['20170503']['default'] == 20
assert key1usage['20170502']['special'] == 5
assert key1usage['20170501']['special'] == 0
# ensure we only get requested data
with freeze_time('2017-05-15'):
usage = b.get_usage(keys=['key1'], days=2)
assert len(usage) == 1
assert len(usage['key1']) == 2

View File

@ -0,0 +1,43 @@
import csv
from django.test import TestCase
from django.test.utils import captured_stdout
from django.core.management import call_command
from ..models import Tier, Key
class ExportKeysTestCase(TestCase):
def setUp(self):
self.bronze = Tier.objects.create(slug='bronze', name='Bronze')
self.gold = Tier.objects.create(slug='gold', name='Gold')
Key.objects.create(key='one', status='a', tier=self.bronze,
email='one@example.com', name='User One')
Key.objects.create(key='two', status='a', tier=self.gold,
email='two@example.com', name='User Two')
Key.objects.create(key='three', status='u', tier=self.bronze,
email='three@example.com', name='User Three')
Key.objects.create(key='four', status='a', tier=self.gold,
email='four@example.com', name='User Four')
def test_basic_export(self):
with captured_stdout() as stdout:
call_command('exportkeys')
stdout.seek(0)
data = list(csv.DictReader(stdout))
self.assertEquals(len(data), 4)
self.assertEquals(data[0]['key'], 'one')
self.assertEquals(data[1]['tier'], 'Gold')
self.assertEquals(data[2]['status'], 'u')
self.assertEquals(data[3]['email'], 'four@example.com')
def test_export_flags(self):
offset = Key.objects.all().order_by('created_at')[1]
with captured_stdout() as stdout:
call_command('exportkeys', '--since=' + offset.created_at.isoformat())
stdout.seek(0)
data = list(csv.DictReader(stdout))
self.assertEquals(len(data), 3)

View File

@ -0,0 +1,83 @@
from django.test import TestCase
from ..models import Tier, Zone, Key
from ..verifier import backend
class ViewTestCase(TestCase):
def setUp(self):
backend.reset()
self.bronze = Tier.objects.create(slug='bronze', name='Bronze')
self.default_zone = Zone.objects.create(slug='default', name='Default')
self.bronze.limits.create(
zone=self.default_zone,
quota_requests=10,
quota_period='d',
requests_per_second=2,
burst_size=10,
)
self.gold = Tier.objects.create(slug='gold', name='Gold')
self.special = Zone.objects.create(slug='special', name='Special')
self.gold.limits.create(
zone=self.special,
quota_requests=10,
quota_period='d',
requests_per_second=2,
burst_size=10,
)
Key.objects.create(
key='bronze',
status='a',
tier=self.bronze,
email='bronze1@example.com',
)
Key.objects.create(
key='gold',
status='a',
tier=self.gold,
email='gold@example.com',
)
def test_view_no_key(self):
response = self.client.get('/example/')
self.assertEquals(response.status_code, 403)
def test_view_key_header(self):
response = self.client.get('/example/', HTTP_X_API_KEY='bronze')
self.assertEquals(response.status_code, 200)
def test_view_key_param(self):
response = self.client.get('/example/?apikey=bronze')
self.assertEquals(response.status_code, 200)
def test_view_zone(self):
# ensure that bronze can't get here, but gold can
response = self.client.get('/special/?apikey=bronze')
self.assertEquals(response.status_code, 403)
response = self.client.get('/special/?apikey=gold')
self.assertEquals(response.status_code, 200)
def test_view_key_429(self):
for x in range(10):
response = self.client.get('/example/?apikey=bronze')
self.assertEquals(response.status_code, 200)
# 11th request, exceeds burst
response = self.client.get('/example/?apikey=bronze')
self.assertEquals(response.status_code, 429)
# ... we won't test everything else, verifier tests take care of that
def test_view_protected_via_middleware(self):
# make sure middleware doesn't wind up protecting everything
response = self.client.get('/via_middleware/')
self.assertEquals(response.status_code, 403)
# and with key
response = self.client.get('/via_middleware/?apikey=bronze')
self.assertEquals(response.status_code, 200)
def test_view_unprotected(self):
# make sure middleware doesn't wind up protecting everything
response = self.client.get('/unprotected/')
self.assertEquals(response.status_code, 200)

View File

@ -0,0 +1,47 @@
import csv
from django.test import TestCase, override_settings
from django.test.utils import captured_stdout
from django.core.management import call_command
from freezegun import freeze_time
from ..models import Tier, Key, Zone
from ..backends import CacheBackend
class UsageReportTestCase(TestCase):
def setUp(self):
b = CacheBackend()
Zone.objects.create(slug='default', name='Default')
Zone.objects.create(slug='special', name='Special')
tier = Tier.objects.create(slug='default')
Key.objects.create(key='key1', tier=tier, email='key1@example.com')
Key.objects.create(key='key2', tier=tier, email='key2@example.com')
# key, zone, day, num
self.usage = (
# Key 1 usage
('key1', 'default', '20170501', '20'),
('key1', 'default', '20170502', '200'),
('key1', 'default', '20170503', '20'),
('key1', 'special', '20170502', '5'),
# Key 2 usage
('key2', 'special', '20170501', '1'),
('key2', 'special', '20170502', '1'),
('key2', 'special', '20170503', '1'),
)
for key, zone, day, num in self.usage:
for _ in range(int(num)):
b.get_and_inc_quota_value(key, zone, day)
@override_settings(SIMPLEKEYS_RATE_LIMIT_BACKEND='simplekeys.backends.CacheBackend')
def test_basic_report(self):
with freeze_time('2017-05-05'):
with captured_stdout() as stdout:
call_command('usagereport')
stdout.seek(0)
data = tuple(tuple(row) for row in csv.reader(stdout))[1:]
self.assertEquals(len(data), 7)
self.assertEquals(set(data), set(self.usage))

View File

@ -0,0 +1,218 @@
import datetime
from django.test import TestCase
from freezegun import freeze_time
from ..models import Tier, Zone, Key
from ..verifier import (verify, VerificationError, RateLimitError, QuotaError,
backend)
class UsageTestCase(TestCase):
def setUp(self):
self.bronze = Tier.objects.create(slug='bronze', name='Bronze')
self.gold = Tier.objects.create(slug='gold', name='Gold')
self.default_zone = Zone.objects.create(slug='default', name='Default')
self.premium_zone = Zone.objects.create(slug='premium', name='Premium')
self.secret_zone = Zone.objects.create(slug='secret', name='Secret')
# only available on memory backend
backend.reset()
self.bronze.limits.create(
zone=self.default_zone,
quota_requests=100,
quota_period='d',
requests_per_second=2,
burst_size=10,
)
self.bronze.limits.create(
zone=self.premium_zone,
quota_requests=10,
quota_period='d',
requests_per_second=1,
burst_size=2,
)
self.gold.limits.create(
zone=self.default_zone,
quota_requests=1000,
quota_period='d',
requests_per_second=5,
burst_size=10,
)
self.gold.limits.create(
zone=self.premium_zone,
quota_requests=10,
quota_period='d',
requests_per_second=5,
burst_size=10,
)
self.gold.limits.create(
zone=self.secret_zone,
quota_requests=10,
quota_period='m', # monthly limit for secret zone
requests_per_second=5,
burst_size=10,
)
Key.objects.create(
key='bronze1',
status='a',
tier=self.bronze,
email='bronze1@example.com',
)
Key.objects.create(
key='bronze2',
status='a',
tier=self.bronze,
email='bronze2@example.com',
)
Key.objects.create(
key='gold',
status='a',
tier=self.gold,
email='gold@example.com',
)
def test_verifier_bad_key(self):
self.assertRaises(VerificationError, verify, 'badkey', 'bronze')
def test_verifier_inactive_key(self):
Key.objects.create(
key='newkey',
status='u',
tier=self.gold,
email='new@example.com',
)
self.assertRaises(VerificationError, verify, 'newkey', 'bronze')
def test_verifier_suspended_key(self):
Key.objects.create(
key='badactor',
status='s',
tier=self.gold,
email='new@example.com',
)
self.assertRaises(VerificationError, verify, 'badactor', 'bronze')
def test_verifier_zone_access(self):
# gold has access, bronze doesn't
self.assert_(verify('gold', 'secret'))
self.assertRaises(VerificationError, verify, 'bronze1', 'secret')
def test_verifier_rate_limit(self):
with freeze_time() as frozen_dt:
# to start - we should have full capacity for a burst of 10
for x in range(10):
verify('bronze1', 'default')
# this next one should raise an exception
self.assertRaises(RateLimitError, verify, 'bronze1', 'default')
# let's go forward 1sec, this will let the bucket get 2 more tokens
frozen_dt.tick()
# two more, then limited
verify('bronze1', 'default')
verify('bronze1', 'default')
self.assertRaises(RateLimitError, verify, 'bronze1', 'default')
def test_verifier_rate_limit_full_refill(self):
with freeze_time() as frozen_dt:
# let's use the premium zone now - 1req/sec. & burst of 2
verify('bronze1', 'premium')
verify('bronze1', 'premium')
self.assertRaises(RateLimitError, verify, 'bronze1', 'premium')
# in 5 seconds - ensure we haven't let capacity surpass burst rate
frozen_dt.tick(delta=datetime.timedelta(seconds=5))
verify('bronze1', 'premium')
verify('bronze1', 'premium')
self.assertRaises(RateLimitError, verify, 'bronze1', 'premium')
def test_verifier_rate_limit_key_dependent(self):
# ensure that the rate limit is unique per-key
# each key is able to get both of its requests in, no waiting
verify('bronze1', 'premium')
verify('bronze1', 'premium')
verify('bronze2', 'premium')
verify('bronze2', 'premium')
self.assertRaises(RateLimitError, verify, 'bronze1', 'premium')
self.assertRaises(RateLimitError, verify, 'bronze2', 'premium')
def test_verifier_rate_limit_zone_dependent(self):
# ensure that the rate limit is unique per-zone
# key is able to get both of its requests in, no waiting
verify('bronze1', 'premium')
verify('bronze1', 'premium')
# and can hit another zone no problem
verify('bronze1', 'default')
# but premium is still exhausted
self.assertRaises(RateLimitError, verify, 'bronze1', 'premium')
def test_verifier_quota_day(self):
# let's pretend a day has passed, we can call again!
with freeze_time('2017-04-17') as frozen_dt:
# gold can hit premium only 10x/day (burst is also 10)
for x in range(10):
verify('gold', 'premium')
# after 1 second, should have another token
frozen_dt.tick()
# but still no good- we've hit our daily limit
self.assertRaises(QuotaError, verify, 'gold', 'premium')
frozen_dt.tick(delta=datetime.timedelta(days=1))
for x in range(10):
verify('gold', 'premium')
def test_verifier_quota_month(self):
# need to make sure we aren't on the last day of a month
with freeze_time('2017-04-17') as frozen_dt:
# gold can hit secret only 10x/month (burst is also 10)
for x in range(10):
verify('gold', 'secret')
# after 1 second, should have another token
frozen_dt.tick()
# but still no good- we've hit our monthly limit
self.assertRaises(QuotaError, verify, 'gold', 'secret')
# let's pretend a day has passed... still no good
frozen_dt.tick(delta=datetime.timedelta(days=1))
self.assertRaises(QuotaError, verify, 'gold', 'secret')
# but a month later? we're good!
frozen_dt.tick(delta=datetime.timedelta(days=30))
for x in range(10):
verify('gold', 'secret')
def test_verifier_quota_key_dependent(self):
with freeze_time('2017-04-17') as frozen_dt:
# 1 req/sec from bronze1 and bronze2
for x in range(10):
verify('bronze1', 'premium')
verify('bronze2', 'premium')
frozen_dt.tick()
# 11th in either should be a problem - day total is exhausted
self.assertRaises(QuotaError, verify, 'bronze1', 'premium')
self.assertRaises(QuotaError, verify, 'bronze2', 'premium')
def test_verifier_quota_zone_dependent(self):
with freeze_time('2017-04-17') as frozen_dt:
# should be able to do 10 in premium & secret without issue
for x in range(10):
verify('gold', 'premium')
verify('gold', 'secret')
frozen_dt.tick()
# 11th in either should be a problem
self.assertRaises(QuotaError, verify, 'gold', 'premium')
self.assertRaises(QuotaError, verify, 'gold', 'secret')

View File

@ -0,0 +1,176 @@
from django.test import TestCase
from django.core import mail
from ..models import Tier, Zone, Key
from ..verifier import backend
from ..views import _get_confirm_hash
class RegistrationViewTestCase(TestCase):
def setUp(self):
backend.reset()
default_zone = Zone.objects.create(slug='default', name='default')
default_tier = Tier.objects.create(slug='default', name='default')
default_tier.limits.create(
zone=default_zone,
quota_requests=10,
quota_period='d',
requests_per_second=2,
burst_size=10,
)
Tier.objects.create(slug='special', name='special')
def test_get(self):
# ensure form is present
response = self.client.get('/register/')
self.assertEquals(response.status_code, 200)
self.assertIn('form', response.context)
def test_valid_post(self):
email = 'amy@example.com'
response = self.client.post('/register/',
{'email': email,
'name': 'Amy',
'organization': 'ACME'
}
)
# ensure key is created
key = Key.objects.get(email=email)
self.assertEquals(key.email, email)
self.assertEquals(key.name, 'Amy')
self.assertEquals(key.organization, 'ACME')
self.assertEquals(key.status, 'u')
self.assertEquals(key.tier.slug, 'default')
# ensure email is sent and contains key
self.assertEquals(len(mail.outbox), 1)
self.assertEquals(len(key.key), 36) # default is UUID
self.assertIn(key.key, str(mail.outbox[0].message()))
# ensure redirect to / (OK that it is a 404)
self.assertRedirects(response, '/', target_status_code=404)
def test_invalid_post(self):
# invalid post - missing email
response = self.client.post('/register/',
{'name': 'Amy',
}
)
# response should be the page w/ errors on the form
self.assertEquals(response.status_code, 200)
self.assertEquals(len(response.context['form'].errors), 1)
# no email is sent
self.assertEqual(len(mail.outbox), 0)
self.assertEqual(Key.objects.count(), 0)
def test_custom_tier(self):
email = 'amy@example.com'
self.client.post('/register-special/', # tier overridden
{'email': email,
'name': 'Amy',
'organization': 'ACME'
}
)
# ensure key is created in right tier
key = Key.objects.get(email=email)
self.assertEquals(key.tier.slug, 'special')
def test_relative_confirmation_url(self):
self.client.post('/register/',
{'email': 'amy@example.com',
'name': 'Amy',
'organization': 'ACME'
}
)
# is built from Site URL and /confirm/
confirmation_url = 'https://example.com/confirm/?'
self.assertIn(confirmation_url, str(mail.outbox[0].message()))
def test_absolute_confirmation_url(self):
self.client.post('/register-special/',
{'email': 'amy@example.com',
'name': 'Amy',
'organization': 'ACME'
}
)
# absolute URL
confirmation_url = 'https://confirm.example.com/special-confirm/?'
self.assertIn(confirmation_url, str(mail.outbox[0].message()))
class ConfirmationViewTestCase(TestCase):
def setUp(self):
backend.reset()
default_zone = Zone.objects.create(slug='default', name='default')
default_tier = Tier.objects.create(slug='default', name='default')
default_tier.limits.create(
zone=default_zone,
quota_requests=10,
quota_period='d',
requests_per_second=2,
burst_size=10,
)
self.key = 'sample'
self.email = 'amy@example.com'
self.key_obj = Key.objects.create(status='u', key=self.key,
email=self.email,
tier=default_tier
)
self.hash = _get_confirm_hash(self.key, self.email)
def test_get(self):
response = self.client.get(
'/confirm/?key={}&email={}&confirm_hash={}'.format(
self.key, self.email, self.hash
)
)
# get a response pointing us towards a POST to the same data
self.assertEquals(response.status_code, 200)
self.assertIn('form', response.context)
self.assertIn(self.key, response.content.decode())
self.assertIn(self.email, response.content.decode())
self.assertIn(self.hash, response.content.decode())
def test_get_invalid(self):
response = self.client.get(
'/confirm/?key={}&email={}'.format(
self.key, self.email,
)
)
# hash isn't checked on GET, but still must be present
self.assertEquals(response.status_code, 400)
def test_post(self):
response = self.client.post('/confirm/',
{'key': self.key,
'email': self.email,
'confirm_hash': self.hash})
self.assertEquals(response.status_code, 200)
self.assertIn(self.key, response.content.decode())
# status is updated to active
self.assertEquals(Key.objects.get(key=self.key).status, 'a')
def test_post_invalid(self):
response = self.client.post('/confirm/',
{'key': self.key,
'email': self.email,
'confirm_hash': 'bad-hash'})
self.assertEquals(response.status_code, 400)
# status is unchanged
self.assertEquals(Key.objects.get(key=self.key).status, 'u')
def test_post_suspended_key(self):
self.key_obj.status = 's'
self.key_obj.save()
response = self.client.post('/confirm/',
{'key': self.key,
'email': self.email,
'confirm_hash': self.hash})
# don't let them update a suspended key
self.assertEquals(response.status_code, 400)
self.assertEquals(Key.objects.get(key=self.key).status, 's')

24
simplekeys/tests/urls.py Normal file
View File

@ -0,0 +1,24 @@
"""
URLs only for test purposes
"""
from django.conf.urls import url
from django.contrib import admin
from simplekeys.views import RegistrationView, ConfirmationView
from . import views
urlpatterns = [
url(r'^admin/', admin.site.urls),
url(r'^example/$', views.example),
url(r'^special/$', views.special),
url(r'^unprotected/$', views.unprotected),
url(r'^via_middleware/$', views.via_middleware),
url(r'^register/$', RegistrationView.as_view()),
url(r'^register-special/$', RegistrationView.as_view(
tier='special',
confirmation_url='https://confirm.example.com/special-confirm/',
)),
url(r'^confirm/$', ConfirmationView.as_view()),
]

23
simplekeys/tests/views.py Normal file
View File

@ -0,0 +1,23 @@
"""
views only for test purposes
"""
from django.http import JsonResponse
from simplekeys.decorators import key_required
@key_required()
def example(request):
return JsonResponse({'response': 'OK'})
@key_required(zone='special')
def special(request):
return JsonResponse({'response': 'special'})
def via_middleware(request):
return JsonResponse({'response': 'OK'})
def unprotected(request):
return JsonResponse({'response': 'OK'})

102
simplekeys/verifier.py Normal file
View File

@ -0,0 +1,102 @@
from __future__ import division
import time
import datetime
from django.conf import settings
from django.utils.module_loading import import_string
from django.http import JsonResponse
from .models import Key, Limit
class VerificationError(Exception):
pass
class RateLimitError(Exception):
pass
class QuotaError(Exception):
pass
# load backend from setting
backend = getattr(settings, 'SIMPLEKEYS_RATE_LIMIT_BACKEND',
'simplekeys.backends.CacheBackend')
backend = import_string(backend)()
def verify(key, zone):
# ensure we have a verified key w/ access to the zone
try:
# could also do this w/ new subquery expressions in 1.11
kobj = Key.objects.get(key=key, status='a')
limit = Limit.objects.get(tier=kobj.tier, zone__slug=zone)
except Key.DoesNotExist:
raise VerificationError('no valid key')
except Limit.DoesNotExist:
raise VerificationError('key does not have access to zone {}'.format(
zone
))
# enforce rate limiting - will raise RateLimitError if exhausted
# replenish first
tokens, last_time = backend.get_tokens_and_timestamp(key, zone)
if last_time is None:
# if this is the first time, fill the bucket
tokens = limit.burst_size
else:
# increment bucket, careful not to overfill
tokens = min(
tokens + (limit.requests_per_second * (time.time() - last_time)),
limit.burst_size
)
# now try to decrement count
if tokens >= 1:
tokens -= 1
backend.set_token_count(key, zone, tokens)
else:
raise RateLimitError('exhausted tokens: {} req/sec, burst {}'.format(
limit.requests_per_second, limit.burst_size
))
# enforce daily/monthly quotas
if limit.quota_period == 'd':
quota_range = datetime.datetime.utcnow().strftime('%Y%m%d')
elif limit.quota_period == 'm':
quota_range = datetime.datetime.utcnow().strftime('%Y%m')
if (backend.get_and_inc_quota_value(key, zone, quota_range) >
limit.quota_requests):
raise QuotaError('quota exceeded: {}/{}'.format(
limit.quota_requests, limit.get_quota_period_display()
))
return True
def verify_request(request, zone):
key = request.META.get(getattr(settings, 'SIMPLEKEYS_HEADER',
'HTTP_X_API_KEY'))
note = getattr(settings, 'SIMPLEKEYS_ERROR_NOTE', None)
if not key:
key = request.GET.get(getattr(settings, 'SIMPLEKEYS_QUERY_PARAM',
'apikey'))
try:
verify(key, zone)
except VerificationError as e:
return JsonResponse({'error': str(e), 'note': note},
status=403)
except RateLimitError as e:
return JsonResponse({'error': str(e), 'note': note},
status=429)
except QuotaError as e:
return JsonResponse({'error': str(e), 'note': note},
status=429)
# pass through
return None

122
simplekeys/views.py Normal file
View File

@ -0,0 +1,122 @@
import hashlib
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.mail import send_mail
from django.shortcuts import render, redirect
from django.template import loader
from django.views.generic import View
from django.http import HttpResponseBadRequest
from .forms import KeyRegistrationForm, KeyConfirmationForm
from .models import Tier, Key
def _get_confirm_hash(key, email):
value = '{}{}{}'.format(key, email, settings.SECRET_KEY)
return hashlib.sha256(value.encode()).hexdigest()
class RegistrationView(View):
"""
present user with a form to fill out to get a key
upon submission, send an email sending user to confirmation page
"""
template_name = "simplekeys/register.html"
email_subject = 'API Key Registration'
email_message_template = 'simplekeys/confirmation_email.txt'
from_email = settings.DEFAULT_FROM_EMAIL
tier = 'default'
redirect = '/'
confirmation_url = '/confirm/'
def get(self, request):
return render(request, self.template_name,
{'form': KeyRegistrationForm()})
def post(self, request):
form = KeyRegistrationForm(request.POST)
if not form.is_valid():
return render(request, self.template_name,
{'form': form})
# go ahead w/ creation
key = form.instance
key.tier = Tier.objects.get(slug=self.tier)
# TODO: option to override this and avoid sending email?
key.status = 'u'
key.save()
# send email & redirect user
confirm_hash = _get_confirm_hash(key.key, key.email)
# if URL is relative, make absolute
if not self.confirmation_url.startswith(('http:', 'https:')):
confirmation_url = '{protocol}://{site}{confirmation_url}'.format(
protocol='https' if request.is_secure else 'http',
site=Site.objects.get_current().domain,
confirmation_url=self.confirmation_url
)
else:
confirmation_url = self.confirmation_url
confirmation_url = (
'{base}?key={key}&email={email}&confirm_hash={confirm_hash}'
).format(
base=confirmation_url,
key=key.key,
email=key.email,
confirm_hash=confirm_hash
)
message = loader.render_to_string(
self.email_message_template,
{'key': key, 'confirmation_url': confirmation_url}
)
send_mail(self.email_subject,
message,
self.from_email,
[key.email])
return redirect(self.redirect)
class ConfirmationView(View):
"""
present user with a simple form that just needs to be submitted
to activate the key (don't do activation on GET to prevent
email clients from clicking link)
"""
confirmation_template_name = "simplekeys/confirmation.html"
confirmed_template_name = "simplekeys/confirmed.html"
def get(self, request):
form = KeyConfirmationForm(request.GET)
if form.is_valid():
return render(request, self.confirmation_template_name,
{'form': form})
else:
return HttpResponseBadRequest('invalid request')
def post(self, request):
form = KeyConfirmationForm(request.POST)
if form.is_valid():
hash = _get_confirm_hash(form.cleaned_data['key'],
form.cleaned_data['email'])
if hash != form.cleaned_data['confirm_hash']:
return HttpResponseBadRequest('invalid request - bad hash')
# update the key
try:
key = Key.objects.get(key=form.cleaned_data['key'],
status__in=('u', 'a'))
except Key.DoesNotExist:
return HttpResponseBadRequest('invalid request - no key')
key.status = 'a'
key.save()
return render(request, self.confirmed_template_name, {'key': key})
else:
return HttpResponseBadRequest('invalid request - invalid form')