Skip to content

Commit

Permalink
Merge 7e85e83 into 0a6fbbe
Browse files Browse the repository at this point in the history
  • Loading branch information
art-w authored Feb 24, 2023
2 parents 0a6fbbe + 7e85e83 commit 4a5e92c
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 346 deletions.
48 changes: 29 additions & 19 deletions src/irmin-pack/unix/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
Suffix.read_range_exn (Fm.suffix t.fm) ~off ~min_len ~max_len buf
| None -> Sparse.read_range_exn (get_prefix t) ~off ~min_len ~max_len buf

let read_bytes_exn t ~f ~off ~len =
let read_seq_exn t ~off ~len =
let bytes_in_prefix =
let open Int63.Syntax in
let prefix_bytes_after_off = suffix_start_offset t - off in
Expand All @@ -112,28 +112,38 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) :
let len = Int63.to_int len in
let max_read_size = min 8192 len in
let buffer = Bytes.create max_read_size in
let rec aux read_exn ~off ~len =
if len = 0 then ()
let rec aux read_exn ~off ~len () =
if len <= 0 then Seq.Nil
else
let read_len = min len max_read_size in
read_exn ~off ~len:read_len buffer;
f (Bytes.sub_string buffer 0 read_len);
aux read_exn
~off:Int63.Syntax.(off + Int63.of_int read_len)
~len:(len - read_len)
Seq.Cons
( Bytes.sub_string buffer 0 read_len,
aux read_exn
~off:Int63.Syntax.(off + Int63.of_int read_len)
~len:(len - read_len) )
in
if bytes_in_prefix > Int63.zero then
aux
(Sparse.read_exn (get_prefix t))
~off
~len:(Int63.to_int bytes_in_prefix);
if bytes_in_suffix > Int63.zero then
let off = Int63.Syntax.(off + bytes_in_prefix) in
let off = soff_of_offset t off in
aux
(Suffix.read_exn (get_suffix t))
~off
~len:(Int63.to_int bytes_in_suffix)
let prefix =
if bytes_in_prefix <= Int63.zero then Seq.empty
else
aux
(Sparse.read_exn (get_prefix t))
~off
~len:(Int63.to_int bytes_in_prefix)
in
let suffix =
if bytes_in_suffix <= Int63.zero then Seq.empty
else
let off = Int63.Syntax.(off + bytes_in_prefix) in
let off = soff_of_offset t off in
aux
(Suffix.read_exn (get_suffix t))
~off
~len:(Int63.to_int bytes_in_suffix)
in
Seq.append prefix suffix

let read_bytes_exn t ~f ~off ~len = Seq.iter f (read_seq_exn t ~off ~len)

let next_valid_offset t ~off =
let open Int63.Syntax in
Expand Down
2 changes: 2 additions & 0 deletions src/irmin-pack/unix/dispatcher_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ module type S = sig
(** [soff_of_offset t global_offset] converts a global offset to a suffix
offset. *)

val read_seq_exn : t -> off:int63 -> len:int63 -> string Seq.t

val read_bytes_exn : t -> f:(string -> unit) -> off:int63 -> len:int63 -> unit
(** [read_bytes_exn] reads a slice of the global offset space defined by [off]
and [len].
Expand Down
2 changes: 0 additions & 2 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ struct
module Dict = Append_only_file.Make (Io) (Errs)
module Suffix = Chunked_suffix.Make (Io) (Errs)
module Sparse = Sparse_file.Make (Io)
module Mapping_file = Sparse.Mapping_file
module Lower = Lower.Make (Io) (Errs)

type after_reload_consumer = { after_reload : unit -> (unit, Errs.t) result }
Expand Down Expand Up @@ -200,7 +199,6 @@ struct
let* () = Dict.fsync t.dict in
let* () = Suffix.fsync t.suffix in
let* () = Control.fsync t.control in
let* () = Option.might Sparse.fsync t.prefix in
Index.flush ~with_fsync:true t.index

(* Constructors *********************************************************** *)
Expand Down
144 changes: 66 additions & 78 deletions src/irmin-pack/unix/gc_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,21 @@ module Make (Args : Gc_args.S) = struct
open Args
module Io = Fm.Io
module Sparse = Dispatcher.Fm.Sparse
module Mapping_file = Sparse.Mapping_file
module Ao = Append_only_file.Make (Fm.Io) (Errs)

module Ao = struct
include Append_only_file.Make (Fm.Io) (Errs)
let string_of_key = Irmin.Type.to_string key_t

let create_rw_exn ~path =
create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_procedure:`Internal
|> Errs.raise_if_error
end
module Live = struct
type t = (int63 * int) list

let string_of_key = Irmin.Type.to_string key_t
let empty = []

let add ~off ~len (t : t) : t =
let off_end = Int63.(Syntax.(off + of_int len)) in
match t with
| (off', len') :: rest when off_end = off' -> (off, len + len') :: rest
| _ -> (off, len) :: t
end

module Priority_queue = struct
module Offset_rev = struct
Expand Down Expand Up @@ -111,26 +114,21 @@ module Make (Args : Gc_args.S) = struct
disk in order to correctly deserialised the gced commit. *)
let magic_parent =
Pack_value.Kind.to_magic Pack_value.Kind.Dangling_parent_commit
|> String.make 1

(* Transfer the commit with a different magic. Note that this is modifying
existing written data. *)
let transfer_parent_commit_exn ~read_exn ~write_exn key =
let transfer_parent_commit_exn ~write_exn key =
match Pack_key.inspect key with
| Indexed _ ->
(* Its possible that some parents are referenced by hash. *)
()
| Direct { offset = off; length = len; _ } ->
let buffer = Bytes.create len in
read_exn ~off ~len buffer;
Bytes.set buffer Hash.hash_size magic_parent;
(* Bytes.unsafe_to_string usage: We assume read_exn returns unique
ownership of buffer to this function. Then at the call to
Bytes.unsafe_to_string we give up unique ownership to buffer (we do
not modify it thereafter) in return for ownership of the resulting
string, which we pass to write_exn. This usage is safe. *)
write_exn ~off ~len (Bytes.unsafe_to_string buffer)
| Direct { offset = off; _ } ->
(* Targeted write to change the parent commit kind to dangling. *)
let off = Int63.(Syntax.(off + of_int Hash.hash_size)) in
write_exn ~off ~len:1 magic_parent

let report_old_file_sizes ~root ~generation stats =
let prefix_file_sizes ~root ~generation =
let open Result_syntax in
let* prefix_size =
if generation = 0 then Ok Int63.zero
Expand All @@ -140,9 +138,20 @@ module Make (Args : Gc_args.S) = struct
if generation = 0 then Ok Int63.zero
else Irmin_pack.Layout.V3.mapping ~root ~generation |> Io.size_of_path
in
(mapping_size, prefix_size)

let report_old_file_sizes ~root ~generation stats =
let open Result_syntax in
let+ mapping_size, prefix_size = prefix_file_sizes ~root ~generation in
stats := Gc_stats.Worker.add_file_size !stats "old_prefix" prefix_size;
stats := Gc_stats.Worker.add_file_size !stats "old_mapping" mapping_size

let report_new_file_sizes ~root ~generation stats =
let open Result_syntax in
let+ mapping_size, prefix_size = prefix_file_sizes ~root ~generation in
stats := Gc_stats.Worker.add_file_size !stats "prefix" prefix_size;
stats := Gc_stats.Worker.add_file_size !stats "mapping" mapping_size

type suffix_params = {
start_offset : int63;
chunk_start_idx : int;
Expand Down Expand Up @@ -193,23 +202,18 @@ module Make (Args : Gc_args.S) = struct
| Some commit -> commit
in

(* Step 3. Create the new mapping. *)
let mapping =
(* Step 3.1 Start [Mapping_file] routine which will create the
reachable file. *)
(* Step 3. Compute the list of [offset, length] ranges of live objects
reachable from the GC commit. *)
let live_entries =
stats := Gc_stats.Worker.finish_current_step !stats "mapping: start";
(fun f ->
let report_mapping_size size =
stats := Gc_stats.Worker.add_file_size !stats "mapping" size
in
let path =
Irmin_pack.Layout.V3.mapping ~generation ~root:new_files_path
in
Mapping_file.create ~report_mapping_size ~path ~register_entries:f ()
|> Errs.raise_if_error)
@@ fun ~register_entry ->
(* Step 3.1 The compact list of reachable [offset, length] *)
let live_entries = ref Live.empty in
let register_entry ~off ~len =
stats := Gc_stats.Worker.incr_objects_traversed !stats;
live_entries := Live.add ~off ~len !live_entries
in
(* Step 3.2 If the commit parents are referenced by offset, then include
the commit parents in the reachable file. The parent(s) of [commit_key]
the commit parents in the reachable list. The parent(s) of [commit_key]
must be included in the iteration because, when decoding the
[Commit_value.t] at [commit_key], the parents will have to be read in
order to produce a key for them. If the parent is referenced by hash,
Expand All @@ -218,82 +222,66 @@ module Make (Args : Gc_args.S) = struct
stats :=
Gc_stats.Worker.finish_current_step !stats
"mapping: commits to reachable";
let register_entry ~off ~len =
stats := Gc_stats.Worker.incr_objects_traversed !stats;
register_entry ~off ~len
in

(* Step 3.3 Put the commit in the reachable file. *)
let off = Node_store.get_offset node_store commit_key in
let len = Node_store.get_length node_store commit_key in
register_entry ~off ~len;

(* Step 3.4 Put the nodes and contents in the reachable file. *)
(* Step 3.4 Put the nodes and contents in the reachable list. *)
stats :=
Gc_stats.Worker.finish_current_step !stats
"mapping: objects to reachable";
iter_reachable commit node_store ~f:register_entry;

(* Step 3.5 Return and let the [Mapping_file] routine create the mapping
file. *)
(* Step 3.5 Return *)
stats :=
Gc_stats.Worker.finish_current_step !stats "mapping: of reachable";
()
!live_entries
in

let () =
(* Step 4. Create the new prefix. *)
stats := Gc_stats.Worker.finish_current_step !stats "prefix: start";
let prefix =
let path =
Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation
in
Ao.create_rw_exn ~path
let mapping =
Irmin_pack.Layout.V4.mapping ~root:new_files_path ~generation
in
let data = Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation in
let () =
Errors.finalise_exn (fun _outcome ->
stats :=
Gc_stats.Worker.add_file_size !stats "prefix" (Ao.end_poff prefix);
Ao.close prefix |> Errs.log_if_error "GC: Close prefix")
@@ fun () ->
let prefix = Sparse.Ao.create ~mapping ~data |> Errs.raise_if_error in
(* Step 5. Transfer to the new prefix, flush and close. *)
[%log.debug "GC: transfering to the new prefix"];
stats := Gc_stats.Worker.finish_current_step !stats "prefix: transfer";
Errors.finalise_exn (fun _outcome ->
Sparse.Ao.flush prefix
>>= (fun _ -> Sparse.Ao.close prefix)
|> Errs.log_if_error "GC: Close prefix after data copy")
@@ fun () ->
(* Step 5.1. Transfer all. *)
let append_exn = Ao.append_exn prefix in
let f ~off ~len =
let len = Int63.of_int len in
Dispatcher.read_bytes_exn dispatcher ~f:append_exn ~off ~len
in
let () = Mapping_file.iter_exn mapping f in
Ao.flush prefix |> Errs.raise_if_error
List.iter
(fun (off, len) ->
let len = Int63.of_int len in
let str = Dispatcher.read_seq_exn dispatcher ~off ~len in
Sparse.Ao.append_seq_exn prefix ~off str)
live_entries
in
(* Step 5.2. Transfer again the parent commits but with a modified
magic. Reopen the new prefix, this time _not_ in append-only
as we have to modify data inside the file. *)
(* Step 5.2. Update the parent commits to be dangling.
Reopen the new prefix, this time in write-only as we have to modify data inside the file. *)
stats :=
Gc_stats.Worker.finish_current_step !stats
"prefix: rewrite commit parents";
let read_exn ~off ~len buf =
Dispatcher.read_exn dispatcher ~off ~len buf
in
let prefix_data =
let path =
Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation
in
Io.open_ ~path ~readonly:false |> Errs.raise_if_error
in
let prefix = Sparse.v ~mapping ~data:prefix_data in
let prefix = Sparse.Wo.open_wo ~mapping ~data |> Errs.raise_if_error in
Errors.finalise_exn (fun _outcome ->
Sparse.fsync prefix
>>= (fun _ -> Sparse.close prefix)
Sparse.Wo.fsync prefix
>>= (fun _ -> Sparse.Wo.close prefix)
|> Errs.log_if_error "GC: Close prefix after parent rewrite")
@@ fun () ->
let write_exn = Sparse.write_exn prefix in
let write_exn = Sparse.Wo.write_exn prefix in
List.iter
(fun key -> transfer_parent_commit_exn ~read_exn ~write_exn key)
(fun key -> transfer_parent_commit_exn ~write_exn key)
(Commit_value.parents commit)
in
let () = report_new_file_sizes ~root ~generation stats |> ignore in

(* Step 6. Calculate post-GC suffix parameters. *)
let suffix_params, removable_chunk_idxs =
Expand Down
Loading

0 comments on commit 4a5e92c

Please sign in to comment.