Files
daily_odoo_timesheet/daily_timesheet.py
2026-05-28 10:22:37 +07:00

395 lines
15 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Ambil semua commit hari ini dari semua repo, distribusikan 8 jam pro-rata,
lalu upload ke Odoo timesheet.
Pro-rata: bobot = time span per task (last commit - first commit).
Single commit mendapat minimum 30 menit.
Contoh: Task A span 2 jam + Task B span 1.5 jam + Task C 1 commit →
bobot: 120 + 90 + 30 = 240 menit
Task A = 8 × 120/240 = 4.00h, Task B = 3.00h, Task C = 1.00h
"""
import json
import re
import subprocess
import urllib.request
import urllib.error
import argparse
from datetime import date, datetime, timedelta
from collections import defaultdict
from decimal import Decimal, ROUND_HALF_UP
BASE_URL = "https://odoo.minipc.sismedika.biz.id"
TOTAL_HOURS = 8
PROJECT_MAP = {
"CPONE": 123,
"IBL": 186,
"Support Pramita": 70,
"SAS": 92,
"Support Kedungdoro": 77,
}
REPOS = [
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/BE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/FE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE_DASHBOARD", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_KD", "project": "Support Kedungdoro"},
{"path": "/Users/fajrihardhitamurti/REPO_GITLAB_PRAMITA/bisone", "project": "Support Pramita"},
]
COMMIT_RE = re.compile(r"^([A-Z0-9]+)\s*-\s*(.+)$")
# ── HTTP ──────────────────────────────────────────────────────────────────────
def build_headers(session_id: str) -> dict:
return {
"accept": "*/*",
"content-type": "application/json",
"origin": BASE_URL,
"referer": f"{BASE_URL}/web",
"user-agent": "Mozilla/5.0",
"cookie": f"frontend_lang=en_US; cids=1; session_id={session_id}; tz=Asia/Jakarta",
}
def odoo_call(session_id: str, endpoint: str, payload: dict) -> dict:
url = f"{BASE_URL}{endpoint}"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
try:
with urllib.request.urlopen(req) as resp:
response = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
raise RuntimeError(f"HTTP {e.code}: {e.read().decode()}") from e
if "error" in response:
err = response["error"]
raise RuntimeError(f"Odoo error: {err.get('message')}")
return response
# ── Odoo helpers ──────────────────────────────────────────────────────────────
def check_session(session_id: str) -> bool:
"""Return True jika session masih valid, False jika expired."""
payload = {"jsonrpc": "2.0", "method": "call", "id": 1, "params": {}}
url = f"{BASE_URL}/web/session/get_session_info"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
try:
with urllib.request.urlopen(req) as resp:
response = json.loads(resp.read().decode("utf-8"))
uid = response.get("result", {}).get("uid")
return bool(uid)
except Exception:
return False
def notify_session_expired():
import os
os.system(
'osascript -e \'display notification '
'"Session Odoo expired. Buka run_daily.sh dan ganti SESSION_ID." '
'with title "⚠️ Timesheet Gagal" '
'subtitle "Session ID perlu diperbarui" '
'sound name "Basso"\''
)
print("SESSION EXPIRED — update SESSION_ID di run_daily.sh")
def search_task(session_id: str, code: str, project_id: int) -> tuple[int, str] | None:
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "project.task", "method": "name_search", "args": [],
"kwargs": {
"name": f"[{code}]", "operator": "ilike",
"args": [
"&", "&", "&",
["company_id", "=", 1],
["project_id.allow_timesheets", "=", True],
["stage_id.fold", "=", False],
["project_id", "=", project_id],
],
"limit": 1,
"context": {
"lang": "en_US", "tz": "Asia/Jakarta", "uid": 41,
"allowed_company_ids": [1], "is_timesheet": 1,
"default_project_id": project_id,
},
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/project.task/name_search", payload)
items = result.get("result", [])
return tuple(items[0]) if items else None
def upload_timesheet(session_id: str, entry: dict, user_id: int) -> int:
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "account.analytic.line", "method": "create",
"args": [entry],
"kwargs": {
"context": {
"lang": "en_US", "tz": "Asia/Jakarta", "uid": user_id,
"allowed_company_ids": [1], "is_timesheet": 1,
}
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/account.analytic.line/create", payload)
return result.get("result")
# ── Git ───────────────────────────────────────────────────────────────────────
def get_commits_today(repo_path: str, author: str, today: str) -> list[dict]:
until = (date.fromisoformat(today) + timedelta(days=1)).isoformat()
cmd = [
"git", "-C", repo_path, "log",
f"--author={author}",
f"--after={today}",
f"--before={until}",
"--format=%h|%ad|%s",
"--date=format:%Y-%m-%d %H:%M",
]
try:
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True)
except subprocess.CalledProcessError:
return []
commits = []
for line in out.strip().splitlines():
if not line:
continue
parts = line.split("|", 2)
if len(parts) == 3:
dt = datetime.strptime(parts[1], "%Y-%m-%d %H:%M")
commits.append({
"hash": parts[0],
"date": dt.strftime("%Y-%m-%d"),
"time": dt.strftime("%H:%M"),
"dt": dt,
"message": parts[2],
})
return commits
# ── Pro-rata ──────────────────────────────────────────────────────────────────
SINGLE_COMMIT_MINUTES = 30 # bobot minimum untuk task dengan 1 commit
def span_minutes(timestamps: list[datetime]) -> int:
"""Menit antara commit pertama dan terakhir. Minimum 30 menit."""
if len(timestamps) <= 1:
return SINGLE_COMMIT_MINUTES
return max(
int((max(timestamps) - min(timestamps)).total_seconds() / 60),
SINGLE_COMMIT_MINUTES,
)
def distribute_hours(weights: list[int], total: float = 8.0) -> list[float]:
"""Pro-rata berdasarkan bobot (menit span). Sum = total. Adjustment di entry terakhir."""
total_weight = sum(weights)
raw = [Decimal(str(total)) * Decimal(w) / Decimal(total_weight) for w in weights]
rounded = [float(r.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)) for r in raw]
diff = round(total - sum(rounded), 2)
rounded[-1] = round(rounded[-1] + diff, 2)
return rounded
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
today = date.today().isoformat()
parser = argparse.ArgumentParser(description="Daily timesheet: commit → Odoo (8 jam pro-rata)")
parser.add_argument("--session-id", required=True, help="session_id cookie Odoo")
parser.add_argument("--author", required=True, help="Git author filter, e.g. 'fajri'")
parser.add_argument("--user-id", required=True, type=int, help="ID user Odoo")
parser.add_argument("--employee-id", required=True, type=int, help="ID employee Odoo")
parser.add_argument("--date", default=today, help=f"Tanggal YYYY-MM-DD (default: {today})")
parser.add_argument("--dry-run", action="store_true", help="Preview saja, tidak upload")
parser.add_argument("--save-pending", action="store_true", help="Simpan ke pending.json + notifikasi macOS")
args = parser.parse_args()
print(f"Tanggal : {args.date}")
print(f"Author : {args.author}")
print(f"Mode : {'DRY RUN' if args.dry_run else 'UPLOAD'}")
print("=" * 65)
# Cek session sebelum mulai
if not check_session(args.session_id):
notify_session_expired()
return
# 1. Kumpulkan semua commit hari ini dari semua repo
raw_commits = []
for repo in REPOS:
commits = get_commits_today(repo["path"], args.author, args.date)
repo_name = repo["path"].split("/")[-1]
for c in commits:
m = COMMIT_RE.match(c["message"])
if m:
raw_commits.append({
"code": m.group(1),
"description": m.group(2),
"date": c["date"],
"time": c["time"],
"dt": c["dt"],
"hash": c["hash"],
"repo": repo_name,
"project": repo["project"],
"project_id": PROJECT_MAP[repo["project"]],
})
else:
print(f" SKIP {c['hash']} ({repo_name}): {c['message']}")
if not raw_commits:
print("\nTidak ada commit dengan format TASKCODE - deskripsi hari ini.")
return
# 2. Cari task_id di Odoo, group by (task_id, project_id)
print(f"\nMencari task di Odoo untuk {len(raw_commits)} commit...\n")
task_cache = {} # (code, project_id) → (task_id, task_name) | None
# Group: key = (task_id, project_id)
groups = defaultdict(lambda: {
"descriptions": [], "timestamps": [], "count": 0,
"project_id": None, "project": None, "task_name": None, "date": None,
})
not_found = []
for c in raw_commits:
cache_key = (c["code"], c["project_id"])
if cache_key not in task_cache:
task_cache[cache_key] = search_task(args.session_id, c["code"], c["project_id"])
task = task_cache[cache_key]
if not task:
not_found.append(c)
print(f" NOT FOUND [{c['code']}] di project {c['project']} ({c['hash']})")
continue
task_id, task_name = task
key = (task_id, c["project_id"])
groups[key]["descriptions"].append(c["description"])
groups[key]["timestamps"].append(c["dt"])
groups[key]["count"] += 1
groups[key]["project_id"] = c["project_id"]
groups[key]["project"] = c["project"]
groups[key]["task_id"] = task_id
groups[key]["task_name"] = task_name
groups[key]["date"] = c["date"]
if not groups:
print("\nTidak ada task yang ditemukan di Odoo.")
return
# 3. Hitung pro-rata berdasarkan time span
keys = list(groups.keys())
spans = [span_minutes(groups[k]["timestamps"]) for k in keys]
hours = distribute_hours(spans, total=float(TOTAL_HOURS))
# 4. Tampilkan preview
print(f"\n{'' * 75}")
print(f" {'PROJECT':<20} {'TASK':<10} {'WAKTU':<13} {'SPAN':>6} {'JAM':>5} DESKRIPSI")
print(f"{'' * 75}")
entries = []
for key, h, span in zip(keys, hours, spans):
g = groups[key]
ts = sorted(g["timestamps"])
if len(ts) == 1:
waktu = ts[0].strftime("%H:%M") + " (1 commit)"
else:
waktu = f"{ts[0].strftime('%H:%M')}{ts[-1].strftime('%H:%M')}"
span_label = f"{span}m" if span > SINGLE_COMMIT_MINUTES else "30m*"
desc = "; ".join(dict.fromkeys(g["descriptions"]))
print(f" {g['project']:<20} {str(g['task_id']):<10} {waktu:<13} {span_label:>6} {h:>5.2f} {desc[:30]}")
entries.append({
"odoo": {
"name": desc,
"date": g["date"],
"unit_amount": h,
"user_id": args.user_id,
"employee_id": args.employee_id,
"task_id": g["task_id"],
"project_id": g["project_id"],
},
"display": g,
})
print(f"{'' * 75}")
total_span = sum(spans)
print(f" {'TOTAL':<20} {'':<10} {'':<13} {total_span:>5}m {sum(hours):>5.2f}")
print(f" * single commit → minimum {SINGLE_COMMIT_MINUTES} menit")
if not_found:
print(f"\n {len(not_found)} commit diabaikan (task tidak ditemukan di Odoo)")
if args.dry_run:
print("\n[DRY RUN] Tidak ada yang diupload.")
return
# 5. Simpan pending.json + notifikasi macOS
if args.save_pending:
import os
pending_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
f"pending_{args.date}.json"
)
pending = {
"date": args.date,
"user_id": args.user_id,
"employee_id": args.employee_id,
"entries": [e["odoo"] for e in entries],
}
with open(pending_path, "w") as f:
json.dump(pending, f, indent=2)
summary = ", ".join(
f"{e['odoo']['unit_amount']}h {e['display']['project']}"
for e in entries
)
msg = f"{len(entries)} entry siap ({summary})"
os.system(
f'osascript -e \'display notification "{msg}" '
f'with title "Timesheet {args.date}" '
f'subtitle "Jalankan: python3 upload_pending.py --session-id XXX"\''
)
print(f"\nPending disimpan: {pending_path}")
print("Notifikasi macOS dikirim.")
return
# 6. Konfirmasi & upload
confirm = input(f"\nUpload {len(entries)} timesheet entry? (y/N): ").strip().lower()
if confirm != "y":
print("Dibatalkan.")
return
print()
ok, fail = 0, 0
for e in entries:
try:
new_id = upload_timesheet(args.session_id, e["odoo"], args.user_id)
g = e["display"]
print(f" OK ID {new_id:<8} {g['project']:<20} {e['odoo']['unit_amount']}h {e['odoo']['name'][:40]}")
ok += 1
except RuntimeError as ex:
print(f" ERR {e['odoo']['name'][:40]}{ex}")
fail += 1
print(f"\nSelesai: {ok} sukses, {fail} gagal.")
if __name__ == "__main__":
main()