import time
from IPython import display
from enum import Enum
from pprint import pprint
from fastcore.test import *
from starlette.testclient import TestClient
from starlette.requests import Headers
from starlette.datastructures import UploadFile核心
FastHTML subclass of Starlette, along with the RouterX and RouteX classes it automatically uses.
这是fasthtml的源代码。除非您想了解幕后构建的方式或需要某个特定API的详细信息,否则您无需阅读此内容。该笔记本使用nbdev转换为Python模块fasthtml/core.py。
导入和工具
我们首先编写源代码 第一,然后测试 在后。测试既是确认代码正常工作的手段,也作为工作示例。第一个导出函数,parsed_date,就是这个模式的一个例子。
解析日期
parsed_date (s:str)
将 s 转换为日期时间
parsed_date('2pm')datetime.datetime(2025, 1, 31, 14, 0)
isinstance(date.fromtimestamp(0), date)True
蛇转连字符
snake2hyphens (s:str)
将 s 从蛇形命名法转换为带连字符和首字母大写的形式
snake2hyphens("snake_case")'Snake-Case'
Htmx头部
HtmxHeaders (boosted:str|None=None, current_url:str|None=None, history_restore_request:str|None=None, prompt:str|None=None, request:str|None=None, target:str|None=None, trigger_name:str|None=None, trigger:str|None=None)
def test_request(url: str='/', headers: dict={}, method: str='get') -> Request:
scope = {
'type': 'http',
'method': method,
'path': url,
'headers': Headers(headers).raw,
'query_string': b'',
'scheme': 'http',
'client': ('127.0.0.1', 8000),
'server': ('127.0.0.1', 8000),
}
receive = lambda: {"body": b"", "more_body": False}
return Request(scope, receive)h = test_request(headers=Headers({'HX-Request':'1'}))
_get_htmx(h.headers)HtmxHeaders(boosted=None, current_url=None, history_restore_request=None, prompt=None, request='1', target=None, trigger_name=None, trigger=None)
请求与响应
test_eq(_fix_anno(Union[str,None], 'a'), 'a')
test_eq(_fix_anno(float, 0.9), 0.9)
test_eq(_fix_anno(int, '1'), 1)
test_eq(_fix_anno(int, ['1','2']), 2)
test_eq(_fix_anno(list[int], ['1','2']), [1,2])
test_eq(_fix_anno(list[int], '1'), [1])d = dict(k=int, l=List[int])
test_eq(_form_arg('k', "1", d), 1)
test_eq(_form_arg('l', "1", d), [1])
test_eq(_form_arg('l', ["1","2"], d), [1,2])Http头部
HttpHeader (k:str, v:str)
_to_htmx_header('trigger_after_settle')'HX-Trigger-After-Settle'
Htmx响应头
HtmxResponseHeaders (location=None, push_url=None, redirect=None, refresh=None, replace_url=None, reswap=None, retarget=None, reselect=None, trigger=None, trigger_after_settle=None, trigger_after_swap=None)
HTMX响应头
HtmxResponseHeaders(trigger_after_settle='hi')HttpHeader(k='HX-Trigger-After-Settle', v='hi')
表单转字典
form2dict (form:starlette.datastructures.FormData)
将starlette表单数据转换为字典
d = [('a',1),('a',2),('b',0)]
fd = FormData(d)
res = form2dict(fd)
test_eq(res['a'], [1,2])
test_eq(res['b'], 0)解析表单
parse_form (req:starlette.requests.Request)
Starlette在空的多部分表单上会出现错误,因此这检查这种情况
async def f(req):
def _f(p:HttpHeader): ...
p = first(_params(_f).values())
result = await _from_body(req, p)
return JSONResponse(result.__dict__)
client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
d = dict(k='value1',v=['value2','value3'])
response = client.post('/', data=d)
print(response.json()){'k': 'value1', 'v': 'value3'}
async def f(req): return Response(str(req.query_params.getlist('x')))
client = TestClient(Starlette(routes=[Route('/', f, methods=['GET'])]))
client.get('/?x=1&x=2').text"['1', '2']"
def g(req, this:Starlette, a:str, b:HttpHeader): ...
async def f(req):
a = await _wrap_req(req, _params(g))
return Response(str(a))
client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/?a=1', data=d)
print(response.text)[<starlette.requests.Request object>, <starlette.applications.Starlette object>, '1', HttpHeader(k='value1', v='value3')]
def g(req, this:Starlette, a:str, b:HttpHeader): ...
async def f(req):
a = await _wrap_req(req, _params(g))
return Response(str(a))
client = TestClient(Starlette(routes=[Route('/', f, methods=['POST'])]))
response = client.post('/?a=1', data=d)
print(response.text)[<starlette.requests.Request object>, <starlette.applications.Starlette object>, '1', HttpHeader(k='value1', v='value3')]
平坦_xt
flat_xt (lst)
扁平化列表
x = ft('a',1)
test_eq(flat_xt([x, x, [x,x]]), (x,)*4)
test_eq(flat_xt(x), (x,))前ware
Beforeware (f, skip=None)
初始化自身。请参阅 help(type(self)) 以获取准确的签名。
网页套接字 / SSE
def on_receive(self, msg:str): return f"Message text was: {msg}"
c = _ws_endp(on_receive)
cli = TestClient(Starlette(routes=[WebSocketRoute('/', _ws_endp(on_receive))]))
with cli.websocket_connect('/') as ws:
ws.send_text('{"msg":"Hi!"}')
data = ws.receive_text()
assert data == 'Message text was: Hi!'事件流
EventStream (s)
从 s 创建一个 text/event-stream 响应
信号关闭
signal_shutdown ()
路由与应用
统一资源标识符
uri (_arg, **kwargs)
解码 URI
decode_uri (s)
StringConvertor.to_string
StringConvertor.to_string (value:str)
HTTPConnection.url_path_for
HTTPConnection.url_path_for (name:str, **path_params)
平坦元组
flat_tuple (o)
扁平化列表
无操作主体
noop_body (c, req)
默认的主体包装函数,只返回内容
响应
respond (req, heads, bdy)
默认的 FT 响应创建函数
重定向
Redirect (loc)
根据需要使用 HTMX 或 Starlette RedirectResponse 进行重定向到 loc
获取密钥
get_key (key=None, fname='.sesskey')
get_key()'cc87253c-bfc1-4544-bbc0-58dd8d3291bc'
qp
qp (p:str, **kw)
将参数 kw 添加到路径 p
qp 将查询参数添加到路由路径字符串中
vals = {'a':5, 'b':False, 'c':[1,2], 'd':'bar', 'e':None, 'ab':42}res = qp('/foo', **vals)
test_eq(res, '/foo?a=5&b=&c=1&c=2&d=bar&e=&ab=42')qp 检查每个参数是否应该作为查询参数发送或作为路由的一部分发送,并正确地进行编码。
path = '/foo/{a}/{d}/{ab:int}'
res = qp(path, **vals)
test_eq(res, '/foo/5/bar/42?b=&c=1&c=2&e=')定义标题
def_hdrs (htmx=True, surreal=True)
FastHTML 应用的默认标题
快速HTML
FastHTML (debug=False, routes=None, middleware=None, title:str='FastHTML page', exception_handlers=None, on_startup=None, on_shutdown=None, lifespan=None, hdrs=None, ftrs=None, exts=None, before=None, after=None, surreal=True, htmx=True, default_hdrs=True, sess_cls=<class 'starlette.middleware.sessions.SessionMiddleware'>, secret_key=None, session_cookie='session_', max_age=31536000, sess_path='/', same_site='lax', sess_https_only=False, sess_domain=None, key_fname='.sesskey', body_wrap=<function noop_body>, htmlkw=None, nb_hdrs=False, **bodykw)
创建一个 Starlette 应用程序。
FastHTML.ws
FastHTML.ws (path:str, conn=None, disconn=None, name=None, middleware=None)
在 path 上添加一个 websocket 路由
嵌套名称
nested_name (f)
*使用’_’连接嵌套函数名称获取函数名 f*
def f():
def g(): ...
return gfunc = f()
nested_name(func)'f_g'
快速HTML.route
FastHTML.route (path:str=None, methods=None, name=None, include_in_schema=True, body_wrap=None)
在 path 添加一个路由
app = FastHTML()
@app.get
def foo(a:str, b:list[int]): ...
foo.to(a='bar', b=[1,2])'/foo?a=bar&b=1&b=2'
@app.get('/foo/{a}')
def foo(a:str, b:list[int]): ...
foo.to(a='bar', b=[1,2])'/foo/bar?b=1&b=2'
服务
serve (appname=None, app='app', host='0.0.0.0', port=None, reload=True, reload_includes:list[str]|str|None=None, reload_excludes:list[str]|str|None=None)
在异步服务器中运行应用程序,默认设置为实时重新加载。
| 类型 | 默认值 | 详细信息 | |
|---|---|---|---|
| appname | NoneType | None | 模块名称 |
| app | str | app | 要提供的App实例 |
| 主机 | str | 0.0.0.0 | 如果主机是 0.0.0.0 将转为 localhost |
| 端口 | NoneType | None | 如果端口为 None,它将默认为 5001 或 PORT 环境变量 |
| reload | bool | True | 默认是在代码更改时重新加载应用程序 |
| reload_includes | list[str] | str | None | None | 要监视更改的附加文件 |
| reload_excludes | list[str] | str | None | None | 要忽略更改的文件 |
客户端
Client (app, url='http://testserver')
一个简单的 httpx ASGI 客户端,不需要 async
app = FastHTML(routes=[Route('/', lambda _: Response('test'))])
cli = Client(app)
cli.get('/').text'test'
请注意,您也可以使用 Starlette 的 TestClient,而不是 FastHTML 的 Client。它们基本上是可以互换的。
快速HTML测试
def get_cli(app): return app,TestClient(app),app.routeapp,cli,rt = get_cli(FastHTML(secret_key='soopersecret'))app,cli,rt = get_cli(FastHTML(title="My Custom Title"))
@app.get
def foo(): return Div("Hello World")
print(app.routes)
response = cli.get('/foo')
assert '<title>My Custom Title</title>' in response.text
foo.to(param='value')[Route(path='/foo', name='foo', methods=['GET', 'HEAD'])]
'/foo?param=value'
app,cli,rt = get_cli(FastHTML())
@rt('/xt2')
def get(): return H1('bar')
txt = cli.get('/xt2').text
assert '<title>FastHTML page</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt@rt("/hi")
def get(): return 'Hi there'
r = cli.get('/hi')
r.text'Hi there'
@rt("/hi")
def post(): return 'Postal'
cli.post('/hi').text'Postal'
@app.get("/hostie")
def show_host(req): return req.headers['host']
cli.get('/hostie').text'testserver'
@app.get("/setsess")
def set_sess(session):
session['foo'] = 'bar'
return 'ok'
@app.ws("/ws")
def ws(self, msg:str, ws:WebSocket, session): return f"Message text was: {msg} with session {session.get('foo')}, from client: {ws.client}"
cli.get('/setsess')
with cli.websocket_connect('/ws') as ws:
ws.send_text('{"msg":"Hi!"}')
data = ws.receive_text()
assert 'Message text was: Hi! with session bar' in data
print(data)Message text was: Hi! with session bar, from client: Address(host='testclient', port=50000)
@rt
def yoyo(): return 'a yoyo'
cli.post('/yoyo').text'a yoyo'
@app.get
def autopost(): return Html(Div('Text.', hx_post=yoyo()))
print(cli.get('/autopost').text) <!doctype html>
<html>
<div hx-post="a yoyo">Text.</div>
</html>
@app.get
def autopost2(): return Html(Body(Div('Text.', cls='px-2', hx_post=show_host.to(a='b'))))
print(cli.get('/autopost2').text) <!doctype html>
<html>
<body>
<div class="px-2" hx-post="/hostie?a=b">Text.</div>
</body>
</html>
@app.get
def autoget2(): return Html(Div('Text.', hx_get=show_host))
print(cli.get('/autoget2').text) <!doctype html>
<html>
<div hx-get="/hostie">Text.</div>
</html>
@rt('/user/{nm}', name='gday')
def get(nm:str=''): return f"Good day to you, {nm}!"
cli.get('/user/Alexis').text'Good day to you, Alexis!'
@app.get
def autolink(): return Html(Div('Text.', link=uri('gday', nm='Alexis')))
print(cli.get('/autolink').text) <!doctype html>
<html>
<div href="/user/Alexis">Text.</div>
</html>
@rt('/link')
def get(req): return f"{req.url_for('gday', nm='Alexis')}; {req.url_for('show_host')}"
cli.get('/link').text'http://testserver/user/Alexis; http://testserver/hostie'
@app.get("/background")
async def background_task(request):
async def long_running_task():
await asyncio.sleep(0.1)
print("Background task completed!")
return P("Task started"), BackgroundTask(long_running_task)
response = cli.get("/background")Background task completed!
test_eq(app.router.url_path_for('gday', nm='Jeremy'), '/user/Jeremy')hxhdr = {'headers':{'hx-request':"1"}}
@rt('/ft')
def get(): return Title('Foo'),H1('bar')
txt = cli.get('/ft').text
assert '<title>Foo</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt
@rt('/xt2')
def get(): return H1('bar')
txt = cli.get('/xt2').text
assert '<title>FastHTML page</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt
assert cli.get('/xt2', **hxhdr).text.strip() == '<h1>bar</h1>'
@rt('/xt3')
def get(): return Html(Head(Title('hi')), Body(P('there')))
txt = cli.get('/xt3').text
assert '<title>FastHTML page</title>' not in txt and '<title>hi</title>' in txt and '<p>there</p>' in txt@rt('/oops')
def get(nope): return nope
test_warns(lambda: cli.get('/oops?nope=1'))def test_r(cli, path, exp, meth='get', hx=False, **kwargs):
if hx: kwargs['headers'] = {'hx-request':"1"}
test_eq(getattr(cli, meth)(path, **kwargs).text, exp)
ModelName = str_enum('ModelName', "alexnet", "resnet", "lenet")
fake_db = [{"name": "Foo"}, {"name": "Bar"}]@rt('/html/{idx}')
async def get(idx:int): return Body(H4(f'Next is {idx+1}.'))@rt("/models/{nm}")
def get(nm:ModelName): return nm
@rt("/files/{path}")
async def get(path: Path): return path.with_suffix('.txt')
@rt("/items/")
def get(idx:int|None = 0): return fake_db[idx]
@rt("/idxl/")
def get(idx:list[int]): return str(idx)r = cli.get('/html/1', headers={'hx-request':"1"})
assert '<h4>Next is 2.</h4>' in r.text
test_r(cli, '/models/alexnet', 'alexnet')
test_r(cli, '/files/foo', 'foo.txt')
test_r(cli, '/items/?idx=1', '{"name":"Bar"}')
test_r(cli, '/items/', '{"name":"Foo"}')
assert cli.get('/items/?idx=g').text=='404 Not Found'
assert cli.get('/items/?idx=g').status_code == 404
test_r(cli, '/idxl/?idx=1&idx=2', '[1, 2]')
assert cli.get('/idxl/?idx=1&idx=g').status_code == 404app = FastHTML()
rt = app.route
cli = TestClient(app)
@app.route(r'/static/{path:path}.jpg')
def index(path:str): return f'got {path}'
cli.get('/static/sub/a.b.jpg').text'got sub/a.b'
app.chk = 'foo'@app.get("/booly/")
def _(coming:bool=True): return 'Coming' if coming else 'Not coming'
@app.get("/datie/")
def _(d:parsed_date): return d
@app.get("/ua")
async def _(user_agent:str): return user_agent
@app.get("/hxtest")
def _(htmx): return htmx.request
@app.get("/hxtest2")
def _(foo:HtmxHeaders, req): return foo.request
@app.get("/app")
def _(app): return app.chk
@app.get("/app2")
def _(foo:FastHTML): return foo.chk,HttpHeader("mykey", "myval")
@app.get("/app3")
def _(foo:FastHTML): return HtmxResponseHeaders(location="http://example.org")
@app.get("/app4")
def _(foo:FastHTML): return Redirect("http://example.org")test_r(cli, '/booly/?coming=true', 'Coming')
test_r(cli, '/booly/?coming=no', 'Not coming')
date_str = "17th of May, 2024, 2p"
test_r(cli, f'/datie/?d={date_str}', '2024-05-17 14:00:00')
test_r(cli, '/ua', 'FastHTML', headers={'User-Agent':'FastHTML'})
test_r(cli, '/hxtest' , '1', headers={'HX-Request':'1'})
test_r(cli, '/hxtest2', '1', headers={'HX-Request':'1'})
test_r(cli, '/app' , 'foo')r = cli.get('/app2', **hxhdr)
test_eq(r.text, 'foo')
test_eq(r.headers['mykey'], 'myval')r = cli.get('/app3')
test_eq(r.headers['HX-Location'], 'http://example.org')r = cli.get('/app4', follow_redirects=False)
test_eq(r.status_code, 303)r = cli.get('/app4', headers={'HX-Request':'1'})
test_eq(r.headers['HX-Redirect'], 'http://example.org')@rt
def meta():
return ((Title('hi'),H1('hi')),
(Meta(property='image'), Meta(property='site_name'))
)
t = cli.post('/meta').text
assert re.search(r'<body>\s*<h1>hi</h1>\s*</body>', t)
assert '<meta' in t@app.post('/profile/me')
def profile_update(username: str): return username
test_r(cli, '/profile/me', 'Alexis', 'post', data={'username' : 'Alexis'})
test_r(cli, '/profile/me', 'Missing required field: username', 'post', data={})# Example post request with parameter that has a default value
@app.post('/pet/dog')
def pet_dog(dogname: str = None): return dogname
# Working post request with optional parameter
test_r(cli, '/pet/dog', '', 'post', data={})@dataclass
class Bodie: a:int;b:str
@rt("/bodie/{nm}")
def post(nm:str, data:Bodie):
res = asdict(data)
res['nm'] = nm
return res
@app.post("/bodied/")
def bodied(data:dict): return data
nt = namedtuple('Bodient', ['a','b'])
@app.post("/bodient/")
def bodient(data:nt): return asdict(data)
class BodieTD(TypedDict): a:int;b:str='foo'
@app.post("/bodietd/")
def bodient(data:BodieTD): return data
class Bodie2:
a:int|None; b:str
def __init__(self, a, b='foo'): store_attr()
@rt("/bodie2/", methods=['get','post'])
def bodie(d:Bodie2): return f"a: {d.a}; b: {d.b}"from fasthtml.xtend import Titledd = dict(a=1, b='foo')
test_r(cli, '/bodie/me', '{"a":1,"b":"foo","nm":"me"}', 'post', data=dict(a=1, b='foo', nm='me'))
test_r(cli, '/bodied/', '{"a":"1","b":"foo"}', 'post', data=d)
test_r(cli, '/bodie2/', 'a: 1; b: foo', 'post', data={'a':1})
test_r(cli, '/bodie2/?a=1&b=foo&nm=me', 'a: 1; b: foo')
test_r(cli, '/bodient/', '{"a":"1","b":"foo"}', 'post', data=d)
test_r(cli, '/bodietd/', '{"a":1,"b":"foo"}', 'post', data=d)# Testing POST with Content-Type: application/json
@app.post("/")
def index(it: Bodie): return Titled("It worked!", P(f"{it.a}, {it.b}"))
s = json.dumps({"b": "Lorem", "a": 15})
response = cli.post('/', headers={"Content-Type": "application/json"}, data=s).text
assert "<title>It worked!</title>" in response and "<p>15, Lorem</p>" in response# Testing POST with Content-Type: application/json
@app.post("/bodytext")
def index(body): return body
response = cli.post('/bodytext', headers={"Content-Type": "application/json"}, data=s).text
test_eq(response, '{"b": "Lorem", "a": 15}')files = [ ('files', ('file1.txt', b'content1')),
('files', ('file2.txt', b'content2')) ]@rt("/uploads")
async def post(files:list[UploadFile]):
return ','.join([(await file.read()).decode() for file in files])
res = cli.post('/uploads', files=files)
print(res.status_code)
print(res.text)200
content1,content2
res = cli.post('/uploads', files=[files[0]])
print(res.status_code)
print(res.text)200
content1
@rt("/setsess")
def get(sess, foo:str=''):
now = datetime.now()
sess['auth'] = str(now)
return f'Set to {now}'
@rt("/getsess")
def get(sess): return f'Session time: {sess["auth"]}'
print(cli.get('/setsess').text)
time.sleep(0.01)
cli.get('/getsess').textSet to 2025-01-31 14:35:31.433371
'Session time: 2025-01-31 14:35:31.433371'
@rt("/sess-first")
def post(sess, name: str):
sess["name"] = name
return str(sess)
cli.post('/sess-first', data={'name': 2})
@rt("/getsess-all")
def get(sess): return sess['name']
test_eq(cli.get('/getsess-all').text, '2')@rt("/upload")
async def post(uf:UploadFile): return (await uf.read()).decode()
with open('../../CHANGELOG.md', 'rb') as f:
print(cli.post('/upload', files={'uf':f}, data={'msg':'Hello'}).text[:15])# Release notes
@rt("/form-submit/{list_id}")
def options(list_id: str):
headers = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST',
'Access-Control-Allow-Headers': '*',
}
return Response(status_code=200, headers=headers)h = cli.options('/form-submit/2').headers
test_eq(h['Access-Control-Allow-Methods'], 'POST')from fasthtml.authmw import user_pwd_authdef _not_found(req, exc): return Div('nope')
app,cli,rt = get_cli(FastHTML(exception_handlers={404:_not_found}))
txt = cli.get('/').text
assert '<div>nope</div>' in txt
assert '<!doctype html>' in txtapp,cli,rt = get_cli(FastHTML())
@rt("/{name}/{age}")
def get(name: str, age: int):
return Titled(f"Hello {name.title()}, age {age}")
assert '<title>Hello Uma, age 5</title>' in cli.get('/uma/5').text
assert '404 Not Found' in cli.get('/uma/five').textauth = user_pwd_auth(testuser='spycraft')
app,cli,rt = get_cli(FastHTML(middleware=[auth]))
@rt("/locked")
def get(auth): return 'Hello, ' + auth
test_eq(cli.get('/locked').text, 'not authenticated')
test_eq(cli.get('/locked', auth=("testuser","spycraft")).text, 'Hello, testuser')auth = user_pwd_auth(testuser='spycraft')
app,cli,rt = get_cli(FastHTML(middleware=[auth]))
@rt("/locked")
def get(auth): return 'Hello, ' + auth
test_eq(cli.get('/locked').text, 'not authenticated')
test_eq(cli.get('/locked', auth=("testuser","spycraft")).text, 'Hello, testuser')API路由器
路由函数
RouteFuncs ()
初始化 self。请查看 help(type(self)) 以获取准确的签名。
API路由器
APIRouter (prefix:str|None=None, body_wrap=<function noop_body>)
为应用程序添加路由
ar = APIRouter()@ar("/hi")
def get(): return 'Hi there'
@ar("/hi")
def post(): return 'Postal'
@ar
def ho(): return 'Ho ho'
@ar("/hostie")
def show_host(req): return req.headers['host']
@ar
def yoyo(): return 'a yoyo'
@ar
def index(): return "home page"
@ar.ws("/ws")
def ws(self, msg:str): return f"Message text was: {msg}"app,cli,_ = get_cli(FastHTML())
ar.to_app(app)assert str(yoyo) == '/yoyo'
# ensure route functions are properly discoverable on `APIRouter` and `APIRouter.rt_funcs`
assert ar.prefix == ''
assert str(ar.rt_funcs.index) == '/'
assert str(ar.index) == '/'
with ExceptionExpected(): ar.blah()
with ExceptionExpected(): ar.rt_funcs.blah()
# ensure any route functions named using an HTTPMethod are not discoverable via `rt_funcs`
assert "get" not in ar.rt_funcs._funcs.keys()test_eq(cli.get('/hi').text, 'Hi there')
test_eq(cli.post('/hi').text, 'Postal')
test_eq(cli.get('/hostie').text, 'testserver')
test_eq(cli.post('/yoyo').text, 'a yoyo')
test_eq(cli.get('/ho').text, 'Ho ho')
test_eq(cli.post('/ho').text, 'Ho ho')with cli.websocket_connect('/ws') as ws:
ws.send_text('{"msg":"Hi!"}')
data = ws.receive_text()
assert data == 'Message text was: Hi!'ar2 = APIRouter("/products")@ar2("/hi")
def get(): return 'Hi there'
@ar2("/hi")
def post(): return 'Postal'
@ar2
def ho(): return 'Ho ho'
@ar2("/hostie")
def show_host(req): return req.headers['host']
@ar2
def yoyo(): return 'a yoyo'
@ar2
def index(): return "home page"
@ar2.ws("/ws")
def ws(self, msg:str): return f"Message text was: {msg}"app,cli,_ = get_cli(FastHTML())
ar2.to_app(app)assert str(yoyo) == '/products/yoyo'
assert ar2.prefix == '/products'
assert str(ar2.rt_funcs.index) == '/products/'
assert str(ar2.index) == '/products/'
assert str(ar.index) == '/'
with ExceptionExpected(): ar2.blah()
with ExceptionExpected(): ar2.rt_funcs.blah()
assert "get" not in ar2.rt_funcs._funcs.keys()test_eq(cli.get('/products/hi').text, 'Hi there')
test_eq(cli.post('/products/hi').text, 'Postal')
test_eq(cli.get('/products/hostie').text, 'testserver')
test_eq(cli.post('/products/yoyo').text, 'a yoyo')
test_eq(cli.get('/products/ho').text, 'Ho ho')
test_eq(cli.post('/products/ho').text, 'Ho ho')with cli.websocket_connect('/products/ws') as ws:
ws.send_text('{"msg":"Hi!"}')
data = ws.receive_text()
assert data == 'Message text was: Hi!'@ar.get
def hi2(): return 'Hi there'
@ar.get("/hi3")
def _(): return 'Hi there'
@ar.post("/post2")
def _(): return 'Postal'
@ar2.get
def hi2(): return 'Hi there'
@ar2.get("/hi3")
def _(): return 'Hi there'
@ar2.post("/post2")
def _(): return 'Postal'额外内容
app,cli,rt = get_cli(FastHTML(secret_key='soopersecret'))cookie
cookie (key:str, value='', max_age=None, expires=None, path='/', domain=None, secure=False, httponly=False, samesite='lax')
创建一个‘set-cookie’ HttpHeader
@rt("/setcookie")
def get(req): return cookie('now', datetime.now())
@rt("/getcookie")
def get(now:parsed_date): return f'Cookie was set at time {now.time()}'
print(cli.get('/setcookie').text)
time.sleep(0.01)
cli.get('/getcookie').text
'Cookie was set at time 14:35:32.085198'
正则表达式参数
reg_re_param (m, s)
快速HTML.静态路线扩展
FastHTML.static_route_exts (prefix='/', static_path='.', exts='static')
在URL路径 prefix 添加一个静态路由,使用来自 static_path 的文件和由 reg_re_param() 定义的 exts
reg_re_param("imgext", "ico|gif|jpg|jpeg|webm|pdf")
@rt(r'/static/{path:path}{fn}.{ext:imgext}')
def get(fn:str, path:str, ext:str): return f"Getting {fn}.{ext} from /{path}"
test_r(cli, '/static/foo/jph.me.ico', 'Getting jph.me.ico from /foo/')app.static_route_exts()
assert 'These are the source notebooks for FastHTML' in cli.get('/README.txt').text快速HTML.static_route
FastHTML.static_route (ext='', prefix='/', static_path='.')
在URL路径 prefix 添加一个静态路由,使用来自 static_path 的文件和单个 ext(包括‘.’)
app.static_route('.md', static_path='../..')
assert 'THIS FILE WAS AUTOGENERATED' in cli.get('/README.md').text中间件基础
MiddlewareBase ()
初始化 self。请查看 help(type(self)) 以获取准确的签名。
FtResponse
FtResponse (content, status_code:int=200, headers=None, cls=<class 'starlette.responses.HTMLResponse'>, media_type:str|None=None)
使用任何 Starlette Response 包装 FT 响应
@rt('/ftr')
def get():
cts = Title('Foo'),H1('bar')
return FtResponse(cts, status_code=201, headers={'Location':'/foo/1'})
r = cli.get('/ftr')
test_eq(r.status_code, 201)
test_eq(r.headers['location'], '/foo/1')
txt = r.text
assert '<title>Foo</title>' in txt and '<h1>bar</h1>' in txt and '<html>' in txt唯一标识符
unqid ()
设置_ws
setup_ws (app, f=<function noop>)