import json import os import random import boto3 from urllib.parse import urlparse from tqdm.auto import tqdm # s3 bucket configuration MINIO_CONFIG = { 'endpoint_url': 'https://minio.hgk.ch', 'access_key': 'meinAccessKey', 'secret_key': 'meinSecretKey', 'bucket': 'skiai' } # input specs, annotations JSON_PATH = 'datasets/skier_pose/labelstudio_export.json' # input specs, keypoint orde must stay consistent KP_ORDER = [ "leftski_tip", "leftski_tail", "rightski_tip", "rightski_tail", "leftpole_top", "leftpole_bottom", "rightpole_top", "rightpole_bottom" ] # output specs OUTPUT_DIR = 'datasets/skier_pose' TRAIN_RATIO = 0.8 # create folder structure def __setup_directories(): for split in ['train', 'val']: os.makedirs(os.path.join(OUTPUT_DIR, split, 'images'), exist_ok=True) os.makedirs(os.path.join(OUTPUT_DIR, split, 'labels'), exist_ok=True) # download image from s3 def __download_from_minio(s3_path, local_path): parsed = urlparse(s3_path) bucket = MINIO_CONFIG['bucket'] # removes 's3://bucketname/' if existing, otherwise slash key = parsed.path.lstrip('/') s3 = boto3.client('s3', endpoint_url=MINIO_CONFIG['endpoint_url'], aws_access_key_id=MINIO_CONFIG['access_key'], aws_secret_access_key=MINIO_CONFIG['secret_key']) s3.download_file(bucket, key, local_path) # create YOLO dataset def createYOLOdataset(): __setup_directories() with open(JSON_PATH, 'r', encoding='utf-8') as f: data = json.load(f) random.seed(42) random.shuffle(data) split_idx = int(len(data) * TRAIN_RATIO) # loop over all images for i, entry in enumerate(tqdm(data, desc="Importing Images", unit="img")): split = 'train' if i < split_idx else 'val' image_s3_path = entry['data']['image'] filename = os.path.basename(image_s3_path) base_name = os.path.splitext(filename)[0] img_local_path = os.path.join(OUTPUT_DIR, split, 'images', filename) label_local_path = os.path.join(OUTPUT_DIR, split, 'labels', f"{base_name}.txt") try: __download_from_minio(image_s3_path, img_local_path) except Exception as e: tqdm.write(f"Error treating {filename}: {e}") continue if not entry.get('annotations'): continue results = entry['annotations'][0].get('result', []) # dummy vars temp_bboxes = [] all_keypoints = [] visibility_map = {} for res in results: res_id = res['id'] res_type = res['type'] val = res.get('value', {}) if res_type == 'rectanglelabels': # save bboxes w = val['width'] / 100.0 h = val['height'] / 100.0 x = val['x'] / 100.0 y = val['y'] / 100.0 temp_bboxes.append({ 'x_center': x + (w / 2.0), 'y_center': y + (h / 2.0), 'width': w, 'height': h, 'x_min': x, 'y_min': y, 'x_max': x + w, 'y_max': y + h, 'assigned_kps': {} # store corresponding kp }) elif res_type == 'keypointlabels': all_keypoints.append({ 'id': res_id, 'label': val['keypointlabels'][0], 'x': val['x'] / 100.0, 'y': val['y'] / 100.0 }) elif res_type == 'choices': # Label Studio links via if parent_id = res.get('parent_id') if parent_id and "hidden" in val.get('choices', []): visibility_map[parent_id] = 1 # assign kp to bboxes for kp in all_keypoints: for bbox in temp_bboxes: # check inside box if (bbox['x_min'] <= kp['x'] <= bbox['x_max'] and bbox['y_min'] <= kp['y'] <= bbox['y_max']): bbox['assigned_kps'][kp['label']] = kp break # Punkt gehört zu dieser Box, nächster Punkt # create yolo lines yolo_lines = [] for bbox in temp_bboxes: # Class (0) + BBox line_parts = [ "0", f"{bbox['x_center']:.6f}", f"{bbox['y_center']:.6f}", f"{bbox['width']:.6f}", f"{bbox['height']:.6f}" ] # kp in right order for kp_name in KP_ORDER: if kp_name in bbox['assigned_kps']: kp = bbox['assigned_kps'][kp_name] # check visibility v = visibility_map.get(kp['id'], 2) line_parts.extend([f"{kp['x']:.6f}", f"{kp['y']:.6f}", str(v)]) else: # 0 (missing) line_parts.extend(["0.000000", "0.000000", "0"]) yolo_lines.append(" ".join(line_parts)) with open(label_local_path, 'w', encoding='utf-8') as f: f.write('\n'.join(yolo_lines)) print(f"Finished! Dataset saved to: {os.path.abspath(OUTPUT_DIR)}") if __name__ == "__main__": createYOLOdataset()