Files

480 lines
23 KiB
Python
Raw Permalink Normal View History

2026-07-01 14:41:49 +07:00
# -*- coding: utf-8 -*-
import logging
import re
import time
from collections import defaultdict
import werkzeug
from odoo import http, _, SUPERUSER_ID, tools, fields
from odoo.exceptions import AccessDenied, UserError, ValidationError
from odoo.http import request, route, SessionExpiredException
from odoo.addons.auth_signup.models.res_users import SignupError
from odoo.addons.auth_signup.controllers.main import AuthSignupHome, ensure_db
_logger = logging.getLogger(__name__)
# ==============================================================================
# SECURITY CONFIGURATION
# ==============================================================================
SECURITY_CONFIG = {
'max_login_attempts': 5,
'lockout_duration_minutes': 15,
'max_pin_attempts': 3,
'pin_lockout_duration_minutes': 30,
'password_min_length': 8,
}
_rate_limit_cache = defaultdict(list)
def _check_rate_limit(key, max_attempts, lockout_minutes):
"""Check if request is rate-limited."""
now = time.time()
window_start = now - (lockout_minutes * 60)
_rate_limit_cache[key] = [t for t in _rate_limit_cache[key] if t > window_start]
if len(_rate_limit_cache[key]) >= max_attempts:
remaining = lockout_minutes * 60 - (now - _rate_limit_cache[key][0])
return True, max(0, int(remaining / 60))
_rate_limit_cache[key].append(now)
return False, 0
def _sanitize_input(value):
"""Basic input sanitization."""
if not value:
return value
if isinstance(value, str):
value = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', value)
return value[:2000] if len(value) > 2000 else value
return value
def _validate_password_strength(password):
"""Validate password meets security requirements."""
errors = []
if len(password) < SECURITY_CONFIG['password_min_length']:
errors.append(_('ពាក្យសម្ងាត់ត្រូវមានយ៉ាងតិច %d តួអក្សរ') % SECURITY_CONFIG['password_min_length'])
if not re.search(r'[A-Z]', password):
errors.append(_('ពាក្យសម្ងាត់ត្រូវមានអក្សរធំយ៉ាងតិច ១'))
if not re.search(r'[a-z]', password):
errors.append(_('ពាក្យសម្ងាត់ត្រូវមានអក្សរតូចយ៉ាងតិច ១'))
if not re.search(r'\d', password):
errors.append(_('ពាក្យសម្ងាត់ត្រូវមានលេខយ៉ាងតិច ១'))
return errors
# ==============================================================================
# CONTROLLER CLASS - ODOO 19 COMPATIBLE
# ==============================================================================
class SignupVerifyEmail(AuthSignupHome):
"""High-security signup/login controller for Odoo 19"""
@route('/web/login', type='http', auth='public', website=True)
def web_login(self, *args, **kw):
ensure_db()
client_ip = request.httprequest.remote_addr
login_attempt = kw.get('login', '').lower().strip()
rate_key = f"login:{client_ip}:{login_attempt}"
is_locked, remaining_min = _check_rate_limit(
rate_key,
SECURITY_CONFIG['max_login_attempts'],
SECURITY_CONFIG['lockout_duration_minutes']
)
if is_locked:
kw['error'] = _("សូមរង់ចាំ %d នាទីមុនពេលសាកល្បងម្តងទៀត (ការពារសុវត្ថិភាព)") % remaining_min
return request.render('web.login', self._prepare_login_context(kw))
# 🔐 Handle POST authentication - ODOO 19 FIX
if request.httprequest.method == 'POST' and login_attempt:
try:
# ✅ Odoo 19: authenticate() takes ONLY (login, password) - returns bool
auth_success = request.session.authenticate(
login_attempt,
kw.get('password', '')
)
if auth_success:
# ✅ Successful auth: regenerate session
if hasattr(request.session, 'rotate_session_token'):
request.session.rotate_session_token()
user = request.env['res.users'].sudo().search(
[('login', '=', login_attempt)], limit=1
)
# First login flow
if user and user.first_login:
qcontext = self.get_auth_signup_qcontext()
qcontext['user_id'] = user.id
return request.render('ck_signup.reset_password_direct', qcontext)
# Handle redirect safely
redirect = kw.get('redirect') or request.params.get('redirect')
if redirect and self._is_safe_redirect(redirect):
return request.redirect(redirect)
return request.redirect('/web')
except AccessDenied:
_logger.warning("Failed login attempt for: %s from %s", login_attempt, client_ip)
kw['error'] = _("លេខកូដ ឬ ពាក្យសម្ងាត់ មិនត្រឹមត្រូវទេ")
except Exception as e:
_logger.exception("Login error for %s: %s", login_attempt, str(e))
kw['error'] = _("មានបញ្ហាក្នុងការចូលប្រើប្រាស់")
# Render login page
response = super().web_login(*args, **kw)
if hasattr(response, 'qcontext'):
response.qcontext.update(self.get_auth_signup_config())
return response
def _prepare_login_context(self, kw):
"""Prepare secure context for login template - ODOO 19 DEBUG FIX."""
ctx = self.get_auth_signup_qcontext()
# ✅ Odoo 19: debug must be string for template 'in' check
debug_val = request.env.context.get('debug')
ctx.update({
'redirect': kw.get('redirect'),
'error': kw.get('error'),
'message': kw.get('message'),
'debug': 'assets' if debug_val else '', # ✅ String, not bool!
'login': _sanitize_input(kw.get('login', '')),
})
return ctx
def _is_safe_redirect(self, url):
"""Prevent open redirect attacks."""
if not url:
return False
if url.startswith(('/', 'http://localhost', 'https://localhost')):
return True
base_url = request.env['ir.config_parameter'].sudo().get_param('web.base.url')
if base_url and url.startswith(base_url):
return True
_logger.warning("Blocked unsafe redirect: %s", url)
return False
# ==============================================================================
# PASSWORD CHANGE - ODOO 19 COMPATIBLE
# ==============================================================================
@route('/web/reset_password/submit', type='http', methods=['POST'],
auth='public', website=True, csrf=True)
def change_password(self, **kw):
values = {}
try:
required = ['user_name', 'old_password', 'new_password', 'confirm_new_password', 'name']
if not all(kw.get(f) for f in required):
values['error'] = _("សូមបំពេញព័ត៌មានទាំងអស់")
return request.render('ck_signup.reset_password_direct', values)
if kw['new_password'] != kw['confirm_new_password']:
values['error'] = _("ពាក្យសម្ងាត់មិនត្រូវគ្នា! សូមបញ្ចូលវាម្តងទៀត")
return request.render('ck_signup.reset_password_direct', values)
strength_errors = _validate_password_strength(kw['new_password'])
if strength_errors:
values['error'] = '; '.join(strength_errors)
return request.render('ck_signup.reset_password_direct', values)
# ✅ Odoo 19: authenticate returns bool
try:
auth_success = request.session.authenticate(
_sanitize_input(kw['user_name']),
kw['old_password']
)
except AccessDenied:
auth_success = False
if not auth_success:
_logger.warning("Password change auth failed for: %s", kw.get('user_name'))
values['error'] = _("លេខកូដ ឬ ពាក្យសម្ងាត់ចាស់ មិនត្រឹមត្រូវទេ")
return request.render('ck_signup.reset_password_direct', values)
# Get current user AFTER successful auth
user = request.env.user
if user.has_group('base.group_public'):
values['error'] = _("អ្នកប្រើប្រាស់សាធារណៈមិនអាចផ្លាស់ប្តូរពាក្យសម្ងាត់បានទេ")
return request.render('ck_signup.reset_password_direct', values)
# ✅ Secure password update via ORM
user.sudo().write({
'name': _sanitize_input(kw['name']),
'password': kw['new_password'],
'first_login': False,
})
_logger.info("Password changed for user: %s", user.login)
success_msg = werkzeug.urls.url_quote(_('ការផ្លាស់ប្តូរពាក្យសម្ងាត់ទទួលបានជោគជ័យ'))
return request.redirect(f'/web/login?message={success_msg}')
except Exception as e:
_logger.exception("Password change error: %s", str(e))
values['error'] = _("មានបញ្ហាក្នុងការផ្លាស់ប្តូរពាក្យសម្ងាត់")
return request.render('ck_signup.reset_password_direct', values)
# ==============================================================================
# SIGNUP - ODOO 19 COMPATIBLE
# ==============================================================================
# @route('/web/signup', type='http', auth='public', website=True)
# def web_auth_signup(self, *args, **kw):
# """Secure signup flow - Odoo 19 compatible."""
# param_obj = request.env['ir.config_parameter'].sudo()
# qcontext = self.get_auth_signup_qcontext()
#
# request.params['background_src'] = _sanitize_input(
# param_obj.get_param('login_form_header_register') or ''
# )
# request.params['background'] = _sanitize_input(
# param_obj.get_param('id_backgroud_mptc') or ''
# )
#
# companies = request.env["res.groups"].sudo().search(
# [('name', '=', 'អ្នកសុំអាហារូបករណ៍')]
# )
# qcontext['groups'] = companies
#
# if not qcontext.get('token') and not qcontext.get('signup_enabled'):
# raise werkzeug.exceptions.NotFound()
#
# if 'error' not in qcontext and request.httprequest.method == 'POST':
# if kw.get('password') != kw.get('confirm_password'):
# qcontext['error'] = _("ពាក្យសម្ងាត់មិនត្រូវគ្នា! សូមបញ្ចូលវាម្តងទៀត")
# return request.render('ck_signup.signup', qcontext)
#
# if kw.get('password'):
# strength_errors = _validate_password_strength(kw['password'])
# if strength_errors:
# qcontext['error'] = '; '.join(strength_errors)
# return request.render('ck_signup.signup', qcontext)
#
# login = _sanitize_input(qcontext.get('login', '')).lower()
#
# existing_user = request.env["res.users"].sudo().search(
# [("login", "=", login), ("active", "=", True)], limit=1
# )
# if existing_user:
# qcontext['error'] = _("អ៊ីម៉ែលនេះត្រូវបានចុះឈ្មោះរួចហើយ")
# return request.render('ck_signup.signup', qcontext)
#
# try:
# self.do_signup(qcontext)
#
# if login:
# request.session['signup_login'] = login
# user_sudo = request.env['res.users'].sudo().search([('login', '=', login)], limit=1)
#
# if user_sudo:
# country_id = kw.get('country')
# if country_id:
# try:
# country = request.env['res.country'].sudo().browse(int(country_id))
# if country.exists():
# user_sudo.partner_id.sudo().write({'country_id': country.id})
# except (ValueError, TypeError):
# pass
# user_sudo.sudo().log_ids.unlink()
#
# qcontext['message'] = _("សូមបញ្ចូល Pin Code ដែលបានផ្ញើទៅក្នុងអ៊ីម៉ែលរបស់អ្នក ដើម្បីផ្ទៀងផ្ទាត់គណនី!")
# return request.render('ck_signup.verify_pin_code', qcontext)
#
# except (SignupError, ValidationError, AssertionError) as e:
# _logger.error("Signup error for %s: %s", login, str(e))
#
# pending_user = request.env["res.users"].sudo().search(
# [("login", "=", login), ("active", "=", False)], limit=1
# )
#
# if pending_user:
# qcontext['message'] = _(
# "សូមបញ្ចូល Pin Code ដែលបានផ្ញើទៅក្នុងអ៊ីម៉ែលរបស់អ្នក ដើម្បីផ្ទៀងផ្ទាត់គណនី!")
# qcontext['error'] = ""
# return request.render('ck_signup.verify_pin_code', qcontext)
# else:
# qcontext['error'] = _("មិនអាចបង្កើតគណនីថ្មីបានទេ")
#
# return request.render('ck_signup.signup', qcontext)
# ==============================================================================
# PIN CODE VERIFICATION - ODOO 19
# ==============================================================================
@route('/web/verify_pin_code', type='http', auth='public', website=True, csrf=True)
def verify_pin_code(self, *args, **kw):
qcontext = self.get_auth_signup_qcontext()
login = _sanitize_input(kw.get('login', '')).lower().strip()
pin_code = _sanitize_input(kw.get('pin_code', '')).strip()
if not login or not pin_code:
qcontext['error'] = _("សូមបញ្ចូលអ៊ីម៉ែល និង PIN Code")
return request.render('ck_signup.verify_pin_code', qcontext)
rate_key = f"pin:{login}"
is_locked, remaining_min = _check_rate_limit(
rate_key,
SECURITY_CONFIG['max_pin_attempts'],
SECURITY_CONFIG['pin_lockout_duration_minutes']
)
if is_locked:
qcontext['error'] = _("សូមរង់ចាំ %d នាទី (ការពារសុវត្ថិភាព)") % remaining_min
return request.render('ck_signup.verify_pin_code', qcontext)
# ✅ ORM only - NO RAW SQL
user = request.env["res.users"].sudo().search(
[("login", "=", login),
("pin_code", "=", pin_code),
("active", "=", False)],
limit=1
)
if user:
try:
user.sudo().write({
'active': True,
'pin_code': False,
})
if user.partner_id:
user.partner_id.sudo().write({'active': True})
companies = request.env["res.groups"].sudo().search(
[('name', '=', 'អ្នកសុំអាហារូបករណ៍')]
)
if companies:
user.sudo().write({'groups_id': [(4, g.id) for g in companies]})
_logger.info("Account verified: %s", login)
qcontext['message'] = _("អបអរសាទរ! សូមចុចប៊ូតុង \"ចូល\" ដើម្បីបំពេញពាក្យចុះឈ្មោះ!")
return request.render('web.login', qcontext)
except Exception as e:
_logger.exception("Error activating user %s: %s", login, str(e))
qcontext['error'] = _("មានបញ្ហាក្នុងការផ្ទៀងផ្ទាត់")
else:
_logger.warning("Failed PIN verification: %s", login)
qcontext['error'] = _("PIN Code ឬ អ៊ីម៉ែល មិនត្រឹមត្រូវទេ")
return request.render('ck_signup.verify_pin_code', qcontext)
# ==============================================================================
# PIN REQUEST
# ==============================================================================
@route('/web/request_new_pin_code', type='http', auth='public', website=True, csrf=True)
def sending_new_pin_code(self, *args, **kw):
qcontext = self.get_auth_signup_qcontext()
login = _sanitize_input(kw.get('login', '')).lower().strip()
if not login:
qcontext['error'] = _("សូមបញ្ចូលអ៊ីម៉ែល")
return request.render('ck_signup.view_request_new_pin_code', qcontext)
active_user = request.env["res.users"].sudo().search(
[("login", "=", login), ("active", "=", True)], limit=1
)
if active_user:
qcontext['error'] = _("អ៊ីម៉ែលនេះបានចុះឈ្មោះរួចហើយ សូមចូលប្រើប្រាស់")
return request.render('ck_signup.view_request_new_pin_code', qcontext)
pending_user = request.env["res.users"].sudo().search(
[("login", "=", login), ("active", "=", False)], limit=1
)
if pending_user:
request_count = pending_user.count_request_pin_code or 0
if request_count > 5:
qcontext['message'] = _("សូមអភ័យទោស អ្នកមិនអាចស្នើសុំ PIN Code បានទៀតទេ!")
return request.render('ck_signup.verify_pin_code', qcontext)
try:
result = request.env['res.users'].sudo().generate_and_send_new_pin_code(login)
if result:
qcontext['message'] = _("PIN Code បានផ្ញើចូលអ៊ីម៉ែលរបស់លោកអ្នកដោយជោគជ័យ!")
return request.render('ck_signup.view_request_new_pin_code', qcontext)
except Exception as e:
_logger.error("Error sending PIN to %s: %s", login, str(e))
qcontext['error'] = _("មានបញ្ហាក្នុងការផ្ញើ PIN Code")
else:
qcontext['error'] = _("អ៊ីម៉ែលមិនត្រឹមត្រូវ ឬ មិនមានក្នុងប្រព័ន្ធ")
return request.render('ck_signup.view_request_new_pin_code', qcontext)
# ==============================================================================
# PASSWORD RESET
# ==============================================================================
@route('/web/reset_password', type='http', auth='public', website=True)
def web_auth_reset_password(self, *args, **kw):
qcontext = self.get_auth_signup_qcontext()
if not qcontext.get('token') and not qcontext.get('reset_password_enabled'):
raise werkzeug.exceptions.NotFound()
if 'error' not in qcontext and request.httprequest.method == 'POST':
try:
if qcontext.get('token'):
self.do_signup(qcontext)
return self.web_login(*args, **kw)
else:
login = _sanitize_input(qcontext.get('login', '')).lower()
if not login:
raise UserError(_("សូមបញ្ចូលអ៊ីម៉ែល"))
_logger.info("Password reset requested: %s from %s", login, request.httprequest.remote_addr)
request.env['res.users'].sudo().reset_password(login)
qcontext['message'] = _("សារប្ដូរលេខសំងាត់ត្រូវបានផ្ញើចូលអ៊ីម៉ែលរបស់លោកអ្នក")
except SignupError:
users = request.env['res.users'].sudo().search(
[('login', '=', _sanitize_input(kw.get('login', '')).lower()),
('active', '=', True)], limit=1
)
if not users:
qcontext['message'] = _("សូមពិនិត្យ Pin Code នៅក្នុងអ៊ីម៉ែលរបស់អ្នក")
qcontext['error'] = ""
return request.render('ck_signup.verify_pin_code', qcontext)
else:
qcontext['message'] = _("ពាក្យសម្ងាត់ត្រូវបានផ្លាស់ប្តូរជោគជ័យ")
except Exception as e:
_logger.exception("Password reset error: %s", str(e))
qcontext['error'] = _("មានបញ្ហាក្នុងការប្ដូរពាក្យសម្ងាត់")
return request.render('auth_signup.reset_password', qcontext)
# ==============================================================================
# HELPERS
# ==============================================================================
def do_signup(self, qcontext):
values = {
key: _sanitize_input(qcontext.get(key))
for key in ('login', 'name', 'password')
if qcontext.get(key)
}
if not values.get('login') or not values.get('password'):
raise UserError(_("សូមបំពេញព័ត៌មានដែលត្រូវការ"))
strength_errors = _validate_password_strength(values['password'])
if strength_errors:
raise ValidationError('; '.join(strength_errors))
group_id = qcontext.get('group')
if group_id:
values['groups'] = str(group_id)
supported_lang_codes = [code for code, _ in request.env['res.lang'].sudo().get_installed()]
lang = request.context.get('lang', '').split('_')[0]
if lang in supported_lang_codes:
values['lang'] = lang
self._signup_with_values(qcontext.get("token"), values)
def _signup_with_values(self, token, values):
db, login, password = request.env['res.users'].sudo().signup(values, token)
_logger.info("New user created: %s", login)