Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

irmin-pack: appending into the sparse file #2192

Merged
merged 1 commit into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions src/irmin-pack/unix/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
Suffix.read_range_exn (Fm.suffix t.fm) ~off ~min_len ~max_len buf
| None -> Sparse.read_range_exn (get_prefix t) ~off ~min_len ~max_len buf

let read_bytes_exn t ~f ~off ~len =
let read_seq_exn t ~off ~len =
let bytes_in_prefix =
let open Int63.Syntax in
let prefix_bytes_after_off = suffix_start_offset t - off in
Expand All @@ -112,28 +112,38 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
let len = Int63.to_int len in
let max_read_size = min 8192 len in
let buffer = Bytes.create max_read_size in
let rec aux read_exn ~off ~len =
if len = 0 then ()
let rec aux read_exn ~off ~len () =
if len <= 0 then Seq.Nil
else
let read_len = min len max_read_size in
read_exn ~off ~len:read_len buffer;
f (Bytes.sub_string buffer 0 read_len);
aux read_exn
~off:Int63.Syntax.(off + Int63.of_int read_len)
~len:(len - read_len)
Seq.Cons
( Bytes.sub_string buffer 0 read_len,
aux read_exn
~off:Int63.Syntax.(off + Int63.of_int read_len)
~len:(len - read_len) )
in
if bytes_in_prefix > Int63.zero then
aux
(Sparse.read_exn (get_prefix t))
~off
~len:(Int63.to_int bytes_in_prefix);
if bytes_in_suffix > Int63.zero then
let off = Int63.Syntax.(off + bytes_in_prefix) in
let off = soff_of_offset t off in
aux
(Suffix.read_exn (get_suffix t))
~off
~len:(Int63.to_int bytes_in_suffix)
let prefix =
if bytes_in_prefix <= Int63.zero then Seq.empty
else
aux
(Sparse.read_exn (get_prefix t))
~off
~len:(Int63.to_int bytes_in_prefix)
in
let suffix =
if bytes_in_suffix <= Int63.zero then Seq.empty
else
let off = Int63.Syntax.(off + bytes_in_prefix) in
let off = soff_of_offset t off in
aux
(Suffix.read_exn (get_suffix t))
~off
~len:(Int63.to_int bytes_in_suffix)
in
Seq.append prefix suffix

let read_bytes_exn t ~f ~off ~len = Seq.iter f (read_seq_exn t ~off ~len)

let next_valid_offset t ~off =
let open Int63.Syntax in
Expand Down
2 changes: 2 additions & 0 deletions src/irmin-pack/unix/dispatcher_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ module type S = sig
(** [soff_of_offset t global_offset] converts a global offset to a suffix
offset. *)

val read_seq_exn : t -> off:int63 -> len:int63 -> string Seq.t

val read_bytes_exn : t -> f:(string -> unit) -> off:int63 -> len:int63 -> unit
(** [read_bytes_exn] reads a slice of the global offset space defined by [off]
and [len].
Expand Down
2 changes: 0 additions & 2 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ struct
module Dict = Append_only_file.Make (Io) (Errs)
module Suffix = Chunked_suffix.Make (Io) (Errs)
module Sparse = Sparse_file.Make (Io)
module Mapping_file = Sparse.Mapping_file
module Lower = Lower.Make (Io) (Errs)

type after_reload_consumer = { after_reload : unit -> (unit, Errs.t) result }
Expand Down Expand Up @@ -200,7 +199,6 @@ struct
let* () = Dict.fsync t.dict in
let* () = Suffix.fsync t.suffix in
let* () = Control.fsync t.control in
let* () = Option.might Sparse.fsync t.prefix in
Index.flush ~with_fsync:true t.index

(* Constructors *********************************************************** *)
Expand Down
144 changes: 66 additions & 78 deletions src/irmin-pack/unix/gc_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@ module Make (Args : Gc_args.S) = struct
open Args
module Io = Fm.Io
module Sparse = Dispatcher.Fm.Sparse
module Mapping_file = Sparse.Mapping_file
module Ao = Append_only_file.Make (Fm.Io) (Errs)

module Ao = struct
include Append_only_file.Make (Fm.Io) (Errs)
let string_of_key = Irmin.Type.to_string key_t

let create_rw_exn ~path =
create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_procedure:`Internal
|> Errs.raise_if_error
end
module Live = struct
type t = (int63 * int) list

let string_of_key = Irmin.Type.to_string key_t
let empty = []

let add ~off ~len (t : t) : t =
let off_end = Int63.(Syntax.(off + of_int len)) in
match t with
| (off', len') :: rest when off_end = off' -> (off, len + len') :: rest
| _ -> (off, len) :: t
end

module Priority_queue = struct
module Offset_rev = struct
Expand Down Expand Up @@ -111,26 +114,21 @@ module Make (Args : Gc_args.S) = struct
disk in order to correctly deserialised the gced commit. *)
let magic_parent =
Pack_value.Kind.to_magic Pack_value.Kind.Dangling_parent_commit
|> String.make 1

(* Transfer the commit with a different magic. Note that this is modifying
existing written data. *)
let transfer_parent_commit_exn ~read_exn ~write_exn key =
let transfer_parent_commit_exn ~write_exn key =
match Pack_key.inspect key with
| Indexed _ ->
(* Its possible that some parents are referenced by hash. *)
()
| Direct { offset = off; length = len; _ } ->
let buffer = Bytes.create len in
read_exn ~off ~len buffer;
Bytes.set buffer Hash.hash_size magic_parent;
(* Bytes.unsafe_to_string usage: We assume read_exn returns unique
ownership of buffer to this function. Then at the call to
Bytes.unsafe_to_string we give up unique ownership to buffer (we do
not modify it thereafter) in return for ownership of the resulting
string, which we pass to write_exn. This usage is safe. *)
write_exn ~off ~len (Bytes.unsafe_to_string buffer)
| Direct { offset = off; _ } ->
(* Targeted write to change the parent commit kind to dangling. *)
let off = Int63.(Syntax.(off + of_int Hash.hash_size)) in
write_exn ~off ~len:1 magic_parent

let report_old_file_sizes ~root ~generation stats =
let prefix_file_sizes ~root ~generation =
let open Result_syntax in
let* prefix_size =
if generation = 0 then Ok Int63.zero
Expand All @@ -140,9 +138,20 @@ module Make (Args : Gc_args.S) = struct
if generation = 0 then Ok Int63.zero
else Irmin_pack.Layout.V3.mapping ~root ~generation |> Io.size_of_path
in
(mapping_size, prefix_size)

let report_old_file_sizes ~root ~generation stats =
let open Result_syntax in
let+ mapping_size, prefix_size = prefix_file_sizes ~root ~generation in
stats := Gc_stats.Worker.add_file_size !stats "old_prefix" prefix_size;
stats := Gc_stats.Worker.add_file_size !stats "old_mapping" mapping_size

let report_new_file_sizes ~root ~generation stats =
let open Result_syntax in
let+ mapping_size, prefix_size = prefix_file_sizes ~root ~generation in
stats := Gc_stats.Worker.add_file_size !stats "prefix" prefix_size;
stats := Gc_stats.Worker.add_file_size !stats "mapping" mapping_size

type suffix_params = {
start_offset : int63;
chunk_start_idx : int;
Expand Down Expand Up @@ -193,23 +202,18 @@ module Make (Args : Gc_args.S) = struct
| Some commit -> commit
in

(* Step 3. Create the new mapping. *)
let mapping =
(* Step 3.1 Start [Mapping_file] routine which will create the
reachable file. *)
(* Step 3. Compute the list of [offset, length] ranges of live objects
reachable from the GC commit. *)
let live_entries =
stats := Gc_stats.Worker.finish_current_step !stats "mapping: start";
(fun f ->
let report_mapping_size size =
stats := Gc_stats.Worker.add_file_size !stats "mapping" size
in
let path =
Irmin_pack.Layout.V3.mapping ~generation ~root:new_files_path
in
Mapping_file.create ~report_mapping_size ~path ~register_entries:f ()
|> Errs.raise_if_error)
@@ fun ~register_entry ->
(* Step 3.1 The compact list of reachable [offset, length] *)
let live_entries = ref Live.empty in
let register_entry ~off ~len =
stats := Gc_stats.Worker.incr_objects_traversed !stats;
live_entries := Live.add ~off ~len !live_entries
in
(* Step 3.2 If the commit parents are referenced by offset, then include
the commit parents in the reachable file. The parent(s) of [commit_key]
the commit parents in the reachable list. The parent(s) of [commit_key]
must be included in the iteration because, when decoding the
[Commit_value.t] at [commit_key], the parents will have to be read in
order to produce a key for them. If the parent is referenced by hash,
Expand All @@ -218,82 +222,66 @@ module Make (Args : Gc_args.S) = struct
stats :=
Gc_stats.Worker.finish_current_step !stats
"mapping: commits to reachable";
let register_entry ~off ~len =
stats := Gc_stats.Worker.incr_objects_traversed !stats;
register_entry ~off ~len
in

(* Step 3.3 Put the commit in the reachable file. *)
let off = Node_store.get_offset node_store commit_key in
let len = Node_store.get_length node_store commit_key in
register_entry ~off ~len;

(* Step 3.4 Put the nodes and contents in the reachable file. *)
(* Step 3.4 Put the nodes and contents in the reachable list. *)
stats :=
Gc_stats.Worker.finish_current_step !stats
"mapping: objects to reachable";
iter_reachable commit node_store ~f:register_entry;

(* Step 3.5 Return and let the [Mapping_file] routine create the mapping
file. *)
(* Step 3.5 Return *)
stats :=
Gc_stats.Worker.finish_current_step !stats "mapping: of reachable";
()
!live_entries
in

let () =
(* Step 4. Create the new prefix. *)
stats := Gc_stats.Worker.finish_current_step !stats "prefix: start";
let prefix =
let path =
Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation
in
Ao.create_rw_exn ~path
let mapping =
Irmin_pack.Layout.V4.mapping ~root:new_files_path ~generation
in
let data = Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation in
let () =
Errors.finalise_exn (fun _outcome ->
stats :=
Gc_stats.Worker.add_file_size !stats "prefix" (Ao.end_poff prefix);
Ao.close prefix |> Errs.log_if_error "GC: Close prefix")
@@ fun () ->
let prefix = Sparse.Ao.create ~mapping ~data |> Errs.raise_if_error in
(* Step 5. Transfer to the new prefix, flush and close. *)
[%log.debug "GC: transfering to the new prefix"];
stats := Gc_stats.Worker.finish_current_step !stats "prefix: transfer";
Errors.finalise_exn (fun _outcome ->
Sparse.Ao.flush prefix
>>= (fun _ -> Sparse.Ao.close prefix)
|> Errs.log_if_error "GC: Close prefix after data copy")
@@ fun () ->
(* Step 5.1. Transfer all. *)
let append_exn = Ao.append_exn prefix in
let f ~off ~len =
let len = Int63.of_int len in
Dispatcher.read_bytes_exn dispatcher ~f:append_exn ~off ~len
in
let () = Mapping_file.iter_exn mapping f in
Ao.flush prefix |> Errs.raise_if_error
List.iter
(fun (off, len) ->
let len = Int63.of_int len in
let str = Dispatcher.read_seq_exn dispatcher ~off ~len in
Sparse.Ao.append_seq_exn prefix ~off str)
live_entries
in
(* Step 5.2. Transfer again the parent commits but with a modified
magic. Reopen the new prefix, this time _not_ in append-only
as we have to modify data inside the file. *)
(* Step 5.2. Update the parent commits to be dangling.
Reopen the new prefix, this time in write-only as we have to modify data inside the file. *)
stats :=
Gc_stats.Worker.finish_current_step !stats
"prefix: rewrite commit parents";
let read_exn ~off ~len buf =
Dispatcher.read_exn dispatcher ~off ~len buf
in
let prefix_data =
let path =
Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation
in
Io.open_ ~path ~readonly:false |> Errs.raise_if_error
in
let prefix = Sparse.v ~mapping ~data:prefix_data in
let prefix = Sparse.Wo.open_wo ~mapping ~data |> Errs.raise_if_error in
Errors.finalise_exn (fun _outcome ->
Sparse.fsync prefix
>>= (fun _ -> Sparse.close prefix)
Sparse.Wo.fsync prefix
>>= (fun _ -> Sparse.Wo.close prefix)
|> Errs.log_if_error "GC: Close prefix after parent rewrite")
@@ fun () ->
let write_exn = Sparse.write_exn prefix in
let write_exn = Sparse.Wo.write_exn prefix in
List.iter
(fun key -> transfer_parent_commit_exn ~read_exn ~write_exn key)
(fun key -> transfer_parent_commit_exn ~write_exn key)
(Commit_value.parents commit)
in
let () = report_new_file_sizes ~root ~generation stats |> ignore in

(* Step 6. Calculate post-GC suffix parameters. *)
let suffix_params, removable_chunk_idxs =
Expand Down
Loading