Demo代码(Python)

阿星

发布于:2025-06-17

import requests
import json

url = "https://api.comfylink.com/v1/openapi/tasks"
headers = {
    "X-API-Key": "YOUR_API_KEY",
    "X-Client-Id": "YOUR_UUID",
    "Content-Type": "application/json"
}
data = {
    "remark": "",
    "appId": "YOUR_APPID",
    "inputs": [{"name": "6:text", "value": "test ..."}]
}

try:
    response = requests.post(url, headers=headers, json=data, timeout=10)
    response.raise_for_status()
    res_json = response.json()
except requests.RequestException as e:
    print("网络请求失败:", e)
    exit(1)
except json.JSONDecodeError:
    print("响应内容不是合法的JSON:", response.text)
    exit(1)

if res_json.get("code") == 0:
    task_id = res_json["data"]["taskId"]
    print("任务创建成功,任务ID:", task_id)
else:
    print("任务创建失败:", res_json.get("msg"))
import requests

task_id = "创建任务返回的任务ID"
url = f"https://api.comfylink.com/v1/openapi/tasks/{task_id}"
headers = {
    "X-API-Key": "YOUR_API_KEY",
    "X-Client-Id": "YOUR_UUID",
}

try:
    response = requests.delete(url, headers=headers, timeout=10)
    res_json = response.json()
except Exception as e:
    print("请求或解析响应失败:", e)
    exit(1)

if res_json.get("code") == 0:
    print("任务已取消")
else:
    print("取消任务失败:", res_json.get("msg"))
import requests

task_id = "创建任务返回的任务ID"
url = f"https://api.comfylink.com/v1/openapi/tasks/{task_id}"
headers = {
    "X-API-Key": "YOUR_API_KEY",
    "X-Client-Id": "YOUR_UUID",
}

try:
    response = requests.get(url, headers=headers, timeout=10)
    response.raise_for_status()
    res_json = response.json()
except requests.RequestException as e:
    print("网络请求失败:", e)
    exit(1)
except Exception:
    print("响应内容不是合法的JSON:", response.text)
    exit(1)

if res_json.get("code") == 0:
    status = res_json["data"]["status"]
    message = res_json["data"]["message"]
    if status == "finished":
        print("任务完成,输出结果:", res_json["data"].get("outputs", []))
    elif status == "failed":
        print("任务失败:", message)
    elif status == "canceled":
        print("任务已取消:", message)
    else:
        print("当前状态:", status, "描述:", message)
else:
    print("查询任务失败:", res_json.get("msg"))
import requests

url = "https://api.comfylink.com/v1/openapi/files"
headers = {
    "X-API-Key": "YOUR_API_KEY",
    "X-Client-Id": "YOUR_UUID"
}

try:
    with open('comfylink.jpg', 'rb') as f:
        files = {'file': f}
        response = requests.post(url, headers=headers, files=files, timeout=20)
        response.raise_for_status()
        res_json = response.json()
except FileNotFoundError:
    print("文件未找到,请检查文件路径。")
    exit(1)
except requests.RequestException as e:
    print("网络请求失败:", e)
    exit(1)
except Exception:
    print("响应内容不是合法的JSON:", response.text)
    exit(1)

if res_json.get("code") == 0:
    file_url = res_json.get("data", {}).get("path")
    print("文件上传成功,访问地址:", file_url)
else:
    print("文件上传失败:", res_json.get("msg"))
import websocket
import json

def on_message(ws, message):
    try:
        data = json.loads(message)
    except json.JSONDecodeError:
        print("收到非JSON消息:", message)
        return
    except Exception as e:
        print("解析消息出错:", str(e))
        return

    # 检查是否有 data 字段且为 dict
    data_field = data.get('data')
    if not isinstance(data_field, dict):
        print("未知消息格式:", message)
        return

    status = data_field.get('status')
    msg = data_field.get('message', '')

    if status == "finished":
        print("【任务完成】")
        print("输出结果:", data_field.get('outputs', []))
    elif status == "failed":
        print("【任务失败】错误信息:", msg)
    elif status == "canceled":
        print("【任务已取消】原因:", msg)
    elif status == "pending":
        print("【任务排队中】当前状态:", msg)
    else:
        print("未知状态:", status, "消息:", msg)

def on_error(ws, error):
    print("发生错误:", error)

def on_close(ws, close_status_code, close_msg):
    print("连接已关闭", close_status_code, close_msg)

def on_open(ws):
    print("WebSocket 连接已建立")

def main():
    ws_url = "wss://api.comfylink.com/v1/openapi/ws"
    headers = {
        "X-API-Key": "YOUR_API_KEY",
        "X-Client-Id": "YOUR_UUID"
    }

    ws = websocket.WebSocketApp(
        ws_url,
        header=[f"{k}: {v}" for k, v in headers.items()],
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
        on_open=on_open
    )

    try:
        ws.run_forever(ping_interval=30, ping_timeout=10)
    except KeyboardInterrupt:
        print("用户中断,关闭连接。")

if __name__ == "__main__":
    main()

提交反馈