import json import os import random import boto3 from urllib.parse import urlparse from tqdm.auto import tqdm # --- KONFIGURATION --- MINIO_CONFIG = { 'endpoint_url': 'https://minio.hgk.ch', 'access_key': 'meinAccessKey', 'secret_key': 'meinSecretKey', 'bucket': 'skiai' } JSON_PATH = 'datasets/skier_pose/labelstudio_export.json' OUTPUT_DIR = 'datasets/skier_pose' TRAIN_RATIO = 0.8 # Die Reihenfolge MUSS konsistent bleiben KP_ORDER = [ "leftski_tip", "leftski_tail", "rightski_tip", "rightski_tail", "leftpole_top", "leftpole_bottom", "rightpole_top", "rightpole_bottom" ] def setup_directories(): """Erstellt die Struktur: train/images, train/labels, val/images, val/labels""" for split in ['train', 'val']: os.makedirs(os.path.join(OUTPUT_DIR, split, 'images'), exist_ok=True) os.makedirs(os.path.join(OUTPUT_DIR, split, 'labels'), exist_ok=True) def download_from_minio(s3_path, local_path): parsed = urlparse(s3_path) bucket = MINIO_CONFIG['bucket'] # Entfernt 's3://bucketname/' falls vorhanden, sonst nur den slash key = parsed.path.lstrip('/') s3 = boto3.client('s3', endpoint_url=MINIO_CONFIG['endpoint_url'], aws_access_key_id=MINIO_CONFIG['access_key'], aws_secret_access_key=MINIO_CONFIG['secret_key']) s3.download_file(bucket, key, local_path) def convert_to_yolo(): setup_directories() with open(JSON_PATH, 'r', encoding='utf-8') as f: data = json.load(f) random.seed(42) random.shuffle(data) split_idx = int(len(data) * TRAIN_RATIO) for i, entry in enumerate(tqdm(data, desc="Importing Images", unit="img")): split = 'train' if i < split_idx else 'val' # Dateinamen aus dem 'data'-Feld holen image_s3_path = entry['data']['image'] filename = os.path.basename(image_s3_path) base_name = os.path.splitext(filename)[0] img_local_path = os.path.join(OUTPUT_DIR, split, 'images', filename) label_local_path = os.path.join(OUTPUT_DIR, split, 'labels', f"{base_name}.txt") try: download_from_minio(image_s3_path, img_local_path) except Exception as e: tqdm.write(f"Error treating {filename}: {e}") continue yolo_lines = [] # Sicherstellen, dass Annotationen vorhanden sind if not entry.get('annotations'): continue results = entry['annotations'][0].get('result', []) # Hilfsmaps für das Matching über IDs kp_map = {} # ID -> {label, x, y} visibility_map = {} # ID -> v_status (1 oder 2) bboxes = [] # Liste aller gefundenen BBoxes for res in results: res_id = res['id'] res_type = res['type'] val = res.get('value', {}) if res_type == 'keypointlabels': kp_map[res_id] = { 'label': val['keypointlabels'][0], 'x': val['x'] / 100.0, 'y': val['y'] / 100.0 } elif res_type == 'choices': # Matching: Label Studio nutzt die gleiche ID für Keypoint und Choice # Wir prüfen, ob die Checkbox "1" (dein Alias für verdeckt) gewählt wurde if "1" in val.get('choices', []): visibility_map[res_id] = 1 elif res_type == 'rectanglelabels': # BBox normalisieren bw = val['width'] / 100.0 bh = val['height'] / 100.0 bx = (val['x'] / 100.0) + (bw / 2.0) by = (val['y'] / 100.0) + (bh / 2.0) bboxes.append(f"{bx:.6f} {by:.6f} {bw:.6f} {bh:.6f}") # Für jede gefundene BBox eine YOLO Zeile generieren # (Hinweis: Aktuell werden alle Keypoints an jede BBox gehängt) for bbox_coords in bboxes: line = f"0 {bbox_coords}" for kp_name in KP_ORDER: # Finde die ID des Keypoints mit diesem Namen target_id = next((id for id, d in kp_map.items() if d['label'] == kp_name), None) if target_id: coords = kp_map[target_id] # Sichtbarkeit: 1 (verdeckt) wenn in visibility_map, sonst 2 (sichtbar) v = visibility_map.get(target_id, 2) line += f" {coords['x']:.6f} {coords['y']:.6f} {v}" else: line += " 0.000000 0.000000 0" yolo_lines.append(line) with open(label_local_path, 'w', encoding='utf-8') as f: f.write('\n'.join(yolo_lines)) print(f"Fertig! Daten liegen in: {os.path.abspath(OUTPUT_DIR)}") if __name__ == "__main__": convert_to_yolo()