Authentication and Authorization¶
Nori provides a complete, ready-to-use authentication system that includes session middlewares, route decorators for permissions, JSON Web Token (JWT) based authentication for APIs, and secure password handling using PBKDF2-SHA256 hashing.
Password Hashing (Security)¶
To create users or validate manual logins, you must always use the Security class provided by the framework in core.auth.security.
from core.auth.security import Security
# Hashing a plain text password
hashed_password = Security.hash_password('my_secret_password')
# 'pbkdf2_sha256$100000$salt$hash'
# Comparison / Verification
is_valid = Security.verify_password('my_secret_password', hashed_password) # True
Security also offers quick stochastic hexadecimal token generators:
token = Security.generate_token() # 64 hex characters
csrf = Security.generate_csrf_token() # 64 hex characters
Brute-Force Protection (Login Guard)¶
Nori includes per-account brute-force protection that locks accounts after repeated failed login attempts, with escalating lockout durations.
from core.auth import check_login_allowed, record_failed_login, clear_failed_logins
async def login(self, request: Request):
form = await request.form()
email = form['email']
# 1. Check if the account is locked
allowed, retry_after = await check_login_allowed(email)
if not allowed:
return JSONResponse(
{'error': f'Too many attempts. Try again in {retry_after}s.'},
status_code=429,
)
# 2. Validate credentials
user = await User.get_or_none(email=email)
if not user or not Security.verify_password(form['password'], user.password_hash):
await record_failed_login(email)
return JSONResponse({'error': 'Invalid credentials'}, status_code=401)
# 3. Success — clear attempts and start session
await clear_failed_logins(email)
request.session['user_id'] = str(user.id)
return RedirectResponse(url='/dashboard', status_code=302)
How it works:
- After 5 consecutive failures, the account is locked for 1 minute.
- Each subsequent lockout escalates: 1m → 5m → 15m → 30m → 1h.
- A successful login resets the counter entirely.
- Attempts made during a lockout are ignored (don't extend the lockout).
- Uses the cache backend (Memory or Redis), so it works with the same CACHE_BACKEND setting.
- Lockouts are logged via nori.auth logger for monitoring.
API: core.auth.login_guard
| Function | Purpose |
|----------|---------|
| check_login_allowed(identifier) | Returns (allowed: bool, retry_after: int) |
| record_failed_login(identifier) | Increments failures; triggers lockout at threshold |
| clear_failed_logins(identifier) | Resets all tracking (call on successful login) |
The identifier can be anything — email, username, phone number. It's the developer's choice.
Session Middlewares¶
Nori manages the logged-in user state in an encrypted session cookie using the SECRET_KEY from .env.
To log in after successfully validating the password, simply inject variables into the request.session dict:
async def login(self, request: Request):
# [Validation logic omitted]
# Start Session (Effective "Login")
request.session['user_id'] = str(user.id)
request.session['role'] = 'admin' if user.level > 0 else 'user'
return RedirectResponse(url='/dashboard', status_code=302)
async def logout(self, request: Request):
# Close Session
request.session.clear()
return RedirectResponse(url='/', status_code=302)
View Restriction (Decorators)¶
You can restrict entire controllers by applying decorators above the async def definition:
@login_required@require_role('my_role')@require_any_role('sales', 'management')
(Note: The role string 'admin' has a general bypass by default).
from core.auth.decorators import login_required, require_role, require_any_role
class DashboardController:
@login_required # Forces the session to contain an active 'user_id' dict key.
async def account(self, request: Request):
return ...
@require_role('editor')
async def change_password(self, request: Request):
return ...
Decorator Behavior:
Depending on what the client tried to access, they act smartly (Content Negotiation via Accept Header):
* If the client is a browser (Content-Type HTML), they transparently redirect to /login (302) or /forbidden (403).
* If the client is pure Fetch/AJAX HTTP (application/json), they throw standard JSON dictionaries {"error": "Unauthorized"} with HTTP codes 401 and 403.
APIs and JSON Web Tokens (JWT)¶
For creating Stateless RESTful APIs, the traditional session system (cookies) is not suitable. Nori has native HMAC-SHA256 signatures in core.auth.jwt.
Create a JWT (Login)¶
from core.auth.jwt import create_token
from starlette.responses import JSONResponse
async def api_login(self, request: Request):
# [Validate pwd...]
# General payload and JSON dispatch
jwt_str = create_token({'user_id': user.id}, expires_in=3600)
return JSONResponse({'token': jwt_str})
JWT_SECRET variable and automatically expire under Nori's Unix Epoch counters.
Protect API Routes (Decorator)¶
from core.auth.decorators import token_required
@token_required
async def api_profile(self, request: Request):
# If reached here, the token exists, is valid, and hasn't expired.
# The injected payload is recovered directly from the decorator:
user_id = request.state.token_payload['user_id']
return JSONResponse({'status': 'ok', 'id': user_id})
The decorator reads the Authorization: Bearer <token> header. If missing, malformed, or invalid, it returns 401 Unauthorized as JSON. The token is trimmed and limited to 4096 characters to prevent abuse.
Granular Permissions (ACL)¶
For fine-grained access control beyond simple roles, use the permission system. Permissions use dot-notation (e.g. articles.edit, users.delete) and are loaded from the database at login time.
Setup at Login¶
After authenticating, set the user's role_ids in the session and call load_permissions():
from core.auth.decorators import load_permissions
async def login(self, request: Request):
# [Validate credentials...]
request.session['user_id'] = str(user.id)
request.session['role'] = user.role
request.session['role_ids'] = [user.role_id] # Required for load_permissions
await load_permissions(request.session, user.id)
return RedirectResponse(url='/dashboard', status_code=302)
Important: load_permissions() reads role_ids from the session to query the Role→Permission M2M. If role_ids is missing or empty, a warning is logged and the user will have no permissions.
Protecting Routes¶
from core.auth.decorators import require_permission
class ArticleController:
@require_permission('articles.edit')
async def edit(self, request: Request):
...
The admin role bypasses all permission checks.
OAuth2 Social Login¶
Nori provides OAuth2 drivers for Google and GitHub in services/. Each driver exposes three functions — no abstraction layer, no registry. The developer calls them explicitly and handles user creation.
Flow Overview¶
1. User clicks "Login with Google"
2. Controller calls get_auth_url() → redirect to provider
3. Provider authenticates user → redirects to your callback URL
4. Controller calls handle_callback(code, state) → gets user profile
5. Developer creates/links user, populates session, redirects
Configuration (.env)¶
# Google — https://console.cloud.google.com/apis/credentials
GOOGLE_CLIENT_ID=your-client-id.apps.googleusercontent.com
GOOGLE_CLIENT_SECRET=your-client-secret
# GitHub — https://github.com/settings/developers
GITHUB_CLIENT_ID=your-client-id
GITHUB_CLIENT_SECRET=your-client-secret
Google Example¶
from starlette.requests import Request
from starlette.responses import RedirectResponse
from services.oauth_google import get_auth_url, handle_callback
from core.auth.decorators import load_permissions
class SocialAuthController:
async def google_login(self, request: Request):
url = get_auth_url(
request.session,
redirect_uri=str(request.url_for('auth.google.callback')),
)
return RedirectResponse(url)
async def google_callback(self, request: Request):
code = request.query_params.get('code', '')
state = request.query_params.get('state', '')
if not code:
return RedirectResponse('/login?error=oauth_denied')
try:
profile = await handle_callback(
request.session,
code=code,
redirect_uri=str(request.url_for('auth.google.callback')),
state=state,
)
except (ValueError, Exception):
return RedirectResponse('/login?error=oauth_failed')
# Developer handles user creation/linking
user = await User.get_or_none(email=profile['email'])
if not user:
user = await User.create(
email=profile['email'],
name=profile['name'],
password_hash='', # No password for OAuth users
)
request.session['user_id'] = str(user.id)
request.session['role'] = user.role
request.session['role_ids'] = [user.role_id]
await load_permissions(request.session, user.id)
return RedirectResponse('/', status_code=302)
GitHub Example¶
from services.oauth_github import get_auth_url, handle_callback
class SocialAuthController:
async def github_login(self, request: Request):
url = get_auth_url(
request.session,
redirect_uri=str(request.url_for('auth.github.callback')),
)
return RedirectResponse(url)
async def github_callback(self, request: Request):
# Same pattern as Google — handle_callback returns:
# {id, email, name, avatar_url, login, raw}
...
Routes¶
routes = [
Route('/auth/google', endpoint=social.google_login, methods=['GET'], name='auth.google.login'),
Route('/auth/google/callback', endpoint=social.google_callback, methods=['GET'], name='auth.google.callback'),
Route('/auth/github', endpoint=social.github_login, methods=['GET'], name='auth.github.login'),
Route('/auth/github/callback', endpoint=social.github_callback, methods=['GET'], name='auth.github.callback'),
]
Security¶
- State parameter (CSRF):
get_auth_url()generates a cryptographic state token stored in the session.handle_callback()validates and consumes it (single-use). Invalid state raisesValueError. - PKCE (Google only): Google uses Proof Key for Code Exchange (S256) to prevent authorization code interception. The code verifier is stored in the session and sent during token exchange.
- Private emails (GitHub): GitHub may return
nullforemailwhen the user has email privacy enabled. The driver automatically fetches/user/emailsand resolves the primary verified email.
Driver Interface¶
Both providers follow the same 3-function interface:
| Function | Args | Returns |
|---|---|---|
get_auth_url(session, redirect_uri, scopes?) |
Session dict, callback URL | Authorization URL string |
handle_callback(session, code, redirect_uri, state) |
Session dict, auth code, callback URL, state | Normalized profile dict |
get_user_profile(access_token) |
OAuth access token | Normalized profile dict |
Google profile: {id, email, name, picture, email_verified, raw}
GitHub profile: {id, email, name, avatar_url, login, raw}
Adding More Providers¶
Copy any existing driver as a template. The pattern is always:
get_auth_url()— builds the provider's authorization URL withgenerate_state()fromcore.auth.oauthhandle_callback()— validates state, exchanges code for token, fetches profileget_user_profile()— fetches user info with an access token
The core helpers (generate_state, validate_state, generate_pkce_verifier, get_pkce_verifier) are available from core.auth.oauth for any new provider.