from flask import Flask, request, jsonify import cv2 import numpy as np from sklearn.metrics.pairwise import cosine_similarity import os import mysql.connector from flask_cors import CORS import dlib app = Flask(__name__) # 修改 CORS 配置,明确指定允许的来源和方法 # 允许所有源的访问(仅在开发环境使用) CORS(app, resources={ r"/*": { "origins": ["http://localhost:8080"], "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type"] } }) CORS(app, supports_credentials=True) # 预先保存的标准图片路径 STANDARD_IMAGES = { 'oval': 'oval.jpg', 'square': 'square.jpg', 'heart': 'heart.jpg', 'round': 'round.jpg', 'long': 'long.jpg', 'diamond': 'diamond.jpg', 'pear': 'pear.jpg' } # MySQL数据库配置 db_config = { 'user': 'root', 'password': '123456', 'host': '192.168.3.80', 'database': 'citu_new' } # 获取当前文件所在目录的绝对路径 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 构建数据文件的完整路径 SHAPE_PREDICTOR_PATH = os.path.join(BASE_DIR, 'data', 'shape_predictor_68_face_landmarks.dat') def compare_faces(image1, image2): # 使用OpenCV进行脸部轮廓对比 face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') gray1 = cv2.cvtColor(image1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(image2, cv2.COLOR_BGR2GRAY) faces1 = face_cascade.detectMultiScale(gray1, 1.1, 4) faces2 = face_cascade.detectMultiScale(gray2, 1.1, 4) if len(faces1) == 0 or len(faces2) == 0: return 0.0 # 计算相似度 face1 = gray1[faces1[0][1]:faces1[0][1]+faces1[0][3], faces1[0][0]:faces1[0][0]+faces1[0][2]] face2 = gray2[faces2[0][1]:faces2[0][1]+faces2[0][3], faces2[0][0]:faces2[0][0]+faces2[0][2]] face1 = cv2.resize(face1, (100, 100)) face2 = cv2.resize(face2, (100, 100)) face1 = face1.flatten() face2 = face2.flatten() similarity = cosine_similarity([face1], [face2])[0][0] return similarity * 100 @app.route('/upload', methods=['POST']) def upload(): try: if not os.path.exists(SHAPE_PREDICTOR_PATH): return jsonify({ 'error': True, 'message': f'找不到面部特征点数据文件: {SHAPE_PREDICTOR_PATH}', 'data': None }), 500 if 'file' not in request.files: return jsonify({ 'error': True, 'message': 'No file part', 'data': None }), 400 file = request.files['file'] image = cv2.imdecode(np.frombuffer(file.read(), np.uint8), cv2.IMREAD_COLOR) # 使用绝对路径加载模型 detector = dlib.get_frontal_face_detector() predictor = dlib.shape_predictor(SHAPE_PREDICTOR_PATH) # 检测人脸 faces = detector(image) if len(faces) == 0: return jsonify({ 'error': True, 'message': '未检测到人脸', 'data': None }), 400 # 获取第一个检测到的人脸的关键点 landmarks = predictor(image, faces[0]) # 获取特定点的坐标 # dlib的68个关键点中: # 左脸颊点约为点1-2 # 右脸颊点约为点15-16 # 鼻梁点约为点27 facial_points = { 'cheek_left_3': {'x': landmarks.part(2).x, 'y': landmarks.part(2).y}, 'cheek_right_3': {'x': landmarks.part(14).x, 'y': landmarks.part(14).y}, 'nose_bridge_1': {'x': landmarks.part(27).x, 'y': landmarks.part(27).y} } # 进行人脸对比 results = [] for name, path in STANDARD_IMAGES.items(): standard_image = cv2.imread(path) similarity = compare_faces(image, standard_image) results.append({ 'name': name, 'similarity': similarity, 'facial_points': facial_points # 添加面部关键点数据 }) # 挑选相似度最高的两个 results.sort(key=lambda x: x['similarity'], reverse=True) top_results = results[:2] return jsonify({ 'error': False, 'message': 'success', 'data': top_results }) except Exception as e: error_message = str(e) print(f"Error in upload: {error_message}") return jsonify({ 'error': True, 'message': f'处理图片时出错: {error_message}', 'data': None }), 500 @app.route('/select', methods=['POST']) def select(): try: data = request.get_json() if not data or 'name' not in data: return jsonify({ 'error': True, 'message': '请求数据格式错误', 'data': None }), 400 face_type = data['name'] # 从前端传来的name就是脸型 # 从MySQL数据库读取对应的记录 try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor(dictionary=True) # 使用dictionary=True返回字典格式的结果 # 修改查询语句匹配新表结构 query = """ SELECT id, facetype, brand, frametype, material, pic_url, detail_info FROM zeiss_glass_shelf WHERE facetype = %s """ cursor.execute(query, (face_type,)) rows = cursor.fetchall() # 格式化返回数据 formatted_rows = [] for row in rows: formatted_rows.append({ 'id': row['id'], 'facetype': row['facetype'], 'brand': row['brand'], 'frametype': row['frametype'], 'material': row['material'], 'pic_url': row['pic_url'], 'detail_info': row['detail_info'] }) return jsonify({ 'error': False, 'message': 'success', 'data': formatted_rows }) except mysql.connector.Error as db_err: error_message = str(db_err) print(f"Database error: {error_message}") return jsonify({ 'error': True, 'message': f'数据库操作失败: {error_message}', 'data': None }), 500 finally: if 'conn' in locals() and conn.is_connected(): cursor.close() conn.close() except Exception as e: error_message = str(e) print(f"General error: {error_message}") return jsonify({ 'error': True, 'message': f'服务器内部错误: {error_message}', 'data': None }), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=8080, debug=True)