diff --git a/modules/ngx_http_xquic_module/ngx_http_v3_stream.c b/modules/ngx_http_xquic_module/ngx_http_v3_stream.c index 89c01b8dfd..2d631edaef 100644 --- a/modules/ngx_http_xquic_module/ngx_http_v3_stream.c +++ b/modules/ngx_http_xquic_module/ngx_http_v3_stream.c @@ -20,6 +20,7 @@ static void ngx_http_v3_run_request(ngx_http_request_t *r, ngx_http_v3_stream_t static void ngx_http_v3_stream_free(ngx_http_v3_stream_t *h3_stream); void ngx_http_v3_stream_cancel(ngx_http_v3_stream_t *h3_stream, ngx_int_t status); +static void ngx_http_v3_upstream_write_and_recv_body_handler(ngx_event_t *rev); static void @@ -845,6 +846,43 @@ ngx_http_v3_init_request_body(ngx_http_request_t *r) } +static void +ngx_http_v3_upstream_write_and_recv_body_handler(ngx_event_t *ev) +{ + ngx_http_request_t *r; + ngx_http_v3_stream_t *stream; + xqc_h3_request_t *h3_request; + ngx_http_upstream_t *u; + ngx_connection_t *fc; + + stream = ev->data; + r = stream->request; + fc = r->connection; + u = r->upstream; + h3_request = stream->h3_request; + + + ngx_http_set_log_request(fc->log, r); + + ngx_log_debug2(NGX_LOG_DEBUG_HTTP, fc->log, 0, + "http upstream request: \"%V?%V\"", &r->uri, &r->args); + + if (ev->delayed && ev->timedout) { + ev->delayed = 0; + ev->timedout = 0; + } + + u->write_event_handler(r, u); + ngx_http_run_posted_requests(fc); + + if (ngx_http_v3_recv_body(r, stream, h3_request) != NGX_OK) { + ngx_log_error(NGX_LOG_WARN, ngx_cycle->log, 0, + "|xquic|ngx_http_v3_upstream_write_and_recv_body_handler error| ngx_http_v3_recv_body error"); + } + +} + + ngx_int_t ngx_http_v3_recv_body(ngx_http_request_t *r, ngx_http_v3_stream_t *stream, @@ -853,11 +891,15 @@ ngx_http_v3_recv_body(ngx_http_request_t *r, ngx_http_core_loc_conf_t *clcf; ngx_buf_t *buf; ngx_int_t rc; - ngx_connection_t *fc; + ngx_connection_t *fc, *c, *pc; + ngx_http_upstream_t *u; off_t len; clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); fc = r->connection; + c = stream->connection->connection; + u = r->upstream; + pc = u->peer.connection; /* check if skip data */ if (stream->skip_data) { @@ -893,31 +935,65 @@ ngx_http_v3_recv_body(ngx_http_request_t *r, ssize_t size = 0; uint8_t fin = 0; + ssize_t wait_write = 0; do { if (buf->last == buf->end) { - len = buf->end - buf->start; - off_t pos_len = buf->pos - buf->start; - u_char *new_buf = ngx_pcalloc(r->pool, len * 2); - if (new_buf == NULL) { - ngx_log_error(NGX_LOG_WARN, fc->log, 0, - "|xquic|ngx_http_v3_recv_body|ngx_pcalloc error|"); + if (r->request_body_no_buffering) { - return NGX_ERROR; - } + if (!pc) { + ngx_log_error(NGX_LOG_WARN, fc->log, 0, + "|xquic|ngx_http_v3_recv_body|the peer connection to upstream has been closed"); + return NGX_ERROR; + } - ngx_memcpy(new_buf, buf->start, len); - buf->pos = new_buf + pos_len; - buf->last = new_buf + len; - buf->end = new_buf + len * 2; - ngx_pfree(r->pool, buf->start); - buf->start = new_buf; + if (ngx_handle_write_event(pc->write, u->conf->send_lowat) != NGX_OK) { + return NGX_ERROR; + } + + if (u->request_sent) { + if (pc->write->ready) { + pc->write->handler(pc->write); + } + if (!wait_write) { + wait_write = 1; + continue; + } + + if (!c) { + ngx_log_error(NGX_LOG_WARN, fc->log, 0, + "|xquic|ngx_http_v3_recv_body|the connection to client has been closed"); + return NGX_ERROR; + } + + pc->write->ready = 1; + pc->write->data = stream; + // pc->write->saved_handler = pc->write->handler; + pc->write->handler = ngx_http_v3_upstream_write_and_recv_body_handler; + c->read->active = 0; + + return NGX_OK; + } + + return NGX_OK; + } + return NGX_ERROR; } ngx_log_error(NGX_LOG_DEBUG, fc->log, 0, "|xquic|ngx_http_v3_recv_body|buf->size:%z|", buf->end - buf->last); + if (c && c->read->active == 0) { + c->read->active = 1; + c->read->ready = 0; + c->read->data = c; + c->read->handler = ngx_http_xquic_read_handler; + } + + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + ngx_add_timer(fc->read, clcf->client_body_timeout); + size = xqc_h3_request_recv_body(h3_request, buf->last, buf->end - buf->last, &fin); if (size == -XQC_EAGAIN) { break; diff --git a/modules/ngx_http_xquic_module/ngx_http_xquic.c b/modules/ngx_http_xquic_module/ngx_http_xquic.c index 33bbf8415f..80d559ae50 100644 --- a/modules/ngx_http_xquic_module/ngx_http_xquic.c +++ b/modules/ngx_http_xquic_module/ngx_http_xquic.c @@ -634,7 +634,7 @@ ngx_http_v3_filter_request_body(ngx_http_request_t *r) ngx_log_error(NGX_LOG_DEBUG, fc->log, 0, "|xquic|ngx_http_v3_filter_request_body|size:%O, fin:%O|", buf->last - buf->pos, fin); - if (buf->pos == buf->last && rb->rest) { + if (buf->pos == buf->last && (rb->rest || rb->last_sent)) { cl = NULL; goto update; } @@ -682,9 +682,10 @@ ngx_http_v3_filter_request_body(ngx_http_request_t *r) b->last = buf->last; b->start = b->pos; b->end = b->last; + + buf->pos = buf->last; } - buf->pos = buf->last = buf->start; ngx_log_error(NGX_LOG_DEBUG, fc->log, 0, "|xquic|ngx_http_v3_filter_request_body|received:%O|", rb->received); @@ -701,6 +702,7 @@ ngx_http_v3_filter_request_body(ngx_http_request_t *r) } b->last_buf = 1; + rb->last_sent = 1; } b->tag = (ngx_buf_tag_t) &ngx_http_v3_filter_request_body; @@ -979,7 +981,7 @@ ngx_http_xquic_session_process_packet(ngx_http_xquic_connection_t *qc, /** * used to recv udp packets */ -static void +void ngx_http_xquic_read_handler(ngx_event_t *rev) { ssize_t n; diff --git a/modules/ngx_http_xquic_module/ngx_http_xquic.h b/modules/ngx_http_xquic_module/ngx_http_xquic.h index c710ced252..d59e1454fa 100644 --- a/modules/ngx_http_xquic_module/ngx_http_xquic.h +++ b/modules/ngx_http_xquic_module/ngx_http_xquic.h @@ -177,6 +177,7 @@ void ngx_http_v3_read_client_request_body_handler(ngx_http_request_t *r); xqc_int_t ngx_http_v3_cert_cb(const char *sni, void **chain, void **cert, void **key, void *conn_user_data); +void ngx_http_xquic_read_handler(ngx_event_t *rev); #endif /* _NGX_HTTP_XQUIC_H_INCLUDED_ */ diff --git a/src/http/modules/ngx_http_upstream_keepalive_module.c b/src/http/modules/ngx_http_upstream_keepalive_module.c index 1a4dfd7766..119b5055cc 100644 --- a/src/http/modules/ngx_http_upstream_keepalive_module.c +++ b/src/http/modules/ngx_http_upstream_keepalive_module.c @@ -289,6 +289,9 @@ ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) c->read->log = pc->log; c->write->log = pc->log; c->pool->log = pc->log; +#if (T_NGX_XQUIC) + c->write->data = c; +#endif if (c->read->timer_set) { ngx_del_timer(c->read);