# (c) 2005 Ian Bicking and contributors; written for Paste
# (http://pythonpaste.org)
# Licensed under the MIT license:
# http://www.opensource.org/licenses/mit-license.php
"""
Routines for testing WSGI applications.
Most interesting is TestApp
"""
import os
import re
import json
import random
import fnmatch
import mimetypes
from base64 import b64encode
from http import cookiejar as http_cookiejar
from io import BytesIO, StringIO
from webtest.compat import urlparse
from webtest.compat import to_bytes
from webtest.compat import escape_cookie_value
from webtest.response import TestResponse
from webtest import forms
from webtest import lint
from webtest import utils
import webob
__all__ = ['TestApp', 'TestRequest']
class AppError(Exception):
def __init__(self, message, *args):
if isinstance(message, bytes):
message = message.decode('utf8')
str_args = ()
for arg in args:
if isinstance(arg, webob.Response):
body = arg.body
if isinstance(body, bytes):
if arg.charset:
arg = body.decode(arg.charset)
else:
arg = repr(body)
elif isinstance(arg, bytes):
try:
arg = arg.decode('utf8')
except UnicodeDecodeError:
arg = repr(arg)
str_args += (arg,)
message = message % str_args
Exception.__init__(self, message)
class CookiePolicy(http_cookiejar.DefaultCookiePolicy):
"""A subclass of DefaultCookiePolicy to allow cookie set for
Domain=localhost."""
def return_ok_domain(self, cookie, request):
if cookie.domain == '.localhost':
return True
return http_cookiejar.DefaultCookiePolicy.return_ok_domain(
self, cookie, request)
def set_ok_domain(self, cookie, request):
if cookie.domain == '.localhost':
return True
return http_cookiejar.DefaultCookiePolicy.set_ok_domain(
self, cookie, request)
[docs]
class TestRequest(webob.BaseRequest):
"""A subclass of webob.Request"""
ResponseClass = TestResponse
[docs]
class TestApp:
"""
Wraps a WSGI application in a more convenient interface for
testing. It uses extended version of :class:`webob.BaseRequest`
and :class:`webob.Response`.
:param app:
May be an WSGI application or Paste Deploy app,
like ``'config:filename.ini#test'``.
.. versionadded:: 2.0
It can also be an actual full URL to an http server and webtest
will proxy requests with `WSGIProxy2
<https://pypi.org/project/WSGIProxy2/>`_.
:type app:
WSGI application
:param extra_environ:
A dictionary of values that should go
into the environment for each request. These can provide a
communication channel with the application.
:type extra_environ:
dict
:param relative_to:
A directory used for file
uploads are calculated relative to this. Also ``config:``
URIs that aren't absolute.
:type relative_to:
string
:param cookiejar:
:class:`cookielib.CookieJar` alike API that keeps cookies
across requests.
:type cookiejar:
CookieJar instance
.. attribute:: cookies
A convenient shortcut for a dict of all cookies in
``cookiejar``.
:param parser_features:
Passed to BeautifulSoup when parsing responses.
:type parser_features:
string or list
:param json_encoder:
Passed to json.dumps when encoding json
:type json_encoder:
A subclass of json.JSONEncoder
:param lint:
If True (default) then check that the application is WSGI compliant
:type lint:
A boolean
"""
RequestClass = TestRequest
# Tell pytest not to collect this class as tests
__test__ = False
def __init__(self, app, extra_environ=None, relative_to=None,
use_unicode=True, cookiejar=None, parser_features=None,
json_encoder=None, lint=True):
if 'WEBTEST_TARGET_URL' in os.environ:
app = os.environ['WEBTEST_TARGET_URL']
if isinstance(app, str):
if app.startswith('http'):
try:
from wsgiproxy import HostProxy
except ImportError: # pragma: no cover
raise ImportError(
'Using webtest with a real url requires WSGIProxy2. '
'Please install it with: '
'pip install WSGIProxy2')
if '#' not in app:
app += '#httplib'
url, client = app.split('#', 1)
app = HostProxy(url, client=client)
else:
from paste.deploy import loadapp
# @@: Should pick up relative_to from calling module's
# __file__
app = loadapp(app, relative_to=relative_to)
self.app = app
self.lint = lint
self.relative_to = relative_to
if extra_environ is None:
extra_environ = {}
self.extra_environ = extra_environ
self.use_unicode = use_unicode
if cookiejar is None:
cookiejar = http_cookiejar.CookieJar(policy=CookiePolicy())
self.cookiejar = cookiejar
if parser_features is None:
parser_features = 'html.parser'
self.RequestClass.ResponseClass.parser_features = parser_features
if json_encoder is None:
json_encoder = json.JSONEncoder
self.JSONEncoder = json_encoder
[docs]
def get_authorization(self):
"""Allow to set the HTTP_AUTHORIZATION environ key. Value should look
like one of the following:
* ``('Basic', ('user', 'password'))``
* ``('Bearer', 'mytoken')``
* ``('JWT', 'myjwt')``
If value is None the the HTTP_AUTHORIZATION is removed
"""
return self.authorization_value
def set_authorization(self, value):
self.authorization_value = value
if value is not None:
invalid_value = (
"You should use a value like ('Basic', ('user', 'password'))"
" OR ('Bearer', 'token') OR ('JWT', 'token')"
)
if isinstance(value, (list, tuple)) and len(value) == 2:
authtype, val = value
if authtype == 'Basic' and val and \
isinstance(val, (list, tuple)):
val = ':'.join(list(val))
val = b64encode(to_bytes(val)).strip()
val = val.decode('latin1')
elif authtype in ('Bearer', 'JWT') and val and \
isinstance(val, (str, str)):
val = val.strip()
else:
raise ValueError(invalid_value)
value = str('%s %s' % (authtype, val))
else:
raise ValueError(invalid_value)
self.extra_environ.update({
'HTTP_AUTHORIZATION': value,
})
else:
if 'HTTP_AUTHORIZATION' in self.extra_environ:
del self.extra_environ['HTTP_AUTHORIZATION']
authorization = property(get_authorization, set_authorization)
@property
def cookies(self):
return {cookie.name: cookie.value for cookie in self.cookiejar}
[docs]
def set_cookie(self, name, value):
"""
Sets a cookie to be passed through with requests.
"""
cookie_domain = self.extra_environ.get('HTTP_HOST', '.localhost')
cookie_domain = cookie_domain.split(':', 1)[0]
if '.' not in cookie_domain:
cookie_domain = "%s.local" % cookie_domain
value = escape_cookie_value(value)
cookie = http_cookiejar.Cookie(
version=0,
name=name,
value=value,
port=None,
port_specified=False,
domain=cookie_domain,
domain_specified=True,
domain_initial_dot=False,
path='/',
path_specified=True,
secure=False,
expires=None,
discard=False,
comment=None,
comment_url=None,
rest=None
)
self.cookiejar.set_cookie(cookie)
[docs]
def reset(self):
"""
Resets the state of the application; currently just clears
saved cookies.
"""
self.cookiejar.clear()
[docs]
def set_parser_features(self, parser_features):
"""
Changes the parser used by BeautifulSoup. See its documentation to
know the supported parsers.
"""
self.RequestClass.ResponseClass.parser_features = parser_features
[docs]
def get(self, url, params=None, headers=None, extra_environ=None,
status=None, expect_errors=False, xhr=False):
"""
Do a GET request given the url path.
:param params:
A query string, or a dictionary that will be encoded
into a query string. You may also include a URL query
string on the ``url``.
:param headers:
Extra headers to send.
:type headers:
dictionary
:param extra_environ:
Environmental variables that should be added to the request.
:type extra_environ:
dictionary
:param status:
The HTTP status code you expect in response (if not 200 or 3xx).
You can also use a wildcard, like ``'3*'`` or ``'*'``.
:type status:
integer or string
:param expect_errors:
If this is False, then if anything is written to
environ ``wsgi.errors`` it will be an error.
If it is True, then non-200/3xx responses are also okay.
:type expect_errors:
boolean
:param xhr:
If this is true, then marks response as ajax. The same as
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }
:type xhr:
boolean
:returns: :class:`webtest.TestResponse` instance.
"""
environ = self._make_environ(extra_environ)
url = str(url)
url = self._remove_fragment(url)
if params:
url = utils.build_params(url, params)
if '?' in url:
url, environ['QUERY_STRING'] = url.split('?', 1)
else:
environ['QUERY_STRING'] = ''
req = self.RequestClass.blank(url, environ)
if xhr:
headers = self._add_xhr_header(headers)
if headers:
req.headers.update(headers)
return self.do_request(req, status=status,
expect_errors=expect_errors)
[docs]
def post(self, url, params='', headers=None, extra_environ=None,
status=None, upload_files=None, expect_errors=False,
content_type=None, xhr=False):
"""
Do a POST request. Similar to :meth:`~webtest.TestApp.get`.
:param params:
Are put in the body of the request. If params is an
iterator, it will be urlencoded. If it is a string, it will not
be encoded, but placed in the body directly.
Can be a :class:`python:collections.OrderedDict` with
:class:`webtest.forms.Upload` fields included::
app.post('/myurl', collections.OrderedDict([
('textfield1', 'value1'),
('uploadfield', webapp.Upload('filename.txt', 'contents'),
('textfield2', 'value2')])))
:param upload_files:
It should be a list of ``(fieldname, filename, file_content)``.
You can also use just ``(fieldname, filename)`` and the file
contents will be read from disk.
:type upload_files:
list
:param content_type:
HTTP content type, for example `application/json`.
:type content_type:
string
:param xhr:
If this is true, then marks response as ajax. The same as
headers={'X-REQUESTED-WITH': 'XMLHttpRequest', }
:type xhr:
boolean
:returns: :class:`webtest.TestResponse` instance.
"""
if xhr:
headers = self._add_xhr_header(headers)
return self._gen_request('POST', url, params=params, headers=headers,
extra_environ=extra_environ, status=status,
upload_files=upload_files,
expect_errors=expect_errors,
content_type=content_type)
[docs]
def put(self, url, params='', headers=None, extra_environ=None,
status=None, upload_files=None, expect_errors=False,
content_type=None, xhr=False):
"""
Do a PUT request. Similar to :meth:`~webtest.TestApp.post`.
:returns: :class:`webtest.TestResponse` instance.
"""
if xhr:
headers = self._add_xhr_header(headers)
return self._gen_request('PUT', url, params=params, headers=headers,
extra_environ=extra_environ, status=status,
upload_files=upload_files,
expect_errors=expect_errors,
content_type=content_type,
)
[docs]
def patch(self, url, params='', headers=None, extra_environ=None,
status=None, upload_files=None, expect_errors=False,
content_type=None, xhr=False):
"""
Do a PATCH request. Similar to :meth:`~webtest.TestApp.post`.
:returns: :class:`webtest.TestResponse` instance.
"""
if xhr:
headers = self._add_xhr_header(headers)
return self._gen_request('PATCH', url, params=params, headers=headers,
extra_environ=extra_environ, status=status,
upload_files=upload_files,
expect_errors=expect_errors,
content_type=content_type)
[docs]
def delete(self, url, params='', headers=None,
extra_environ=None, status=None, expect_errors=False,
content_type=None, xhr=False):
"""
Do a DELETE request. Similar to :meth:`~webtest.TestApp.get`.
:returns: :class:`webtest.TestResponse` instance.
"""
if xhr:
headers = self._add_xhr_header(headers)
return self._gen_request('DELETE', url, params=params, headers=headers,
extra_environ=extra_environ, status=status,
upload_files=None,
expect_errors=expect_errors,
content_type=content_type)
[docs]
def options(self, url, headers=None, extra_environ=None,
status=None, expect_errors=False, xhr=False):
"""
Do a OPTIONS request. Similar to :meth:`~webtest.TestApp.get`.
:returns: :class:`webtest.TestResponse` instance.
"""
if xhr:
headers = self._add_xhr_header(headers)
return self._gen_request('OPTIONS', url, headers=headers,
extra_environ=extra_environ, status=status,
upload_files=None,
expect_errors=expect_errors)
[docs]
def head(self, url, params=None, headers=None, extra_environ=None,
status=None, expect_errors=False, xhr=False):
"""
Do a HEAD request. Similar to :meth:`~webtest.TestApp.get`.
:returns: :class:`webtest.TestResponse` instance.
"""
if params:
url = utils.build_params(url, params)
if xhr:
headers = self._add_xhr_header(headers)
return self._gen_request('HEAD', url, headers=headers,
extra_environ=extra_environ, status=status,
upload_files=None,
expect_errors=expect_errors)
post_json = utils.json_method('POST')
put_json = utils.json_method('PUT')
patch_json = utils.json_method('PATCH')
delete_json = utils.json_method('DELETE')
[docs]
def encode_multipart(self, params, files):
"""
Encodes a set of parameters (typically a name/value list) and
a set of files (a list of (name, filename, file_body, mimetype)) into a
typical POST body, returning the (content_type, body).
"""
boundary = to_bytes(str(random.random()))[2:]
boundary = b'----------a_BoUnDaRy' + boundary + b'$'
lines = []
def _append_file(file_info):
key, filename, value, fcontent = self._get_file_info(file_info)
if isinstance(key, str):
try:
key = key.encode('ascii')
except: # pragma: no cover
raise # file name must be ascii
if isinstance(filename, str):
try:
filename = filename.encode('utf8')
except: # pragma: no cover
raise # file name must be ascii or utf8
if not fcontent:
fcontent = mimetypes.guess_type(filename.decode('utf8'))[0]
fcontent = to_bytes(fcontent)
fcontent = fcontent or b'application/octet-stream'
lines.extend([
b'--' + boundary,
b'Content-Disposition: form-data; ' +
b'name="' + key + b'"; filename="' + filename + b'"',
b'Content-Type: ' + fcontent, b'', value])
for key, value in params:
if isinstance(key, str):
try:
key = key.encode('ascii')
except: # pragma: no cover
raise # field name are always ascii
if isinstance(value, forms.File):
if value.value:
_append_file([key] + list(value.value))
else:
# If no file was uploaded simulate an empty file with no
# name like real browsers do:
_append_file([key, b'', b''])
elif isinstance(value, forms.Upload):
file_info = [key, value.filename]
if value.content is not None:
file_info.append(value.content)
if value.content_type is not None:
file_info.append(value.content_type)
_append_file(file_info)
else:
if isinstance(value, int):
value = str(value).encode('utf8')
elif isinstance(value, str):
value = value.encode('utf8')
elif not isinstance(value, (bytes, str)):
raise ValueError((
'Value for field {} is a {} ({}). '
'It must be str, bytes or an int'
).format(key, type(value), value))
lines.extend([
b'--' + boundary,
b'Content-Disposition: form-data; name="' + key + b'"',
b'', value])
for file_info in files:
_append_file(file_info)
lines.extend([b'--' + boundary + b'--', b''])
body = b'\r\n'.join(lines)
boundary = boundary.decode('ascii')
content_type = 'multipart/form-data; boundary=%s' % boundary
return content_type, body
[docs]
def request(self, url_or_req, status=None, expect_errors=False,
**req_params):
"""
Creates and executes a request. You may either pass in an
instantiated :class:`TestRequest` object, or you may pass in a
URL and keyword arguments to be passed to
:meth:`TestRequest.blank`.
You can use this to run a request without the intermediary
functioning of :meth:`TestApp.get` etc. For instance, to
test a WebDAV method::
resp = app.request('/new-col', method='MKCOL')
Note that the request won't have a body unless you specify it,
like::
resp = app.request('/test.txt', method='PUT', body='test')
You can use :class:`webtest.TestRequest`::
req = webtest.TestRequest.blank('/url/', method='GET')
resp = app.do_request(req)
"""
if isinstance(url_or_req, str):
url_or_req = str(url_or_req)
for (k, v) in req_params.items():
if isinstance(v, str):
req_params[k] = str(v)
if isinstance(url_or_req, str):
req = self.RequestClass.blank(url_or_req, **req_params)
else:
req = url_or_req.copy()
for name, value in req_params.items():
setattr(req, name, value)
req.environ['paste.throw_errors'] = True
for name, value in self.extra_environ.items():
req.environ.setdefault(name, value)
return self.do_request(req,
status=status,
expect_errors=expect_errors,
)
[docs]
def do_request(self, req, status=None, expect_errors=None):
"""
Executes the given webob Request (``req``), with the expected
``status``. Generally :meth:`~webtest.TestApp.get` and
:meth:`~webtest.TestApp.post` are used instead.
To use this::
req = webtest.TestRequest.blank('url', ...args...)
resp = app.do_request(req)
.. note::
You can pass any keyword arguments to
``TestRequest.blank()``, which will be set on the request.
These can be arguments like ``content_type``, ``accept``, etc.
"""
errors = StringIO()
req.environ['wsgi.errors'] = errors
script_name = req.environ.get('SCRIPT_NAME', '')
if script_name and req.path_info.startswith(script_name):
req.path_info = req.path_info[len(script_name):]
# set framework hooks
req.environ['paste.testing'] = True
req.environ['paste.testing_variables'] = {}
# set request cookies
self.cookiejar.add_cookie_header(utils._RequestCookieAdapter(req))
# verify wsgi compatibility
app = lint.middleware(self.app) if self.lint else self.app
# FIXME: should it be an option to not catch exc_info?
res = req.get_response(app, catch_exc_info=True)
# be sure to decode the content
res.decode_content()
# set a few handy attributes
res._use_unicode = self.use_unicode
res.request = req
res.app = app
res.test_app = self
# We do this to make sure the app_iter is exhausted:
try:
res.body
except TypeError: # pragma: no cover
pass
res.errors = errors.getvalue()
for name, value in req.environ['paste.testing_variables'].items():
if hasattr(res, name):
raise ValueError(
"paste.testing_variables contains the variable %r, but "
"the response object already has an attribute by that "
"name" % name)
setattr(res, name, value)
if not expect_errors:
self._check_status(status, res)
self._check_errors(res)
# merge cookies back in
self.cookiejar.extract_cookies(utils._ResponseCookieAdapter(res),
utils._RequestCookieAdapter(req))
return res
def _check_status(self, status, res):
if status == '*':
return
res_status = res.status
if (isinstance(status, str) and '*' in status):
if re.match(fnmatch.translate(status), res_status, re.I):
return
if isinstance(status, str):
if status == res_status:
return
if isinstance(status, (list, tuple)):
if res.status_int not in status:
raise AppError(
"Bad response: %s (not one of %s for %s)\n%s",
res_status, ', '.join(map(str, status)),
res.request.url, res)
return
if status is None:
if res.status_int >= 200 and res.status_int < 400:
return
raise AppError(
"Bad response: %s (not 200 OK or 3xx redirect for %s)\n%s",
res_status, res.request.url,
res)
if status != res.status_int:
raise AppError(
"Bad response: %s (not %s)\n%s", res_status, status, res)
def _check_errors(self, res):
errors = res.errors
if errors:
raise AppError(
"Application had errors logged:\n%s", errors)
def _make_environ(self, extra_environ=None):
environ = self.extra_environ.copy()
environ['paste.throw_errors'] = True
if extra_environ:
environ.update(extra_environ)
return environ
def _remove_fragment(self, url):
scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
return urlparse.urlunsplit((scheme, netloc, path, query, ""))
def _gen_request(self, method, url, params=utils.NoDefault,
headers=None, extra_environ=None, status=None,
upload_files=None, expect_errors=False,
content_type=None):
"""
Do a generic request.
"""
environ = self._make_environ(extra_environ)
inline_uploads = []
# this supports OrderedDict
if isinstance(params, dict) or hasattr(params, 'items'):
params = list(params.items())
if isinstance(params, (list, tuple)):
inline_uploads = [v for (k, v) in params
if isinstance(v, (forms.File, forms.Upload))]
if len(inline_uploads) > 0:
content_type, params = self.encode_multipart(
params, upload_files or ())
environ['CONTENT_TYPE'] = content_type
else:
params = utils.encode_params(params, content_type)
if upload_files or \
(content_type and
to_bytes(content_type).startswith(b'multipart')):
params = urlparse.parse_qsl(params, keep_blank_values=True)
content_type, params = self.encode_multipart(
params, upload_files or ())
environ['CONTENT_TYPE'] = content_type
elif params:
environ.setdefault('CONTENT_TYPE',
'application/x-www-form-urlencoded')
if content_type is not None:
environ['CONTENT_TYPE'] = content_type
environ['REQUEST_METHOD'] = str(method)
url = str(url)
url = self._remove_fragment(url)
req = self.RequestClass.blank(url, environ)
if isinstance(params, str):
params = params.encode(req.charset or 'utf8')
req.environ['wsgi.input'] = BytesIO(params)
req.content_length = len(params)
if headers:
req.headers.update(headers)
return self.do_request(req, status=status,
expect_errors=expect_errors)
def _get_file_info(self, file_info):
if len(file_info) == 2:
# It only has a filename
filename = file_info[1]
if self.relative_to:
filename = os.path.join(self.relative_to, filename)
f = open(filename, 'rb')
content = f.read()
f.close()
return (file_info[0], filename, content, None)
elif 3 <= len(file_info) <= 4:
content = file_info[2]
if not isinstance(content, bytes):
raise ValueError('File content must be %s not %s'
% (bytes, type(content)))
if len(file_info) == 3:
return tuple(file_info) + (None,)
else:
return file_info
else:
raise ValueError(
"upload_files need to be a list of tuples of (fieldname, "
"filename, filecontent, mimetype) or (fieldname, "
"filename, filecontent) or (fieldname, filename); "
"you gave: %r"
% repr(file_info)[:100])
@staticmethod
def _add_xhr_header(headers):
headers = headers or {}
# if remove str we will be have an error in lint.middleware
headers.update({'X-REQUESTED-WITH': 'XMLHttpRequest'})
return headers