Source code for soap.http

from suds.transport import Transport, Reply
from . import settings
import urllib.request
import urllib.parse
import logging
import requests
import io


logger = logging.getLogger(__name__)


[docs]class HttpTransport(Transport): """ Custom HTTPTransport to replace :class:`suds.transport.http.HttpTransport <suds.transport.http.HttpTransport>`. The default :class:`suds.transport.http.HttpTransport <suds.transport.http.HttpTransport>` class has issues with sending SOAP traffic through HTTP proxies like Squid. This Transport fixes the HTTP Proxy issues by using python-requests instead of urllib2. """ #: Timeout for opening a WSDL file. Tuple (CONNECT_TIMEOUT, READ_TIMEOUT) open_timeout = settings.OPEN_TIMEOUT #: Timeout for sending a SOAP call. Tuple (CONNECT_TIMEOUT, READ_TIMEOUT) send_timeout = settings.SEND_TIMEOUT
[docs] def open(self, request): """ Open a SOAP WSDL :param request: :class:`suds.transport.Request <suds.transport.Request>` object :return: WSDL Content as a file-like object :rtype: io.BytesIO """ url = request.url logger.debug("Opening WSDL: %s " % url) if url.startswith("file://"): content = urllib.request.urlopen(url) else: resp = requests.get( url, proxies=self.proxies(url), timeout=self.open_timeout ) resp.raise_for_status() content = io.BytesIO(resp.content) return content
[docs] def send(self, request): """ Send a SOAP method call :param request: :class:`suds.transport.Request <suds.transport.Request>` object :return: :class:`suds.transport.Reply <suds.transport.Reply>` object :rtype: suds.transport.Reply """ url = request.url msg = request.message headers = request.headers logger.debug("Sending SOAP request: %s" % url) resp = requests.post( url, proxies=self.proxies(url), timeout=self.send_timeout, data=msg, headers=headers, ) resp.raise_for_status() reply = Reply(requests.codes.OK, resp.headers, resp.content) return reply
[docs] def proxies(self, url): """ Get the transport proxy configuration :param url: string :return: Proxy configuration dictionary :rtype: Dictionary """ netloc = urllib.parse.urlparse(url).netloc proxies = {} if settings.PROXIES and settings.PROXIES.get(netloc): proxies["http"] = settings.PROXIES[netloc] proxies["https"] = settings.PROXIES[netloc] elif settings.PROXY_URL: proxies["http"] = settings.PROXY_URL proxies["https"] = settings.PROXY_URL return proxies