# import json # import threading # # from flask import Flask # from nacos import NacosClient # # app = Flask(__name__) # # class nacos: # def __init__(self,address,namespace,username,password,data_id,group): # self.SERVER_ADDRESSES = address # self.NAMESPACE = namespace # self.USERNAME = username # self.PASSWORD = password # self.client = NacosClient(server_addresses=self.SERVER_ADDRESSES, namespace=self.NAMESPACE) # self.data_id = data_id # self.group = group # def load_config(self): # """从Nacos加载配置并更新应用状态""" # config_data = self.client.get_config(data_id=self.data_id, group=self.group) # app.config['CONFIG_DATA'] = config_data # print(config_data) # return json.loads(config_data) # # def on_config_change(self,cfg): # """当Nacos中的配置发生变化时,更新应用配置中的配置数据""" # config_data = json.loads(cfg['content']) # Parse the JSON string to a dictionary # app.config['CONFIG_DATA'] = config_data # return config_data # # def watch_config_changes(self): # try: # self.client.add_config_watcher(data_id='python-mendunr', group='DEFAULT_GROUP', cb=self.on_config_change) # self.client.add_naming_instance(service_name='python-mendunr', # ip='192.168.3.175', # port=3333, # group_name='DEFAULT_GROUP', # ephemeral=False) # except Exception as e: # raise e # # # # def start_background_tasks(self): # # """启动后台任务线程来监控配置变化""" # # t = threading.Thread(target=self.watch_config_changes) # # t.daemon = True # # t.start() # # # self.watch_config_changes() # # nacos_single = nacos(address="192.168.3.80:8848",namespace = "dev",username = 'nacos', # password ='nacos',data_id = 'python-mendunr',group = 'DEFAULT_GROUP') # nacos_single.watch_config_changes() # configs = nacos_single.load_config() import json from flask import Flask, request from nacos import NacosClient app = Flask(__name__) class NacosConfig: def __init__(self, address, namespace, username, password, data_id, group): self.SERVER_ADDRESSES = address self.NAMESPACE = namespace self.USERNAME = username self.PASSWORD = password self.client = NacosClient(server_addresses=self.SERVER_ADDRESSES, namespace=self.NAMESPACE, username=self.USERNAME, password=self.PASSWORD) self.data_id = data_id self.group = group self.config_data = None def load_config(self): """从Nacos加载配置""" config_str = self.client.get_config(data_id=self.data_id, group=self.group) self.config_data = json.loads(config_str) if config_str else {} return self.config_data def get_config(self): """返回当前加载的配置数据""" if self.config_data is None: self.load_config() return self.config_data nacos_config = NacosConfig(address="192.168.3.80:8848", namespace="dev", username='nacos', password='nacos', data_id='python-mendunr', group='DEFAULT_GROUP') configs = nacos_config.get_config() @app.route('/config') def get_config_route(): """返回当前配置,或者可以选择在这里重新加载配置""" # 可以在这里选择是否重新加载配置,或者根据需要进行其他逻辑处理 # nacos_config.load_config() # 如果需要每次请求都检查更新,可以取消注释这行代码 return json.dumps(nacos_config.get_config()) if __name__ == '__main__': app.run(debug=True)