我想要连接到这个website后面的网络插座(点击一个球员,这样高尔夫球场就可以看到了).当我判断客户端和服务器之间的流量时,我看到第一条消息是握手:

{"type": "connection_init",
 "payload":
           {
            "accept-language": "en",
            "ec-version": "5.1.88",
            "sport": "GOLF",
            "referrer": "https://www.europeantour.com/",
            "operator": "europeantour"
           }
}

根据格式(DICT的关键字),我得出结论,WebSocket使用了Apollo WebSocket传输协议.

接下来,我将按照gql‘S的WebSocket-example文档进行操作.

import asyncio
import logging
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport


logging.basicConfig(level=logging.INFO)
    
async def main():
    transport = WebsocketsTransport(url='wss://btec-websocket.services.imgarena.com',
                                    init_payload={'accept-language': 'en',
                                                  'ec-version': '5.1.88',
                                                  'operator': 'europeantour',
                                                  'referrer': 'https://www.europeantour.com/',
                                                  'sport': 'GOLF'})
    async with Client(transport=transport,
                      fetch_schema_from_transport=False) as session:
        # do something
    

asyncio.run(main())

在阅读了更多关于协议here的内容后,我仍然不明白如何在我的Python脚本中向服务器发送消息 如何将下面的消息发送到WebSocket?

{
 "id": "1073897396",
 "type": "start",
 "operationName": "SubscribeToGolfTournamentStatus",
 "eventId": 488
}

Edit

使用@Vonc的建议的不同方法

import asyncio
import logging
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport

logging.basicConfig(level=logging.INFO)


async def main():
    transport = WebsocketsTransport(url='wss://btec-websocket.services.imgarena.com',
                                    init_payload={
                                        'accept-language': 'en',
                                        'ec-version': '5.1.88',
                                        'operator': 'europeantour',
                                        'referrer': 'https://www.europeantour.com/',
                                        'sport': 'GOLF'
                                    })
    async with Client(transport=transport, fetch_schema_from_transport=False) as session:
        # Define the subscription
        subscription = gql('''
          subscription MapVisualisationSubscribeTo3DShots($input: SubscribeToGolfTeam3DShotsInput!) {
            subscribeToGolf3DShots(input: $input) {
              __typename
              x
              y
              z
              addressing
              provisionalIndex
              ballHoled
              isApproximate
              strokeNo
              holeNo
              surfaceTypeCode
              teamId
              courseId
              seqNum
              isBallDrop
              isProvisional
              isProvisionalSelected
              shotDistance
              score
              distanceToPin
              player {
                id
              }
            }
          }
        ''')

        # Subscribe and handle incoming data
        async for result in session.subscribe(subscription, variable_values={"input": {"tournamentId": 669,
                                                                                       "holeNo": 1,
                                                                                       "roundNo": 4,
                                                                                       "matchNo": 4001,
                                                                                       "holeOrder": 1
                                                                                       }
                                                                             }):
            print(result)


asyncio.run(main())

以上是基于在Network中观察到的以下WebSocket消息

{"id":"2097853415","type":"full-subscription","payload":{"variables":{"input":{"tournamentId":669,"holeNo":1,"roundNo":4,"matchNo":4001,"holeOrder":1}},"query":"\n  subscription MapVisualisationSubscribeTo3DShots($input: SubscribeToGolfTeam3DShotsInput!) {\n    subscribeToGolf3DShots(input: $input) {\n      __typename\n      x\n      y\n      z\n      addressing\n      provisionalIndex\n      ballHoled\n      isApproximate\n      strokeNo\n      holeNo\n      surfaceTypeCode\n      teamId\n      courseId\n      seqNum\n      isBallDrop\n      isProvisional\n      isProvisionalSelected\n      shotDistance\n      score\n      distanceToPin\n      player {\n        id\n      }\n    }\n  }\n","operationName":"MapVisualisationSubscribeTo3DShots","subscriptionName":"subscribeToGolf3DShots"}}

推荐答案

Europe Tour网站的API不使用Apollo WebSocket协议,因此您不能使用标准的WebsocketTransport类.

我发现:

  • 它不是在connection_init之后返回connection_ack消息,而是返回wsIdentity消息
  • 对于订阅,客户端将首先发送包含OPERATIONAME和EventID的START消息,而不使用任何GraphQL查询
  • 然后,服务器将回复request-full-subscription消息,要求客户端发送完整的订阅查询
  • 然后,客户端必须发送包含完整的所请求查询的full-subscription消息,包括operationName和新的字段subscriptionName
  • ID实际上不是随机的,它是通过将subscriptionName和变量散列在一起而产生的,它是根据字典关键字排序的JSON格式生成的.我花了一段时间才弄明白,因为如果id不正确,就不会产生错误
  • 服务器似乎会根据IP地址缓存GraphQL订阅,因此如果它之前已经接收到订阅,则不会使用请求-完全订阅消息来请求订阅

以下是显示这request-full-subscription条消息的Chrome截图:

Chrome websocket messages

然后,您可以通过继承WebsocketTransport类并进行必要的修改来创建自己的传输.

以下是工作代码的示例:


import asyncio
import json
import logging
from typing import Any, AsyncGenerator, Dict, Optional, Tuple

from graphql import DocumentNode, ExecutionResult, print_ast

from gql import Client, gql
from gql.transport.exceptions import TransportProtocolError
from gql.transport.websockets import WebsocketsTransport

logging.basicConfig(level=logging.INFO)


class EuropeanTourWebsocketsTransport(WebsocketsTransport):
    def _hash(self, e):
        t = 5381
        r = len(e)
        while r:
            r -= 1
            t = t * 33 ^ ord(e[r])
        return t & 0xFFFFFFFF

    def _calc_id(self, subscription_name, variables):

        obj = {
            "subscriptionName": subscription_name,
            "variables": variables,
        }

        obj_stringified = json.dumps(
            obj,
            separators=(",", ":"),
            sort_keys=True,
        )

        hashed_value = self._hash(obj_stringified)

        return hashed_value

    async def _send_query(
        self,
        document: DocumentNode,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: Optional[str] = None,
    ) -> int:

        # Calculate the id by hashing the subscription name and the variables
        query_id = self._calc_id(self.latest_subscription_name, variable_values)

        # Creating the payload for the full subscription
        payload: Dict[str, Any] = {"query": print_ast(document)}
        if variable_values:
            payload["variables"] = variable_values
        if operation_name:
            payload["operationName"] = operation_name
            payload["subscriptionName"] = self.latest_subscription_name

        # Saving the full query first and waiting for the server to request it later
        self.saved_full_subscriptions[str(query_id)] = payload

        # Then first start to request the subscription only with the operation name
        query_str = json.dumps(
            {
                "id": str(query_id),
                "type": "start",
                "operationName": operation_name,
                "eventId": self.latest_event_id,
            }
        )

        await self._send(query_str)

        return query_id

    async def subscribe(
        self,
        document: DocumentNode,
        *,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: str,
        subscription_name: str,
        event_id: int,
        send_stop: Optional[bool] = True,
    ) -> AsyncGenerator[ExecutionResult, None]:

        self.latest_event_id = event_id
        self.latest_subscription_name = subscription_name

        async for result in super().subscribe(
            document,
            variable_values=variable_values,
            operation_name=operation_name,
            send_stop=send_stop,
        ):
            yield result

    async def _wait_ack(self) -> None:

        self.saved_full_subscriptions = {}

        while True:
            init_answer = await self._receive()

            answer_type, answer_id, execution_result = self._parse_answer(init_answer)

            if answer_type == "wsIdentity":
                return

            raise TransportProtocolError(
                "Websocket server did not return a wsIdentity response"
            )

    def _parse_answer(
        self, answer: str
    ) -> Tuple[str, Optional[int], Optional[ExecutionResult]]:
        try:
            json_answer = json.loads(answer)
        except ValueError:
            raise TransportProtocolError(
                f"Server did not return a GraphQL result: {answer}"
            )

        if "wsIdentity" in json_answer:
            return ("wsIdentity", json_answer["wsIdentity"], None)

        elif (
            "type" in json_answer and json_answer["type"] == "request-full-subscription"
        ):
            return ("request-full-subscription", json_answer["id"], None)

        else:

            return self._parse_answer_apollo(json_answer)

    async def send_full_subscription(self, answer_id: str):

        if answer_id not in self.saved_full_subscriptions:
            raise Exception(f"Full subscription not found for id {answer_id}")

        payload = self.saved_full_subscriptions[answer_id]

        query_str = json.dumps(
            {"id": answer_id, "type": "full-subscription", "payload": payload}
        )

        await self._send(query_str)

    async def _handle_answer(
        self,
        answer_type: str,
        answer_id: Optional[int],
        execution_result: Optional[ExecutionResult],
    ) -> None:

        if answer_type == "request-full-subscription":
            await self.send_full_subscription(answer_id)

        else:
            await super()._handle_answer(answer_type, answer_id, execution_result)


async def main():

    transport = EuropeanTourWebsocketsTransport(
        url="wss://btec-websocket.services.imgarena.com",
        init_payload={
            "accept-language": "en",
            "ec-version": "5.1.88",
            "operator": "europeantour",
            "referrer": "https://www.europeantour.com/",
            "sport": "GOLF",
        },
    )

    async with Client(
        transport=transport, fetch_schema_from_transport=False
    ) as session:

        query = gql(
            """
subscription ShotTrackerSubscribeToGolfTournamentGroupScores($input: SubscribeToGolfTournamentGroupScoresInput!) {
  subscribeToGolfTournamentGroupScores(input: $input) {
    groupId
    l1Course
    teamId
    players {
      id
      lastName
      firstName
    }
    roundScores {
      courseId
      roundNo
      toParToday {
        value
      }
      holesThrough {
        value
      }
      startHole
      holes {
        holePar
        holeStrokes
        holeOrder
        holeNumber
      }
      isPlayoff
    }
    toPar {
      value
    }
    tournamentPosition {
      format
      value
      displayValue
    }
    status
  }
}
"""
        )

        variables = {
            "input": {
                "teamId": 21,
                "tournamentId": 488,
                "roundNo": 4,
            },
        }

        async for result in session.subscribe(
            query,
            operation_name="ShotTrackerSubscribeToGolfTournamentGroupScores",
            variable_values=variables,
            subscription_name="subscribeToGolfTournamentGroupScores",
            event_id=488,
        ):

            print(result)


asyncio.run(main())

Python相关问答推荐

无法使用python.h文件; Python嵌入错误

opencv Python稳定的图标识别

具有症状的分段函数:如何仅针对某些输入值定义函数?

pandas DataFrame GroupBy.diff函数的意外输出

将jit与numpy linSpace函数一起使用时出错

类型错误:输入类型不支持ufuncisnan-在执行Mann-Whitney U测试时[SOLVED]

未删除映射表的行

PMMLPipeline._ fit()需要2到3个位置参数,但给出了4个位置参数

如何在Python数据框架中加速序列的符号化

组/群集按字符串中的子字符串或子字符串中的字符串轮询数据框

如何使用scipy的curve_fit与约束,其中拟合的曲线总是在观测值之下?

在Django admin中自动完成相关字段筛选

转换为浮点,pandas字符串列,混合千和十进制分隔符

未调用自定义JSON编码器

在方法中设置属性值时,如何处理语句不可达[Unreacable]";的问题?

在Admin中显示从ManyToMany通过模型的筛选结果

如何使用pytest在traceback中找到特定的异常

如何获取包含`try`外部堆栈的`__traceback__`属性的异常

pytest、xdist和共享生成的文件依赖项

修改.pdb文件中的值并另存为新的