# -*- coding: utf-8 -*- import logging import re import time from collections import defaultdict import werkzeug from odoo import http, _, SUPERUSER_ID, tools, fields from odoo.exceptions import AccessDenied, UserError, ValidationError from odoo.http import request, route, SessionExpiredException from odoo.addons.auth_signup.models.res_users import SignupError from odoo.addons.auth_signup.controllers.main import AuthSignupHome, ensure_db _logger = logging.getLogger(__name__) # ============================================================================== # SECURITY CONFIGURATION # ============================================================================== SECURITY_CONFIG = { 'max_login_attempts': 5, 'lockout_duration_minutes': 15, 'max_pin_attempts': 3, 'pin_lockout_duration_minutes': 30, 'password_min_length': 8, } _rate_limit_cache = defaultdict(list) def _check_rate_limit(key, max_attempts, lockout_minutes): """Check if request is rate-limited.""" now = time.time() window_start = now - (lockout_minutes * 60) _rate_limit_cache[key] = [t for t in _rate_limit_cache[key] if t > window_start] if len(_rate_limit_cache[key]) >= max_attempts: remaining = lockout_minutes * 60 - (now - _rate_limit_cache[key][0]) return True, max(0, int(remaining / 60)) _rate_limit_cache[key].append(now) return False, 0 def _sanitize_input(value): """Basic input sanitization.""" if not value: return value if isinstance(value, str): value = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', value) return value[:2000] if len(value) > 2000 else value return value def _validate_password_strength(password): """Validate password meets security requirements.""" errors = [] if len(password) < SECURITY_CONFIG['password_min_length']: errors.append(_('ពាក្យសម្ងាត់ត្រូវមានយ៉ាងតិច %d តួអក្សរ') % SECURITY_CONFIG['password_min_length']) if not re.search(r'[A-Z]', password): errors.append(_('ពាក្យសម្ងាត់ត្រូវមានអក្សរធំយ៉ាងតិច ១')) if not re.search(r'[a-z]', password): errors.append(_('ពាក្យសម្ងាត់ត្រូវមានអក្សរតូចយ៉ាងតិច ១')) if not re.search(r'\d', password): errors.append(_('ពាក្យសម្ងាត់ត្រូវមានលេខយ៉ាងតិច ១')) return errors # ============================================================================== # CONTROLLER CLASS - ODOO 19 COMPATIBLE # ============================================================================== class SignupVerifyEmail(AuthSignupHome): """High-security signup/login controller for Odoo 19""" @route('/web/login', type='http', auth='public', website=True) def web_login(self, *args, **kw): ensure_db() client_ip = request.httprequest.remote_addr login_attempt = kw.get('login', '').lower().strip() rate_key = f"login:{client_ip}:{login_attempt}" is_locked, remaining_min = _check_rate_limit( rate_key, SECURITY_CONFIG['max_login_attempts'], SECURITY_CONFIG['lockout_duration_minutes'] ) if is_locked: kw['error'] = _("សូមរង់ចាំ %d នាទីមុនពេលសាកល្បងម្តងទៀត (ការពារសុវត្ថិភាព)") % remaining_min return request.render('web.login', self._prepare_login_context(kw)) # 🔐 Handle POST authentication - ODOO 19 FIX if request.httprequest.method == 'POST' and login_attempt: try: # ✅ Odoo 19: authenticate() takes ONLY (login, password) - returns bool auth_success = request.session.authenticate( login_attempt, kw.get('password', '') ) if auth_success: # ✅ Successful auth: regenerate session if hasattr(request.session, 'rotate_session_token'): request.session.rotate_session_token() user = request.env['res.users'].sudo().search( [('login', '=', login_attempt)], limit=1 ) # First login flow if user and user.first_login: qcontext = self.get_auth_signup_qcontext() qcontext['user_id'] = user.id return request.render('ck_signup.reset_password_direct', qcontext) # Handle redirect safely redirect = kw.get('redirect') or request.params.get('redirect') if redirect and self._is_safe_redirect(redirect): return request.redirect(redirect) return request.redirect('/web') except AccessDenied: _logger.warning("Failed login attempt for: %s from %s", login_attempt, client_ip) kw['error'] = _("លេខកូដ ឬ ពាក្យសម្ងាត់ មិនត្រឹមត្រូវទេ") except Exception as e: _logger.exception("Login error for %s: %s", login_attempt, str(e)) kw['error'] = _("មានបញ្ហាក្នុងការចូលប្រើប្រាស់") # Render login page response = super().web_login(*args, **kw) if hasattr(response, 'qcontext'): response.qcontext.update(self.get_auth_signup_config()) return response def _prepare_login_context(self, kw): """Prepare secure context for login template - ODOO 19 DEBUG FIX.""" ctx = self.get_auth_signup_qcontext() # ✅ Odoo 19: debug must be string for template 'in' check debug_val = request.env.context.get('debug') ctx.update({ 'redirect': kw.get('redirect'), 'error': kw.get('error'), 'message': kw.get('message'), 'debug': 'assets' if debug_val else '', # ✅ String, not bool! 'login': _sanitize_input(kw.get('login', '')), }) return ctx def _is_safe_redirect(self, url): """Prevent open redirect attacks.""" if not url: return False if url.startswith(('/', 'http://localhost', 'https://localhost')): return True base_url = request.env['ir.config_parameter'].sudo().get_param('web.base.url') if base_url and url.startswith(base_url): return True _logger.warning("Blocked unsafe redirect: %s", url) return False # ============================================================================== # PASSWORD CHANGE - ODOO 19 COMPATIBLE # ============================================================================== @route('/web/reset_password/submit', type='http', methods=['POST'], auth='public', website=True, csrf=True) def change_password(self, **kw): values = {} try: required = ['user_name', 'old_password', 'new_password', 'confirm_new_password', 'name'] if not all(kw.get(f) for f in required): values['error'] = _("សូមបំពេញព័ត៌មានទាំងអស់") return request.render('ck_signup.reset_password_direct', values) if kw['new_password'] != kw['confirm_new_password']: values['error'] = _("ពាក្យសម្ងាត់មិនត្រូវគ្នា! សូមបញ្ចូលវាម្តងទៀត") return request.render('ck_signup.reset_password_direct', values) strength_errors = _validate_password_strength(kw['new_password']) if strength_errors: values['error'] = '; '.join(strength_errors) return request.render('ck_signup.reset_password_direct', values) # ✅ Odoo 19: authenticate returns bool try: auth_success = request.session.authenticate( _sanitize_input(kw['user_name']), kw['old_password'] ) except AccessDenied: auth_success = False if not auth_success: _logger.warning("Password change auth failed for: %s", kw.get('user_name')) values['error'] = _("លេខកូដ ឬ ពាក្យសម្ងាត់ចាស់ មិនត្រឹមត្រូវទេ") return request.render('ck_signup.reset_password_direct', values) # Get current user AFTER successful auth user = request.env.user if user.has_group('base.group_public'): values['error'] = _("អ្នកប្រើប្រាស់សាធារណៈមិនអាចផ្លាស់ប្តូរពាក្យសម្ងាត់បានទេ") return request.render('ck_signup.reset_password_direct', values) # ✅ Secure password update via ORM user.sudo().write({ 'name': _sanitize_input(kw['name']), 'password': kw['new_password'], 'first_login': False, }) _logger.info("Password changed for user: %s", user.login) success_msg = werkzeug.urls.url_quote(_('ការផ្លាស់ប្តូរពាក្យសម្ងាត់ទទួលបានជោគជ័យ')) return request.redirect(f'/web/login?message={success_msg}') except Exception as e: _logger.exception("Password change error: %s", str(e)) values['error'] = _("មានបញ្ហាក្នុងការផ្លាស់ប្តូរពាក្យសម្ងាត់") return request.render('ck_signup.reset_password_direct', values) # ============================================================================== # SIGNUP - ODOO 19 COMPATIBLE # ============================================================================== # @route('/web/signup', type='http', auth='public', website=True) # def web_auth_signup(self, *args, **kw): # """Secure signup flow - Odoo 19 compatible.""" # param_obj = request.env['ir.config_parameter'].sudo() # qcontext = self.get_auth_signup_qcontext() # # request.params['background_src'] = _sanitize_input( # param_obj.get_param('login_form_header_register') or '' # ) # request.params['background'] = _sanitize_input( # param_obj.get_param('id_backgroud_mptc') or '' # ) # # companies = request.env["res.groups"].sudo().search( # [('name', '=', 'អ្នកសុំអាហារូបករណ៍')] # ) # qcontext['groups'] = companies # # if not qcontext.get('token') and not qcontext.get('signup_enabled'): # raise werkzeug.exceptions.NotFound() # # if 'error' not in qcontext and request.httprequest.method == 'POST': # if kw.get('password') != kw.get('confirm_password'): # qcontext['error'] = _("ពាក្យសម្ងាត់មិនត្រូវគ្នា! សូមបញ្ចូលវាម្តងទៀត") # return request.render('ck_signup.signup', qcontext) # # if kw.get('password'): # strength_errors = _validate_password_strength(kw['password']) # if strength_errors: # qcontext['error'] = '; '.join(strength_errors) # return request.render('ck_signup.signup', qcontext) # # login = _sanitize_input(qcontext.get('login', '')).lower() # # existing_user = request.env["res.users"].sudo().search( # [("login", "=", login), ("active", "=", True)], limit=1 # ) # if existing_user: # qcontext['error'] = _("អ៊ីម៉ែលនេះត្រូវបានចុះឈ្មោះរួចហើយ") # return request.render('ck_signup.signup', qcontext) # # try: # self.do_signup(qcontext) # # if login: # request.session['signup_login'] = login # user_sudo = request.env['res.users'].sudo().search([('login', '=', login)], limit=1) # # if user_sudo: # country_id = kw.get('country') # if country_id: # try: # country = request.env['res.country'].sudo().browse(int(country_id)) # if country.exists(): # user_sudo.partner_id.sudo().write({'country_id': country.id}) # except (ValueError, TypeError): # pass # user_sudo.sudo().log_ids.unlink() # # qcontext['message'] = _("សូមបញ្ចូល Pin Code ដែលបានផ្ញើទៅក្នុងអ៊ីម៉ែលរបស់អ្នក ដើម្បីផ្ទៀងផ្ទាត់គណនី!") # return request.render('ck_signup.verify_pin_code', qcontext) # # except (SignupError, ValidationError, AssertionError) as e: # _logger.error("Signup error for %s: %s", login, str(e)) # # pending_user = request.env["res.users"].sudo().search( # [("login", "=", login), ("active", "=", False)], limit=1 # ) # # if pending_user: # qcontext['message'] = _( # "សូមបញ្ចូល Pin Code ដែលបានផ្ញើទៅក្នុងអ៊ីម៉ែលរបស់អ្នក ដើម្បីផ្ទៀងផ្ទាត់គណនី!") # qcontext['error'] = "" # return request.render('ck_signup.verify_pin_code', qcontext) # else: # qcontext['error'] = _("មិនអាចបង្កើតគណនីថ្មីបានទេ") # # return request.render('ck_signup.signup', qcontext) # ============================================================================== # PIN CODE VERIFICATION - ODOO 19 # ============================================================================== @route('/web/verify_pin_code', type='http', auth='public', website=True, csrf=True) def verify_pin_code(self, *args, **kw): qcontext = self.get_auth_signup_qcontext() login = _sanitize_input(kw.get('login', '')).lower().strip() pin_code = _sanitize_input(kw.get('pin_code', '')).strip() if not login or not pin_code: qcontext['error'] = _("សូមបញ្ចូលអ៊ីម៉ែល និង PIN Code") return request.render('ck_signup.verify_pin_code', qcontext) rate_key = f"pin:{login}" is_locked, remaining_min = _check_rate_limit( rate_key, SECURITY_CONFIG['max_pin_attempts'], SECURITY_CONFIG['pin_lockout_duration_minutes'] ) if is_locked: qcontext['error'] = _("សូមរង់ចាំ %d នាទី (ការពារសុវត្ថិភាព)") % remaining_min return request.render('ck_signup.verify_pin_code', qcontext) # ✅ ORM only - NO RAW SQL user = request.env["res.users"].sudo().search( [("login", "=", login), ("pin_code", "=", pin_code), ("active", "=", False)], limit=1 ) if user: try: user.sudo().write({ 'active': True, 'pin_code': False, }) if user.partner_id: user.partner_id.sudo().write({'active': True}) companies = request.env["res.groups"].sudo().search( [('name', '=', 'អ្នកសុំអាហារូបករណ៍')] ) if companies: user.sudo().write({'groups_id': [(4, g.id) for g in companies]}) _logger.info("Account verified: %s", login) qcontext['message'] = _("អបអរសាទរ! សូមចុចប៊ូតុង \"ចូល\" ដើម្បីបំពេញពាក្យចុះឈ្មោះ!") return request.render('web.login', qcontext) except Exception as e: _logger.exception("Error activating user %s: %s", login, str(e)) qcontext['error'] = _("មានបញ្ហាក្នុងការផ្ទៀងផ្ទាត់") else: _logger.warning("Failed PIN verification: %s", login) qcontext['error'] = _("PIN Code ឬ អ៊ីម៉ែល មិនត្រឹមត្រូវទេ") return request.render('ck_signup.verify_pin_code', qcontext) # ============================================================================== # PIN REQUEST # ============================================================================== @route('/web/request_new_pin_code', type='http', auth='public', website=True, csrf=True) def sending_new_pin_code(self, *args, **kw): qcontext = self.get_auth_signup_qcontext() login = _sanitize_input(kw.get('login', '')).lower().strip() if not login: qcontext['error'] = _("សូមបញ្ចូលអ៊ីម៉ែល") return request.render('ck_signup.view_request_new_pin_code', qcontext) active_user = request.env["res.users"].sudo().search( [("login", "=", login), ("active", "=", True)], limit=1 ) if active_user: qcontext['error'] = _("អ៊ីម៉ែលនេះបានចុះឈ្មោះរួចហើយ សូមចូលប្រើប្រាស់") return request.render('ck_signup.view_request_new_pin_code', qcontext) pending_user = request.env["res.users"].sudo().search( [("login", "=", login), ("active", "=", False)], limit=1 ) if pending_user: request_count = pending_user.count_request_pin_code or 0 if request_count > 5: qcontext['message'] = _("សូមអភ័យទោស អ្នកមិនអាចស្នើសុំ PIN Code បានទៀតទេ!") return request.render('ck_signup.verify_pin_code', qcontext) try: result = request.env['res.users'].sudo().generate_and_send_new_pin_code(login) if result: qcontext['message'] = _("PIN Code បានផ្ញើចូលអ៊ីម៉ែលរបស់លោកអ្នកដោយជោគជ័យ!") return request.render('ck_signup.view_request_new_pin_code', qcontext) except Exception as e: _logger.error("Error sending PIN to %s: %s", login, str(e)) qcontext['error'] = _("មានបញ្ហាក្នុងការផ្ញើ PIN Code") else: qcontext['error'] = _("អ៊ីម៉ែលមិនត្រឹមត្រូវ ឬ មិនមានក្នុងប្រព័ន្ធ") return request.render('ck_signup.view_request_new_pin_code', qcontext) # ============================================================================== # PASSWORD RESET # ============================================================================== @route('/web/reset_password', type='http', auth='public', website=True) def web_auth_reset_password(self, *args, **kw): qcontext = self.get_auth_signup_qcontext() if not qcontext.get('token') and not qcontext.get('reset_password_enabled'): raise werkzeug.exceptions.NotFound() if 'error' not in qcontext and request.httprequest.method == 'POST': try: if qcontext.get('token'): self.do_signup(qcontext) return self.web_login(*args, **kw) else: login = _sanitize_input(qcontext.get('login', '')).lower() if not login: raise UserError(_("សូមបញ្ចូលអ៊ីម៉ែល")) _logger.info("Password reset requested: %s from %s", login, request.httprequest.remote_addr) request.env['res.users'].sudo().reset_password(login) qcontext['message'] = _("សារប្ដូរលេខសំងាត់ត្រូវបានផ្ញើចូលអ៊ីម៉ែលរបស់លោកអ្នក") except SignupError: users = request.env['res.users'].sudo().search( [('login', '=', _sanitize_input(kw.get('login', '')).lower()), ('active', '=', True)], limit=1 ) if not users: qcontext['message'] = _("សូមពិនិត្យ Pin Code នៅក្នុងអ៊ីម៉ែលរបស់អ្នក") qcontext['error'] = "" return request.render('ck_signup.verify_pin_code', qcontext) else: qcontext['message'] = _("ពាក្យសម្ងាត់ត្រូវបានផ្លាស់ប្តូរជោគជ័យ") except Exception as e: _logger.exception("Password reset error: %s", str(e)) qcontext['error'] = _("មានបញ្ហាក្នុងការប្ដូរពាក្យសម្ងាត់") return request.render('auth_signup.reset_password', qcontext) # ============================================================================== # HELPERS # ============================================================================== def do_signup(self, qcontext): values = { key: _sanitize_input(qcontext.get(key)) for key in ('login', 'name', 'password') if qcontext.get(key) } if not values.get('login') or not values.get('password'): raise UserError(_("សូមបំពេញព័ត៌មានដែលត្រូវការ")) strength_errors = _validate_password_strength(values['password']) if strength_errors: raise ValidationError('; '.join(strength_errors)) group_id = qcontext.get('group') if group_id: values['groups'] = str(group_id) supported_lang_codes = [code for code, _ in request.env['res.lang'].sudo().get_installed()] lang = request.context.get('lang', '').split('_')[0] if lang in supported_lang_codes: values['lang'] = lang self._signup_with_values(qcontext.get("token"), values) def _signup_with_values(self, token, values): db, login, password = request.env['res.users'].sudo().signup(values, token) _logger.info("New user created: %s", login)