Files
daily_odoo_timesheet/sync_timesheet.py
2026-05-29 19:29:35 +07:00

265 lines
9.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Baca commit dari semua repo, parse task code, cari task di Odoo, lalu upload timesheet.
Format commit yang dikenali: "TASKCODE - deskripsi pekerjaan"
Contoh: "6D9QD6 - buat api baru"
"""
import json
import re
import subprocess
import urllib.request
import urllib.error
import argparse
from datetime import date, timedelta
from collections import defaultdict
BASE_URL = "https://odoo.aplikasi.web.id"
PROJECT_MAP = {
"CPONE": 123,
"IBL": 186,
"Support Pramita": 70,
"SAS": 92,
"Support Kedungdoro": 77,
}
REPOS = [
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/BE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/FE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/LIVE_CPONE", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_CPONE_DASHBOARD", "project": "CPONE"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/BE_IBL/one-api-lab", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/FE_IBL", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/MERGE_REPORT/ibl_merge_report_service", "project": "IBL"},
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_KD", "project": "Support Kedungdoro"},
{"path": "/Users/fajrihardhitamurti/REPO_GITLAB_PRAMITA/bisone", "project": "Support Pramita"},
{"path": "/Users/fajrihardhitamurti/SAS_TASK", "project": "SAS"},
]
# Regex: "TASKCODE - deskripsi"
COMMIT_RE = re.compile(r"^([A-Z0-9]+)\s*-\s*(.+)$")
def build_headers(session_id: str) -> dict:
return {
"accept": "*/*",
"content-type": "application/json",
"origin": BASE_URL,
"referer": f"{BASE_URL}/web",
"user-agent": "Mozilla/5.0",
"cookie": f"frontend_lang=en_US; cids=1; session_id={session_id}; tz=Asia/Jakarta",
}
def odoo_call(session_id: str, endpoint: str, payload: dict) -> dict:
url = f"{BASE_URL}{endpoint}"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
try:
with urllib.request.urlopen(req) as resp:
response = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
raise RuntimeError(f"HTTP {e.code}: {e.read().decode()}") from e
if "error" in response:
err = response["error"]
raise RuntimeError(f"Odoo error: {err.get('message')}")
return response
def search_task(session_id: str, code: str, project_id: int) -> tuple[int, str] | None:
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "project.task", "method": "name_search",
"args": [],
"kwargs": {
"name": f"[{code}]",
"operator": "ilike",
"args": [
"&", "&", "&",
["company_id", "=", 1],
["project_id.allow_timesheets", "=", True],
["stage_id.fold", "=", False],
["project_id", "=", project_id],
],
"limit": 1,
"context": {
"lang": "en_US", "tz": "Asia/Jakarta", "uid": 41,
"allowed_company_ids": [1],
"is_timesheet": 1,
"default_project_id": project_id,
},
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/project.task/name_search", payload)
items = result.get("result", [])
return tuple(items[0]) if items else None
def upload_timesheet(session_id: str, entry: dict) -> int:
payload = {
"id": 1, "jsonrpc": "2.0", "method": "call",
"params": {
"model": "account.analytic.line", "method": "create",
"args": [entry],
"kwargs": {
"context": {
"lang": "en_US", "tz": "Asia/Jakarta",
"uid": entry["user_id"],
"allowed_company_ids": [1],
"is_timesheet": 1,
}
},
},
}
result = odoo_call(session_id, "/web/dataset/call_kw/account.analytic.line/create", payload)
return result.get("result")
def get_commits(repo_path: str, author: str, since: str, until: str) -> list[dict]:
cmd = [
"git", "-C", repo_path,
"log",
f"--author={author}",
f"--since={since} 00:00:00",
f"--until={until} 23:59:59",
"--format=%h|%ad|%s",
"--date=short",
]
try:
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True)
except subprocess.CalledProcessError:
return []
commits = []
for line in out.strip().splitlines():
if not line:
continue
parts = line.split("|", 2)
if len(parts) == 3:
commits.append({"hash": parts[0], "date": parts[1], "message": parts[2]})
return commits
def main():
today = date.today().isoformat()
parser = argparse.ArgumentParser(
description="Sync commit ke timesheet Odoo",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument("--session-id", required=True, help="session_id cookie Odoo")
parser.add_argument("--author", required=True, help="Filter git author, e.g. 'fajri'")
parser.add_argument("--user-id", required=True, type=int, help="ID user Odoo")
parser.add_argument("--employee-id", required=True, type=int, help="ID employee Odoo")
parser.add_argument("--unit-amount", default=1.0, type=float, help="Jam per commit (default: 1.0)")
parser.add_argument("--since", default=today, help=f"Tanggal mulai YYYY-MM-DD (default: {today})")
parser.add_argument("--until", default=today, help=f"Tanggal akhir YYYY-MM-DD (default: {today})")
parser.add_argument("--dry-run", action="store_true", help="Preview saja, tidak upload ke Odoo")
args = parser.parse_args()
until_exclusive = (date.fromisoformat(args.until) + timedelta(days=1)).isoformat()
print(f"Author : {args.author}")
print(f"Periode : {args.since} s/d {args.until}")
print(f"Mode : {'DRY RUN (preview only)' if args.dry_run else 'UPLOAD'}")
print("=" * 65)
# Kumpulkan commit yang formatnya cocok, per project
candidates = []
skipped = []
for repo in REPOS:
commits = get_commits(repo["path"], args.author, args.since, until_exclusive)
repo_name = repo["path"].split("/")[-1]
project_name = repo["project"]
project_id = PROJECT_MAP[project_name]
for c in commits:
m = COMMIT_RE.match(c["message"])
if m:
candidates.append({
"code": m.group(1),
"description": m.group(2),
"date": c["date"],
"hash": c["hash"],
"repo": repo_name,
"project": project_name,
"project_id": project_id,
})
else:
skipped.append(f" SKIP {c['hash']} ({repo_name}): {c['message']}")
if skipped:
print(f"\n{len(skipped)} commit dilewati (format tidak cocok):")
for s in skipped:
print(s)
if not candidates:
print("\nTidak ada commit dengan format yang cocok.")
return
print(f"\n{len(candidates)} commit akan diproses:\n")
# Search task_id di Odoo untuk tiap commit
entries = []
for c in candidates:
task = search_task(args.session_id, c["code"], c["project_id"])
if task:
task_id, task_name = task
status = f"task {task_id}"
else:
task_id, task_name = None, "(task tidak ditemukan)"
status = "TIDAK DITEMUKAN"
print(f" [{c['project']}] {c['date']} {c['hash']}")
print(f" commit : {c['code']} - {c['description']}")
print(f" task : {status}{task_name}")
print()
if task_id:
entries.append({
"name": c["description"],
"date": c["date"],
"unit_amount": args.unit_amount,
"user_id": args.user_id,
"employee_id": args.employee_id,
"task_id": task_id,
"project_id": c["project_id"],
})
if not entries:
print("Tidak ada entry yang bisa diupload.")
return
print(f"{'=' * 65}")
print(f"Siap upload: {len(entries)} entry, {len(candidates) - len(entries)} gagal (task tidak ditemukan)")
if args.dry_run:
print("\n[DRY RUN] Tidak ada yang diupload.")
return
confirm = input(f"\nUpload {len(entries)} timesheet? (y/N): ").strip().lower()
if confirm != "y":
print("Dibatalkan.")
return
print()
ok, fail = 0, 0
for e in entries:
try:
new_id = upload_timesheet(args.session_id, e)
print(f" OK ID {new_id} | {e['date']} {e['name']}")
ok += 1
except RuntimeError as ex:
print(f" ERR {e['date']} {e['name']} -> {ex}")
fail += 1
print(f"\nSelesai: {ok} sukses, {fail} gagal.")
if __name__ == "__main__":
main()