This document explains the security features implemented in the Bug Bounty Platform, the threats they mitigate, and best practices for secure development.
- Security Overview
- Authentication
- Authorization
- Password Security
- Token Security
- Input Validation
- Rate Limiting
- Common Vulnerabilities Prevented
- Security Best Practices
Security is implemented in multiple layers:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Layer 1: Input Validation (Pydantic) β
β - Type checking β
β - String length limits β
β - Email/URL format validation β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β Layer 2: Authentication (JWT) β
β - Token-based auth β
β - Short-lived access tokens (15 min) β
β - Token rotation β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β Layer 3: Authorization (RBAC) β
β - Role-based access control β
β - Resource ownership checks β
β - Admin-only endpoints β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β Layer 4: Rate Limiting β
β - 100 req/min default β
β - 20 req/min for auth endpoints β
β - Per-IP tracking β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β Layer 5: Secure Storage β
β - Argon2id password hashing β
β - Hashed refresh tokens β
β - No plaintext secrets β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Defense in Depth: If one layer fails, others still protect.
Why JWT?
- Stateless (no session storage on server)
- Scalable (works across multiple servers)
- Contains user info (no database lookup on every request)
Token Structure:
{
"sub": "018d3f54-8c3a-7000-a234-56789abcdef0", // user_id
"role": "user",
"token_version": 1,
"exp": 1704123456, // expires in 15 minutes
"iat": 1704122556 // issued at
}Header:
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
Access Token:
- Short-lived (15 minutes)
- Sent with every API request
- Cannot be revoked (stateless)
- If stolen, attacker only has 15 minutes
Refresh Token:
- Long-lived (7 days)
- Used only to get new access token
- Stored in database (can be revoked)
- Single-use (rotated on refresh)
Why two tokens?
- Compromise: Security vs UX
- Access token: Fast (no DB lookup), but can't revoke
- Refresh token: Slow (DB lookup), but can revoke
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 1. User submits email + password β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β 2. Verify password (Argon2id) β
β - Timing-safe comparison β
β - No early returns β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β 3. Generate access token (JWT) β
β - Signed with SECRET_KEY β
β - Expires in 15 minutes β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β 4. Generate refresh token β
β - Random 32-byte string β
β - SHA-256 hash stored in DB β
β - Original token sent to client β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β 5. Return both tokens β
β { access_token, refresh_token } β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Code:
from pwdlib import PasswordHash
import jwt
from datetime import datetime, timedelta
import secrets
async def login(email: str, password: str) -> tuple[str, str]:
user = await user_repo.find_by_email(email)
if not user:
raise InvalidCredentialsError()
if not verify_password(password, user.hashed_password):
raise InvalidCredentialsError()
access_token = create_access_token(user)
refresh_token = create_refresh_token()
await refresh_token_repo.create(
user_id=user.id,
token_hash=hash_token(refresh_token),
expires_at=datetime.utcnow() + timedelta(days=7),
)
return access_token, refresh_tokenββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 1. Access token expires (after 15 min) β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β 2. Client sends refresh token β
β POST /api/v1/auth/refresh β
β { refresh_token } β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β 3. Verify refresh token β
β - Hash token and lookup in DB β
β - Check not expired β
β - Check not revoked β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β 4. Token rotation β
β - DELETE old refresh token β
β - CREATE new refresh token β
β - Store new hash in DB β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β 5. Generate new access token β
β - Increment token_version if needed β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββ
β 6. Return new tokens β
β { access_token, refresh_token } β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Why rotate refresh tokens?
- Each refresh token is single-use
- If stolen, attacker can only use it once
- Original token becomes invalid
- User notices they're logged out
Scenario: Attacker steals refresh token
Time 0: Legitimate user has token A
Time 1: Attacker steals token A
Time 2: User refreshes β gets token B (token A deleted)
Time 3: Attacker tries to use token A β DETECTED!
Detection mechanism:
async def refresh_token(token: str) -> tuple[str, str]:
token_hash = hash_token(token)
stored_token = await refresh_token_repo.find_by_hash(token_hash)
if not stored_token:
family_tokens = await refresh_token_repo.find_by_family_id(
extract_family_id(token)
)
if family_tokens:
await refresh_token_repo.delete_all(family_tokens)
raise TokenReuseDetectedError()
raise InvalidTokenError()
# ...If a deleted token is used, we:
- Find all tokens in the same family
- Delete them all (logout all devices)
- Notify user of suspicious activity
Problem: How to invalidate all tokens instantly?
Solution: Token version field
class User(Base):
token_version: Mapped[int] = mapped_column(default=0)
def create_access_token(user: User) -> str:
payload = {
"sub": str(user.id),
"token_version": user.token_version, # β Include version
"exp": datetime.utcnow() + timedelta(minutes=15),
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
async def verify_access_token(token: str) -> User:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
user = await user_repo.get_by_id(payload["sub"])
if user.token_version != payload["token_version"]: # β Check version
raise InvalidTokenError("Token invalidated")
return userWhen to increment token_version:
- User changes password
- User reports token theft
- Admin resets user's sessions
def change_password(user: User, new_password: str):
user.hashed_password = hash_password(new_password)
user.token_version += 1 # β Invalidate all tokens
await session.commit()Roles:
USER- Regular researcher (can submit reports)COMPANY- Program owner (can create programs, triage reports)ADMIN- Platform admin (can access admin panel)
Implementation:
from enum import StrEnum
class UserRole(StrEnum):
USER = "user"
COMPANY = "company"
ADMIN = "admin"
UNKNOWN = "unknown" # For guest users
class RequireRole:
def __init__(self, *allowed_roles: UserRole):
self.allowed_roles = allowed_roles
async def __call__(self, user: CurrentUser) -> User:
if user.role not in self.allowed_roles:
raise ForbiddenError()
return user
# Usage
AdminOnly = Annotated[User, Depends(RequireRole(UserRole.ADMIN))]
@router.get("/admin/stats")
async def get_stats(user: AdminOnly) -> StatsResponse:
# Only admins can access
...Example: Only program owner can update program
@router.patch("/programs/{slug}")
async def update_program(
slug: str,
data: ProgramUpdate,
user: CurrentUser,
session: DatabaseSession,
) -> ProgramResponse:
program = await program_repo.get_by_slug(slug)
if not program:
raise NotFoundError()
if program.company_id != user.id and user.role != UserRole.ADMIN:
raise ForbiddenError("You don't own this program")
await program_repo.update(program, data)
await session.commit()
return ProgramResponse.from_orm(program)Authorization matrix:
| Action | USER | COMPANY | ADMIN |
|---|---|---|---|
| Submit report | β (to any program) | β | β |
| Create program | β | β | β |
| Triage report | β | β (own programs) | β |
| Delete program | β | β (own programs) | β |
| View admin panel | β | β | β |
Why Argon2id?
- Winner of Password Hashing Competition (2015)
- Memory-hard (resists GPU/ASIC attacks)
- Configurable (can increase cost over time)
- Side-channel resistant
Comparison:
| Algorithm | Security | Speed | Memory | Adoption |
|---|---|---|---|---|
| MD5 | βββ | Fast | Low | Deprecated |
| SHA-256 | β | Fast | Low | Don't use for passwords |
| bcrypt | ββ | Slow | Medium | Good |
| Argon2id | βββ | Slower | High | Best |
Implementation:
from pwdlib import PasswordHash
from pwdlib.hashers.argon2 import Argon2Hasher
password_hash = PasswordHash((Argon2Hasher(),))
def hash_password(password: str) -> str:
return password_hash.hash(password)
# β $argon2id$v=19$m=65536,t=3,p=4$...
def verify_password(password: str, hash: str) -> bool:
try:
return password_hash.verify(password, hash)
except Exception:
return False # Invalid hash or passwordHash format:
$argon2id$v=19$m=65536,t=3,p=4$salt$hash
β β β β β β
β β β β β ββ Parallelism (4 threads)
β β β β ββ Time cost (3 iterations)
β β β ββ Memory cost (64 MiB)
β β ββ Version 19
β ββ Variant (argon2id)
ββ Algorithm
Vulnerable code:
def verify_password_BAD(password: str, hash: str) -> bool:
if hash_password(password) == hash:
return True
else:
return False # Early return reveals info!Timing attack:
- Attacker measures response time
- Correct password takes longer (database lookup, session creation)
- Incorrect password returns immediately
- Attacker can brute-force faster
Secure code:
def verify_password_GOOD(password: str, hash: str) -> bool:
result = password_hash.verify(password, hash)
# Always takes same time (Argon2id is constant-time)
return resultEnforced via Pydantic:
from pydantic import BaseModel, Field, field_validator
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(..., min_length=8, max_length=128)
@field_validator("password")
@classmethod
def validate_password_strength(cls, v: str) -> str:
if not any(c.isupper() for c in v):
raise ValueError("Password must contain uppercase letter")
if not any(c.islower() for c in v):
raise ValueError("Password must contain lowercase letter")
if not any(c.isdigit() for c in v):
raise ValueError("Password must contain digit")
return vRequirements:
- Minimum 8 characters
- Maximum 128 characters (prevent DoS via long passwords)
- At least one uppercase letter
- At least one lowercase letter
- At least one digit
Never store plaintext tokens!
# BAD - plaintext token in database
refresh_token = secrets.token_urlsafe(32)
await db.execute(
"INSERT INTO refresh_tokens (token) VALUES (?)",
(refresh_token,)
)
# GOOD - hash token before storing
refresh_token = secrets.token_urlsafe(32)
token_hash = hashlib.sha256(refresh_token.encode()).hexdigest()
await db.execute(
"INSERT INTO refresh_tokens (token_hash) VALUES (?)",
(token_hash,)
)Why hash?
- If database is compromised, attacker can't use tokens
- Must have original token to authenticate
- Similar to password hashing
Generate strong secret:
python -c "import secrets; print(secrets.token_urlsafe(32))"
# β x8J9kL2mN4pQ5rS7tU8vW1xY3zA5bC7dE9fG1hI3jK5mRequirements:
- Minimum 32 bytes (256 bits)
- Random (use
secretsmodule, notrandom) - Never commit to Git
- Rotate periodically
Environment variable:
# .env
SECRET_KEY=x8J9kL2mN4pQ5rS7tU8vW1xY3zA5bC7dE9fG1hI3jK5mIf key is compromised:
- Attacker can forge JWT tokens
- Attacker can impersonate any user
- Must rotate key and invalidate all tokens
Access token: 15 minutes
- Balances security (short window if stolen) and UX (not too frequent refresh)
- Can be shorter (5 min) for high-security apps
- Can be longer (1 hour) for low-risk apps
Refresh token: 7 days
- Prevents indefinite access
- User must login again after 7 days
- Can be shorter (1 day) for high-security apps
- Can be longer (30 days) with "remember me" option
All input validated before reaching business logic:
from pydantic import BaseModel, EmailStr, Field, HttpUrl
class ProgramCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
slug: str = Field(..., min_length=1, max_length=255, pattern=r'^[a-z0-9-]+$')
description: str | None = Field(None, max_length=10000)
website: HttpUrl | None = None
# Usage
@router.post("/programs")
async def create_program(data: ProgramCreate):
# At this point, data is validated:
# - name is 1-255 chars
# - slug matches pattern (lowercase, digits, hyphens)
# - description is max 10,000 chars
# - website is valid URL (if provided)
...Benefits:
- Prevents SQL injection (parameterized queries)
- Prevents XSS (HTML is escaped on frontend)
- Prevents buffer overflow (string length limits)
- Clear error messages
Why limit lengths?
- Prevent DoS attacks (1GB string in database)
- Database column constraints
- UX (no 10,000-character names)
Constants:
# config.py
EMAIL_MAX_LENGTH = 255
PASSWORD_HASH_MAX_LENGTH = 255
FULL_NAME_MAX_LENGTH = 255
PROGRAM_NAME_MAX_LENGTH = 255
PROGRAM_SLUG_MAX_LENGTH = 255
REPORT_TITLE_MAX_LENGTH = 255
CWE_ID_MAX_LENGTH = 20Applied to models:
class User(Base):
email: Mapped[str] = mapped_column(String(EMAIL_MAX_LENGTH))
full_name: Mapped[str | None] = mapped_column(String(FULL_NAME_MAX_LENGTH))Attacks prevented:
- Brute-force password guessing
- API abuse (scraping, spam)
- DoS attacks
Implementation:
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(
key_func=get_remote_address, # Rate limit by IP
default_limits=["100 per minute"], # Global default
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Different limits for different endpoints
@router.post("/auth/login")
@limiter.limit("20/minute") # Stricter for auth
async def login(...):
...
@router.get("/programs")
@limiter.limit("100/minute") # Relaxed for public endpoints
async def list_programs(...):
...Configuration:
# .env
RATE_LIMIT_DEFAULT=100/minute
RATE_LIMIT_AUTH=20/minuteResponse when rate limited:
HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 20
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1704123456
Retry-After: 60
{
"error": "Rate limit exceeded. Try again in 60 seconds."
}More sophisticated: Limit by authenticated user
def get_rate_limit_key(request: Request) -> str:
if request.state.user:
return f"user:{request.state.user.id}"
return f"ip:{request.client.host}"
limiter = Limiter(key_func=get_rate_limit_key)Benefits:
- Authenticated users get separate quota
- Can't share rate limit across IPs
- Can offer premium tiers (higher limits)
Vulnerable code:
# BAD - concatenating user input
email = request.args.get("email")
query = f"SELECT * FROM users WHERE email = '{email}'"
result = await db.execute(query)Attack:
email=admin@example.com' OR '1'='1
β SELECT * FROM users WHERE email = 'admin@example.com' OR '1'='1'
β Returns all users!
Secure code:
# GOOD - parameterized query
stmt = select(User).where(User.email == email)
result = await session.execute(stmt)SQLAlchemy automatically escapes parameters.
Vulnerable code:
<!-- BAD - raw HTML injection -->
<div>{user.bio}</div>Attack:
bio=<script>alert('XSS')</script>
β <div><script>alert('XSS')</script></div>
β Script executes!
Secure code:
// React automatically escapes
<div>{user.bio}</div>
// β <div><script>alert('XSS')</script></div>For markdown:
import markdown
from bleach import clean
def render_markdown(text: str) -> str:
html = markdown.markdown(text)
return clean(
html,
tags=['p', 'br', 'strong', 'em', 'code', 'pre'],
strip=True,
)Not vulnerable because we use JWT tokens:
- CSRF requires cookies (sent automatically)
- JWT tokens are in
Authorizationheader (must be sent explicitly) - Attacker can't read
Authorizationheader from another domain
If using cookies, would need CSRF tokens:
from fastapi_csrf import CsrfProtect
@app.post("/login")
async def login(csrf_protect: CsrfProtect = Depends()):
await csrf_protect.validate_csrf(request)
...Vulnerable code:
# BAD - no authorization check
@router.get("/reports/{report_id}")
async def get_report(report_id: UUID):
report = await report_repo.get_by_id(report_id)
return report # Anyone can access any report!Secure code:
# GOOD - check ownership
@router.get("/reports/{report_id}")
async def get_report(report_id: UUID, user: CurrentUser):
report = await report_repo.get_by_id(report_id)
if not report:
raise NotFoundError()
if (report.researcher_id != user.id and
report.program.company_id != user.id and
user.role != UserRole.ADMIN):
raise ForbiddenError()
return reportGive users only the permissions they need:
# BAD - everyone is admin
user.role = UserRole.ADMIN
# GOOD - specific roles
user.role = UserRole.USER # Can only submit reportsMultiple layers of security:
- Input validation (Pydantic)
- Authentication (JWT)
- Authorization (RBAC)
- Rate limiting
- Encryption in transit (HTTPS)
- Encryption at rest (encrypted database)
Default to secure, opt-in to insecure:
# BAD - default allows everything
CORS_ORIGINS = ["*"]
# GOOD - default is restrictive
CORS_ORIGINS = ["https://example.com"]Add security headers via Nginx:
add_header X-Frame-Options "SAMEORIGIN";
add_header X-Content-Type-Options "nosniff";
add_header X-XSS-Protection "1; mode=block";
add_header Referrer-Policy "strict-origin-when-cross-origin";
add_header Content-Security-Policy "default-src 'self'";Don't leak information in errors:
# BAD - reveals whether email exists
if not user:
raise HTTPException(status_code=404, detail="User not found")
if not verify_password(password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid password")
# GOOD - generic error
if not user or not verify_password(password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")Log security events:
import structlog
logger = structlog.get_logger()
@router.post("/auth/login")
async def login(email: str, password: str):
user = await user_repo.find_by_email(email)
if not user:
logger.warning("login_failed", email=email, reason="user_not_found")
raise InvalidCredentialsError()
if not verify_password(password, user.hashed_password):
logger.warning("login_failed", email=email, reason="invalid_password")
raise InvalidCredentialsError()
logger.info("login_success", user_id=user.id, email=email)
# ...Monitor for:
- Failed login attempts
- Rate limit violations
- Token reuse attempts
- Unusual activity patterns
Regularly update dependencies:
# Check for vulnerabilities
pip-audit
# Update dependencies
pip install --upgrade -r requirements.txtUse Dependabot or Renovate:
- Automatic PR for dependency updates
- Includes security patches
Never commit secrets to Git:
# .gitignore
.env
*.pem
*.key
secrets/Use environment variables:
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
SECRET_KEY: str # Read from .env or environment
DATABASE_URL: strFor production, use secret managers:
- AWS Secrets Manager
- HashiCorp Vault
- Azure Key Vault
Security is not an afterthought, it's designed into every layer:
- Authentication - JWT with token rotation
- Authorization - RBAC with resource ownership
- Password Security - Argon2id hashing
- Token Security - Hashed storage, token versioning
- Input Validation - Pydantic schemas
- Rate Limiting - Per-IP and per-user
- Vulnerability Prevention - SQL injection, XSS, CSRF, IDOR
For more information:
- System architecture: ARCHITECTURE.md
- Design patterns: PATTERNS.md
- Database design: DATABASE.md
- Hands-on tutorial: GETTING-STARTED.md
Remember: Security is a journey, not a destination. Stay updated on best practices and common vulnerabilities.