skiAI/createDataset.py
2026-01-11 19:44:48 +01:00

165 lines
5.2 KiB
Python

import json
import os
import random
import boto3
from urllib.parse import urlparse
from tqdm.auto import tqdm
# s3 bucket configuration
MINIO_CONFIG = {
'endpoint_url': 'https://minio.hgk.ch',
'access_key': 'meinAccessKey',
'secret_key': 'meinSecretKey',
'bucket': 'skiai'
}
# input specs, annotations
JSON_PATH = 'datasets/skier_pose/labelstudio_export.json'
# input specs, keypoint orde must stay consistent
KP_ORDER = [
"leftski_tip", "leftski_tail", "rightski_tip", "rightski_tail",
"leftpole_top", "leftpole_bottom", "rightpole_top", "rightpole_bottom"
]
# output specs
OUTPUT_DIR = 'datasets/skier_pose'
TRAIN_RATIO = 0.8
# create folder structure
def __setup_directories():
for split in ['train', 'val']:
os.makedirs(os.path.join(OUTPUT_DIR, split, 'images'), exist_ok=True)
os.makedirs(os.path.join(OUTPUT_DIR, split, 'labels'), exist_ok=True)
# download image from s3
def __download_from_minio(s3_path, local_path):
parsed = urlparse(s3_path)
bucket = MINIO_CONFIG['bucket']
# removes 's3://bucketname/' if existing, otherwise slash
key = parsed.path.lstrip('/')
s3 = boto3.client('s3',
endpoint_url=MINIO_CONFIG['endpoint_url'],
aws_access_key_id=MINIO_CONFIG['access_key'],
aws_secret_access_key=MINIO_CONFIG['secret_key'])
s3.download_file(bucket, key, local_path)
# create YOLO dataset
def createYOLOdataset():
__setup_directories()
with open(JSON_PATH, 'r', encoding='utf-8') as f:
data = json.load(f)
random.seed(42)
random.shuffle(data)
split_idx = int(len(data) * TRAIN_RATIO)
# loop over all images
for i, entry in enumerate(tqdm(data, desc="Importing Images", unit="img")):
split = 'train' if i < split_idx else 'val'
image_s3_path = entry['data']['image']
filename = os.path.basename(image_s3_path)
base_name = os.path.splitext(filename)[0]
img_local_path = os.path.join(OUTPUT_DIR, split, 'images', filename)
label_local_path = os.path.join(OUTPUT_DIR, split, 'labels', f"{base_name}.txt")
try:
__download_from_minio(image_s3_path, img_local_path)
except Exception as e:
tqdm.write(f"Error treating {filename}: {e}")
continue
if not entry.get('annotations'):
continue
results = entry['annotations'][0].get('result', [])
# dummy vars
temp_bboxes = []
all_keypoints = []
visibility_map = {}
for res in results:
res_id = res['id']
res_type = res['type']
val = res.get('value', {})
if res_type == 'rectanglelabels':
# save bboxes
w = val['width'] / 100.0
h = val['height'] / 100.0
x = val['x'] / 100.0
y = val['y'] / 100.0
temp_bboxes.append({
'x_center': x + (w / 2.0),
'y_center': y + (h / 2.0),
'width': w,
'height': h,
'x_min': x,
'y_min': y,
'x_max': x + w,
'y_max': y + h,
'assigned_kps': {} # store corresponding kp
})
elif res_type == 'keypointlabels':
all_keypoints.append({
'id': res_id,
'label': val['keypointlabels'][0],
'x': val['x'] / 100.0,
'y': val['y'] / 100.0
})
elif res_type == 'choices':
# Label Studio links via if
parent_id = res.get('parent_id')
if parent_id and "hidden" in val.get('choices', []):
visibility_map[parent_id] = 1
# assign kp to bboxes
for kp in all_keypoints:
for bbox in temp_bboxes:
# check inside box
if (bbox['x_min'] <= kp['x'] <= bbox['x_max'] and
bbox['y_min'] <= kp['y'] <= bbox['y_max']):
bbox['assigned_kps'][kp['label']] = kp
break # Punkt gehört zu dieser Box, nächster Punkt
# create yolo lines
yolo_lines = []
for bbox in temp_bboxes:
# Class (0) + BBox
line_parts = [
"0",
f"{bbox['x_center']:.6f}", f"{bbox['y_center']:.6f}",
f"{bbox['width']:.6f}", f"{bbox['height']:.6f}"
]
# kp in right order
for kp_name in KP_ORDER:
if kp_name in bbox['assigned_kps']:
kp = bbox['assigned_kps'][kp_name]
# check visibility
v = visibility_map.get(kp['id'], 2)
line_parts.extend([f"{kp['x']:.6f}", f"{kp['y']:.6f}", str(v)])
else:
# 0 (missing)
line_parts.extend(["0.000000", "0.000000", "0"])
yolo_lines.append(" ".join(line_parts))
with open(label_local_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(yolo_lines))
print(f"Finished! Dataset saved to: {os.path.abspath(OUTPUT_DIR)}")
if __name__ == "__main__":
createYOLOdataset()