import nacos import socket import logging import time import os import requests import threading class NacosService: CLIENT = None DATA_ID = None GROUP = None LOCAL_IP = None SERVER_ADDRESSES = None NAMESPACE = None logger = logging.getLogger('app') # 这里设置名字 __instance = None _registered = False # 添加注册标志位 _stop_event = threading.Event() # 用于控制心跳任务的停止 def __init__(self): if os.name == 'posix': # 'posix' 表示类Unix系统,包括Linux default_address = "127.0.0.1:8848" else: default_address = "192.168.3.80:8848" self.LOCAL_IP = str(os.getenv("NACOS_REGISTER_IP", default=socket.gethostbyname(socket.gethostname()))) self.SERVER_ADDRESSES = default_address self.NAMESPACE = "dev" self.DATA_ID = "python-mendunr" self.GROUP = "DEFAULT_GROUP" def __new__(cls, *args, **kwargs): if NacosService.__instance is None: NacosService.__instance = object.__new__(cls, *args, **kwargs) return NacosService.__instance ### 获取配置 def getConfig(self): return self.CLIENT.get_config(self.DATA_ID, self.GROUP) ### 注册实例 def register_instance(self): if self._registered: return res = requests.post(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance", data={ 'groupName': self.GROUP, 'metadata': '{"preserved.register.source":"SPRING_CLOUD","preserved.heart.beat.interval": 5500, "preserved.heart.beat.timeout": 30000, "preserved.ip.delete.timeout": 60000}', 'namespaceId': self.NAMESPACE, 'port': 3333, 'enable': True, 'healthy': True, 'clusterName': 'DEFAULT', 'ip': self.LOCAL_IP, 'weight': 1, 'ephemeral': False, # 修改这里为 False 'serviceName': self.GROUP + "@@" + self.DATA_ID }) if res.status_code == 200: self.logger.info("register_instance res %s", str(res.content, 'UTF-8')) self._registered = True # 设置注册成功标志位 else: self.logger.error(f"Failed to register instance: {res.status_code} - {res.text}") def send_heartbeat(self): while not self._stop_event.is_set(): try: res = requests.put(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/beat", data={ "ip": self.LOCAL_IP, "port": "3333", "namespaceId": self.NAMESPACE, "serviceName": self.GROUP + "@@" + self.DATA_ID, "clusterName": "DEFAULT"} ) if res.status_code == 200: res_json = res.json() if res_json["code"] == 20404: ### 找不到资源重新注册 self.register_instance() else: self.logger.error(f"Failed to send heartbeat: {res.status_code} - {res.text}") self._stop_event.wait(4) ## 4秒执行一次 except Exception as e: self.logger.error(f"send_heartbeat error: {e}") def getServer(self, serverName): try: res = requests.get(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/list", params={'namespaceId': self.NAMESPACE, 'serviceName': self.GROUP + "@@" + serverName}) if res.status_code == 200: hosts = res.json()["hosts"] if len(hosts) > 0: return hosts[0] else: self.logger.error(f"Failed to get server list: {res.status_code} - {res.text}") except Exception as e: self.logger.error(f"getServer error: {e}") return None def start(self): try: self.CLIENT = nacos.NacosClient(self.SERVER_ADDRESSES, namespace=self.NAMESPACE) self.register_instance() self.heartbeat_thread = threading.Thread(target=self.send_heartbeat) self.heartbeat_thread.start() except Exception as e: self.logger.error(f"start error: {e}") def stop(self): self._stop_event.set() self.heartbeat_thread.join() def get_nacos_configs(self): url = f"http://{self.SERVER_ADDRESSES}/nacos/v1/cs/configs" params = { 'tenant': self.NAMESPACE, 'dataId': self.DATA_ID, 'group': self.GROUP } try: r = requests.get(url, params) r.encoding = r.apparent_encoding if r.status_code == 200: r_dict = r.json() return r_dict else: self.logger.error(f"Failed to get Nacos configs: {r.status_code} - {r.text}") except Exception as e: self.logger.error(f"get_nacos_configs error: {e}") return None nacos_service_single = NacosService() # 发送url请求,获取nacos上的配置 def get_nacos_configs(): return nacos_service_single.get_nacos_configs()