| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 | # import json# import threading## from flask import Flask# from  nacos import NacosClient## app = Flask(__name__)## class nacos:#     def __init__(self,address,namespace,username,password,data_id,group):#         self.SERVER_ADDRESSES = address#         self.NAMESPACE = namespace#         self.USERNAME = username#         self.PASSWORD = password#         self.client = NacosClient(server_addresses=self.SERVER_ADDRESSES, namespace=self.NAMESPACE)#         self.data_id = data_id#         self.group = group#     def load_config(self):#         """从Nacos加载配置并更新应用状态"""#         config_data = self.client.get_config(data_id=self.data_id, group=self.group)#         app.config['CONFIG_DATA'] = config_data#         print(config_data)#         return json.loads(config_data)##     def on_config_change(self,cfg):#         """当Nacos中的配置发生变化时,更新应用配置中的配置数据"""#         config_data = json.loads(cfg['content'])  # Parse the JSON string to a dictionary#         app.config['CONFIG_DATA'] = config_data#         return config_data##     def watch_config_changes(self):#         try:#             self.client.add_config_watcher(data_id='python-mendunr', group='DEFAULT_GROUP', cb=self.on_config_change)#             self.client.add_naming_instance(service_name='python-mendunr',#                                        ip='192.168.3.175',#                                        port=3333,#                                        group_name='DEFAULT_GROUP',#                                        ephemeral=False)#         except Exception as e:#             raise e###     # def start_background_tasks(self):#     #     """启动后台任务线程来监控配置变化"""#     #     t = threading.Thread(target=self.watch_config_changes)#     #     t.daemon = True#     #     t.start()#     #     # self.watch_config_changes()## nacos_single = nacos(address="192.168.3.80:8848",namespace = "dev",username = 'nacos',#                      password ='nacos',data_id = 'python-mendunr',group = 'DEFAULT_GROUP')# nacos_single.watch_config_changes()# configs = nacos_single.load_config()import jsonfrom flask import Flask, requestfrom nacos import NacosClientapp = Flask(__name__)class NacosConfig:    def __init__(self, address, namespace, username, password, data_id, group):        self.SERVER_ADDRESSES = address        self.NAMESPACE = namespace        self.USERNAME = username        self.PASSWORD = password        self.client = NacosClient(server_addresses=self.SERVER_ADDRESSES, namespace=self.NAMESPACE,                                  username=self.USERNAME, password=self.PASSWORD)        self.data_id = data_id        self.group = group        self.config_data = None    def load_config(self):        """从Nacos加载配置"""        config_str = self.client.get_config(data_id=self.data_id, group=self.group)        self.config_data = json.loads(config_str) if config_str else {}        return self.config_data    def get_config(self):        """返回当前加载的配置数据"""        if self.config_data is None:            self.load_config()        return self.config_datanacos_config = NacosConfig(address="192.168.3.80:8848", namespace="dev", username='nacos', password='nacos',                           data_id='python-mendunr', group='DEFAULT_GROUP')configs = nacos_config.get_config()@app.route('/config')def get_config_route():    """返回当前配置,或者可以选择在这里重新加载配置"""    # 可以在这里选择是否重新加载配置,或者根据需要进行其他逻辑处理    # nacos_config.load_config()  # 如果需要每次请求都检查更新,可以取消注释这行代码    return json.dumps(nacos_config.get_config())if __name__ == '__main__':    app.run(debug=True)
 |