Skip to content

Commit

Permalink
Avoid libc buffered IO
Browse files Browse the repository at this point in the history
the FILE related IO functions in libc do buffering inside the C library,
and are generally less performant than using the file descriptor based
open()/read()/write() functions.

The FILE based approach somewhat limits the maximum throughput of
librepo, as well as increases CPU usage. In my benchmarks of a reposync
of the Amazon Linux 2023 x86-64 repositories, this move to file
descriptor based IO saves about 1 second of user time, and .5 seconds of
system time, for a wall clock time benefit of a few seconds (102s vs
99s).
  • Loading branch information
stewartsmith committed Jun 5, 2024
1 parent ba5365f commit ebd4be0
Showing 1 changed file with 84 additions and 56 deletions.
140 changes: 84 additions & 56 deletions librepo/downloader.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ typedef struct {
Current protocol */
CURL *curl_handle; /*!<
Used curl handle or NULL */
FILE *f; /*!<
fdopened file descriptor from LrDownloadTarget and used
int fd; /*!<
opened file descriptor from LrDownloadTarget and used
in curl_handle. */
char errorbuffer[CURL_ERROR_SIZE]; /*!<
Error buffer used in curl handle */
Expand Down Expand Up @@ -280,7 +280,7 @@ typedef struct {
* | LrDownloadTarget *target -----------/ | int fd |
* | LrMirror *mirror --------/ | LrChecksumType checks.. |
* | CURL *curl_handle |-+ | char *checksum |
* | FILE *f | | int resume |
* | int fd | | int resume |
* | GSList *tried_mirrors | | LrProgressCb progresscb |
* | gint64 original_offset | | void *cbdata |
* | GSlist *lrmirrors ---\ | GStringChunk *chunk |
Expand Down Expand Up @@ -604,22 +604,51 @@ lr_zck_writecb(char *ptr, size_t size, size_t nmemb, void *userdata)
size_t
lr_writecb(char *ptr, size_t size, size_t nmemb, void *userdata)
{
size_t cur_written_expected = nmemb;
size_t cur_written;
LrTarget *target = (LrTarget *) userdata;
#ifdef WITH_ZCHUNK
if(target->target->is_zchunk && !target->range_fail && target->mirror->mirror->protocol == LR_PROTOCOL_HTTP)
return lr_zck_writecb(ptr, size, nmemb, userdata);
#endif /* WITH_ZCHUNK */

gint64 all = size * nmemb; // Total number of bytes from curl
gint64 range_start = target->target->byterangestart;
gint64 range_end = target->target->byterangeend;

/* libcurl docs tell us size is always 1, but don't bet on it */
size_t all;
ssize_t r;
ssize_t written;
/*
* We write up to 32MB at a time, mainly to ensure that the below loops
* never bitrot, and it's regularly tested with real world RPMs.
*/
#define WRITECB_CHUNK_MAX 32*1024*1024
/*
* The libcurl docs point to size always being 1, and the max value for
* nmemb being CURL_MAX_WRITE_SIZE (default 16kb, compile time setting),
* and if CURLOPT_HEADER is set, then it's CURL_MAX_HTTP_HEADER, usually
* meaning 100k.
*/
all = size * nmemb;
written = 0;
if ((size != 0 && all / size != nmemb) || all > SSIZE_MAX) {
/*
* But if libcurl changes (likely breaking the world), let's handle the
* overflow by just writing one chunk at a time until we're done.
*/
all = WRITECB_CHUNK_MAX;
}

if (range_start <= 0 && range_end <= 0) {
// Write everything curl give to you
target->writecb_recieved += all;
return fwrite(ptr, size, nmemb, target->f);
while(( all - written ) > 0) {
r = write(target->fd, ptr + written, ((all - written) > WRITECB_CHUNK_MAX)? WRITECB_CHUNK_MAX : all - written);
if (r < 0 && errno != EINTR) {
g_warning("Error while writing file: %s", g_strerror(errno));
return (size_t)written;
}
written += r;
target->writecb_recieved += r;
}
return (size_t)written;
}

/* Deal with situation when user wants only specific byte range of the
Expand All @@ -629,8 +658,6 @@ lr_writecb(char *ptr, size_t size, size_t nmemb, void *userdata)
gint64 cur_range_start = target->writecb_recieved;
gint64 cur_range_end = cur_range_start + all;

target->writecb_recieved += all;

if (target->target->byterangestart > 0) {
// If byterangestart is specified, then CURLOPT_RESUME_FROM_LARGE
// is used by default
Expand Down Expand Up @@ -684,13 +711,25 @@ lr_writecb(char *ptr, size_t size, size_t nmemb, void *userdata)
}

assert(nmemb > 0);
cur_written = fwrite(ptr, size, nmemb, target->f);
if (cur_written != nmemb) {
g_warning("Error while writing file: %s", g_strerror(errno));
return 0; // There was an error
}

return cur_written_expected;
all = size * nmemb;
written = 0;
if ((size != 0 && all / size != nmemb) || all > SSIZE_MAX) {
/*
* But if libcurl changes (likely breaking the world), let's handle the
* overflow by just writing one chunk at a time until we're done.
*/
all = WRITECB_CHUNK_MAX;
}
while(( all - written ) > 0) {
r = write(target->fd, ptr + written, ((all - written) > WRITECB_CHUNK_MAX)? WRITECB_CHUNK_MAX : all - written);
if (r < 0 && errno != EINTR) {
g_warning("Error while writing file: %s", g_strerror(errno));
return (size_t)written;
}
written += r;
target->writecb_recieved += r;
}
return (size_t)written;
}

/** Select a suitable mirror
Expand Down Expand Up @@ -1033,9 +1072,9 @@ remove_librepo_xattr(LrDownloadTarget * target)
gboolean
lr_zck_clear_header(LrTarget *target, GError **err)
{
assert(target && target->f && target->target && target->target->path);
assert(target && target->fd >= 0 && target->target && target->target->path);

int fd = fileno(target->f);
int fd = target->fd;
lseek(fd, 0, SEEK_END);
if(ftruncate(fd, 0) < 0) {
g_set_error(err, LR_DOWNLOADER_ERROR, LRE_IO,
Expand All @@ -1051,7 +1090,7 @@ find_local_zck_header(LrTarget *target, GError **err)
{
zckCtx *zck = NULL;
gboolean found = FALSE;
int fd = fileno(target->f);
int fd = target->fd;

if(target->target->handle->cachedir) {
g_debug("%s: Cache directory: %s\n", __func__,
Expand Down Expand Up @@ -1129,7 +1168,7 @@ static gboolean
prep_zck_header(LrTarget *target, GError **err)
{
zckCtx *zck = NULL;
int fd = fileno(target->f);
int fd = target->fd;
GError *tmp_err = NULL;

if(lr_zck_valid_header(target->target, target->target->path, fd,
Expand Down Expand Up @@ -1187,7 +1226,7 @@ find_local_zck_chunks(LrTarget *target, GError **err)
assert(target && target->target && target->target->zck_dl);

zckCtx *zck = zck_dl_get_zck(target->target->zck_dl);
int fd = fileno(target->f);
int fd = target->fd;
if(zck && fd != zck_get_fd(zck) && !zck_set_fd(zck, fd)) {
g_set_error(err, LR_DOWNLOADER_ERROR, LRE_ZCK,
"Unable to set zchunk file descriptor for %s: %s",
Expand Down Expand Up @@ -1255,7 +1294,7 @@ static gboolean
prep_zck_body(LrTarget *target, GError **err)
{
zckCtx *zck = zck_dl_get_zck(target->target->zck_dl);
int fd = fileno(target->f);
int fd = target->fd;
if(zck && fd != zck_get_fd(zck) && !zck_set_fd(zck, fd)) {
g_set_error(err, LR_DOWNLOADER_ERROR, LRE_ZCK,
"Unable to set zchunk file descriptor for %s: %s",
Expand Down Expand Up @@ -1295,7 +1334,7 @@ static gboolean
check_zck(LrTarget *target, GError **err)
{
assert(!err || *err == NULL);
assert(target && target->f && target->target);
assert(target && target->fd >= 0 && target->target);

if(target->mirror->max_ranges == 0 || target->mirror->mirror->protocol != LR_PROTOCOL_HTTP) {
target->zck_state = LR_ZCK_DL_BODY;
Expand Down Expand Up @@ -1391,11 +1430,10 @@ check_zck(LrTarget *target, GError **err)

/** Open the file to write to
*/
static FILE*
static int
open_target_file(LrTarget *target, GError **err)
{
int fd;
FILE *f;

if (target->target->fd != -1) {
// Use supplied filedescriptor
Expand All @@ -1404,7 +1442,7 @@ open_target_file(LrTarget *target, GError **err)
g_set_error(err, LR_DOWNLOADER_ERROR, LRE_IO,
"dup(%d) failed: %s",
target->target->fd, g_strerror(errno));
return NULL;
return -1;
}
} else {
// Use supplied filename
Expand All @@ -1417,20 +1455,11 @@ open_target_file(LrTarget *target, GError **err)
g_set_error(err, LR_DOWNLOADER_ERROR, LRE_IO,
"Cannot open %s: %s",
target->target->fn, g_strerror(errno));
return NULL;
return -1;
}
}

f = fdopen(fd, "w+b");
if (!f) {
close(fd);
g_set_error(err, LR_DOWNLOADER_ERROR, LRE_IO,
"fdopen(%d) failed: %s",
fd, g_strerror(errno));
return NULL;
}

return f;
return fd;
}

/** Prepare next transfer
Expand Down Expand Up @@ -1511,8 +1540,8 @@ prepare_next_transfer(LrDownload *dd, gboolean *candidatefound, GError **err)
}

// Prepare FILE
target->f = open_target_file(target, err);
if (!target->f)
target->fd = open_target_file(target, err);
if (target->fd < 0)
goto fail;
target->writecb_recieved = 0;
target->writecb_required_range_written = FALSE;
Expand All @@ -1538,15 +1567,15 @@ prepare_next_transfer(LrDownload *dd, gboolean *candidatefound, GError **err)
target->curl_handle = NULL;
g_free(target->headercb_interrupt_reason);
target->headercb_interrupt_reason = NULL;
fclose(target->f);
target->f = NULL;
close(target->fd);
target->fd = -1;
lr_downloadtarget_set_error(target->target, LRE_OK, NULL);
return prepare_next_transfer(dd, candidatefound, err);
}
}
# endif /* WITH_ZCHUNK */

int fd = fileno(target->f);
int fd = target->fd;

// Allow resume only for files that were originally being
// downloaded by librepo
Expand All @@ -1573,8 +1602,7 @@ prepare_next_transfer(LrDownload *dd, gboolean *candidatefound, GError **err)

if (target->original_offset == -1) {
// Determine offset
fseek(target->f, 0L, SEEK_END);
gint64 determined_offset = ftell(target->f);
off_t determined_offset = lseek(target->fd, 0L, SEEK_END);
if (determined_offset == -1) {
// An error while determining offset =>
// Download the whole file again
Expand Down Expand Up @@ -1689,9 +1717,9 @@ prepare_next_transfer(LrDownload *dd, gboolean *candidatefound, GError **err)
curl_easy_cleanup(target->curl_handle);
target->curl_handle = NULL;
}
if (target->f != NULL) {
fclose(target->f);
target->f = NULL;
if (target->fd != -1) {
close(target->fd);
target->fd = -1;
}

return FALSE;
Expand Down Expand Up @@ -2262,8 +2290,7 @@ check_transfer_statuses(LrDownload *dd, GError **err)
//
// Checksum checking
//
fflush(target->f);
fd = fileno(target->f);
fd = target->fd;

// Preserve timestamp of downloaded file if requested
if (target->target->handle && target->target->handle->preservetime) {
Expand Down Expand Up @@ -2353,8 +2380,8 @@ check_transfer_statuses(LrDownload *dd, GError **err)
target->curl_handle = NULL;
g_free(target->headercb_interrupt_reason);
target->headercb_interrupt_reason = NULL;
fclose(target->f);
target->f = NULL;
close(target->fd);
target->fd = -1;
if (target->curl_rqheaders) {
curl_slist_free_all(target->curl_rqheaders);
target->curl_rqheaders = NULL;
Expand Down Expand Up @@ -2725,6 +2752,7 @@ lr_download(GSList *targets,
target->target->rcode = LRE_UNFINISHED;
target->target->err = "Not finished";
target->handle = dtarget->handle;
target->fd = -1;
dd.targets = g_slist_append(dd.targets, target);
// Add list of handle internal mirrors to dd.handle_mirrors
// if doesn't exists yet and set the list reference
Expand Down Expand Up @@ -2756,8 +2784,8 @@ lr_download(GSList *targets,
curl_multi_remove_handle(dd.multi_handle, target->curl_handle);
curl_easy_cleanup(target->curl_handle);
target->curl_handle = NULL;
fclose(target->f);
target->f = NULL;
close(target->fd);
target->fd = -1;
g_free(target->headercb_interrupt_reason);
target->headercb_interrupt_reason = NULL;

Expand Down Expand Up @@ -2803,7 +2831,7 @@ lr_download(GSList *targets,
for (GSList *elem = dd.targets; elem; elem = g_slist_next(elem)) {
LrTarget *target = elem->data;
assert(target->curl_handle == NULL);
assert(target->f == NULL);
assert(target->fd == -1);

// Remove file created for the target if download was
// unsuccessful and the file doesn't exists before or
Expand Down

0 comments on commit ebd4be0

Please sign in to comment.