265 lines
9.4 KiB
Python
Executable File
265 lines
9.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Baca commit dari semua repo, parse task code, cari task di Odoo, lalu upload timesheet.
|
|
|
|
Format commit yang dikenali: "TASKCODE - deskripsi pekerjaan"
|
|
Contoh: "6D9QD6 - buat api baru"
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import urllib.request
|
|
import urllib.error
|
|
import argparse
|
|
from datetime import date, timedelta
|
|
from collections import defaultdict
|
|
|
|
BASE_URL = "https://odoo.aplikasi.web.id"
|
|
|
|
PROJECT_MAP = {
|
|
"CPONE": 123,
|
|
"IBL": 186,
|
|
"Support Pramita": 70,
|
|
"SAS": 92,
|
|
"Support Kedungdoro": 77,
|
|
}
|
|
|
|
REPOS = [
|
|
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/BE_CPONE", "project": "CPONE"},
|
|
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/FE_CPONE", "project": "CPONE"},
|
|
{"path": "/Users/fajrihardhitamurti/REPO_CPONE/LIVE_CPONE", "project": "CPONE"},
|
|
{"path": "/Users/fajrihardhitamurti/REPO_CPONE_DASHBOARD", "project": "CPONE"},
|
|
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/BE_IBL/one-api-lab", "project": "IBL"},
|
|
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/FE_IBL", "project": "IBL"},
|
|
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_IBL/MERGE_REPORT/ibl_merge_report_service", "project": "IBL"},
|
|
{"path": "/Users/fajrihardhitamurti/REPO_GITEA_KD", "project": "Support Kedungdoro"},
|
|
{"path": "/Users/fajrihardhitamurti/REPO_GITLAB_PRAMITA/bisone", "project": "Support Pramita"},
|
|
{"path": "/Users/fajrihardhitamurti/SAS_TASK", "project": "SAS"},
|
|
]
|
|
|
|
# Regex: "TASKCODE - deskripsi"
|
|
COMMIT_RE = re.compile(r"^([A-Z0-9]+)\s*-\s*(.+)$")
|
|
|
|
|
|
def build_headers(session_id: str) -> dict:
|
|
return {
|
|
"accept": "*/*",
|
|
"content-type": "application/json",
|
|
"origin": BASE_URL,
|
|
"referer": f"{BASE_URL}/web",
|
|
"user-agent": "Mozilla/5.0",
|
|
"cookie": f"frontend_lang=en_US; cids=1; session_id={session_id}; tz=Asia/Jakarta",
|
|
}
|
|
|
|
|
|
def odoo_call(session_id: str, endpoint: str, payload: dict) -> dict:
|
|
url = f"{BASE_URL}{endpoint}"
|
|
body = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(url, data=body, headers=build_headers(session_id), method="POST")
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
response = json.loads(resp.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as e:
|
|
raise RuntimeError(f"HTTP {e.code}: {e.read().decode()}") from e
|
|
if "error" in response:
|
|
err = response["error"]
|
|
raise RuntimeError(f"Odoo error: {err.get('message')}")
|
|
return response
|
|
|
|
|
|
def search_task(session_id: str, code: str, project_id: int) -> tuple[int, str] | None:
|
|
payload = {
|
|
"id": 1, "jsonrpc": "2.0", "method": "call",
|
|
"params": {
|
|
"model": "project.task", "method": "name_search",
|
|
"args": [],
|
|
"kwargs": {
|
|
"name": f"[{code}]",
|
|
"operator": "ilike",
|
|
"args": [
|
|
"&", "&", "&",
|
|
["company_id", "=", 1],
|
|
["project_id.allow_timesheets", "=", True],
|
|
["stage_id.fold", "=", False],
|
|
["project_id", "=", project_id],
|
|
],
|
|
"limit": 1,
|
|
"context": {
|
|
"lang": "en_US", "tz": "Asia/Jakarta", "uid": 41,
|
|
"allowed_company_ids": [1],
|
|
"is_timesheet": 1,
|
|
"default_project_id": project_id,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
result = odoo_call(session_id, "/web/dataset/call_kw/project.task/name_search", payload)
|
|
items = result.get("result", [])
|
|
return tuple(items[0]) if items else None
|
|
|
|
|
|
def upload_timesheet(session_id: str, entry: dict) -> int:
|
|
payload = {
|
|
"id": 1, "jsonrpc": "2.0", "method": "call",
|
|
"params": {
|
|
"model": "account.analytic.line", "method": "create",
|
|
"args": [entry],
|
|
"kwargs": {
|
|
"context": {
|
|
"lang": "en_US", "tz": "Asia/Jakarta",
|
|
"uid": entry["user_id"],
|
|
"allowed_company_ids": [1],
|
|
"is_timesheet": 1,
|
|
}
|
|
},
|
|
},
|
|
}
|
|
result = odoo_call(session_id, "/web/dataset/call_kw/account.analytic.line/create", payload)
|
|
return result.get("result")
|
|
|
|
|
|
def get_commits(repo_path: str, author: str, since: str, until: str) -> list[dict]:
|
|
cmd = [
|
|
"git", "-C", repo_path,
|
|
"log",
|
|
f"--author={author}",
|
|
f"--since={since} 00:00:00",
|
|
f"--until={until} 23:59:59",
|
|
"--format=%h|%ad|%s",
|
|
"--date=short",
|
|
]
|
|
try:
|
|
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL, text=True)
|
|
except subprocess.CalledProcessError:
|
|
return []
|
|
commits = []
|
|
for line in out.strip().splitlines():
|
|
if not line:
|
|
continue
|
|
parts = line.split("|", 2)
|
|
if len(parts) == 3:
|
|
commits.append({"hash": parts[0], "date": parts[1], "message": parts[2]})
|
|
return commits
|
|
|
|
|
|
def main():
|
|
today = date.today().isoformat()
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Sync commit ke timesheet Odoo",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
)
|
|
parser.add_argument("--session-id", required=True, help="session_id cookie Odoo")
|
|
parser.add_argument("--author", required=True, help="Filter git author, e.g. 'fajri'")
|
|
parser.add_argument("--user-id", required=True, type=int, help="ID user Odoo")
|
|
parser.add_argument("--employee-id", required=True, type=int, help="ID employee Odoo")
|
|
parser.add_argument("--unit-amount", default=1.0, type=float, help="Jam per commit (default: 1.0)")
|
|
parser.add_argument("--since", default=today, help=f"Tanggal mulai YYYY-MM-DD (default: {today})")
|
|
parser.add_argument("--until", default=today, help=f"Tanggal akhir YYYY-MM-DD (default: {today})")
|
|
parser.add_argument("--dry-run", action="store_true", help="Preview saja, tidak upload ke Odoo")
|
|
args = parser.parse_args()
|
|
|
|
until_exclusive = (date.fromisoformat(args.until) + timedelta(days=1)).isoformat()
|
|
|
|
print(f"Author : {args.author}")
|
|
print(f"Periode : {args.since} s/d {args.until}")
|
|
print(f"Mode : {'DRY RUN (preview only)' if args.dry_run else 'UPLOAD'}")
|
|
print("=" * 65)
|
|
|
|
# Kumpulkan commit yang formatnya cocok, per project
|
|
candidates = []
|
|
skipped = []
|
|
|
|
for repo in REPOS:
|
|
commits = get_commits(repo["path"], args.author, args.since, until_exclusive)
|
|
repo_name = repo["path"].split("/")[-1]
|
|
project_name = repo["project"]
|
|
project_id = PROJECT_MAP[project_name]
|
|
|
|
for c in commits:
|
|
m = COMMIT_RE.match(c["message"])
|
|
if m:
|
|
candidates.append({
|
|
"code": m.group(1),
|
|
"description": m.group(2),
|
|
"date": c["date"],
|
|
"hash": c["hash"],
|
|
"repo": repo_name,
|
|
"project": project_name,
|
|
"project_id": project_id,
|
|
})
|
|
else:
|
|
skipped.append(f" SKIP {c['hash']} ({repo_name}): {c['message']}")
|
|
|
|
if skipped:
|
|
print(f"\n{len(skipped)} commit dilewati (format tidak cocok):")
|
|
for s in skipped:
|
|
print(s)
|
|
|
|
if not candidates:
|
|
print("\nTidak ada commit dengan format yang cocok.")
|
|
return
|
|
|
|
print(f"\n{len(candidates)} commit akan diproses:\n")
|
|
|
|
# Search task_id di Odoo untuk tiap commit
|
|
entries = []
|
|
for c in candidates:
|
|
task = search_task(args.session_id, c["code"], c["project_id"])
|
|
if task:
|
|
task_id, task_name = task
|
|
status = f"task {task_id}"
|
|
else:
|
|
task_id, task_name = None, "(task tidak ditemukan)"
|
|
status = "TIDAK DITEMUKAN"
|
|
|
|
print(f" [{c['project']}] {c['date']} {c['hash']}")
|
|
print(f" commit : {c['code']} - {c['description']}")
|
|
print(f" task : {status} — {task_name}")
|
|
print()
|
|
|
|
if task_id:
|
|
entries.append({
|
|
"name": c["description"],
|
|
"date": c["date"],
|
|
"unit_amount": args.unit_amount,
|
|
"user_id": args.user_id,
|
|
"employee_id": args.employee_id,
|
|
"task_id": task_id,
|
|
"project_id": c["project_id"],
|
|
})
|
|
|
|
if not entries:
|
|
print("Tidak ada entry yang bisa diupload.")
|
|
return
|
|
|
|
print(f"{'=' * 65}")
|
|
print(f"Siap upload: {len(entries)} entry, {len(candidates) - len(entries)} gagal (task tidak ditemukan)")
|
|
|
|
if args.dry_run:
|
|
print("\n[DRY RUN] Tidak ada yang diupload.")
|
|
return
|
|
|
|
confirm = input(f"\nUpload {len(entries)} timesheet? (y/N): ").strip().lower()
|
|
if confirm != "y":
|
|
print("Dibatalkan.")
|
|
return
|
|
|
|
print()
|
|
ok, fail = 0, 0
|
|
for e in entries:
|
|
try:
|
|
new_id = upload_timesheet(args.session_id, e)
|
|
print(f" OK ID {new_id} | {e['date']} {e['name']}")
|
|
ok += 1
|
|
except RuntimeError as ex:
|
|
print(f" ERR {e['date']} {e['name']} -> {ex}")
|
|
fail += 1
|
|
|
|
print(f"\nSelesai: {ok} sukses, {fail} gagal.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|