Files
daily_odoo_timesheet/daily_timesheet.py
sas.fajri 70356de750 fix: ganti BASE_URL ke odoo.aplikasi.web.id
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-29 10:13:23 +07:00

454 lines
17 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Ambil semua commit hari ini dari semua repo, distribusikan 8 jam pro-rata,
lalu upload ke Odoo timesheet.
Pro-rata: bobot = time span per task (last commit - first commit).
Single commit mendapat minimum 30 menit.
Contoh: Task A span 2 jam + Task B span 1.5 jam + Task C 1 commit →
bobot: 120 + 90 + 30 = 240 menit
Task A = 8 × 120/240 = 4.00h, Task B = 3.00h, Task C = 1.00h
"""
import json
import re
import subprocess
import urllib.request
import urllib.error
import argparse
from datetime import date, datetime, timedelta
from collections import defaultdict
from decimal import Decimal, ROUND_HALF_UP
BASE_URL = "https://odoo.aplikasi.web.id"
TOTAL_HOURS = 8
PROJECT_MAP = {
"CPONE": 123,
"IBL": 186,
"Support Pramita": 70,
"SAS": 92,
"Support Kedungdoro": 77,
}
REPOS = [
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/BE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/FE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE_DASHBOARD", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/BE_IBL", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/FE_IBL", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_KD", "project": "Support Kedungdoro"},
{"path": "/Users/fajrihardhitamurti/REPO_GITLAB_PRAMITA/bisone", "project": "Support Pramita"},
{"path": "/Users/fajrihardhitamurti/SAS_TASK", "project": "SAS"},
]
COMMIT_RE = re.compile(r"^([A-Z0-9]+)\s*-\s*(.+)$")
# ── HTTP ──────────────────────────────────────────────────────────────────────
def build_headers(session_id: str) -> dict:
return {
"accept": "*/*",
"content-type": "application/json",
"origin": BASE_URL,
"referer": f"{BASE_URL}/web",
"user-agent": "Mozilla/5.0",
"cookie": f"frontend_lang=en_US; cids=1; session_id={session_id}; tz=Asia/Jakarta",
}
def odoo_call(session_id: str, endpoint: str, payload: dict) -> dict:
url = f"{BASE_URL}{endpoint}"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
try:
with urllib.request.urlopen(req) as resp:
response = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
raise RuntimeError(f"HTTP {e.code}: {e.read().decode()}") from e
if "error" in response:
err = response["error"]
raise RuntimeError(f"Odoo error: {err.get('message')}")
return response
# ── Odoo helpers ──────────────────────────────────────────────────────────────
def check_session(session_id: str) -> bool:
"""Return True jika session masih valid, False jika expired."""
payload = {"jsonrpc": "2.0", "method": "call", "id": 1, "params": {}}
url = f"{BASE_URL}/web/session/get_session_info"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
try:
with urllib.request.urlopen(req) as resp:
response = json.loads(resp.read().decode("utf-8"))
uid = response.get("result", {}).get("uid")
return bool(uid)
except Exception:
return False
def notify_session_expired():
import os
os.system(
'osascript -e \'display notification '
'"Session Odoo expired. Buka run_daily.sh dan ganti SESSION_ID." '
'with title "⚠️ Timesheet Gagal" '
'subtitle "Session ID perlu diperbarui" '
'sound name "Basso"\''
)
print("SESSION EXPIRED — update SESSION_ID di run_daily.sh")
def search_task_in_project(session_id: str, code: str, project_id: int) -> tuple[int, str] | None:
"""Cari task di satu project spesifik."""
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "project.task", "method": "name_search", "args": [],
"kwargs": {
"name": f"[{code}]", "operator": "ilike",
"args": [
"&", "&", "&",
["company_id", "=", 1],
["project_id.allow_timesheets", "=", True],
["stage_id.fold", "=", False],
["project_id", "=", project_id],
],
"limit": 1,
"context": {
"lang": "en_US", "tz": "Asia/Jakarta", "uid": 41,
"allowed_company_ids": [1], "is_timesheet": 1,
"default_project_id": project_id,
},
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/project.task/name_search", payload)
items = result.get("result", [])
return tuple(items[0]) if items else None
def search_task(session_id: str, code: str, project_id: int) -> tuple[int, str, int] | None:
"""
Cari task mulai dari project default repo.
Kalau tidak ketemu, fallback cari di semua project lain.
Return (task_id, task_name, actual_project_id).
"""
result = search_task_in_project(session_id, code, project_id)
if result:
return result[0], result[1], project_id
for name, pid in PROJECT_MAP.items():
if pid == project_id:
continue
result = search_task_in_project(session_id, code, pid)
if result:
return result[0], result[1], pid
return None
def upload_timesheet(session_id: str, entry: dict, user_id: int) -> int:
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "account.analytic.line", "method": "create",
"args": [entry],
"kwargs": {
"context": {
"lang": "en_US", "tz": "Asia/Jakarta", "uid": user_id,
"allowed_company_ids": [1], "is_timesheet": 1,
}
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/account.analytic.line/create", payload)
return result.get("result")
# ── Git ───────────────────────────────────────────────────────────────────────
def get_commits_today(repo_path: str, author: str, today: str) -> list[dict]:
cmd = [
"git", "-C", repo_path, "log",
f"--author={author}",
f"--since={today} 00:00:00",
f"--until={today} 23:59:59",
"--format=%h|%ad|%s",
"--date=format:%Y-%m-%d %H:%M",
]
try:
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True)
except subprocess.CalledProcessError:
return []
commits = []
for line in out.strip().splitlines():
if not line:
continue
parts = line.split("|", 2)
if len(parts) == 3:
dt = datetime.strptime(parts[1], "%Y-%m-%d %H:%M")
commits.append({
"hash": parts[0],
"date": dt.strftime("%Y-%m-%d"),
"time": dt.strftime("%H:%M"),
"dt": dt,
"message": parts[2],
})
return commits
# ── Pro-rata ──────────────────────────────────────────────────────────────────
SINGLE_COMMIT_MINUTES = 30 # bobot minimum untuk task dengan 1 commit
WORK_START_HOUR = 8 # jam mulai kerja
def span_minutes(timestamps: list[datetime], extend_to_work_start: bool = False) -> int:
"""
Menit dari start sampai commit terakhir. Minimum 30 menit.
Jika extend_to_work_start=True, start dihitung dari jam 08:00
(berlaku untuk task yang punya commit paling awal di hari itu).
"""
first = min(timestamps)
last = max(timestamps)
if extend_to_work_start:
start = first.replace(hour=WORK_START_HOUR, minute=0, second=0, microsecond=0)
else:
start = first
return max(
int((last - start).total_seconds() / 60),
SINGLE_COMMIT_MINUTES,
)
def distribute_hours(weights: list[int], total: float = 8.0) -> list[float]:
"""Pro-rata berdasarkan bobot (menit span). Sum = total. Adjustment di entry terakhir."""
total_weight = sum(weights)
raw = [Decimal(str(total)) * Decimal(w) / Decimal(total_weight) for w in weights]
rounded = [float(r.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)) for r in raw]
diff = round(total - sum(rounded), 2)
rounded[-1] = round(rounded[-1] + diff, 2)
return rounded
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
today = date.today().isoformat()
parser = argparse.ArgumentParser(description="Daily timesheet: commit → Odoo (8 jam pro-rata)")
parser.add_argument("--session-id", required=True, help="session_id cookie Odoo")
parser.add_argument("--author", required=True, help="Git author filter, e.g. 'fajri'")
parser.add_argument("--user-id", required=True, type=int, help="ID user Odoo")
parser.add_argument("--employee-id", required=True, type=int, help="ID employee Odoo")
parser.add_argument("--date", default=today, help=f"Tanggal YYYY-MM-DD (default: {today})")
parser.add_argument("--dry-run", action="store_true", help="Preview saja, tidak upload")
parser.add_argument("--save-pending", action="store_true", help="Simpan ke pending.json + notifikasi macOS")
parser.add_argument("--yes", action="store_true", help="Skip konfirmasi, langsung upload")
args = parser.parse_args()
print(f"Tanggal : {args.date}")
print(f"Author : {args.author}")
if args.dry_run:
mode = "DRY RUN"
elif args.save_pending:
mode = "SAVE PENDING"
else:
mode = "UPLOAD"
print(f"Mode : {mode}")
print("=" * 65)
# Cek session sebelum mulai
if not check_session(args.session_id):
notify_session_expired()
return
# 1. Kumpulkan semua commit hari ini dari semua repo
raw_commits = []
for repo in REPOS:
commits = get_commits_today(repo["path"], args.author, args.date)
repo_name = repo["path"].split("/")[-1]
for c in commits:
m = COMMIT_RE.match(c["message"])
if m:
raw_commits.append({
"code": m.group(1),
"description": m.group(2),
"date": c["date"],
"time": c["time"],
"dt": c["dt"],
"hash": c["hash"],
"repo": repo_name,
"project": repo["project"],
"project_id": PROJECT_MAP[repo["project"]],
})
else:
print(f" SKIP {c['hash']} ({repo_name}): {c['message']}")
if not raw_commits:
print("\nTidak ada commit dengan format TASKCODE - deskripsi hari ini.")
return
# 2. Cari task_id di Odoo, group by (task_id, project_id)
print(f"\nMencari task di Odoo untuk {len(raw_commits)} commit...\n")
task_cache = {} # (code, project_id) → (task_id, task_name) | None
# Group: key = (task_id, project_id)
groups = defaultdict(lambda: {
"descriptions": [], "timestamps": [], "count": 0,
"project_id": None, "project": None, "task_name": None, "date": None,
})
not_found = []
for c in raw_commits:
cache_key = (c["code"], c["project_id"])
if cache_key not in task_cache:
task_cache[cache_key] = search_task(args.session_id, c["code"], c["project_id"])
task = task_cache[cache_key]
if not task:
not_found.append(c)
print(f" NOT FOUND [{c['code']}] di semua project ({c['hash']})")
continue
task_id, task_name, actual_project_id = task
if actual_project_id != c["project_id"]:
actual_name = next(k for k, v in PROJECT_MAP.items() if v == actual_project_id)
print(f" FALLBACK [{c['code']}] tidak ada di {c['project']}, ditemukan di {actual_name}")
key = (task_id, actual_project_id)
groups[key]["descriptions"].append(c["description"])
groups[key]["timestamps"].append(c["dt"])
groups[key]["count"] += 1
groups[key]["project_id"] = actual_project_id
groups[key]["project"] = next(k for k, v in PROJECT_MAP.items() if v == actual_project_id)
groups[key]["task_id"] = task_id
groups[key]["task_name"] = task_name
groups[key]["date"] = c["date"]
if not groups:
print("\nTidak ada task yang ditemukan di Odoo.")
return
# 3. Hitung pro-rata berdasarkan time span
keys = list(groups.keys())
# Task yang punya commit paling awal di hari itu → spannya dihitung dari 08:00
global_first_dt = min(min(groups[k]["timestamps"]) for k in keys)
first_task_key = next(
k for k in keys if global_first_dt in groups[k]["timestamps"]
)
spans = [
span_minutes(groups[k]["timestamps"], extend_to_work_start=(k == first_task_key))
for k in keys
]
hours = distribute_hours(spans, total=float(TOTAL_HOURS))
# 4. Tampilkan preview
print(f"\n{'' * 78}")
print(f" {'PROJECT':<20} {'TASK':<10} {'WAKTU':<15} {'SPAN':>6} {'JAM':>5} DESKRIPSI")
print(f"{'' * 78}")
entries = []
for key, h, span in zip(keys, hours, spans):
g = groups[key]
ts = sorted(g["timestamps"])
is_first = (key == first_task_key)
if is_first:
start_label = f"08:00→{ts[-1].strftime('%H:%M')}"
elif len(ts) == 1:
start_label = ts[0].strftime("%H:%M") + " (1x)"
else:
start_label = f"{ts[0].strftime('%H:%M')}{ts[-1].strftime('%H:%M')}"
span_label = f"{span}m{'' if is_first else ''}" if span > SINGLE_COMMIT_MINUTES else "30m*"
desc = "; ".join(dict.fromkeys(g["descriptions"]))
print(f" {g['project']:<20} {str(g['task_id']):<10} {start_label:<15} {span_label:>6} {h:>5.2f} {desc[:28]}")
entries.append({
"odoo": {
"name": desc,
"date": g["date"],
"unit_amount": h,
"user_id": args.user_id,
"employee_id": args.employee_id,
"task_id": g["task_id"],
"project_id": g["project_id"],
},
"display": g,
})
print(f"{'' * 78}")
total_span = sum(spans)
print(f" {'TOTAL':<20} {'':<10} {'':<15} {total_span:>5}m {sum(hours):>5.2f}")
print(f" ↑ span dihitung dari 08:00 | * single commit → minimum {SINGLE_COMMIT_MINUTES} menit")
if not_found:
print(f"\n {len(not_found)} commit diabaikan (task tidak ditemukan di Odoo)")
if args.dry_run:
print("\n[DRY RUN] Tidak ada yang diupload.")
return
# 5. Simpan pending.json + notifikasi macOS
if args.save_pending:
import os
pending_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
f"pending_{args.date}.json"
)
pending = {
"date": args.date,
"user_id": args.user_id,
"employee_id": args.employee_id,
"entries": [e["odoo"] for e in entries],
}
with open(pending_path, "w") as f:
json.dump(pending, f, indent=2)
summary = ", ".join(
f"{e['odoo']['unit_amount']}h {e['display']['project']}"
for e in entries
)
msg = f"{len(entries)} entry siap ({summary})"
os.system(
f'osascript -e \'display notification "{msg}" '
f'with title "Timesheet {args.date}" '
f'subtitle "Jalankan: python3 upload_pending.py --session-id XXX"\''
)
print(f"\nPending disimpan: {pending_path}")
print("Notifikasi macOS dikirim.")
return
# 6. Konfirmasi & upload
if not args.yes:
confirm = input(f"\nUpload {len(entries)} timesheet entry? (y/N): ").strip().lower()
if confirm != "y":
print("Dibatalkan.")
return
print()
ok, fail = 0, 0
for e in entries:
try:
new_id = upload_timesheet(args.session_id, e["odoo"], args.user_id)
g = e["display"]
print(f" OK ID {new_id:<8} {g['project']:<20} {e['odoo']['unit_amount']}h {e['odoo']['name'][:40]}")
ok += 1
except RuntimeError as ex:
print(f" ERR {e['odoo']['name'][:40]}{ex}")
fail += 1
print(f"\nSelesai: {ok} sukses, {fail} gagal.")
if __name__ == "__main__":
main()