#!/usr/bin/env python3 """ Ambil semua commit hari ini dari semua repo, distribusikan 8 jam pro-rata, lalu upload ke Odoo timesheet. Pro-rata per commit: bobot = gap waktu dari commit sebelumnya (atau dari 08:00 untuk commit pertama). Setiap commit → satu timesheet entry terpisah. Contoh (commit berurutan): commit 1 09:00 → span 08:00→09:00 = 60m commit 2 10:30 → span 09:00→10:30 = 90m commit 3 10:30 → gap 0 → minimum 30m Total = 180m → commit 1 = 8×60/180=2.67h, commit 2 = 4.00h, commit 3 = 1.33h """ import json import os import re import subprocess import urllib.request import urllib.error import argparse from datetime import date, datetime, timedelta from decimal import Decimal, ROUND_HALF_UP from pathlib import Path def load_env() -> dict: """Baca .env dari direktori script, return dict key=value.""" env_path = Path(__file__).parent / ".env" env = {} if env_path.exists(): for line in env_path.read_text().splitlines(): line = line.strip() if line and not line.startswith("#") and "=" in line: k, v = line.split("=", 1) env[k.strip()] = v.strip() return env ENV = load_env() BASE_URL = "https://odoo.aplikasi.web.id" TOTAL_HOURS = 8 PROJECT_MAP = { "CPONE": 123, "IBL": 186, "Support Pramita": 70, "SAS": 92, "Support Kedungdoro": 77, } REPOS = [ {"path": "/Users/fajrihardhitamurti/REPO_CPONE/BE_CPONE", "project": "CPONE"}, {"path": "/Users/fajrihardhitamurti/REPO_CPONE/FE_CPONE", "project": "CPONE"}, {"path": "/Users/fajrihardhitamurti/REPO_CPONE/LIVE_CPONE", "project": "CPONE"}, {"path": "/Users/fajrihardhitamurti/REPO_CPONE_DASHBOARD", "project": "CPONE"}, {"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/BE_IBL/one-api-lab", "project": "IBL"}, {"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/FE_IBL", "project": "IBL"}, {"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/MERGE_REPORT/ibl_merge_report_service", "project": "IBL"}, {"path": "/Users/fajrihardhitamurti/REPO_GITEA_KD", "project": "Support Kedungdoro"}, {"path": "/Users/fajrihardhitamurti/REPO_GITLAB_PRAMITA/bisone", "project": "Support Pramita"}, {"path": "/Users/fajrihardhitamurti/SAS_TASK", "project": "SAS"}, ] COMMIT_RE = re.compile(r"^([A-Z0-9]+)\s*-\s*(.+)$") # ── HTTP ────────────────────────────────────────────────────────────────────── def build_headers(session_id: str) -> dict: return { "accept": "*/*", "content-type": "application/json", "origin": BASE_URL, "referer": f"{BASE_URL}/web", "user-agent": "Mozilla/5.0", "cookie": f"frontend_lang=en_US; cids=1; session_id={session_id}; tz=Asia/Jakarta", } def odoo_call(session_id: str, endpoint: str, payload: dict) -> dict: url = f"{BASE_URL}{endpoint}" body = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST") try: with urllib.request.urlopen(req) as resp: response = json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as e: raise RuntimeError(f"HTTP {e.code}: {e.read().decode()}") from e if "error" in response: err = response["error"] raise RuntimeError(f"Odoo error: {err.get('message')}") return response # ── Odoo helpers ────────────────────────────────────────────────────────────── def check_session(session_id: str) -> bool: """Return True jika session masih valid, False jika expired.""" payload = {"jsonrpc": "2.0", "method": "call", "id": 1, "params": {}} url = f"{BASE_URL}/web/session/get_session_info" body = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST") try: with urllib.request.urlopen(req) as resp: response = json.loads(resp.read().decode("utf-8")) uid = response.get("result", {}).get("uid") return bool(uid) except Exception: return False def notify_session_expired(): import os os.system( 'osascript -e \'display notification ' '"Session Odoo expired. Buka run_daily.sh dan ganti SESSION_ID." ' 'with title "⚠️ Timesheet Gagal" ' 'subtitle "Session ID perlu diperbarui" ' 'sound name "Basso"\'' ) print("SESSION EXPIRED — update SESSION_ID di run_daily.sh") def search_task_in_project(session_id: str, code: str, project_id: int) -> tuple[int, str] | None: """Cari task di satu project spesifik.""" payload = { "id": 1, "jsonrpc": "2.0", "method": "call", "params": { "model": "project.task", "method": "name_search", "args": [], "kwargs": { "name": f"[{code}]", "operator": "ilike", "args": [ "&", "&", "&", ["company_id", "=", 1], ["project_id.allow_timesheets", "=", True], ["stage_id.fold", "=", False], ["project_id", "=", project_id], ], "limit": 1, "context": { "lang": "en_US", "tz": "Asia/Jakarta", "uid": 41, "allowed_company_ids": [1], "is_timesheet": 1, "default_project_id": project_id, }, }, }, } result = odoo_call(session_id, "/web/dataset/call_kw/project.task/name_search", payload) items = result.get("result", []) return tuple(items[0]) if items else None def search_task(session_id: str, code: str, project_id: int) -> tuple[int, str, int] | None: """ Cari task mulai dari project default repo. Kalau tidak ketemu, fallback cari di semua project lain. Return (task_id, task_name, actual_project_id). """ result = search_task_in_project(session_id, code, project_id) if result: return result[0], result[1], project_id for name, pid in PROJECT_MAP.items(): if pid == project_id: continue result = search_task_in_project(session_id, code, pid) if result: return result[0], result[1], pid return None def upload_timesheet(session_id: str, entry: dict, user_id: int) -> int: payload = { "id": 1, "jsonrpc": "2.0", "method": "call", "params": { "model": "account.analytic.line", "method": "create", "args": [entry], "kwargs": { "context": { "lang": "en_US", "tz": "Asia/Jakarta", "uid": user_id, "allowed_company_ids": [1], "is_timesheet": 1, } }, }, } result = odoo_call(session_id, "/web/dataset/call_kw/account.analytic.line/create", payload) return result.get("result") # ── Git ─────────────────────────────────────────────────────────────────────── def get_commits_today(repo_path: str, author: str, today: str) -> list[dict]: cmd = [ "git", "-C", repo_path, "log", f"--author={author}", f"--since={today} 00:00:00", f"--until={today} 23:59:59", "--format=%h|%ad|%s", "--date=format:%Y-%m-%d %H:%M", ] try: out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True) except subprocess.CalledProcessError: return [] commits = [] for line in out.strip().splitlines(): if not line: continue parts = line.split("|", 2) if len(parts) == 3: dt = datetime.strptime(parts[1], "%Y-%m-%d %H:%M") # Filter ketat: pastikan author date = today (bukan commit dari hari lain yg di-rebase) if dt.strftime("%Y-%m-%d") != today: continue commits.append({ "hash": parts[0], "date": dt.strftime("%Y-%m-%d"), "time": dt.strftime("%H:%M"), "dt": dt, "message": parts[2], }) return commits # ── Pro-rata ────────────────────────────────────────────────────────────────── SINGLE_COMMIT_MINUTES = 30 # bobot minimum untuk task dengan 1 commit WORK_START_HOUR = 8 # jam mulai kerja def commit_spans(sorted_commits: list[dict]) -> list[int]: """ Hitung span (menit) per commit, diurutkan secara kronologis. - Commit pertama: span dari jam 08:00 ke waktu commit (min 30 menit). - Commit berikutnya: gap dari commit sebelumnya (min 30 menit). """ result = [] for i, c in enumerate(sorted_commits): if i == 0: work_start = c["dt"].replace(hour=WORK_START_HOUR, minute=0, second=0, microsecond=0) span = max(int((c["dt"] - work_start).total_seconds() / 60), SINGLE_COMMIT_MINUTES) else: span = max(int((c["dt"] - sorted_commits[i - 1]["dt"]).total_seconds() / 60), SINGLE_COMMIT_MINUTES) result.append(span) return result def distribute_hours(weights: list[int], total: float = 8.0) -> list[float]: """Pro-rata berdasarkan bobot (menit span). Sum = total. Adjustment di entry terbesar.""" total_weight = sum(weights) raw = [Decimal(str(total)) * Decimal(w) / Decimal(total_weight) for w in weights] rounded = [float(r.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)) for r in raw] diff = round(total - sum(rounded), 2) # Terapkan koreksi ke entry terbesar agar tidak bisa jadi 0 largest_idx = rounded.index(max(rounded)) rounded[largest_idx] = round(rounded[largest_idx] + diff, 2) return rounded # ── Main ────────────────────────────────────────────────────────────────────── def main(): today = date.today().isoformat() parser = argparse.ArgumentParser(description="Daily timesheet: commit → Odoo (8 jam pro-rata)") parser.add_argument("--session-id", default=ENV.get("SESSION_ID"), help="session_id cookie Odoo") parser.add_argument("--author", default=ENV.get("AUTHOR"), help="Git author filter") parser.add_argument("--user-id", default=ENV.get("USER_ID"), type=int, help="ID user Odoo") parser.add_argument("--employee-id", default=ENV.get("EMPLOYEE_ID"), type=int, help="ID employee Odoo") parser.add_argument("--date", default=today, help=f"Tanggal YYYY-MM-DD (default: {today})") parser.add_argument("--dry-run", action="store_true", help="Preview saja, tidak upload") parser.add_argument("--save-pending", action="store_true", help="Simpan ke pending.json + notifikasi macOS") parser.add_argument("--yes", action="store_true", help="Skip konfirmasi, langsung upload") args = parser.parse_args() print(f"Tanggal : {args.date}") print(f"Author : {args.author}") if args.dry_run: mode = "DRY RUN" elif args.save_pending: mode = "SAVE PENDING" else: mode = "UPLOAD" print(f"Mode : {mode}") print("=" * 65) # Cek session sebelum mulai if not check_session(args.session_id): notify_session_expired() return # 1. Kumpulkan semua commit hari ini dari semua repo raw_commits = [] for repo in REPOS: commits = get_commits_today(repo["path"], args.author, args.date) repo_name = repo["path"].split("/")[-1] for c in commits: m = COMMIT_RE.match(c["message"]) if m: raw_commits.append({ "code": m.group(1), "description": m.group(2), "date": c["date"], "time": c["time"], "dt": c["dt"], "hash": c["hash"], "repo": repo_name, "project": repo["project"], "project_id": PROJECT_MAP[repo["project"]], }) else: print(f" SKIP {c['hash']} ({repo_name}): {c['message']}") if not raw_commits: print("\nTidak ada commit dengan format TASKCODE - deskripsi hari ini.") return # 2. Cari task_id di Odoo untuk setiap commit print(f"\nMencari task di Odoo untuk {len(raw_commits)} commit...\n") task_cache = {} # (code, project_id) → (task_id, task_name, actual_pid) | None resolved = [] not_found = [] for c in raw_commits: cache_key = (c["code"], c["project_id"]) if cache_key not in task_cache: task_cache[cache_key] = search_task(args.session_id, c["code"], c["project_id"]) task = task_cache[cache_key] if not task: not_found.append(c) print(f" NOT FOUND [{c['code']}] di semua project ({c['hash']})") continue task_id, task_name, actual_project_id = task if actual_project_id != c["project_id"]: actual_name = next(k for k, v in PROJECT_MAP.items() if v == actual_project_id) print(f" FALLBACK [{c['code']}] tidak ada di {c['project']}, ditemukan di {actual_name}") resolved.append({ "task_id": task_id, "task_name": task_name, "project_id": actual_project_id, "project": next(k for k, v in PROJECT_MAP.items() if v == actual_project_id), "description": c["description"], "date": c["date"], "time": c["time"], "dt": c["dt"], "hash": c["hash"], }) if not resolved: print("\nTidak ada task yang ditemukan di Odoo.") return # 3. Sort by time, hitung span per commit resolved.sort(key=lambda x: x["dt"]) spans = commit_spans(resolved) hours = distribute_hours(spans, total=float(TOTAL_HOURS)) # 4. Tampilkan preview (satu baris per commit) print(f"\n{'─' * 78}") print(f" {'PROJECT':<20} {'TASK':<10} {'WAKTU':<10} {'SPAN':>6} {'JAM':>5} DESKRIPSI") print(f"{'─' * 78}") entries = [] for i, (c, h, span) in enumerate(zip(resolved, hours, spans)): is_first = (i == 0) if is_first: time_label = f"08:00→{c['dt'].strftime('%H:%M')}" else: time_label = c["dt"].strftime("%H:%M") if span == SINGLE_COMMIT_MINUTES: span_label = f"{'30m↑' if is_first else '30m*'}" else: span_label = f"{span}m{'↑' if is_first else ''}" print(f" {c['project']:<20} {str(c['task_id']):<10} {time_label:<10} {span_label:>6} {h:>5.2f} {c['description'][:28]}") entries.append({ "odoo": { "name": c["description"], "date": c["date"], "unit_amount": h, "user_id": args.user_id, "employee_id": args.employee_id, "task_id": c["task_id"], "project_id": c["project_id"], }, "display": c, }) print(f"{'─' * 78}") total_span = sum(spans) print(f" {'TOTAL':<20} {'':<10} {'':<10} {total_span:>5}m {sum(hours):>5.2f}") print(f" ↑ span dari 08:00 | * gap ke commit sebelumnya → minimum {SINGLE_COMMIT_MINUTES} menit") if not_found: print(f"\n {len(not_found)} commit diabaikan (task tidak ditemukan di Odoo)") if args.dry_run: print("\n[DRY RUN] Tidak ada yang diupload.") return # 5. Simpan pending.json + notifikasi macOS if args.save_pending: import os pending_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), f"pending_{args.date}.json" ) pending = { "date": args.date, "user_id": args.user_id, "employee_id": args.employee_id, "entries": [e["odoo"] for e in entries], } with open(pending_path, "w") as f: json.dump(pending, f, indent=2) summary = ", ".join( f"{e['odoo']['unit_amount']}h {e['display']['project']}" for e in entries[:3] ) if len(entries) > 3: summary += f" +{len(entries) - 3} lagi" msg = f"{len(entries)} entry siap ({summary})" os.system( f'osascript -e \'display notification "{msg}" ' f'with title "Timesheet {args.date}" ' f'subtitle "Jalankan: python3 upload_pending.py --session-id XXX"\'' ) print(f"\nPending disimpan: {pending_path}") print("Notifikasi macOS dikirim.") return # 6. Konfirmasi & upload if not args.yes: confirm = input(f"\nUpload {len(entries)} timesheet entry? (y/N): ").strip().lower() if confirm != "y": print("Dibatalkan.") return print() ok, fail = 0, 0 for e in entries: try: new_id = upload_timesheet(args.session_id, e["odoo"], args.user_id) print(f" OK ID {new_id:<8} {e['display']['project']:<20} {e['odoo']['unit_amount']}h {e['odoo']['name'][:40]}") ok += 1 except RuntimeError as ex: print(f" ERR {e['odoo']['name'][:40]} → {ex}") fail += 1 print(f"\nSelesai: {ok} sukses, {fail} gagal.") if __name__ == "__main__": main()