import nacos import socket from threading import Timer import logging import time import os import requests class nacos_service: CLIENT = None DATA_ID = None GROUP = None LOCAL_IP = None SERVER_ADDRESSES = None NAMESPACE = None logger = logging.getLogger('app') # 这里设置名字 __instance = None def __init__(self): if os.name == 'posix': # 'posix' 表示类Unix系统,包括Linux default_address = "http://127.0.0.1:8848" else: default_address = "http://192.168.3.80:8848" self.LOCAL_IP = str(os.getenv(key="NACOS_REGISTER_IP", default=socket.gethostbyname(socket.gethostname()))) self.SERVER_ADDRESSES = str(os.getenv(key="NACOS_SERVER_ADDRESS", default=default_address)) self.NAMESPACE = str(os.getenv(key="NACOS_NAME_SPACE", default="dev")) self.DATA_ID = str(os.getenv(key="NACOS_DATA_ID", default="python-mendunr")) self.GROUP = str(os.getenv(key="DEFAULT_GROUP", default="DEFAULT_GROUP")) self.logger.info("启动参数: %s", { "IP": self.LOCAL_IP, "SERVER_ADDRESSES": self.SERVER_ADDRESSES, "NAMESPACE": self.NAMESPACE, "DATA_ID": self.DATA_ID, "nacosGroup": self.GROUP} ) def __new__(cls, *args, **kwargs): if nacos_service.__instance is None: nacos_service.__instance = object.__new__(cls, *args, **kwargs) return nacos_service.__instance ### 获取配置 def getConfig(self): return self.CLIENT.get_config(self.DATA_ID, self.GROUP) ### 注册实例 def register_instance(self): self.logger.info("注册nacos实例 分组: %s ", self.GROUP) self.logger.info("注册nacos实例 时间: %s ", time.strftime('%H:%M:%S', time.localtime(time.time()))) res = requests.post(url=self.SERVER_ADDRESSES + "/nacos/v1/ns/instance", data={ 'groupName': self.GROUP, 'metadata': '{"preserved.register.source":"SPRING_CLOUD","preserved.heart.beat.interval": 5500, "preserved.heart.beat.timeout": 30000, "preserved.ip.delete.timeout": 60000}', 'namespaceId': self.NAMESPACE, 'port': 3333, 'enable': True, 'healthy': True, 'clusterName': 'DEFAULT', 'ip': self.LOCAL_IP, 'weight': 1, 'ephemeral': True, 'serviceName': self.GROUP + "@@" + self.DATA_ID }) self.logger.info("register_instance res %s", str(res.content, 'UTF-8')) def send_heartbeat(self): try: res = requests.put(url=self.SERVER_ADDRESSES + "/nacos/v1/ns/instance/beat", data={ "ip": self.LOCAL_IP, "port": "3333", "namespaceId": self.NAMESPACE, "serviceName": self.GROUP + "@@" + self.DATA_ID, # "groupName":self.GROUP, "clusterName": "DEFAULT"} ).json() if res["code"] == 20404: ### 找不到资源重新注册 self.register_instance() # continue # time.sleep(4) t = Timer(4, self.send_heartbeat) ## 2秒执行一次 t.start() except Exception as e: print(e) def getServer(self, serverName): res = requests.get(url=self.SERVER_ADDRESSES + "/nacos/v1/ns/instance/list", params={'namespaceId': self.NAMESPACE, 'serviceName': self.GROUP + "@@" + serverName}) hosts = res.json()["hosts"] if len(hosts) > 0: return hosts[0] return None def start(self): try: self.logger.info("启动参数: %s", { "IP": self.LOCAL_IP, "SERVER_ADDRESSES": self.SERVER_ADDRESSES, "NAMESPACE": self.NAMESPACE, "DATA_ID": self.DATA_ID, "nacosGroup": self.GROUP} ) self.CLIENT = nacos.NacosClient(self.SERVER_ADDRESSES, namespace=self.NAMESPACE) self.register_instance() self.send_heartbeat() except Exception as e: print(e) def get_nacos_configs(self): url = self.SERVER_ADDRESSES + "/nacos/v1/cs/configs" params = { 'tenant': self.NAMESPACE, 'dataId': self.DATA_ID, 'group': self.GROUP } r = requests.get(url, params) r.encoding = r.apparent_encoding r_dict = r.json() return r_dict nacos_service_single = nacos_service() # 发送url请求,获取nacos上的配置 def get_nacos_configs(): url = nacos_service_single.SERVER_ADDRESSES + "/nacos/v1/cs/configs" params = { 'tenant': nacos_service_single.NAMESPACE, 'dataId': nacos_service_single.DATA_ID, 'group': nacos_service_single.GROUP } r = requests.get(url, params) r.encoding = r.apparent_encoding r_dict = r.json() return r_dict