Files

217 lines
11 KiB
Python
Raw Permalink Normal View History

2026-07-01 14:41:49 +07:00
# -*- coding: utf-8 -*-
from odoo import models, fields, api
from datetime import datetime, timedelta
from collections import defaultdict
class ProjectDashboard(models.Model):
_name = 'project.dashboard'
_description = 'Project Dashboard Data'
@api.model
def get_dashboard_kpis(self, filters=None, company_id=None):
if filters is None: filters = {}
user = self.env.user
today = fields.Date.today()
company = self.env['res.company'].browse(company_id) if company_id else user.company_id
task_domain = [('company_id', '=', company.id)]
project_domain = [('company_id', '=', company.id)]
if filters.get('manager_id'):
project_domain.append(('user_id', '=', filters['manager_id']))
task_domain.append(('project_id.user_id', '=', filters['manager_id']))
if filters.get('customer_id'):
project_domain.append(('partner_id', '=', filters['customer_id']))
task_domain.append(('project_id.partner_id', '=', filters['customer_id']))
if filters.get('project_id'):
task_domain.append(('project_id', '=', filters['project_id']))
# User filtering
if not user.has_group('project.group_project_manager'):
task_domain.append(('user_ids', 'in', [user.id]))
# ✅ FIXED: Use 'fold' field to find closed stages
closed_stages = self.env['project.task.type'].search([('fold', '=', True)])
return {
'total_projects': self.env['project.project'].search_count(project_domain),
'active_tasks': self.env['project.task'].search_count(
task_domain + [('stage_id', 'not in', closed_stages.ids)]),
'overdue_tasks': self.env['project.task'].search_count(
task_domain + [('date_deadline', '<', today), ('stage_id', 'not in', closed_stages.ids)]),
'today_tasks': self.env['project.task'].search_count(task_domain + [('date_deadline', '=', today)]),
'my_tasks': self.env['project.task'].search_count(task_domain + [('user_ids', 'in', [user.id])]),
'my_overdue_tasks': self.env['project.task'].search_count(
task_domain + [('user_ids', 'in', [user.id]), ('date_deadline', '<', today),
('stage_id', 'not in', closed_stages.ids)]),
}
@api.model
def get_task_deadline_chart(self, filters=None, company_id=None):
if filters is None: filters = {}
today = fields.Date.today()
company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id
task_domain = [('company_id', '=', company.id)]
if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id']))
if not self.env.user.has_group('project.group_project_manager'): task_domain.append(
('user_ids', 'in', [self.env.user.id]))
# ✅ FIXED: Use 'fold' field
closed_stages = self.env['project.task.type'].search([('fold', '=', True)])
tasks = self.env['project.task'].search(task_domain + [('stage_id', 'not in', closed_stages.ids)])
overdue = today_count = upcoming = 0
for task in tasks:
if task.date_deadline:
d = fields.Date.to_date(task.date_deadline)
if d and d < today:
overdue += 1
elif d and d == today:
today_count += 1
elif d:
upcoming += 1
return [{'name': 'Overdue', 'value': overdue}, {'name': 'Today', 'value': today_count},
{'name': 'Upcoming', 'value': upcoming}]
@api.model
def get_tasks_by_project(self, filters=None, company_id=None):
if filters is None: filters = {}
company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id
task_domain = [('company_id', '=', company.id)]
if filters.get('manager_id'): task_domain.append(('project_id.user_id', '=', filters['manager_id']))
if filters.get('customer_id'): task_domain.append(('project_id.partner_id', '=', filters['customer_id']))
if not self.env.user.has_group('project.group_project_manager'): task_domain.append(
('user_ids', 'in', [self.env.user.id]))
tasks = self.env['project.task'].search(task_domain)
project_data = defaultdict(int)
for task in tasks:
project_data[task.project_id.name or 'Unassigned'] += 1
return [{'name': k, 'value': v} for k, v in project_data.items()]
@api.model
def get_tasks_by_stage(self, filters=None, company_id=None):
if filters is None: filters = {}
company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id
task_domain = [('company_id', '=', company.id)]
if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id']))
if not self.env.user.has_group('project.group_project_manager'): task_domain.append(
('user_ids', 'in', [self.env.user.id]))
tasks = self.env['project.task'].search(task_domain)
stage_data = defaultdict(int)
for task in tasks:
stage_data[task.stage_id.name or 'No Stage'] += 1
return [{'name': k, 'value': v} for k, v in stage_data.items()]
@api.model
def get_all_tasks(self, filters=None, limit=10, offset=0, company_id=None):
if filters is None: filters = {}
company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id
task_domain = [('company_id', '=', company.id)]
if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id']))
if filters.get('manager_id'): task_domain.append(('project_id.user_id', '=', filters['manager_id']))
if filters.get('customer_id'): task_domain.append(('project_id.partner_id', '=', filters['customer_id']))
if not self.env.user.has_group('project.group_project_manager'): task_domain.append(
('user_ids', 'in', [self.env.user.id]))
tasks = self.env['project.task'].search(task_domain, limit=limit, offset=offset, order='date_deadline asc')
total = self.env['project.task'].search_count(task_domain)
today = fields.Date.today()
priority_map = {'0': 'Low', '1': 'Medium', '2': 'High', '3': 'Urgent'}
result = []
for task in tasks:
days_diff = 999
status = 'upcoming'
if task.date_deadline:
# ✅ SAFE CONVERSION: Handles both datetime and date types
deadline_date = fields.Date.to_date(task.date_deadline)
if deadline_date:
delta = deadline_date - today
days_diff = delta.days
if days_diff < 0:
status = 'overdue'
elif days_diff == 0:
status = 'today'
result.append({
'id': task.id, 'name': task.name,
'project': task.project_id.name if task.project_id else '',
'deadline': task.date_deadline.strftime('%Y-%m-%d') if task.date_deadline else '',
'days_diff': days_diff, 'status': status,
'priority': str(task.priority), 'priority_label': priority_map.get(str(task.priority), 'Normal'),
})
return {'tasks': result, 'total': total}
@api.model
def get_timesheet_hours(self, filters=None, company_id=None):
if filters is None: filters = {}
today = fields.Date.today()
first_day = today.replace(day=1)
last_day = (first_day + timedelta(days=32)).replace(day=1) - timedelta(days=1)
company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id
domain = [('date', '>=', first_day), ('date', '<=', last_day), ('company_id', '=', company.id)]
if filters.get('project_id'): domain.append(('project_id', '=', filters['project_id']))
timesheets = self.env['account.analytic.line'].search(domain)
daily_hours = defaultdict(float)
for ts in timesheets:
daily_hours[ts.date.strftime('%Y-%m-%d')] += ts.unit_amount
result = []
current_day = first_day
while current_day <= last_day:
date_str = current_day.strftime('%Y-%m-%d')
result.append({'date': date_str, 'hours': daily_hours.get(date_str, 0.0)})
current_day += timedelta(days=1)
return result
@api.model
def get_task_deadline_chart(self, filters=None, company_id=None):
if filters is None: filters = {}
today = fields.Date.today()
company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id
task_domain = [('company_id', '=', company.id)]
if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id']))
if not self.env.user.has_group('project.group_project_manager'): task_domain.append(
('user_ids', 'in', [self.env.user.id]))
tasks = self.env['project.task'].search(task_domain)
overdue = today_count = upcoming = 0
for task in tasks:
if task.date_deadline:
# ✅ SAFE CONVERSION
d = fields.Date.to_date(task.date_deadline)
if d and d < today:
overdue += 1
elif d and d == today:
today_count += 1
elif d:
upcoming += 1
return [{'name': 'Overdue', 'value': overdue}, {'name': 'Today', 'value': today_count},
{'name': 'Upcoming', 'value': upcoming}]
@api.model
def get_priority_wise_tasks(self, filters=None, company_id=None):
if filters is None: filters = {}
company = self.env['res.company'].browse(company_id) if company_id else self.env.user.company_id
task_domain = [('company_id', '=', company.id)]
if filters.get('project_id'): task_domain.append(('project_id', '=', filters['project_id']))
if not self.env.user.has_group('project.group_project_manager'): task_domain.append(
('user_ids', 'in', [self.env.user.id]))
tasks = self.env['project.task'].search(task_domain)
priority_data = defaultdict(int)
priority_map = {'0': 'Low', '1': 'Medium', '2': 'High', '3': 'Urgent'}
for task in tasks:
priority_data[priority_map.get(str(task.priority), 'Normal')] += 1
return [{'name': k, 'value': v} for k, v in priority_data.items()]
@api.model
def get_companies(self):
user = self.env.user
companies = user.company_ids if user.has_group('base.group_multi_company') else user.company_id
return [{'id': c.id, 'name': c.name} for c in companies]