cengal.parallel_execution.coroutines.integrations.uvicorn.versions.v_0.uvicorn
1#!/usr/bin/env python 2# coding=utf-8 3 4# Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space> 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17 18 19__all__ = ['get_uvicorn_awaitable'] 20 21 22from uvicorn.main import * 23from h11._connection import DEFAULT_MAX_INCOMPLETE_EVENT_SIZE 24 25 26""" 27Module Docstring 28Docstrings: http://www.python.org/dev/peps/pep-0257/ 29""" 30 31__author__ = "ButenkoMS <gtalk@butenkoms.space>" 32__copyright__ = "Copyright © 2012-2024 ButenkoMS. All rights reserved. Contacts: <gtalk@butenkoms.space>" 33__credits__ = ["ButenkoMS <gtalk@butenkoms.space>", ] 34__license__ = "Apache License, Version 2.0" 35__version__ = "4.4.1" 36__maintainer__ = "ButenkoMS <gtalk@butenkoms.space>" 37__email__ = "gtalk@butenkoms.space" 38# __status__ = "Prototype" 39__status__ = "Development" 40# __status__ = "Production" 41 42 43class UvicornStartupFailureError(Exception): 44 pass 45 46 47def get_uvicorn_awaitable( 48 app: typing.Union["ASGIApplication", typing.Callable, str], 49 *, 50 host: str = "127.0.0.1", 51 port: int = 8000, 52 uds: typing.Optional[str] = None, 53 fd: typing.Optional[int] = None, 54 loop: LoopSetupType = "auto", 55 http: typing.Union[typing.Type[asyncio.Protocol], HTTPProtocolType] = "auto", 56 ws: typing.Union[typing.Type[asyncio.Protocol], WSProtocolType] = "auto", 57 ws_max_size: int = 16777216, 58 ws_ping_interval: typing.Optional[float] = 20.0, 59 ws_ping_timeout: typing.Optional[float] = 20.0, 60 ws_per_message_deflate: bool = True, 61 lifespan: LifespanType = "auto", 62 interface: InterfaceType = "auto", 63 reload: bool = False, 64 reload_dirs: typing.Optional[typing.Union[typing.List[str], str]] = None, 65 reload_includes: typing.Optional[typing.Union[typing.List[str], str]] = None, 66 reload_excludes: typing.Optional[typing.Union[typing.List[str], str]] = None, 67 reload_delay: float = 0.25, 68 workers: typing.Optional[int] = None, 69 env_file: typing.Optional[typing.Union[str, os.PathLike]] = None, 70 log_config: typing.Optional[ 71 typing.Union[typing.Dict[str, typing.Any], str] 72 ] = LOGGING_CONFIG, 73 log_level: typing.Optional[typing.Union[str, int]] = None, 74 access_log: bool = True, 75 proxy_headers: bool = True, 76 server_header: bool = True, 77 date_header: bool = True, 78 forwarded_allow_ips: typing.Optional[typing.Union[typing.List[str], str]] = None, 79 root_path: str = "", 80 limit_concurrency: typing.Optional[int] = None, 81 backlog: int = 2048, 82 limit_max_requests: typing.Optional[int] = None, 83 timeout_keep_alive: int = 5, 84 ssl_keyfile: typing.Optional[str] = None, 85 ssl_certfile: typing.Optional[typing.Union[str, os.PathLike]] = None, 86 ssl_keyfile_password: typing.Optional[str] = None, 87 ssl_version: int = SSL_PROTOCOL_VERSION, 88 ssl_cert_reqs: int = ssl.CERT_NONE, 89 ssl_ca_certs: typing.Optional[str] = None, 90 ssl_ciphers: str = "TLSv1", 91 headers: typing.Optional[typing.List[typing.Tuple[str, str]]] = None, 92 use_colors: typing.Optional[bool] = None, 93 app_dir: typing.Optional[str] = None, 94 factory: bool = False, 95 h11_max_incomplete_event_size: int = DEFAULT_MAX_INCOMPLETE_EVENT_SIZE, 96) -> None: 97 reload = False 98 workers = None 99 100 if app_dir is not None: 101 sys.path.insert(0, app_dir) 102 103 config = Config( 104 app, 105 host=host, 106 port=port, 107 uds=uds, 108 fd=fd, 109 loop=loop, 110 http=http, 111 ws=ws, 112 ws_max_size=ws_max_size, 113 ws_ping_interval=ws_ping_interval, 114 ws_ping_timeout=ws_ping_timeout, 115 ws_per_message_deflate=ws_per_message_deflate, 116 lifespan=lifespan, 117 interface=interface, 118 reload=reload, 119 reload_dirs=reload_dirs, 120 reload_includes=reload_includes, 121 reload_excludes=reload_excludes, 122 reload_delay=reload_delay, 123 workers=workers, 124 env_file=env_file, 125 log_config=log_config, 126 log_level=log_level, 127 access_log=access_log, 128 proxy_headers=proxy_headers, 129 server_header=server_header, 130 date_header=date_header, 131 forwarded_allow_ips=forwarded_allow_ips, 132 root_path=root_path, 133 limit_concurrency=limit_concurrency, 134 backlog=backlog, 135 limit_max_requests=limit_max_requests, 136 timeout_keep_alive=timeout_keep_alive, 137 ssl_keyfile=ssl_keyfile, 138 ssl_certfile=ssl_certfile, 139 ssl_keyfile_password=ssl_keyfile_password, 140 ssl_version=ssl_version, 141 ssl_cert_reqs=ssl_cert_reqs, 142 ssl_ca_certs=ssl_ca_certs, 143 ssl_ciphers=ssl_ciphers, 144 headers=headers, 145 use_colors=use_colors, 146 factory=factory, 147 h11_max_incomplete_event_size=h11_max_incomplete_event_size, 148 ) 149 server = Server(config=config) 150 151 async def uvicorn_awaitable(): 152 await server.serve() 153 154 if config.uds and os.path.exists(config.uds): 155 os.remove(config.uds) # pragma: py-win32 156 157 if not server.started and not config.should_reload and config.workers == 1: 158 raise UvicornStartupFailureError 159 160 return uvicorn_awaitable()
def
get_uvicorn_awaitable( app: Union[ForwardRef('ASGIApplication'), Callable, str], *, host: str = '127.0.0.1', port: int = 8000, uds: Union[str, NoneType] = None, fd: Union[int, NoneType] = None, loop: Literal['none', 'auto', 'asyncio', 'uvloop'] = 'auto', http: Union[Type[asyncio.protocols.Protocol], Literal['auto', 'h11', 'httptools']] = 'auto', ws: Union[Type[asyncio.protocols.Protocol], Literal['auto', 'none', 'websockets', 'wsproto']] = 'auto', ws_max_size: int = 16777216, ws_ping_interval: Union[float, NoneType] = 20.0, ws_ping_timeout: Union[float, NoneType] = 20.0, ws_per_message_deflate: bool = True, lifespan: Literal['auto', 'on', 'off'] = 'auto', interface: Literal['auto', 'asgi3', 'asgi2', 'wsgi'] = 'auto', reload: bool = False, reload_dirs: Union[List[str], str, NoneType] = None, reload_includes: Union[List[str], str, NoneType] = None, reload_excludes: Union[List[str], str, NoneType] = None, reload_delay: float = 0.25, workers: Union[int, NoneType] = None, env_file: Union[str, os.PathLike, NoneType] = None, log_config: Union[Dict[str, Any], str, NoneType] = {'version': 1, 'disable_existing_loggers': False, 'formatters': {'default': {'()': 'uvicorn.logging.DefaultFormatter', 'fmt': '%(levelprefix)s %(message)s', 'use_colors': None}, 'access': {'()': 'uvicorn.logging.AccessFormatter', 'fmt': '%(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s'}}, 'handlers': {'default': {'formatter': 'default', 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stderr'}, 'access': {'formatter': 'access', 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout'}}, 'loggers': {'uvicorn': {'handlers': ['default'], 'level': 'INFO', 'propagate': False}, 'uvicorn.error': {'level': 'INFO'}, 'uvicorn.access': {'handlers': ['access'], 'level': 'INFO', 'propagate': False}}}, log_level: Union[str, int, NoneType] = None, access_log: bool = True, proxy_headers: bool = True, server_header: bool = True, date_header: bool = True, forwarded_allow_ips: Union[List[str], str, NoneType] = None, root_path: str = '', limit_concurrency: Union[int, NoneType] = None, backlog: int = 2048, limit_max_requests: Union[int, NoneType] = None, timeout_keep_alive: int = 5, ssl_keyfile: Union[str, NoneType] = None, ssl_certfile: Union[str, os.PathLike, NoneType] = None, ssl_keyfile_password: Union[str, NoneType] = None, ssl_version: int = <_SSLMethod.PROTOCOL_TLS_SERVER: 17>, ssl_cert_reqs: int = <VerifyMode.CERT_NONE: 0>, ssl_ca_certs: Union[str, NoneType] = None, ssl_ciphers: str = 'TLSv1', headers: Union[List[Tuple[str, str]], NoneType] = None, use_colors: Union[bool, NoneType] = None, app_dir: Union[str, NoneType] = None, factory: bool = False, h11_max_incomplete_event_size: int = 16384) -> None:
48def get_uvicorn_awaitable( 49 app: typing.Union["ASGIApplication", typing.Callable, str], 50 *, 51 host: str = "127.0.0.1", 52 port: int = 8000, 53 uds: typing.Optional[str] = None, 54 fd: typing.Optional[int] = None, 55 loop: LoopSetupType = "auto", 56 http: typing.Union[typing.Type[asyncio.Protocol], HTTPProtocolType] = "auto", 57 ws: typing.Union[typing.Type[asyncio.Protocol], WSProtocolType] = "auto", 58 ws_max_size: int = 16777216, 59 ws_ping_interval: typing.Optional[float] = 20.0, 60 ws_ping_timeout: typing.Optional[float] = 20.0, 61 ws_per_message_deflate: bool = True, 62 lifespan: LifespanType = "auto", 63 interface: InterfaceType = "auto", 64 reload: bool = False, 65 reload_dirs: typing.Optional[typing.Union[typing.List[str], str]] = None, 66 reload_includes: typing.Optional[typing.Union[typing.List[str], str]] = None, 67 reload_excludes: typing.Optional[typing.Union[typing.List[str], str]] = None, 68 reload_delay: float = 0.25, 69 workers: typing.Optional[int] = None, 70 env_file: typing.Optional[typing.Union[str, os.PathLike]] = None, 71 log_config: typing.Optional[ 72 typing.Union[typing.Dict[str, typing.Any], str] 73 ] = LOGGING_CONFIG, 74 log_level: typing.Optional[typing.Union[str, int]] = None, 75 access_log: bool = True, 76 proxy_headers: bool = True, 77 server_header: bool = True, 78 date_header: bool = True, 79 forwarded_allow_ips: typing.Optional[typing.Union[typing.List[str], str]] = None, 80 root_path: str = "", 81 limit_concurrency: typing.Optional[int] = None, 82 backlog: int = 2048, 83 limit_max_requests: typing.Optional[int] = None, 84 timeout_keep_alive: int = 5, 85 ssl_keyfile: typing.Optional[str] = None, 86 ssl_certfile: typing.Optional[typing.Union[str, os.PathLike]] = None, 87 ssl_keyfile_password: typing.Optional[str] = None, 88 ssl_version: int = SSL_PROTOCOL_VERSION, 89 ssl_cert_reqs: int = ssl.CERT_NONE, 90 ssl_ca_certs: typing.Optional[str] = None, 91 ssl_ciphers: str = "TLSv1", 92 headers: typing.Optional[typing.List[typing.Tuple[str, str]]] = None, 93 use_colors: typing.Optional[bool] = None, 94 app_dir: typing.Optional[str] = None, 95 factory: bool = False, 96 h11_max_incomplete_event_size: int = DEFAULT_MAX_INCOMPLETE_EVENT_SIZE, 97) -> None: 98 reload = False 99 workers = None 100 101 if app_dir is not None: 102 sys.path.insert(0, app_dir) 103 104 config = Config( 105 app, 106 host=host, 107 port=port, 108 uds=uds, 109 fd=fd, 110 loop=loop, 111 http=http, 112 ws=ws, 113 ws_max_size=ws_max_size, 114 ws_ping_interval=ws_ping_interval, 115 ws_ping_timeout=ws_ping_timeout, 116 ws_per_message_deflate=ws_per_message_deflate, 117 lifespan=lifespan, 118 interface=interface, 119 reload=reload, 120 reload_dirs=reload_dirs, 121 reload_includes=reload_includes, 122 reload_excludes=reload_excludes, 123 reload_delay=reload_delay, 124 workers=workers, 125 env_file=env_file, 126 log_config=log_config, 127 log_level=log_level, 128 access_log=access_log, 129 proxy_headers=proxy_headers, 130 server_header=server_header, 131 date_header=date_header, 132 forwarded_allow_ips=forwarded_allow_ips, 133 root_path=root_path, 134 limit_concurrency=limit_concurrency, 135 backlog=backlog, 136 limit_max_requests=limit_max_requests, 137 timeout_keep_alive=timeout_keep_alive, 138 ssl_keyfile=ssl_keyfile, 139 ssl_certfile=ssl_certfile, 140 ssl_keyfile_password=ssl_keyfile_password, 141 ssl_version=ssl_version, 142 ssl_cert_reqs=ssl_cert_reqs, 143 ssl_ca_certs=ssl_ca_certs, 144 ssl_ciphers=ssl_ciphers, 145 headers=headers, 146 use_colors=use_colors, 147 factory=factory, 148 h11_max_incomplete_event_size=h11_max_incomplete_event_size, 149 ) 150 server = Server(config=config) 151 152 async def uvicorn_awaitable(): 153 await server.serve() 154 155 if config.uds and os.path.exists(config.uds): 156 os.remove(config.uds) # pragma: py-win32 157 158 if not server.started and not config.should_reload and config.workers == 1: 159 raise UvicornStartupFailureError 160 161 return uvicorn_awaitable()