""" 系统用户认证模块 提供用户注册、登录验证等功能 """ import logging import base64 import time import uuid import psycopg2 from psycopg2 import pool logger = logging.getLogger(__name__) # PostgreSQL连接池 pg_pool = None def get_pg_connection(): """ 获取PostgreSQL数据库连接 Returns: connection: PostgreSQL连接对象 """ global pg_pool if pg_pool is None: try: # 创建连接池 pg_pool = psycopg2.pool.SimpleConnectionPool( 1, 20, host="localhost", database="dataops", user="postgres", password="postgres", port="5432" ) logger.info("PostgreSQL连接池初始化成功") except Exception as e: logger.error(f"PostgreSQL连接池初始化失败: {str(e)}") raise return pg_pool.getconn() def release_pg_connection(conn): """ 释放PostgreSQL连接到连接池 Args: conn: 数据库连接对象 """ global pg_pool if pg_pool and conn: pg_pool.putconn(conn) def encode_password(password): """ 对密码进行base64编码 Args: password: 原始密码 Returns: str: 编码后的密码 """ return base64.b64encode(password.encode('utf-8')).decode('utf-8') def create_user_table(): """ 创建用户表,如果不存在 Returns: bool: 是否成功创建 """ conn = None try: conn = get_pg_connection() cursor = conn.cursor() # 创建用户表 create_table_query = """ CREATE TABLE IF NOT EXISTS users ( id VARCHAR(100) PRIMARY KEY, username VARCHAR(50) UNIQUE NOT NULL, password VARCHAR(100) NOT NULL, created_at FLOAT NOT NULL, last_login FLOAT, is_admin BOOLEAN DEFAULT FALSE ); """ cursor.execute(create_table_query) # 创建索引加速查询 create_index_query = """ CREATE INDEX IF NOT EXISTS idx_users_username ON users(username); """ cursor.execute(create_index_query) conn.commit() cursor.close() logger.info("用户表创建成功") return True except Exception as e: logger.error(f"创建用户表失败: {str(e)}") if conn: conn.rollback() return False finally: if conn: release_pg_connection(conn) def register_user(username, password): """ 注册新用户 Args: username: 用户名 password: 密码 Returns: tuple: (是否成功, 消息) """ conn = None try: # 确保表已创建 create_user_table() # 对密码进行编码 encoded_password = encode_password(password) # 生成用户ID user_id = str(uuid.uuid4()) conn = get_pg_connection() cursor = conn.cursor() # 检查用户名是否存在 check_query = "SELECT username FROM users WHERE username = %s" cursor.execute(check_query, (username,)) if cursor.fetchone(): return False, "用户名已存在" # 创建用户 insert_query = """ INSERT INTO users (id, username, password, created_at, last_login) VALUES (%s, %s, %s, %s, %s) """ cursor.execute( insert_query, (user_id, username, encoded_password, time.time(), None) ) conn.commit() cursor.close() return True, "注册成功" except Exception as e: logger.error(f"用户注册失败: {str(e)}") if conn: conn.rollback() return False, f"注册失败: {str(e)}" finally: if conn: release_pg_connection(conn) def login_user(username, password): """ 用户登录验证 Args: username: 用户名 password: 密码 Returns: tuple: (是否成功, 用户信息/错误消息) """ conn = None try: # 对输入的密码进行编码 encoded_password = encode_password(password) conn = get_pg_connection() cursor = conn.cursor() # 查询用户 query = """ SELECT id, username, password, created_at, last_login, is_admin FROM users WHERE username = %s """ cursor.execute(query, (username,)) user = cursor.fetchone() # 检查用户是否存在 if not user: return False, "用户名或密码错误" # 验证密码 if user[2] != encoded_password: return False, "用户名或密码错误" # 更新最后登录时间 current_time = time.time() update_query = """ UPDATE users SET last_login = %s WHERE username = %s """ cursor.execute(update_query, (current_time, username)) conn.commit() # 构建用户信息 user_info = { "id": user[0], "username": user[1], "created_at": user[3], "last_login": current_time, "is_admin": user[5] if len(user) > 5 else False } cursor.close() return True, user_info except Exception as e: logger.error(f"用户登录失败: {str(e)}") if conn: conn.rollback() return False, f"登录失败: {str(e)}" finally: if conn: release_pg_connection(conn) def get_user_by_username(username): """ 根据用户名获取用户信息 Args: username: 用户名 Returns: dict: 用户信息(不包含密码) """ conn = None try: conn = get_pg_connection() cursor = conn.cursor() query = """ SELECT id, username, created_at, last_login, is_admin FROM users WHERE username = %s """ cursor.execute(query, (username,)) user = cursor.fetchone() cursor.close() if not user: return None user_info = { "id": user[0], "username": user[1], "created_at": user[2], "last_login": user[3], "is_admin": user[4] if user[4] is not None else False } return user_info except Exception as e: logger.error(f"获取用户信息失败: {str(e)}") return None finally: if conn: release_pg_connection(conn) def init_db(): """ 初始化数据库,创建用户表 Returns: bool: 是否成功初始化 """ return create_user_table()