Demo代码(Python)
阿星
发布于:2025-06-17
import requests
import json
url = "https://api.comfylink.com/v1/openapi/tasks"
headers = {
"X-API-Key": "YOUR_API_KEY",
"X-Client-Id": "YOUR_UUID",
"Content-Type": "application/json"
}
data = {
"remark": "",
"appId": "YOUR_APPID",
"inputs": [{"name": "6:text", "value": "test ..."}]
}
try:
response = requests.post(url, headers=headers, json=data, timeout=10)
response.raise_for_status()
res_json = response.json()
except requests.RequestException as e:
print("网络请求失败:", e)
exit(1)
except json.JSONDecodeError:
print("响应内容不是合法的JSON:", response.text)
exit(1)
if res_json.get("code") == 0:
task_id = res_json["data"]["taskId"]
print("任务创建成功,任务ID:", task_id)
else:
print("任务创建失败:", res_json.get("msg"))
import requests
task_id = "创建任务返回的任务ID"
url = f"https://api.comfylink.com/v1/openapi/tasks/{task_id}"
headers = {
"X-API-Key": "YOUR_API_KEY",
"X-Client-Id": "YOUR_UUID",
}
try:
response = requests.delete(url, headers=headers, timeout=10)
res_json = response.json()
except Exception as e:
print("请求或解析响应失败:", e)
exit(1)
if res_json.get("code") == 0:
print("任务已取消")
else:
print("取消任务失败:", res_json.get("msg"))
import requests
task_id = "创建任务返回的任务ID"
url = f"https://api.comfylink.com/v1/openapi/tasks/{task_id}"
headers = {
"X-API-Key": "YOUR_API_KEY",
"X-Client-Id": "YOUR_UUID",
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
res_json = response.json()
except requests.RequestException as e:
print("网络请求失败:", e)
exit(1)
except Exception:
print("响应内容不是合法的JSON:", response.text)
exit(1)
if res_json.get("code") == 0:
status = res_json["data"]["status"]
message = res_json["data"]["message"]
if status == "finished":
print("任务完成,输出结果:", res_json["data"].get("outputs", []))
elif status == "failed":
print("任务失败:", message)
elif status == "canceled":
print("任务已取消:", message)
else:
print("当前状态:", status, "描述:", message)
else:
print("查询任务失败:", res_json.get("msg"))
import requests
url = "https://api.comfylink.com/v1/openapi/files"
headers = {
"X-API-Key": "YOUR_API_KEY",
"X-Client-Id": "YOUR_UUID"
}
try:
with open('comfylink.jpg', 'rb') as f:
files = {'file': f}
response = requests.post(url, headers=headers, files=files, timeout=20)
response.raise_for_status()
res_json = response.json()
except FileNotFoundError:
print("文件未找到,请检查文件路径。")
exit(1)
except requests.RequestException as e:
print("网络请求失败:", e)
exit(1)
except Exception:
print("响应内容不是合法的JSON:", response.text)
exit(1)
if res_json.get("code") == 0:
file_url = res_json.get("data", {}).get("path")
print("文件上传成功,访问地址:", file_url)
else:
print("文件上传失败:", res_json.get("msg"))
import websocket
import json
def on_message(ws, message):
try:
data = json.loads(message)
except json.JSONDecodeError:
print("收到非JSON消息:", message)
return
except Exception as e:
print("解析消息出错:", str(e))
return
# 检查是否有 data 字段且为 dict
data_field = data.get('data')
if not isinstance(data_field, dict):
print("未知消息格式:", message)
return
status = data_field.get('status')
msg = data_field.get('message', '')
if status == "finished":
print("【任务完成】")
print("输出结果:", data_field.get('outputs', []))
elif status == "failed":
print("【任务失败】错误信息:", msg)
elif status == "canceled":
print("【任务已取消】原因:", msg)
elif status == "pending":
print("【任务排队中】当前状态:", msg)
else:
print("未知状态:", status, "消息:", msg)
def on_error(ws, error):
print("发生错误:", error)
def on_close(ws, close_status_code, close_msg):
print("连接已关闭", close_status_code, close_msg)
def on_open(ws):
print("WebSocket 连接已建立")
def main():
ws_url = "wss://api.comfylink.com/v1/openapi/ws"
headers = {
"X-API-Key": "YOUR_API_KEY",
"X-Client-Id": "YOUR_UUID"
}
ws = websocket.WebSocketApp(
ws_url,
header=[f"{k}: {v}" for k, v in headers.items()],
on_message=on_message,
on_error=on_error,
on_close=on_close,
on_open=on_open
)
try:
ws.run_forever(ping_interval=30, ping_timeout=10)
except KeyboardInterrupt:
print("用户中断,关闭连接。")
if __name__ == "__main__":
main()