Initial commit: Odoo timesheet automation scripts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sas.fajri
2026-05-28 10:18:08 +07:00
commit c9bf004a4f
9 changed files with 1410 additions and 0 deletions

362
daily_timesheet.py Executable file
View File

@@ -0,0 +1,362 @@
#!/usr/bin/env python3
"""
Ambil semua commit hari ini dari semua repo, distribusikan 8 jam pro-rata,
lalu upload ke Odoo timesheet.
Pro-rata: bobot = jumlah commit per task. Total = 8 jam.
Contoh: Task A 3 commit + Task B 1 commit → A=6h, B=2h.
"""
import json
import re
import subprocess
import urllib.request
import urllib.error
import argparse
from datetime import date, timedelta
from collections import defaultdict
from decimal import Decimal, ROUND_HALF_UP
BASE_URL = "https://odoo.minipc.sismedika.biz.id"
TOTAL_HOURS = 8
PROJECT_MAP = {
"CPONE": 123,
"IBL": 186,
"Support Pramita": 70,
"SAS": 92,
"Support Kedungdoro": 77,
}
REPOS = [
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/BE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/FE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE_DASHBOARD", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_KD", "project": "Support Kedungdoro"},
{"path": "/Users/fajrihardhitamurti/REPO_GITLAB_PRAMITA/bisone", "project": "Support Pramita"},
]
COMMIT_RE = re.compile(r"^([A-Z0-9]+)\s*-\s*(.+)$")
# ── HTTP ──────────────────────────────────────────────────────────────────────
def build_headers(session_id: str) -> dict:
return {
"accept": "*/*",
"content-type": "application/json",
"origin": BASE_URL,
"referer": f"{BASE_URL}/web",
"user-agent": "Mozilla/5.0",
"cookie": f"frontend_lang=en_US; cids=1; session_id={session_id}; tz=Asia/Jakarta",
}
def odoo_call(session_id: str, endpoint: str, payload: dict) -> dict:
url = f"{BASE_URL}{endpoint}"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
try:
with urllib.request.urlopen(req) as resp:
response = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
raise RuntimeError(f"HTTP {e.code}: {e.read().decode()}") from e
if "error" in response:
err = response["error"]
raise RuntimeError(f"Odoo error: {err.get('message')}")
return response
# ── Odoo helpers ──────────────────────────────────────────────────────────────
def check_session(session_id: str) -> bool:
"""Return True jika session masih valid, False jika expired."""
payload = {"jsonrpc": "2.0", "method": "call", "id": 1, "params": {}}
url = f"{BASE_URL}/web/session/get_session_info"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
try:
with urllib.request.urlopen(req) as resp:
response = json.loads(resp.read().decode("utf-8"))
uid = response.get("result", {}).get("uid")
return bool(uid)
except Exception:
return False
def notify_session_expired():
import os
os.system(
'osascript -e \'display notification '
'"Session Odoo expired. Buka run_daily.sh dan ganti SESSION_ID." '
'with title "⚠️ Timesheet Gagal" '
'subtitle "Session ID perlu diperbarui" '
'sound name "Basso"\''
)
print("SESSION EXPIRED — update SESSION_ID di run_daily.sh")
def search_task(session_id: str, code: str, project_id: int) -> tuple[int, str] | None:
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "project.task", "method": "name_search", "args": [],
"kwargs": {
"name": f"[{code}]", "operator": "ilike",
"args": [
"&", "&", "&",
["company_id", "=", 1],
["project_id.allow_timesheets", "=", True],
["stage_id.fold", "=", False],
["project_id", "=", project_id],
],
"limit": 1,
"context": {
"lang": "en_US", "tz": "Asia/Jakarta", "uid": 41,
"allowed_company_ids": [1], "is_timesheet": 1,
"default_project_id": project_id,
},
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/project.task/name_search", payload)
items = result.get("result", [])
return tuple(items[0]) if items else None
def upload_timesheet(session_id: str, entry: dict, user_id: int) -> int:
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "account.analytic.line", "method": "create",
"args": [entry],
"kwargs": {
"context": {
"lang": "en_US", "tz": "Asia/Jakarta", "uid": user_id,
"allowed_company_ids": [1], "is_timesheet": 1,
}
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/account.analytic.line/create", payload)
return result.get("result")
# ── Git ───────────────────────────────────────────────────────────────────────
def get_commits_today(repo_path: str, author: str, today: str) -> list[dict]:
until = (date.fromisoformat(today) + timedelta(days=1)).isoformat()
cmd = [
"git", "-C", repo_path, "log",
f"--author={author}",
f"--after={today}",
f"--before={until}",
"--format=%h|%ad|%s",
"--date=short",
]
try:
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True)
except subprocess.CalledProcessError:
return []
commits = []
for line in out.strip().splitlines():
if not line:
continue
parts = line.split("|", 2)
if len(parts) == 3:
commits.append({"hash": parts[0], "date": parts[1], "message": parts[2]})
return commits
# ── Pro-rata ──────────────────────────────────────────────────────────────────
def distribute_hours(weights: list[int], total: float = 8.0) -> list[float]:
"""
Distribusi jam pro-rata berdasarkan bobot (jumlah commit).
Adjustment di entry terakhir supaya sum persis = total.
"""
total_weight = sum(weights)
raw = [Decimal(str(total)) * Decimal(w) / Decimal(total_weight) for w in weights]
rounded = [float(r.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)) for r in raw]
diff = round(total - sum(rounded), 2)
rounded[-1] = round(rounded[-1] + diff, 2)
return rounded
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
today = date.today().isoformat()
parser = argparse.ArgumentParser(description="Daily timesheet: commit → Odoo (8 jam pro-rata)")
parser.add_argument("--session-id", required=True, help="session_id cookie Odoo")
parser.add_argument("--author", required=True, help="Git author filter, e.g. 'fajri'")
parser.add_argument("--user-id", required=True, type=int, help="ID user Odoo")
parser.add_argument("--employee-id", required=True, type=int, help="ID employee Odoo")
parser.add_argument("--date", default=today, help=f"Tanggal YYYY-MM-DD (default: {today})")
parser.add_argument("--dry-run", action="store_true", help="Preview saja, tidak upload")
parser.add_argument("--save-pending", action="store_true", help="Simpan ke pending.json + notifikasi macOS")
args = parser.parse_args()
print(f"Tanggal : {args.date}")
print(f"Author : {args.author}")
print(f"Mode : {'DRY RUN' if args.dry_run else 'UPLOAD'}")
print("=" * 65)
# Cek session sebelum mulai
if not check_session(args.session_id):
notify_session_expired()
return
# 1. Kumpulkan semua commit hari ini dari semua repo
raw_commits = []
for repo in REPOS:
commits = get_commits_today(repo["path"], args.author, args.date)
repo_name = repo["path"].split("/")[-1]
for c in commits:
m = COMMIT_RE.match(c["message"])
if m:
raw_commits.append({
"code": m.group(1),
"description": m.group(2),
"date": c["date"],
"hash": c["hash"],
"repo": repo_name,
"project": repo["project"],
"project_id": PROJECT_MAP[repo["project"]],
})
else:
print(f" SKIP {c['hash']} ({repo_name}): {c['message']}")
if not raw_commits:
print("\nTidak ada commit dengan format TASKCODE - deskripsi hari ini.")
return
# 2. Cari task_id di Odoo, group by (task_id, project_id)
print(f"\nMencari task di Odoo untuk {len(raw_commits)} commit...\n")
task_cache = {} # (code, project_id) → (task_id, task_name) | None
# Group: key = (task_id, project_id), value = list of commit descriptions & count
groups = defaultdict(lambda: {"descriptions": [], "count": 0, "project_id": None,
"project": None, "task_name": None, "date": None})
not_found = []
for c in raw_commits:
cache_key = (c["code"], c["project_id"])
if cache_key not in task_cache:
task_cache[cache_key] = search_task(args.session_id, c["code"], c["project_id"])
task = task_cache[cache_key]
if not task:
not_found.append(c)
print(f" NOT FOUND [{c['code']}] di project {c['project']} ({c['hash']})")
continue
task_id, task_name = task
key = (task_id, c["project_id"])
groups[key]["descriptions"].append(c["description"])
groups[key]["count"] += 1
groups[key]["project_id"] = c["project_id"]
groups[key]["project"] = c["project"]
groups[key]["task_id"] = task_id
groups[key]["task_name"] = task_name
groups[key]["date"] = c["date"]
if not groups:
print("\nTidak ada task yang ditemukan di Odoo.")
return
# 3. Hitung pro-rata jam
keys = list(groups.keys())
weights = [groups[k]["count"] for k in keys]
hours = distribute_hours(weights, total=float(TOTAL_HOURS))
# 4. Tampilkan preview
print(f"\n{'' * 65}")
print(f" {'PROJECT':<20} {'TASK':<10} {'JAM':>5} {'COMMIT':>6} DESKRIPSI")
print(f"{'' * 65}")
entries = []
for key, h in zip(keys, hours):
g = groups[key]
desc = g["descriptions"][0] if len(g["descriptions"]) == 1 else \
"; ".join(dict.fromkeys(g["descriptions"])) # deduplicate, preserve order
print(f" {g['project']:<20} {str(g['task_id']):<10} {h:>5.2f} {g['count']:>6}x {desc[:35]}")
entries.append({
"odoo": {
"name": desc,
"date": g["date"],
"unit_amount": h,
"user_id": args.user_id,
"employee_id": args.employee_id,
"task_id": g["task_id"],
"project_id": g["project_id"],
},
"display": g,
})
print(f"{'' * 65}")
print(f" {'TOTAL':<20} {'':<10} {sum(hours):>5.2f} {sum(weights):>6}x")
if not_found:
print(f"\n {len(not_found)} commit diabaikan (task tidak ditemukan di Odoo)")
if args.dry_run:
print("\n[DRY RUN] Tidak ada yang diupload.")
return
# 5. Simpan pending.json + notifikasi macOS
if args.save_pending:
import os
pending_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
f"pending_{args.date}.json"
)
pending = {
"date": args.date,
"user_id": args.user_id,
"employee_id": args.employee_id,
"entries": [e["odoo"] for e in entries],
}
with open(pending_path, "w") as f:
json.dump(pending, f, indent=2)
summary = ", ".join(
f"{e['odoo']['unit_amount']}h {e['display']['project']}"
for e in entries
)
msg = f"{len(entries)} entry siap ({summary})"
os.system(
f'osascript -e \'display notification "{msg}" '
f'with title "Timesheet {args.date}" '
f'subtitle "Jalankan: python3 upload_pending.py --session-id XXX"\''
)
print(f"\nPending disimpan: {pending_path}")
print("Notifikasi macOS dikirim.")
return
# 6. Konfirmasi & upload
confirm = input(f"\nUpload {len(entries)} timesheet entry? (y/N): ").strip().lower()
if confirm != "y":
print("Dibatalkan.")
return
print()
ok, fail = 0, 0
for e in entries:
try:
new_id = upload_timesheet(args.session_id, e["odoo"], args.user_id)
g = e["display"]
print(f" OK ID {new_id:<8} {g['project']:<20} {e['odoo']['unit_amount']}h {e['odoo']['name'][:40]}")
ok += 1
except RuntimeError as ex:
print(f" ERR {e['odoo']['name'][:40]}{ex}")
fail += 1
print(f"\nSelesai: {ok} sukses, {fail} gagal.")
if __name__ == "__main__":
main()