Files
daily_odoo_timesheet/daily_timesheet.py
sas.fajri dd1094e48a fix: koreksi pembulatan ke entry terbesar, filter author date ketat
- distribute_hours: simpan diff pembulatan ke entry terbesar (bukan
  terakhir) agar tidak bisa jadi 0.00h saat banyak entry
- get_commits_today: filter strict author date = today, cegah commit
  dari hari lain yg di-rebase/push masuk ke timesheet hari ini

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 17:26:11 +07:00

464 lines
18 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Ambil semua commit hari ini dari semua repo, distribusikan 8 jam pro-rata,
lalu upload ke Odoo timesheet.
Pro-rata per commit: bobot = gap waktu dari commit sebelumnya (atau dari 08:00 untuk commit pertama).
Setiap commit → satu timesheet entry terpisah.
Contoh (commit berurutan):
commit 1 09:00 → span 08:00→09:00 = 60m
commit 2 10:30 → span 09:00→10:30 = 90m
commit 3 10:30 → gap 0 → minimum 30m
Total = 180m → commit 1 = 8×60/180=2.67h, commit 2 = 4.00h, commit 3 = 1.33h
"""
import json
import os
import re
import subprocess
import urllib.request
import urllib.error
import argparse
from datetime import date, datetime, timedelta
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
def load_env() -> dict:
"""Baca .env dari direktori script, return dict key=value."""
env_path = Path(__file__).parent / ".env"
env = {}
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
env[k.strip()] = v.strip()
return env
ENV = load_env()
BASE_URL = "https://odoo.aplikasi.web.id"
TOTAL_HOURS = 8
PROJECT_MAP = {
"CPONE": 123,
"IBL": 186,
"Support Pramita": 70,
"SAS": 92,
"Support Kedungdoro": 77,
}
REPOS = [
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/BE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/FE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/LIVE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE_DASHBOARD", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/BE_IBL/one-api-lab", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/FE_IBL", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/MERGE_REPORT/ibl_merge_report_service", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_KD", "project": "Support Kedungdoro"},
{"path": "/Users/fajrihardhitamurti/REPO_GITLAB_PRAMITA/bisone", "project": "Support Pramita"},
{"path": "/Users/fajrihardhitamurti/SAS_TASK", "project": "SAS"},
]
COMMIT_RE = re.compile(r"^([A-Z0-9]+)\s*-\s*(.+)$")
# ── HTTP ──────────────────────────────────────────────────────────────────────
def build_headers(session_id: str) -> dict:
return {
"accept": "*/*",
"content-type": "application/json",
"origin": BASE_URL,
"referer": f"{BASE_URL}/web",
"user-agent": "Mozilla/5.0",
"cookie": f"frontend_lang=en_US; cids=1; session_id={session_id}; tz=Asia/Jakarta",
}
def odoo_call(session_id: str, endpoint: str, payload: dict) -> dict:
url = f"{BASE_URL}{endpoint}"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
try:
with urllib.request.urlopen(req) as resp:
response = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
raise RuntimeError(f"HTTP {e.code}: {e.read().decode()}") from e
if "error" in response:
err = response["error"]
raise RuntimeError(f"Odoo error: {err.get('message')}")
return response
# ── Odoo helpers ──────────────────────────────────────────────────────────────
def check_session(session_id: str) -> bool:
"""Return True jika session masih valid, False jika expired."""
payload = {"jsonrpc": "2.0", "method": "call", "id": 1, "params": {}}
url = f"{BASE_URL}/web/session/get_session_info"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
try:
with urllib.request.urlopen(req) as resp:
response = json.loads(resp.read().decode("utf-8"))
uid = response.get("result", {}).get("uid")
return bool(uid)
except Exception:
return False
def notify_session_expired():
import os
os.system(
'osascript -e \'display notification '
'"Session Odoo expired. Buka run_daily.sh dan ganti SESSION_ID." '
'with title "⚠️ Timesheet Gagal" '
'subtitle "Session ID perlu diperbarui" '
'sound name "Basso"\''
)
print("SESSION EXPIRED — update SESSION_ID di run_daily.sh")
def search_task_in_project(session_id: str, code: str, project_id: int) -> tuple[int, str] | None:
"""Cari task di satu project spesifik."""
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "project.task", "method": "name_search", "args": [],
"kwargs": {
"name": f"[{code}]", "operator": "ilike",
"args": [
"&", "&", "&",
["company_id", "=", 1],
["project_id.allow_timesheets", "=", True],
["stage_id.fold", "=", False],
["project_id", "=", project_id],
],
"limit": 1,
"context": {
"lang": "en_US", "tz": "Asia/Jakarta", "uid": 41,
"allowed_company_ids": [1], "is_timesheet": 1,
"default_project_id": project_id,
},
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/project.task/name_search", payload)
items = result.get("result", [])
return tuple(items[0]) if items else None
def search_task(session_id: str, code: str, project_id: int) -> tuple[int, str, int] | None:
"""
Cari task mulai dari project default repo.
Kalau tidak ketemu, fallback cari di semua project lain.
Return (task_id, task_name, actual_project_id).
"""
result = search_task_in_project(session_id, code, project_id)
if result:
return result[0], result[1], project_id
for name, pid in PROJECT_MAP.items():
if pid == project_id:
continue
result = search_task_in_project(session_id, code, pid)
if result:
return result[0], result[1], pid
return None
def upload_timesheet(session_id: str, entry: dict, user_id: int) -> int:
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "account.analytic.line", "method": "create",
"args": [entry],
"kwargs": {
"context": {
"lang": "en_US", "tz": "Asia/Jakarta", "uid": user_id,
"allowed_company_ids": [1], "is_timesheet": 1,
}
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/account.analytic.line/create", payload)
return result.get("result")
# ── Git ───────────────────────────────────────────────────────────────────────
def get_commits_today(repo_path: str, author: str, today: str) -> list[dict]:
cmd = [
"git", "-C", repo_path, "log",
f"--author={author}",
f"--since={today} 00:00:00",
f"--until={today} 23:59:59",
"--format=%h|%ad|%s",
"--date=format:%Y-%m-%d %H:%M",
]
try:
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True)
except subprocess.CalledProcessError:
return []
commits = []
for line in out.strip().splitlines():
if not line:
continue
parts = line.split("|", 2)
if len(parts) == 3:
dt = datetime.strptime(parts[1], "%Y-%m-%d %H:%M")
# Filter ketat: pastikan author date = today (bukan commit dari hari lain yg di-rebase)
if dt.strftime("%Y-%m-%d") != today:
continue
commits.append({
"hash": parts[0],
"date": dt.strftime("%Y-%m-%d"),
"time": dt.strftime("%H:%M"),
"dt": dt,
"message": parts[2],
})
return commits
# ── Pro-rata ──────────────────────────────────────────────────────────────────
SINGLE_COMMIT_MINUTES = 30 # bobot minimum untuk task dengan 1 commit
WORK_START_HOUR = 8 # jam mulai kerja
def commit_spans(sorted_commits: list[dict]) -> list[int]:
"""
Hitung span (menit) per commit, diurutkan secara kronologis.
- Commit pertama: span dari jam 08:00 ke waktu commit (min 30 menit).
- Commit berikutnya: gap dari commit sebelumnya (min 30 menit).
"""
result = []
for i, c in enumerate(sorted_commits):
if i == 0:
work_start = c["dt"].replace(hour=WORK_START_HOUR, minute=0, second=0, microsecond=0)
span = max(int((c["dt"] - work_start).total_seconds() / 60), SINGLE_COMMIT_MINUTES)
else:
span = max(int((c["dt"] - sorted_commits[i - 1]["dt"]).total_seconds() / 60), SINGLE_COMMIT_MINUTES)
result.append(span)
return result
def distribute_hours(weights: list[int], total: float = 8.0) -> list[float]:
"""Pro-rata berdasarkan bobot (menit span). Sum = total. Adjustment di entry terbesar."""
total_weight = sum(weights)
raw = [Decimal(str(total)) * Decimal(w) / Decimal(total_weight) for w in weights]
rounded = [float(r.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)) for r in raw]
diff = round(total - sum(rounded), 2)
# Terapkan koreksi ke entry terbesar agar tidak bisa jadi 0
largest_idx = rounded.index(max(rounded))
rounded[largest_idx] = round(rounded[largest_idx] + diff, 2)
return rounded
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
today = date.today().isoformat()
parser = argparse.ArgumentParser(description="Daily timesheet: commit → Odoo (8 jam pro-rata)")
parser.add_argument("--session-id", default=ENV.get("SESSION_ID"), help="session_id cookie Odoo")
parser.add_argument("--author", default=ENV.get("AUTHOR"), help="Git author filter")
parser.add_argument("--user-id", default=ENV.get("USER_ID"), type=int, help="ID user Odoo")
parser.add_argument("--employee-id", default=ENV.get("EMPLOYEE_ID"), type=int, help="ID employee Odoo")
parser.add_argument("--date", default=today, help=f"Tanggal YYYY-MM-DD (default: {today})")
parser.add_argument("--dry-run", action="store_true", help="Preview saja, tidak upload")
parser.add_argument("--save-pending", action="store_true", help="Simpan ke pending.json + notifikasi macOS")
parser.add_argument("--yes", action="store_true", help="Skip konfirmasi, langsung upload")
args = parser.parse_args()
print(f"Tanggal : {args.date}")
print(f"Author : {args.author}")
if args.dry_run:
mode = "DRY RUN"
elif args.save_pending:
mode = "SAVE PENDING"
else:
mode = "UPLOAD"
print(f"Mode : {mode}")
print("=" * 65)
# Cek session sebelum mulai
if not check_session(args.session_id):
notify_session_expired()
return
# 1. Kumpulkan semua commit hari ini dari semua repo
raw_commits = []
for repo in REPOS:
commits = get_commits_today(repo["path"], args.author, args.date)
repo_name = repo["path"].split("/")[-1]
for c in commits:
m = COMMIT_RE.match(c["message"])
if m:
raw_commits.append({
"code": m.group(1),
"description": m.group(2),
"date": c["date"],
"time": c["time"],
"dt": c["dt"],
"hash": c["hash"],
"repo": repo_name,
"project": repo["project"],
"project_id": PROJECT_MAP[repo["project"]],
})
else:
print(f" SKIP {c['hash']} ({repo_name}): {c['message']}")
if not raw_commits:
print("\nTidak ada commit dengan format TASKCODE - deskripsi hari ini.")
return
# 2. Cari task_id di Odoo untuk setiap commit
print(f"\nMencari task di Odoo untuk {len(raw_commits)} commit...\n")
task_cache = {} # (code, project_id) → (task_id, task_name, actual_pid) | None
resolved = []
not_found = []
for c in raw_commits:
cache_key = (c["code"], c["project_id"])
if cache_key not in task_cache:
task_cache[cache_key] = search_task(args.session_id, c["code"], c["project_id"])
task = task_cache[cache_key]
if not task:
not_found.append(c)
print(f" NOT FOUND [{c['code']}] di semua project ({c['hash']})")
continue
task_id, task_name, actual_project_id = task
if actual_project_id != c["project_id"]:
actual_name = next(k for k, v in PROJECT_MAP.items() if v == actual_project_id)
print(f" FALLBACK [{c['code']}] tidak ada di {c['project']}, ditemukan di {actual_name}")
resolved.append({
"task_id": task_id,
"task_name": task_name,
"project_id": actual_project_id,
"project": next(k for k, v in PROJECT_MAP.items() if v == actual_project_id),
"description": c["description"],
"date": c["date"],
"time": c["time"],
"dt": c["dt"],
"hash": c["hash"],
})
if not resolved:
print("\nTidak ada task yang ditemukan di Odoo.")
return
# 3. Sort by time, hitung span per commit
resolved.sort(key=lambda x: x["dt"])
spans = commit_spans(resolved)
hours = distribute_hours(spans, total=float(TOTAL_HOURS))
# 4. Tampilkan preview (satu baris per commit)
print(f"\n{'' * 78}")
print(f" {'PROJECT':<20} {'TASK':<10} {'WAKTU':<10} {'SPAN':>6} {'JAM':>5} DESKRIPSI")
print(f"{'' * 78}")
entries = []
for i, (c, h, span) in enumerate(zip(resolved, hours, spans)):
is_first = (i == 0)
if is_first:
time_label = f"08:00→{c['dt'].strftime('%H:%M')}"
else:
time_label = c["dt"].strftime("%H:%M")
if span == SINGLE_COMMIT_MINUTES:
span_label = f"{'30m↑' if is_first else '30m*'}"
else:
span_label = f"{span}m{'' if is_first else ''}"
print(f" {c['project']:<20} {str(c['task_id']):<10} {time_label:<10} {span_label:>6} {h:>5.2f} {c['description'][:28]}")
entries.append({
"odoo": {
"name": c["description"],
"date": c["date"],
"unit_amount": h,
"user_id": args.user_id,
"employee_id": args.employee_id,
"task_id": c["task_id"],
"project_id": c["project_id"],
},
"display": c,
})
print(f"{'' * 78}")
total_span = sum(spans)
print(f" {'TOTAL':<20} {'':<10} {'':<10} {total_span:>5}m {sum(hours):>5.2f}")
print(f" ↑ span dari 08:00 | * gap ke commit sebelumnya → minimum {SINGLE_COMMIT_MINUTES} menit")
if not_found:
print(f"\n {len(not_found)} commit diabaikan (task tidak ditemukan di Odoo)")
if args.dry_run:
print("\n[DRY RUN] Tidak ada yang diupload.")
return
# 5. Simpan pending.json + notifikasi macOS
if args.save_pending:
import os
pending_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
f"pending_{args.date}.json"
)
pending = {
"date": args.date,
"user_id": args.user_id,
"employee_id": args.employee_id,
"entries": [e["odoo"] for e in entries],
}
with open(pending_path, "w") as f:
json.dump(pending, f, indent=2)
summary = ", ".join(
f"{e['odoo']['unit_amount']}h {e['display']['project']}"
for e in entries[:3]
)
if len(entries) > 3:
summary += f" +{len(entries) - 3} lagi"
msg = f"{len(entries)} entry siap ({summary})"
os.system(
f'osascript -e \'display notification "{msg}" '
f'with title "Timesheet {args.date}" '
f'subtitle "Jalankan: python3 upload_pending.py --session-id XXX"\''
)
print(f"\nPending disimpan: {pending_path}")
print("Notifikasi macOS dikirim.")
return
# 6. Konfirmasi & upload
if not args.yes:
confirm = input(f"\nUpload {len(entries)} timesheet entry? (y/N): ").strip().lower()
if confirm != "y":
print("Dibatalkan.")
return
print()
ok, fail = 0, 0
for e in entries:
try:
new_id = upload_timesheet(args.session_id, e["odoo"], args.user_id)
print(f" OK ID {new_id:<8} {e['display']['project']:<20} {e['odoo']['unit_amount']}h {e['odoo']['name'][:40]}")
ok += 1
except RuntimeError as ex:
print(f" ERR {e['odoo']['name'][:40]}{ex}")
fail += 1
print(f"\nSelesai: {ok} sukses, {fail} gagal.")
if __name__ == "__main__":
main()