Path: blob/main/external/curl/tests/http/testenv/ws_echo_server.py
2066 views
#!/usr/bin/env python31# -*- coding: utf-8 -*-2#***************************************************************************3# _ _ ____ _4# Project ___| | | | _ \| |5# / __| | | | |_) | |6# | (__| |_| | _ <| |___7# \___|\___/|_| \_\_____|8#9# Copyright (C) Daniel Stenberg, <[email protected]>, et al.10#11# This software is licensed as described in the file COPYING, which12# you should have received as part of this distribution. The terms13# are also available at https://curl.se/docs/copyright.html.14#15# You may opt to use, copy, modify, merge, publish, distribute and/or sell16# copies of the Software, and permit persons to whom the Software is17# furnished to do so, under the terms of the COPYING file.18#19# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY20# KIND, either express or implied.21#22# SPDX-License-Identifier: curl23#24###########################################################################25#26import argparse27import asyncio28import logging2930from websockets import server31from websockets.exceptions import ConnectionClosedError323334async def echo(websocket):35try:36async for message in websocket:37await websocket.send(message)38except ConnectionClosedError:39pass404142async def run_server(port):43async with server.serve(echo, "localhost", port):44await asyncio.Future() # run forever454647def main():48parser = argparse.ArgumentParser(prog='scorecard', description="""49Run a websocket echo server.50""")51parser.add_argument("--port", type=int,52default=9876, help="port to listen on")53args = parser.parse_args()5455logging.basicConfig(56format="%(asctime)s %(message)s",57level=logging.DEBUG,58)5960asyncio.run(run_server(args.port))616263if __name__ == "__main__":64main()656667