import json from flask import Flask, request from nacos import NacosClient app = Flask(__name__) config_json = { "local": { "address": "192.168.3.80:8848", "namespace": "dev", "username": "nacos", "password": "nacos", "data_id": "python-mendunr", "group": "DEFAULT_GROUP" }, "production": { "address": "localhost:8848", "namespace": "dev", "username": "nacos", "password": "nacos", "data_id": "python-mendunr", "group": "DEFAULT_GROUP" } } class NacosConfig: def __init__(self, environment='local'): env_config = config_json[environment] self.SERVER_ADDRESSES = env_config['address'] self.NAMESPACE = env_config['namespace'] self.USERNAME = env_config['username'] self.PASSWORD = env_config['password'] self.data_id = env_config['data_id'] self.group = env_config['group'] self.client = NacosClient(server_addresses=self.SERVER_ADDRESSES, namespace=self.NAMESPACE, username=self.USERNAME, password=self.PASSWORD) self.config_data = None def load_config(self): """从Nacos加载配置""" config_str = self.client.get_config(data_id=self.data_id, group=self.group) self.config_data = json.loads(config_str) if config_str else {} return self.config_data def get_config(self): """返回当前加载的配置数据""" if self.config_data is None: self.load_config() return self.config_data # 使用配置文件中的生产环境配置 (本地是local,提交代码是production)!! nacos_config = NacosConfig(environment='production') configs = nacos_config.get_config() @app.route('/config') def get_config_route(): """返回当前配置,或者可以选择在这里重新加载配置""" # 可以在这里选择是否重新加载配置,或者根据需要进行其他逻辑处理 # nacos_config.load_config() # 如果需要每次请求都检查更新,可以取消注释这行代码 return json.dumps(nacos_config.get_config()) if __name__ == '__main__': app.run(debug=True)