|
@@ -1,11 +1,10 @@
|
|
import nacos
|
|
import nacos
|
|
import socket
|
|
import socket
|
|
-from threading import Timer
|
|
|
|
import logging
|
|
import logging
|
|
import time
|
|
import time
|
|
import os
|
|
import os
|
|
import requests
|
|
import requests
|
|
-
|
|
|
|
|
|
+import threading
|
|
|
|
|
|
class NacosService:
|
|
class NacosService:
|
|
CLIENT = None
|
|
CLIENT = None
|
|
@@ -18,6 +17,7 @@ class NacosService:
|
|
|
|
|
|
__instance = None
|
|
__instance = None
|
|
_registered = False # 添加注册标志位
|
|
_registered = False # 添加注册标志位
|
|
|
|
+ _stop_event = threading.Event() # 用于控制心跳任务的停止
|
|
|
|
|
|
def __init__(self):
|
|
def __init__(self):
|
|
if os.name == 'posix': # 'posix' 表示类Unix系统,包括Linux
|
|
if os.name == 'posix': # 'posix' 表示类Unix系统,包括Linux
|
|
@@ -65,25 +65,25 @@ class NacosService:
|
|
self.logger.error(f"Failed to register instance: {res.status_code} - {res.text}")
|
|
self.logger.error(f"Failed to register instance: {res.status_code} - {res.text}")
|
|
|
|
|
|
def send_heartbeat(self):
|
|
def send_heartbeat(self):
|
|
- try:
|
|
|
|
- res = requests.put(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/beat",
|
|
|
|
- data={
|
|
|
|
- "ip": self.LOCAL_IP,
|
|
|
|
- "port": "3333",
|
|
|
|
- "namespaceId": self.NAMESPACE,
|
|
|
|
- "serviceName": self.GROUP + "@@" + self.DATA_ID,
|
|
|
|
- "clusterName": "DEFAULT"}
|
|
|
|
- )
|
|
|
|
- if res.status_code == 200:
|
|
|
|
- res_json = res.json()
|
|
|
|
- if res_json["code"] == 20404: ### 找不到资源重新注册
|
|
|
|
- self.register_instance()
|
|
|
|
- else:
|
|
|
|
- self.logger.error(f"Failed to send heartbeat: {res.status_code} - {res.text}")
|
|
|
|
- t = Timer(4, self.send_heartbeat) ## 4秒执行一次
|
|
|
|
- t.start()
|
|
|
|
- except Exception as e:
|
|
|
|
- self.logger.error(f"send_heartbeat error: {e}")
|
|
|
|
|
|
+ while not self._stop_event.is_set():
|
|
|
|
+ try:
|
|
|
|
+ res = requests.put(url=f"http://{self.SERVER_ADDRESSES}/nacos/v1/ns/instance/beat",
|
|
|
|
+ data={
|
|
|
|
+ "ip": self.LOCAL_IP,
|
|
|
|
+ "port": "3333",
|
|
|
|
+ "namespaceId": self.NAMESPACE,
|
|
|
|
+ "serviceName": self.GROUP + "@@" + self.DATA_ID,
|
|
|
|
+ "clusterName": "DEFAULT"}
|
|
|
|
+ )
|
|
|
|
+ if res.status_code == 200:
|
|
|
|
+ res_json = res.json()
|
|
|
|
+ if res_json["code"] == 20404: ### 找不到资源重新注册
|
|
|
|
+ self.register_instance()
|
|
|
|
+ else:
|
|
|
|
+ self.logger.error(f"Failed to send heartbeat: {res.status_code} - {res.text}")
|
|
|
|
+ self._stop_event.wait(4) ## 4秒执行一次
|
|
|
|
+ except Exception as e:
|
|
|
|
+ self.logger.error(f"send_heartbeat error: {e}")
|
|
|
|
|
|
def getServer(self, serverName):
|
|
def getServer(self, serverName):
|
|
try:
|
|
try:
|
|
@@ -103,10 +103,15 @@ class NacosService:
|
|
try:
|
|
try:
|
|
self.CLIENT = nacos.NacosClient(self.SERVER_ADDRESSES, namespace=self.NAMESPACE)
|
|
self.CLIENT = nacos.NacosClient(self.SERVER_ADDRESSES, namespace=self.NAMESPACE)
|
|
self.register_instance()
|
|
self.register_instance()
|
|
- self.send_heartbeat()
|
|
|
|
|
|
+ self.heartbeat_thread = threading.Thread(target=self.send_heartbeat)
|
|
|
|
+ self.heartbeat_thread.start()
|
|
except Exception as e:
|
|
except Exception as e:
|
|
self.logger.error(f"start error: {e}")
|
|
self.logger.error(f"start error: {e}")
|
|
|
|
|
|
|
|
+ def stop(self):
|
|
|
|
+ self._stop_event.set()
|
|
|
|
+ self.heartbeat_thread.join()
|
|
|
|
+
|
|
def get_nacos_configs(self):
|
|
def get_nacos_configs(self):
|
|
url = f"http://{self.SERVER_ADDRESSES}/nacos/v1/cs/configs"
|
|
url = f"http://{self.SERVER_ADDRESSES}/nacos/v1/cs/configs"
|
|
params = {
|
|
params = {
|
|
@@ -132,4 +137,4 @@ nacos_service_single = NacosService()
|
|
|
|
|
|
# 发送url请求,获取nacos上的配置
|
|
# 发送url请求,获取nacos上的配置
|
|
def get_nacos_configs():
|
|
def get_nacos_configs():
|
|
- return nacos_service_single.get_nacos_configs()
|
|
|
|
|
|
+ return nacos_service_single.get_nacos_configs()
|