feat(): new feature

This commit is contained in:
2026-01-29 15:59:08 +01:00
parent e9cca3daeb
commit c365b4fa6b
17 changed files with 617 additions and 83 deletions

137
backend/api/routes.py Normal file
View File

@@ -0,0 +1,137 @@
from fastapi import APIRouter, UploadFile, File, HTTPException, Form
from models.quote_request import QuoteRequest, QuoteResponse
from slicer import slicer_service
from calculator import GCodeParser, QuoteCalculator
from config import settings
from profile_manager import ProfileManager
import os
import shutil
import uuid
import logging
import json
router = APIRouter()
logger = logging.getLogger("api")
profile_manager = ProfileManager()
def cleanup_files(files: list):
for f in files:
try:
if os.path.exists(f):
os.remove(f)
except Exception as e:
logger.warning(f"Failed to delete temp file {f}: {e}")
def format_time(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
if h > 0:
return f"{int(h)}h {int(m)}m"
return f"{int(m)}m {int(s)}s"
@router.post("/quote", response_model=QuoteResponse)
async def calculate_quote(
file: UploadFile = File(...),
# Compatible with form data if we parse manually or use specific dependencies.
# FastAPI handling of mixed File + JSON/Form is tricky.
# Easiest is to use Form(...) for fields.
machine: str = Form("bambu_a1"),
filament: str = Form("pla_basic"),
quality: str = Form("standard"),
layer_height: str = Form(None), # Form data comes as strings usually
infill_density: int = Form(None),
infill_pattern: str = Form(None),
support_enabled: bool = Form(False),
print_speed: int = Form(None)
):
"""
Endpoint for calculating print quote.
Accepts Multipart Form Data:
- file: The STL file
- machine, filament, quality: strings
- other overrides
"""
if not file.filename.lower().endswith(".stl"):
raise HTTPException(status_code=400, detail="Only .stl files are supported.")
if machine != "bambu_a1":
raise HTTPException(status_code=400, detail="Unsupported machine.")
req_id = str(uuid.uuid4())
input_filename = f"{req_id}.stl"
output_filename = f"{req_id}.gcode"
input_path = os.path.join(settings.TEMP_DIR, input_filename)
output_path = os.path.join(settings.TEMP_DIR, output_filename)
try:
# 1. Save File
with open(input_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 2. Build Overrides
overrides = {}
if layer_height is not None and layer_height != "":
overrides["layer_height"] = layer_height
if infill_density is not None:
overrides["sparse_infill_density"] = f"{infill_density}%"
if infill_pattern:
overrides["sparse_infill_pattern"] = infill_pattern
if support_enabled: overrides["enable_support"] = "1"
if print_speed is not None:
overrides["default_print_speed"] = str(print_speed)
# 3. Slice
# Pass parameters to slicer service
slicer_service.slice_stl(
input_stl_path=input_path,
output_gcode_path=output_path,
machine=machine,
filament=filament,
quality=quality,
overrides=overrides
)
# 4. Parse
stats = GCodeParser.parse_metadata(output_path)
if stats["print_time_seconds"] == 0 and stats["filament_weight_g"] == 0:
raise HTTPException(status_code=500, detail="Slicing returned empty stats.")
# 5. Calculate
# We could allow filament cost override here too if passed in params
quote = QuoteCalculator.calculate(stats)
return QuoteResponse(
success=True,
data={
"print_time_seconds": stats["print_time_seconds"],
"print_time_formatted": format_time(stats["print_time_seconds"]),
"material_grams": stats["filament_weight_g"],
"cost": {
"material": quote["breakdown"]["material_cost"],
"machine": quote["breakdown"]["machine_cost"],
"energy": quote["breakdown"]["energy_cost"],
"markup": quote["breakdown"]["markup_amount"],
"total": quote["total_price"]
},
"parameters": {
"machine": machine,
"filament": filament,
"quality": quality
}
}
)
except Exception as e:
logger.error(f"Quote error: {e}", exc_info=True)
return QuoteResponse(success=False, error=str(e))
finally:
cleanup_files([input_path, output_path])
@router.get("/profiles/available")
def get_profiles():
return {
"machines": profile_manager.list_machines(),
"filaments": profile_manager.list_filaments(),
"processes": profile_manager.list_processes()
}

View File

@@ -58,7 +58,6 @@ async def legacy_calculate(file: UploadFile = File(...)):
# Map Check response to old format
data = resp.data
return {
"printer": data.get("printer", "Unknown"),
"print_time_seconds": data.get("print_time_seconds", 0),
"print_time_formatted": data.get("print_time_formatted", ""),
"material_grams": data.get("material_grams", 0.0),
@@ -72,4 +71,4 @@ def health_check():
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,37 @@
from pydantic import BaseModel, Field, validator
from typing import Optional, Literal, Dict, Any
class QuoteRequest(BaseModel):
# File STL (base64 or path)
file_path: Optional[str] = None
file_base64: Optional[str] = None
# Parametri slicing
machine: str = Field(default="bambu_a1", description="Machine type")
filament: str = Field(default="pla_basic", description="Filament type")
quality: Literal["draft", "standard", "fine"] = Field(default="standard")
# Parametri opzionali
layer_height: Optional[float] = Field(None, ge=0.08, le=0.32)
infill_density: Optional[int] = Field(None, ge=0, le=100)
support_enabled: Optional[bool] = None
print_speed: Optional[int] = Field(None, ge=20, le=300)
# Pricing overrides
filament_cost_override: Optional[float] = None
@validator('machine')
def validate_machine(cls, v):
# This list should ideally be dynamic, but for validation purposes we start with known ones.
# Logic in ProfileManager can be looser or strict.
# For now, we allow the string through and let ProfileManager validate availability.
return v
@validator('filament')
def validate_filament(cls, v):
return v
class QuoteResponse(BaseModel):
success: bool
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None

15
backend/profile_cache.py Normal file
View File

@@ -0,0 +1,15 @@
from functools import lru_cache
import hashlib
from typing import Dict, Tuple
# We can't cache the profile manager instance itself easily if it's not a singleton,
# but we can cache the result of a merge function if we pass simple types.
# However, to avoid circular imports or complex dependency injection,
# we will just provide a helper to generate cache keys and a holder for logic if needed.
# For now, the ProfileManager will strictly determine *what* to merge.
# Validating the cache strategy: since file I/O is the bottleneck, we want to cache the *content*.
def get_cache_key(machine: str, filament: str, process: str) -> str:
"""Helper to create a unique cache key"""
data = f"{machine}|{filament}|{process}"
return hashlib.md5(data.encode()).hexdigest()

193
backend/profile_manager.py Normal file
View File

@@ -0,0 +1,193 @@
import os
import json
import logging
from typing import Dict, List, Tuple, Optional
from profile_cache import get_cache_key
logger = logging.getLogger(__name__)
class ProfileManager:
def __init__(self, profiles_root: str = "profiles"):
# Assuming profiles_root is relative to backend or absolute
if not os.path.isabs(profiles_root):
base_dir = os.path.dirname(os.path.abspath(__file__))
self.profiles_root = os.path.join(base_dir, profiles_root)
else:
self.profiles_root = profiles_root
if not os.path.exists(self.profiles_root):
logger.warning(f"Profiles root not found: {self.profiles_root}")
def get_profiles(self, machine: str, filament: str, process: str) -> Tuple[Dict, Dict, Dict]:
"""
Main entry point to get merged profiles.
Args:
machine: e.g. "Bambu Lab A1 0.4 nozzle"
filament: e.g. "Bambu PLA Basic @BBL A1"
process: e.g. "0.20mm Standard @BBL A1"
"""
# Try cache first (although specific logic is needed if we cache the *result* or the *files*)
# Since we implemented a simple external cache helper, let's use it if we want,
# but for now we will rely on internal logic or the lru_cache decorator on a helper method.
# But wait, the `get_cached_profiles` in profile_cache.py calls `build_merged_profiles` which is logic WE need to implement.
# So we should probably move the implementation here and have the cache wrapper call it,
# OR just implement it here and wrap it.
return self._build_merged_profiles(machine, filament, process)
def _build_merged_profiles(self, machine_name: str, filament_name: str, process_name: str) -> Tuple[Dict, Dict, Dict]:
# We need to find the files.
# The naming convention in OrcaSlicer profiles usually involves the Vendor (e.g. BBL).
# We might need a mapping or search.
# For this implementation, we will assume we know the relative paths or search for them.
# Strategy: Search in all vendor subdirs for the specific JSON files.
# Because names are usually unique enough or we can specify the expected vendor.
# However, to be fast, we can map "machine_name" to a file path.
machine_file = self._find_profile_file(machine_name, "machine")
filament_file = self._find_profile_file(filament_name, "filament")
process_file = self._find_profile_file(process_name, "process")
if not machine_file:
raise FileNotFoundError(f"Machine profile not found: {machine_name}")
if not filament_file:
raise FileNotFoundError(f"Filament profile not found: {filament_name}")
if not process_file:
raise FileNotFoundError(f"Process profile not found: {process_name}")
machine_profile = self._merge_chain(machine_file)
filament_profile = self._merge_chain(filament_file)
process_profile = self._merge_chain(process_file)
# Apply patches
machine_profile = self._apply_patches(machine_profile, "machine")
process_profile = self._apply_patches(process_profile, "process")
return machine_profile, process_profile, filament_profile
def _find_profile_file(self, profile_name: str, profile_type: str) -> Optional[str]:
"""
Searches for a profile file by name in the profiles directory.
The name should match the filename (without .json possibly) or be a precise match.
"""
# Add .json if missing
filename = profile_name if profile_name.endswith(".json") else f"{profile_name}.json"
for root, dirs, files in os.walk(self.profiles_root):
if filename in files:
# Check if it is in the correct type folder (machine, filament, process)
# OrcaSlicer structure: Vendor/process/file.json
# We optionally verify parent dir
if os.path.basename(root) == profile_type or profile_type in root:
return os.path.join(root, filename)
# Fallback: if we simply found it, maybe just return it?
# Some common files might be in root or other places.
# Let's return it if we are fairly sure.
return os.path.join(root, filename)
return None
def _merge_chain(self, final_file_path: str) -> Dict:
"""
Resolves inheritance and merges.
"""
chain = []
current_path = final_file_path
# 1. Build chain
while current_path:
chain.insert(0, current_path) # Prepend
with open(current_path, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {current_path}")
raise e
inherits = data.get("inherits")
if inherits:
# Resolve inherited file
# It is usually in the same directory or relative.
# OrcaSlicer logic: checks same dir, then parent, etc.
# Usually it's in the same directory.
parent_dir = os.path.dirname(current_path)
inherited_path = os.path.join(parent_dir, inherits)
# Special case: if not found, it might be in a common folder?
# But OrcaSlicer usually keeps them local or in specific common dirs.
if not os.path.exists(inherited_path) and not inherits.endswith(".json"):
inherited_path += ".json"
if os.path.exists(inherited_path):
current_path = inherited_path
else:
# Could be a system common file not in the same dir?
# For simplicty, try to look up in the same generic type folder across the vendor?
# Or just fail for now.
# Often "fdm_machine_common.json" is at the Vendor root or similar?
# Let's try searching recursively if not found in place.
found = self._find_profile_file(inherits, "any") # "any" type
if found:
current_path = found
else:
logger.warning(f"Inherited profile '{inherits}' not found for '{current_path}' (Root: {self.profiles_root})")
current_path = None
else:
current_path = None
# 2. Merge
merged = {}
for path in chain:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Shallow update
merged.update(data)
# Remove metadata
merged.pop("inherits", None)
return merged
def _apply_patches(self, profile: Dict, profile_type: str) -> Dict:
if profile_type == "machine":
# Patch: G92 E0 to ensure extrusion reference text matches
lcg = profile.get("layer_change_gcode", "")
if "G92 E0" not in lcg:
# Append neatly
if lcg and not lcg.endswith("\n"):
lcg += "\n"
lcg += "G92 E0"
profile["layer_change_gcode"] = lcg
# Patch: ensure printable height is sufficient?
# Only if necessary. For now, trust the profile.
elif profile_type == "process":
# Optional: Disable skirt/brim if we want a "clean" print estimation?
# Actually, for accurate cost, we SHOULD include skirt/brim if the profile has it.
pass
return profile
def list_machines(self) -> List[str]:
# Simple helper to list available machine JSONs
return self._list_profiles_by_type("machine")
def list_filaments(self) -> List[str]:
return self._list_profiles_by_type("filament")
def list_processes(self) -> List[str]:
return self._list_profiles_by_type("process")
def _list_profiles_by_type(self, ptype: str) -> List[str]:
results = []
for root, dirs, files in os.walk(self.profiles_root):
if os.path.basename(root) == ptype:
for f in files:
if f.endswith(".json") and "common" not in f:
results.append(f.replace(".json", ""))
return sorted(results)

View File

@@ -0,0 +1,24 @@
{
"quality_to_process": {
"draft": "0.28mm Extra Draft @BBL A1",
"standard": "0.20mm Standard @BBL A1",
"fine": "0.12mm Fine @BBL A1"
},
"filament_costs": {
"pla_basic": 20.0,
"petg_basic": 25.0,
"abs_basic": 22.0,
"tpu_95a": 35.0
},
"filament_to_profile": {
"pla_basic": "Bambu PLA Basic @BBL A1",
"petg_basic": "Bambu PETG Basic @BBL A1",
"abs_basic": "Bambu ABS @BBL A1",
"tpu_95a": "Bambu TPU 95A @BBL A1"
},
"machine_to_profile": {
"bambu_a1": "Bambu Lab A1 0.4 nozzle",
"bambu_x1": "Bambu Lab X1 Carbon 0.4 nozzle",
"bambu_p1s": "Bambu Lab P1S 0.4 nozzle"
}
}

View File

@@ -0,0 +1,58 @@
import sys
import os
import unittest
import json
# Add backend to path
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from profile_manager import ProfileManager
from profile_cache import get_cache_key
class TestProfileManager(unittest.TestCase):
def setUp(self):
self.pm = ProfileManager(profiles_root="profiles")
def test_list_machines(self):
machines = self.pm.list_machines()
print(f"Found machines: {len(machines)}")
self.assertTrue(len(machines) > 0, "No machines found")
# Check for a known machine
self.assertTrue(any("Bambu Lab A1" in m for m in machines), "Bambu Lab A1 should be in the list")
def test_find_profile(self):
# We know "Bambu Lab A1 0.4 nozzle" should exist (based on user context and mappings)
# It might be in profiles/BBL/machine/
path = self.pm._find_profile_file("Bambu Lab A1 0.4 nozzle", "machine")
self.assertIsNotNone(path, "Could not find Bambu Lab A1 machine profile")
print(f"Found profile at: {path}")
def test_scan_profiles_inheritance(self):
# Pick a profile we expect to inherit stuff
# e.g. "Bambu Lab A1 0.4 nozzle" inherits "fdm_bbl_3dp_001_common" which inherits "fdm_machine_common"
merged, _, _ = self.pm.get_profiles(
"Bambu Lab A1 0.4 nozzle",
"Bambu PLA Basic @BBL A1",
"0.20mm Standard @BBL A1"
)
self.assertIsNotNone(merged)
# Check if inherits is gone
self.assertNotIn("inherits", merged)
# Check if patch applied (G92 E0)
self.assertIn("G92 E0", merged.get("layer_change_gcode", ""))
# Check specific key from base
# "printer_technology": "FFF" is usually in common
# We can't be 100% sure of keys without seeing file, but let's check something likely
self.assertTrue("nozzle_diameter" in merged or "extruder_clearance_height_to_lid" in merged or "printable_height" in merged)
def test_mappings_resolution(self):
# Test if the slicer service would resolve correctly?
# We can just test the manager with mapped names if the manager supported it,
# but the manager deals with explicit names.
# Integration test handles the mapping.
pass
if __name__ == '__main__':
unittest.main()