Micropython 上轻量级异步网页服务器
m24h2024/04/18原创 软件综合 IP:上海
关键词
MicropythonWeb serverHTTPD

项目托管在XXXXXXXXXXXXXXXXXX/m24h/aweb

我是个喜欢造轮子的人 本来microdot用得好好的 但是越看越觉得不够清秀 而且不需要的功能完全是浪费内存 所以我干脆自己写了个异步的http服务器 打成mpy也才几k 暂时无屎山  合适用途简单的地方

支持路由 支持预处理后处理 支持响应字串 文件 json form-urlencoded 支持cookie 自动识别功能函数是普通的还是协程

只支持utf-8编码 不支持文件上传 不支持模板 websocket官方已提供 ssl普通人用不起 但要用需要小改 用官方ssl包wrap

目前只进行了简单的测试 在我这里 做完静态文件服务 也只需要下面代码

web=aweb.Web()
    
@web('*', 'get')
def _get(flow):
    path='web/'+flow.path.strip('/')
    if '..' in path or '/_' in path or path.startswith('_') or path.endswith('.py'):
        flow.send_text('!!! Not found !!!', status=404, reason='BADPATH')
    try:
        while os.stat(path)[0]&0x4000: # dir
            path=path+'/index.html'
        flow.send_file(path, max_age=86400)
    except OSError:
        flow.send_text('!!! Not found !!!', status=404, reason='NOTFOUND')
...
asyncio.get_event_loop().run_until_complete( \
        aweb.server(web, port=conf.port, limit=1024, clients=10))
...
asyncio.get_event_loop().run_forever()

具体模块代码如下 开头是一段各种使用的例子

'''
A light-weight async http web server for light-weight usage. (not fully tested)
Examples:

import aweb

web=aweb.Web()

@web('index.html')
def root(flow):
    flow.send_file('web/index.html')
        
@web('post/*', ':before')
def auth(flow):
    flow.var['user']=flow.cookie('user')
    if flow.head['host']=='vhoost':
        flow.path='vhost/'+flow.path

@web('', ':after')
def after_all(flow):
    flow.tail['Access-Control-Allow-Origin'] = '*' # header to send
    flow.set_cookie('user', flow.var['user'], max_age=3600)
    if not hasattr(flow, 'send'):
        flow.send_file('web/404.html', status=404, reason='LOST')

# in fact, more than one server with more path-router can be run at the same time
# ssl is not supported, it's too expensive for most of DIYers, but it's also easy to be wrapped
asyncio.get_event_loop().run_until_complete( \
    aweb.server(web, port=80, limit=1024, clients=5))
asyncio.get_event_loop().run_until_complete( \
    aweb.server(web2, port=8080, limit=1024, clients=5))

@web('test/*', 'get', "I'm robot", generator)
async def test(flow, title, gen): # async function is also automatically supported
    await asyncio.sleep(1)
    #support outside content generator, also can send text/html/redirect/file/json/form directly
    if gen:
        flow.tail['Content-Type']=...
        flow.send=generator
    else:
        flow.send_json({'return':title})

# initialize other aync services

# loop all async services
asyncio.get_event_loop().run_forever()

'''

import asyncio
import json
import sys

_minetypes={
    'css': 'text/css',
    'gif': 'image/gif',
    'html': 'text/html',
    'htm': 'text/html',
    'jpg': 'image/jpeg',
    'js': 'application/javascript',
    'json': 'application/json',
    'png': 'image/png',
    'txt': 'text/plain',
    }

def minetype(ext):
    return _minetypes.get(ext.lower(), 'application/octet-stream')

# support only utf-8 in micropython
def url_decode(b):
    ret=bytearray()
    l=len(b)
    i=0
    while i<l:
        t=b[i]
        if t==0x28: # +
            t=0x20  # [space]
        elif t==0x25 and i+2<l: # %
            t=int(b[i+1:i+3], 16)
            i=i+2
        ret.append(t)
        i=i+1
    return ret.decode('utf-8')

def url_encode(s, safe=False):
    ret=bytearray()
    for b in s.encode('utf-8'):
        if (b>=65 and b<=90) or (b>=97 and b<=122) or b==45 or b==46 or (b==47 and not safe) or b==95 or b==126:
            ret.append(b)
        else:
            ret.extend(b'%{:02X}'.format(b))
    return ret
    
def param_decode(ret, b):
    for kv in b.split(b'&'):
        kv=kv.split(b'=',1)
        if not kv[0]:
            continue
        ret.append((url_decode(kv[0].strip()), url_decode(kv[1]) if len(kv)>1 else ''))

def param_encode(ret, l):
    for k,v in l:
        if not k:
            continue
        if len(ret)>0:
            ret.extend(b'&')
        ret.extend(url_encode(k))
        ret.extend(b'=')
        ret.extend(url_encode(v or ''))
        
def param_get(tp, name):
    for k,v in tp:
        if k==name:
            return v
    return None

def param_array(tp, name):
    ret=[]
    for k,v in tp:
        if k==name:
            ret.append(v)
    return ret

class Web(list):
    #decorator to specify a function as web routing
    #path is likely 'test/index.html' 'test/path', without case sensitive
    #path is from root without leading '/'
    #using '*' at the tail of path as wildcard
    #method can be 'get' 'post', or use ',' to combine them
    #args and kwargs will be used to call the mapped function
    #longest path matches first
    #method ':before' ':after' is specially for function running before or after
    def __call__(self, path, method='get,post', *args, **kwargs):
        def decorator(func):
            p=path.lower()
            if p.endswith('*'):
                p=p[:-1]
                wc=True
            else:
                wc=False
            order=(len(p)<<2)+(0 if wc else 1)
            i=0
            n=len(self)
            while i<n:
                if self[i][0]<=order:
                    break
            for m in method.split(','):
                self.insert(i, (order, p, m.strip().lower(), wc, func, args, kwargs))
            return func
        return decorator
    
    # find the path, return the longest one, then matching method, then matching wildcard
    # return a tuple as (order, path, method, wildcard, func, args, kwargs)
    def find(self, path, method):
        p=path.lower() if path else ''
        m=method.lower() if method else 'get'
        for t in self:
            if (t[1]==p or (t[3] and p.startswith(t[1]))) and t[2]==m:
                return t
        return None

def _send_file(fname):
    mv=memoryview(bytearray(1024))
    with open(fname, 'rb') as f:
        while t:=f.readinto(mv):
            yield mv[:t]

# micropython does not support async yield yet
class AsyncGenRead:
    def __init__(self, r):
        self.r=r
        
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        while t:=self.r.read(1024):
            return t
        raise StopAsyncIteration

class Flow:
    def __init__(self, r, w, limit):
        self.req=r
        self.resp=w
        self.limit=limit
        self.var={} # for unspecified usage during whole flow
        
    async def readallb(self):
        t=self.head.get('content-length')
        if t:
            t=int(t)
            if t>self.limit:
                raise MemoryError('Out of limit size')
            return await self.req.readexactly(t)
        else:
            t=await self.req.read(self.limit)
            if len(t)>=self.limit: # seems not end
                raise MemoryError('Out of limit size')
            return t
   
    async def readlineb(self, mv):
        r=self.req
        p=0
        limit=len(mv)
        while p<limit and await r.readinto(mv[p:p+1]):
            if mv[p]==0x0A:
                return bytes(mv[:p]).rstrip(b'\r\n')
            p=p+1
        raise MemoryError('Out of limit size')
        
    async def _start(self):
        mv=memoryview(bytearray(self.limit))
        t=await self.readlineb(mv)
        t=t.split()
        if len(t)<3:
            raise ValueError('Bad protocol')
        self.method=t[0].strip().decode('utf-8').lower()
        v=t[2].strip().split(b'/', 1)
        self.ver=v[1].strip().decode('utf-8') if len(v)>1 else '1.1'
        t=t[1].strip().split(b'?', 1)
        v=t[0].replace(b'\\', b'/').split(b'/', 1)
        self.path=url_decode(v[1]).strip().lower() if len(v)>1 else ''
        self._query_b=t[1].rstrip().lstrip('? \t') if len(t)>1 else b''
        self.head={}
        self.cookie={}
        while t:=await self.readlineb(mv):
            t=t.split(b':',1)
            v=t[0].strip().decode('utf-8').lower()
            if v=='cookie':
                if len(t)>1:
                    for v in t[1].split(b';'):
                        t=v.split(b'=', 1)
                        self.cookie[url_decode(t[0].strip())]=url_decode(t[1].strip()) if len(t)>1 else ''
                continue
            self.head[v]=t[1].strip().decode('utf-8') if len(t)>1 else ''
        del mv
        self.status=200
        self.reason='OK'
        self.tail={'Connection':'Close'}
        self._setcookie={}

    async def _finish(self):
        if not hasattr(self, 'send'):
            self.send='!!! NOT FOUND !!!'
            self.status=404
            self.reason='NOROUTER'
            self.tail['Content-Type']='text/plain'
        resp=self.resp
        resp.write(b'HTTP/1.0 {} {}\r\n'.format(self.status, self.reason, encoding='utf-8'))
        for k,v in self.tail.items():
            if not k:
                continue
            if isinstance(v, tuple) or isinstance(v, list):
                for t in v:
                    resp.write(b'{}: {}\r\n'.format(k, t or '', encoding='utf-8'))
            else:
                resp.write(b'{}: {}\r\n'.format(k, v or '', encoding='utf-8'))
        await resp.drain()
        for k,v in self._setcookie.items():
            if not k:
                continue
            resp.write(b'Set-Cookie: '+url_encode(k)+b'='+(v or b'')+b'\r\n')
        resp.write(b'\r\n')
        await resp.drain()
        send=self.send
        if isinstance(send, str):
            resp.write(send.encode('utf-8'))
            await resp.drain()
        elif isinstance(send, bytes) or isinstance(send, bytearray) or isinstance(send, memoryview):
            resp.write(send)
            await resp.drain()
        elif isinstance(send, dict):
            resp.write(json.dumps(send, separators=(',', ':')).encode('utf-8'))
            await resp.drain()
        elif isinstance(send, list) or isinstance(send, tuple):
            t=bytearray()
            param_encode(t, send)
            resp.write(t)
            await resp.drain()
        elif hasattr(send, '__aiter__') and callable(send.__aiter__):
            async for v in send:
                if isinstance(v, str):
                    resp.write(v.encode('utf-8'))
                else:
                    resp.write(v)
                await resp.drain()
        elif hasattr(send, 'send') and callable(send.send):
            for v in send:
                if isinstance(v, str):
                    resp.write(v.encode('utf-8'))
                else:
                    resp.write(v)
                await resp.drain()
        elif callable(send):
            t=send()
            if isinstance(t, str):
                resp.write(t.encode('utf-8'))
            else:
                resp.write(t)
            await resp.drain()
    
    def query(self):
        if hasattr(self, '_query_b'):
            self._query=[]
            param_decode(self._query, self._query_b)
            delattr(self, '_query_b')
        return self._query
                
    async def recv_json(self):
        if not hasattr(self, 'recv'):
            t=await self.readallb()
            t=t.decode('utf-8')
            t=json.loads(t)
            self.recv=t
        return self.recv
                
    async def recv_form(self):
        if not hasattr(self, 'recv'):
            t=await self.readallb()
            self.recv=[]
            param_decode(self.recv, t)
        return self.recv
    
    # return a async generator to retrieve bytes body using 'async for'
    def recv_bytes(self):
        if not hasattr(self, 'recv'):
            self.recv=AsyncGenRead(self.req)
        return self.recv
        
    def set_cookie(self, name, value, path=None, domain=None, expires=None, \
                   max_age=None, secure=False, http_only=False, partitioned=False):
        t=bytearray()
        t.extend(url_encode(value))
        if path:
            t.extend(b'; Path={}'.format(path, encoding='utf-8'))
        if domain:
            t.extend(b'; Domain={}'.format(domain, encoding='utf-8'))
        if expires:
            t.extend(b'; Expires={}'.format(expires, encoding='utf-8'))
        if isinstance(max_age, int):
            t.extend(b'; Max-Age={}'.format(max_age))
        if secure:
            t.extend(b'; Secure')
        if http_only:
            t.extend(b'; HttpOnly')
        if partitioned:
            t.extend(b'; Partitioned')
        self._setcookie[name]=t
    
    def del_cookie(self, name):
        self._setcookie[name]=b'; Expires=Thu, 01 Jan 1970 00:00:01 GMT; Max-Age=0'
    
    def send_text(self, str, status=200, reason='OK'):
        self.tail['Content-Type']='text/plain'
        self.send=str
        self.status=status
        self.reason=reason
        
    def send_html(self, str, status=200, reason='OK'):
        self.tail['Content-Type']='text/html'
        self.send=str
        self.status=status
        self.reason=reason
        
    def send_redirect(self, url):
        self.tail['Location']=url
        self.status=302
        self.reason='REDIR'
        
    def send_json(self, obj, status=200, reason='OK'):
        self.tail['Content-Type']='application/json'
        self.send=obj
        self.status=status
        self.reason=reason

    def send_form(self, obj, status=200, reason='OK'):
        self.tail['Content-Type']='application/x-www-form-urlencoded'
        self.send=obj
        self.status=status
        self.reason=reason
        
    def send_file(self, file, max_age=86400, status=200, reason='OK'):
        t=file.rsplit('.',1)
        t=t[1] if len(t)>1 else ''
        self.tail['Content-Type']=minetype(t)
        if max_age is not None:
            self.tail['Cache-Control']='Max-Age={}'.format(max_age)
        self.send=_send_file(file)          
        self.status=status
        self.reason=reason
                
    def send_obj(self, obj, content_type, max_age=None, status=200, reason='OK'):
        self.tail['Content-Type']=content_type
        if max_age is not None:
            self.tail['Cache-Control']='Max-Age={}'.format(max_age)
        self.send=obj          
        self.status=status
        self.reason=reason              
                
#start a server listening, return an asyncio.Server object
async def server(web, host='0.0.0.0', port=80, limit=1024, clients=10):
    clnt=0
    async def dispatcher(r, w):
        nonlocal clnt
        if clnt<=clients:
            flow=Flow(r, w, limit=limit)
            clnt=clnt+1
            try:
                try:
                    await flow._start()
                    ctn=True
                    if exe:=web.find(flow.path, ':before'):
                        coro=exe[4](flow, *(exe[5]), **(exe[6]))
                        if hasattr(coro, 'send') and callable(coro.send):
                            await coro
                    if not hasattr(flow, 'send') and (exe:=web.find(flow.path, flow.method)):
                        coro=exe[4](flow, *(exe[5]), **(exe[6]))
                        if hasattr(coro, 'send') and callable(coro.send):
                            await coro
                    if exe:=web.find(flow.path, ':after'):
                        coro=exe[4](flow, *(exe[5]), **(exe[6]))
                        if hasattr(coro, 'send') and callable(coro.send):
                            await coro
                except:
                    w.write(b'HTTP/1.0 500 INTERR\r\nContent-Type: text/plain\r\n\r\n!!! Internal Error !!!')
                    await w.drain()
                    raise
                else:
                    await flow._finish()
            except OSError:
                pass
            except asyncio.CancelledError:
                pass
            except Exception as e:
                sys.print_exception(e)
            finally:
                clnt=clnt-1
        try:
            w.close()
            await w.wait_closed()
        except:
            pass
    return await asyncio.start_server(dispatcher, host, port)


[修改于 6个月15天前 - 2024/04/18 11:02:12]

来自:计算机科学 / 软件综合
0
2
已屏蔽 原因:{{ notice.reason }}已屏蔽
{{notice.noticeContent}}
~~空空如也

想参与大家的讨论?现在就 登录 或者 注册

所属专业
上级专业
同级专业
m24h
进士 学者 机友
文章
52
回复
861
学术分
1
2020/01/22注册,47分0秒前活动

个人开源项目: XXXXXXXXXXXXXX

主体类型:个人
所属领域:无
认证方式:手机号
IP归属地:上海
文件下载
加载中...
{{errorInfo}}
{{downloadWarning}}
你在 {{downloadTime}} 下载过当前文件。
文件名称:{{resource.defaultFile.name}}
下载次数:{{resource.hits}}
上传用户:{{uploader.username}}
所需积分:{{costScores}},{{holdScores}}下载当前附件免费{{description}}
积分不足,去充值
文件已丢失

当前账号的附件下载数量限制如下:
时段 个数
{{f.startingTime}}点 - {{f.endTime}}点 {{f.fileCount}}
视频暂不能访问,请登录试试
仅供内部学术交流或培训使用,请先保存到本地。本内容不代表科创观点,未经原作者同意,请勿转载。
音频暂不能访问,请登录试试
支持的图片格式:jpg, jpeg, png
插入公式
评论控制
加载中...
文号:{{pid}}
投诉或举报
加载中...
{{tip}}
请选择违规类型:
{{reason.type}}

空空如也

加载中...
详情
详情
推送到专栏从专栏移除
设为匿名取消匿名
查看作者
回复
只看作者
加入收藏取消收藏
收藏
取消收藏
折叠回复
置顶取消置顶
评学术分
鼓励
设为精选取消精选
管理提醒
编辑
通过审核
评论控制
退修或删除
历史版本
违规记录
投诉或举报
加入黑名单移除黑名单
查看IP
{{format('YYYY/MM/DD HH:mm:ss', toc)}}