123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- """
- 系统用户认证模块
- 提供用户注册、登录验证等功能
- """
- import logging
- import base64
- import time
- import uuid
- import psycopg2
- from psycopg2 import pool
- logger = logging.getLogger(__name__)
- # PostgreSQL连接池
- pg_pool = None
- def get_pg_connection():
- """
- 获取PostgreSQL数据库连接
-
- Returns:
- connection: PostgreSQL连接对象
- """
- global pg_pool
-
- if pg_pool is None:
- try:
- # 创建连接池
- pg_pool = psycopg2.pool.SimpleConnectionPool(
- 1, 20,
- host="localhost",
- database="dataops",
- user="postgres",
- password="postgres",
- port="5432"
- )
- logger.info("PostgreSQL连接池初始化成功")
- except Exception as e:
- logger.error(f"PostgreSQL连接池初始化失败: {str(e)}")
- raise
-
- return pg_pool.getconn()
- def release_pg_connection(conn):
- """
- 释放PostgreSQL连接到连接池
-
- Args:
- conn: 数据库连接对象
- """
- global pg_pool
- if pg_pool and conn:
- pg_pool.putconn(conn)
- def encode_password(password):
- """
- 对密码进行base64编码
-
- Args:
- password: 原始密码
-
- Returns:
- str: 编码后的密码
- """
- return base64.b64encode(password.encode('utf-8')).decode('utf-8')
- def create_user_table():
- """
- 创建用户表,如果不存在
-
- Returns:
- bool: 是否成功创建
- """
- conn = None
- try:
- conn = get_pg_connection()
- cursor = conn.cursor()
-
- # 创建用户表
- create_table_query = """
- CREATE TABLE IF NOT EXISTS users (
- id VARCHAR(100) PRIMARY KEY,
- username VARCHAR(50) UNIQUE NOT NULL,
- password VARCHAR(100) NOT NULL,
- created_at FLOAT NOT NULL,
- last_login FLOAT,
- is_admin BOOLEAN DEFAULT FALSE
- );
- """
- cursor.execute(create_table_query)
-
- # 创建索引加速查询
- create_index_query = """
- CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
- """
- cursor.execute(create_index_query)
-
- conn.commit()
- cursor.close()
-
- logger.info("用户表创建成功")
- return True
- except Exception as e:
- logger.error(f"创建用户表失败: {str(e)}")
- if conn:
- conn.rollback()
- return False
- finally:
- if conn:
- release_pg_connection(conn)
- def register_user(username, password):
- """
- 注册新用户
-
- Args:
- username: 用户名
- password: 密码
-
- Returns:
- tuple: (是否成功, 消息)
- """
- conn = None
- try:
- # 确保表已创建
- create_user_table()
-
- # 对密码进行编码
- encoded_password = encode_password(password)
-
- # 生成用户ID
- user_id = str(uuid.uuid4())
-
- conn = get_pg_connection()
- cursor = conn.cursor()
-
- # 检查用户名是否存在
- check_query = "SELECT username FROM users WHERE username = %s"
- cursor.execute(check_query, (username,))
-
- if cursor.fetchone():
- return False, "用户名已存在"
-
- # 创建用户
- insert_query = """
- INSERT INTO users (id, username, password, created_at, last_login)
- VALUES (%s, %s, %s, %s, %s)
- """
- cursor.execute(
- insert_query,
- (user_id, username, encoded_password, time.time(), None)
- )
-
- conn.commit()
- cursor.close()
-
- return True, "注册成功"
- except Exception as e:
- logger.error(f"用户注册失败: {str(e)}")
- if conn:
- conn.rollback()
- return False, f"注册失败: {str(e)}"
- finally:
- if conn:
- release_pg_connection(conn)
- def login_user(username, password):
- """
- 用户登录验证
-
- Args:
- username: 用户名
- password: 密码
-
- Returns:
- tuple: (是否成功, 用户信息/错误消息)
- """
- conn = None
- try:
- # 对输入的密码进行编码
- encoded_password = encode_password(password)
-
- conn = get_pg_connection()
- cursor = conn.cursor()
-
- # 查询用户
- query = """
- SELECT id, username, password, created_at, last_login, is_admin
- FROM users WHERE username = %s
- """
- cursor.execute(query, (username,))
-
- user = cursor.fetchone()
-
- # 检查用户是否存在
- if not user:
- return False, "用户名或密码错误"
-
- # 验证密码
- if user[2] != encoded_password:
- return False, "用户名或密码错误"
-
- # 更新最后登录时间
- current_time = time.time()
- update_query = """
- UPDATE users SET last_login = %s WHERE username = %s
- """
- cursor.execute(update_query, (current_time, username))
-
- conn.commit()
-
- # 构建用户信息
- user_info = {
- "id": user[0],
- "username": user[1],
- "created_at": user[3],
- "last_login": current_time,
- "is_admin": user[5] if len(user) > 5 else False
- }
-
- cursor.close()
-
- return True, user_info
- except Exception as e:
- logger.error(f"用户登录失败: {str(e)}")
- if conn:
- conn.rollback()
- return False, f"登录失败: {str(e)}"
- finally:
- if conn:
- release_pg_connection(conn)
- def get_user_by_username(username):
- """
- 根据用户名获取用户信息
-
- Args:
- username: 用户名
-
- Returns:
- dict: 用户信息(不包含密码)
- """
- conn = None
- try:
- conn = get_pg_connection()
- cursor = conn.cursor()
-
- query = """
- SELECT id, username, created_at, last_login, is_admin
- FROM users WHERE username = %s
- """
- cursor.execute(query, (username,))
-
- user = cursor.fetchone()
- cursor.close()
-
- if not user:
- return None
-
- user_info = {
- "id": user[0],
- "username": user[1],
- "created_at": user[2],
- "last_login": user[3],
- "is_admin": user[4] if user[4] is not None else False
- }
-
- return user_info
- except Exception as e:
- logger.error(f"获取用户信息失败: {str(e)}")
- return None
- finally:
- if conn:
- release_pg_connection(conn)
- def init_db():
- """
- 初始化数据库,创建用户表
-
- Returns:
- bool: 是否成功初始化
- """
- return create_user_table()
|