Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Axon upload/download HTTP APIs #1817

Merged
merged 16 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions docs/synapse/httpapi.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,41 @@
" \"subs\": {},\n",
" ...\n",
" }\n",
" }\n"
" }\n",
"\n",
"Axon\n",
"----\n",
"\n",
"A Synapse Axon implements an HTTP API for uploading and downloading files.\n",
"The HTTP APIs use HTTP chunked encoding for handling large files.\n",
"\n",
"/api/v1/axon/files/put\n",
"~~~~~~~~~~~~~~~~~~~~~~\n",
"\n",
"This API allows the caller to upload and save a file to the Axon. This may be called via a PUT or POST request.\n",
"\n",
"*Method*\n",
" PUT, POST\n",
"\n",
" *Input*\n",
" The API expects a stream of byte chunks.\n",
"\n",
" *Returns*\n",
" On successful upload, or if the file already existed, the API returns information about the file::\n",
" \n",
" (size, SHA-256)\n",
"\n",
"\n",
"/api/v1/axon/files/by/sha256/<SHA-256>\n",
"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n",
"\n",
"This API allows the caller to retrieve a file from the Axon as identified by the SHA-256. If the file does not exist a 404 will be returned.\n",
"\n",
"*Method*\n",
" GET\n",
" \n",
" *Returns*\n",
" If the file exists a stream of byte chunks will be returned to the caller.\n"
]
}
],
Expand All @@ -447,7 +481,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
"version": "3.7.8"
}
},
"nbformat": 4,
Expand Down
73 changes: 73 additions & 0 deletions synapse/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,80 @@
import synapse.lib.base as s_base
import synapse.lib.const as s_const
import synapse.lib.share as s_share
import synapse.lib.httpapi as s_httpapi
import synapse.lib.lmdbslab as s_lmdbslab
import synapse.lib.slabseqn as s_slabseqn

logger = logging.getLogger(__name__)

CHUNK_SIZE = 16 * s_const.mebibyte
MAX_SPOOL_SIZE = CHUNK_SIZE * 32 # 512 mebibytes
MAX_HTTP_UPLOAD_SIZE = 4 * s_const.tebibyte

class AxonFilesHttpV1(s_httpapi.StreamHandler):

async def prepare(self):
self.upfd = None

if not await self.reqAuthAllowed(('axon', 'upload')):
await self.finish()

# max_body_size defaults to 100MB and requires a value
self.request.connection.set_max_body_size(MAX_HTTP_UPLOAD_SIZE)

self.upfd = await self.cell.upload()

async def data_received(self, chunk):
if chunk is not None:
await self.upfd.write(chunk)
await asyncio.sleep(0)

def on_finish(self):
if self.upfd is not None and not self.upfd.isfini:
self.cell.schedCoroSafe(self.upfd.fini())

def on_connection_close(self):
self.on_finish()

async def _save(self):
size, sha256b = await self.upfd.save()
sha256 = s_common.ehex(sha256b)

self.sendRestRetn((size, sha256))

return

async def post(self):
'''
Called after all data has been read.
'''
await self._save()
return

async def put(self):
await self._save()
return

class AxonFileHttpV1(s_httpapi.Handler):

async def get(self, sha256):

if not await self.reqAuthAllowed(('axon', 'get')):
return

sha256b = s_common.uhex(sha256)

try:
async for byts in self.cell.get(sha256b):
self.write(byts)
await self.flush()
await asyncio.sleep(0)

except s_exc.NoSuchFile as e:
self.set_status(404)
self.sendRestErr('NoSuchFile', e.get('mesg'))

return

class UpLoad(s_base.Base):

Expand Down Expand Up @@ -160,6 +227,8 @@ async def __anit__(self, dirn, conf=None): # type: ignore
# modularize blob storage
await self._initBlobStor()

self._initAxonHttpApi()

async def _axonHealth(self, health):
health.update('axon', 'nominal', '', data=await self.metrics())

Expand All @@ -169,6 +238,10 @@ async def _initBlobStor(self):
self.blobs = self.blobslab.initdb('blobs')
self.onfini(self.blobslab.fini)

def _initAxonHttpApi(self):
self.addHttpApi('/api/v1/axon/files/put', AxonFilesHttpV1, {'cell': self})
self.addHttpApi('/api/v1/axon/files/by/sha256/([0-9a-fA-F]{64}$)', AxonFileHttpV1, {'cell': self})

def _addSyncItem(self, item):
self.axonhist.add(item)
self.axonseqn.add(item)
Expand Down
17 changes: 17 additions & 0 deletions synapse/lib/httpapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ async def get(self):

user = await self.user()
if not user.allowed(path):
self.set_status(403)
mesg = f'User {user.iden} ({user.name}) must have permission {".".join(path)}'
self.sendRestErr('AuthDeny', mesg)
return False
Expand Down Expand Up @@ -281,6 +282,22 @@ async def _reqValidOpts(self, opts):

return opts

@t_web.stream_request_body
class StreamHandler(Handler):
'''
Subclass for Tornado streaming uploads.

Notes:
- Async method prepare() is called after headers are read but before body processing.
- Sync method on_finish() can be used to cleanup after a request.
- Sync method on_connection_close() can be used to cleanup after a client disconnect.
- Async methods post(), put(), etc are called after the streaming has completed.
'''

async def data_received(self, chunk):
raise s_exc.NoSuchImpl(mesg='data_received must be implemented by subclasses.',
name='data_received')

class StormNodesV1(Handler):

async def post(self):
Expand Down
108 changes: 108 additions & 0 deletions synapse/tests/test_axon.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import io
import hashlib
import logging
import unittest.mock as mock

import aiohttp.client_exceptions as a_exc

import synapse.exc as s_exc
import synapse.axon as s_axon
import synapse.common as s_common
Expand Down Expand Up @@ -181,6 +184,111 @@ async def test_axon_proxy(self):
async with axon.getLocalProxy() as prox:
await self.runAxonTestBase(prox)

async def test_axon_http(self):

# HTTP handlers on a standalone Axon
async with self.getTestAxon() as axon:
host, port = await axon.addHttpsPort(0, host='127.0.0.1')

newb = await axon.auth.addUser('newb')
await newb.setPasswd('secret')

url_ul = f'https://localhost:{port}/api/v1/axon/files/put'
url_dl = f'https://localhost:{port}/api/v1/axon/files/by/sha256'

asdfhash_h = s_common.ehex(asdfhash)
bbufhash_h = s_common.ehex(bbufhash)
emptyhash_h = s_common.ehex(emptyhash)

# Perms
async with self.getHttpSess(auth=('newb', 'secret'), port=port) as sess:
async with sess.get(f'{url_dl}/{asdfhash_h}') as resp:
self.eq(403, resp.status)
item = await resp.json()
self.eq('err', item.get('status'))

async with sess.post(url_ul, data=abuf) as resp:
self.eq(403, resp.status)
item = await resp.json()
self.eq('err', item.get('status'))

# Stream file
byts = io.BytesIO(bbuf)

with self.raises(a_exc.ServerDisconnectedError):
async with sess.post(url_ul, data=byts) as resp:
pass

await newb.addRule((True, ('axon', 'get')))
await newb.addRule((True, ('axon', 'upload')))

# Basic
async with self.getHttpSess(auth=('newb', 'secret'), port=port) as sess:
async with sess.get(f'{url_dl}/foobar') as resp:
self.eq(404, resp.status)

async with sess.get(f'{url_dl}/{asdfhash_h}') as resp:
self.eq(404, resp.status)
item = await resp.json()
self.eq('err', item.get('status'))

async with sess.post(url_ul, data=abuf) as resp:
self.eq(200, resp.status)
item = await resp.json()
self.eq('ok', item.get('status'))
self.eq((asdfretn[0], asdfhash_h), item.get('result'))
self.true(await axon.has(asdfhash))

async with sess.put(url_ul, data=abuf) as resp:
self.eq(200, resp.status)
item = await resp.json()
self.eq('ok', item.get('status'))
self.eq((asdfretn[0], asdfhash_h), item.get('result'))
self.true(await axon.has(asdfhash))

async with sess.get(f'{url_dl}/{asdfhash_h}') as resp:
self.eq(200, resp.status)
self.eq(abuf, await resp.read())

# Streaming upload
byts = io.BytesIO(bbuf)

async with sess.post(url_ul, data=byts) as resp:
self.eq(200, resp.status)
item = await resp.json()
self.eq('ok', item.get('status'))
self.eq((bbufretn[0], bbufhash_h), item.get('result'))
self.true(await axon.has(bbufhash))

byts = io.BytesIO(bbuf)

async with sess.put(url_ul, data=byts) as resp:
self.eq(200, resp.status)
item = await resp.json()
self.eq('ok', item.get('status'))
self.eq((bbufretn[0], bbufhash_h), item.get('result'))
self.true(await axon.has(bbufhash))

byts = io.BytesIO(b'')

async with sess.post(url_ul, data=byts) as resp:
self.eq(200, resp.status)
item = await resp.json()
self.eq('ok', item.get('status'))
self.eq((emptyretn[0], emptyhash_h), item.get('result'))
self.true(await axon.has(emptyhash))

# Streaming download
async with sess.get(f'{url_dl}/{bbufhash_h}') as resp:
self.eq(200, resp.status)

byts = []
async for bytz in resp.content.iter_chunked(1024):
byts.append(bytz)

self.gt(len(byts), 1)
self.eq(bbuf, b''.join(byts))

async def test_axon_perms(self):
async with self.getTestAxon() as axon:
user = await axon.auth.addUser('user')
Expand Down
31 changes: 29 additions & 2 deletions synapse/tests/test_lib_httpapi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import asyncio

import aiohttp
import aiohttp.client_exceptions as a_exc

import synapse.cortex as s_cortex

import synapse.lib.httpapi as s_httpapi

Expand Down Expand Up @@ -37,7 +40,7 @@ async def get(self):

async with self.getHttpSess(auth=('user', '12345'), port=port) as sess:
async with sess.get(url) as resp:
self.eq(resp.status, 200)
self.eq(resp.status, 403)
retn = await resp.json()
self.eq(retn.get('status'), 'err')
self.eq(retn.get('code'), 'AuthDeny')
Expand Down Expand Up @@ -820,3 +823,27 @@ async def test_healthcheck(self):
async with sess.get(url) as resp:
result = await resp.json()
self.eq(result.get('status'), 'ok')

async def test_streamhandler(self):

class SadHandler(s_httpapi.StreamHandler):
'''
data_received must be implemented
'''
async def post(self):
self.sendRestRetn('foo')
return

async with self.getTestCore() as core:
core.addHttpApi('/api/v1/sad', SadHandler, {'cell': core})

host, port = await core.addHttpsPort(0, host='127.0.0.1')

root = await core.auth.getUserByName('root')
await root.setPasswd('secret')

url = f'https://localhost:{port}/api/v1/sad'
async with self.getHttpSess(auth=('root', 'secret'), port=port) as sess:
with self.raises(a_exc.ServerDisconnectedError):
async with sess.post(url, data=b'foo') as resp:
pass