From 7e85e83eef53c93c335d3b99181fef6a60612561 Mon Sep 17 00:00:00 2001 From: ArthurW Date: Wed, 15 Feb 2023 13:51:47 +0100 Subject: [PATCH] irmin-pack: sparse file, add append operation --- src/irmin-pack/unix/dispatcher.ml | 48 +++-- src/irmin-pack/unix/dispatcher_intf.ml | 2 + src/irmin-pack/unix/file_manager.ml | 2 - src/irmin-pack/unix/gc_worker.ml | 144 ++++++------- src/irmin-pack/unix/sparse_file.ml | 264 +++++++++--------------- src/irmin-pack/unix/sparse_file_intf.ml | 123 +++++++---- test/irmin-pack/test_mapping.ml | 65 +++--- 7 files changed, 302 insertions(+), 346 deletions(-) diff --git a/src/irmin-pack/unix/dispatcher.ml b/src/irmin-pack/unix/dispatcher.ml index 59b2718097..1968c7060e 100644 --- a/src/irmin-pack/unix/dispatcher.ml +++ b/src/irmin-pack/unix/dispatcher.ml @@ -97,7 +97,7 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) : Suffix.read_range_exn (Fm.suffix t.fm) ~off ~min_len ~max_len buf | None -> Sparse.read_range_exn (get_prefix t) ~off ~min_len ~max_len buf - let read_bytes_exn t ~f ~off ~len = + let read_seq_exn t ~off ~len = let bytes_in_prefix = let open Int63.Syntax in let prefix_bytes_after_off = suffix_start_offset t - off in @@ -112,28 +112,38 @@ module Make (Fm : File_manager.S with module Io = Io.Unix) : let len = Int63.to_int len in let max_read_size = min 8192 len in let buffer = Bytes.create max_read_size in - let rec aux read_exn ~off ~len = - if len = 0 then () + let rec aux read_exn ~off ~len () = + if len <= 0 then Seq.Nil else let read_len = min len max_read_size in read_exn ~off ~len:read_len buffer; - f (Bytes.sub_string buffer 0 read_len); - aux read_exn - ~off:Int63.Syntax.(off + Int63.of_int read_len) - ~len:(len - read_len) + Seq.Cons + ( Bytes.sub_string buffer 0 read_len, + aux read_exn + ~off:Int63.Syntax.(off + Int63.of_int read_len) + ~len:(len - read_len) ) in - if bytes_in_prefix > Int63.zero then - aux - (Sparse.read_exn (get_prefix t)) - ~off - ~len:(Int63.to_int bytes_in_prefix); - if bytes_in_suffix > Int63.zero then - let off = Int63.Syntax.(off + bytes_in_prefix) in - let off = soff_of_offset t off in - aux - (Suffix.read_exn (get_suffix t)) - ~off - ~len:(Int63.to_int bytes_in_suffix) + let prefix = + if bytes_in_prefix <= Int63.zero then Seq.empty + else + aux + (Sparse.read_exn (get_prefix t)) + ~off + ~len:(Int63.to_int bytes_in_prefix) + in + let suffix = + if bytes_in_suffix <= Int63.zero then Seq.empty + else + let off = Int63.Syntax.(off + bytes_in_prefix) in + let off = soff_of_offset t off in + aux + (Suffix.read_exn (get_suffix t)) + ~off + ~len:(Int63.to_int bytes_in_suffix) + in + Seq.append prefix suffix + + let read_bytes_exn t ~f ~off ~len = Seq.iter f (read_seq_exn t ~off ~len) let next_valid_offset t ~off = let open Int63.Syntax in diff --git a/src/irmin-pack/unix/dispatcher_intf.ml b/src/irmin-pack/unix/dispatcher_intf.ml index 408c506214..97bbc0fc24 100644 --- a/src/irmin-pack/unix/dispatcher_intf.ml +++ b/src/irmin-pack/unix/dispatcher_intf.ml @@ -49,6 +49,8 @@ module type S = sig (** [soff_of_offset t global_offset] converts a global offset to a suffix offset. *) + val read_seq_exn : t -> off:int63 -> len:int63 -> string Seq.t + val read_bytes_exn : t -> f:(string -> unit) -> off:int63 -> len:int63 -> unit (** [read_bytes_exn] reads a slice of the global offset space defined by [off] and [len]. diff --git a/src/irmin-pack/unix/file_manager.ml b/src/irmin-pack/unix/file_manager.ml index 51425a1b80..952da9dce4 100644 --- a/src/irmin-pack/unix/file_manager.ml +++ b/src/irmin-pack/unix/file_manager.ml @@ -32,7 +32,6 @@ struct module Dict = Append_only_file.Make (Io) (Errs) module Suffix = Chunked_suffix.Make (Io) (Errs) module Sparse = Sparse_file.Make (Io) - module Mapping_file = Sparse.Mapping_file module Lower = Lower.Make (Io) (Errs) type after_reload_consumer = { after_reload : unit -> (unit, Errs.t) result } @@ -200,7 +199,6 @@ struct let* () = Dict.fsync t.dict in let* () = Suffix.fsync t.suffix in let* () = Control.fsync t.control in - let* () = Option.might Sparse.fsync t.prefix in Index.flush ~with_fsync:true t.index (* Constructors *********************************************************** *) diff --git a/src/irmin-pack/unix/gc_worker.ml b/src/irmin-pack/unix/gc_worker.ml index f9d16a15c9..db08e3f429 100644 --- a/src/irmin-pack/unix/gc_worker.ml +++ b/src/irmin-pack/unix/gc_worker.ml @@ -23,18 +23,21 @@ module Make (Args : Gc_args.S) = struct open Args module Io = Fm.Io module Sparse = Dispatcher.Fm.Sparse - module Mapping_file = Sparse.Mapping_file + module Ao = Append_only_file.Make (Fm.Io) (Errs) - module Ao = struct - include Append_only_file.Make (Fm.Io) (Errs) + let string_of_key = Irmin.Type.to_string key_t - let create_rw_exn ~path = - create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000 - ~auto_flush_procedure:`Internal - |> Errs.raise_if_error - end + module Live = struct + type t = (int63 * int) list - let string_of_key = Irmin.Type.to_string key_t + let empty = [] + + let add ~off ~len (t : t) : t = + let off_end = Int63.(Syntax.(off + of_int len)) in + match t with + | (off', len') :: rest when off_end = off' -> (off, len + len') :: rest + | _ -> (off, len) :: t + end module Priority_queue = struct module Offset_rev = struct @@ -111,26 +114,21 @@ module Make (Args : Gc_args.S) = struct disk in order to correctly deserialised the gced commit. *) let magic_parent = Pack_value.Kind.to_magic Pack_value.Kind.Dangling_parent_commit + |> String.make 1 (* Transfer the commit with a different magic. Note that this is modifying existing written data. *) - let transfer_parent_commit_exn ~read_exn ~write_exn key = + let transfer_parent_commit_exn ~write_exn key = match Pack_key.inspect key with | Indexed _ -> (* Its possible that some parents are referenced by hash. *) () - | Direct { offset = off; length = len; _ } -> - let buffer = Bytes.create len in - read_exn ~off ~len buffer; - Bytes.set buffer Hash.hash_size magic_parent; - (* Bytes.unsafe_to_string usage: We assume read_exn returns unique - ownership of buffer to this function. Then at the call to - Bytes.unsafe_to_string we give up unique ownership to buffer (we do - not modify it thereafter) in return for ownership of the resulting - string, which we pass to write_exn. This usage is safe. *) - write_exn ~off ~len (Bytes.unsafe_to_string buffer) + | Direct { offset = off; _ } -> + (* Targeted write to change the parent commit kind to dangling. *) + let off = Int63.(Syntax.(off + of_int Hash.hash_size)) in + write_exn ~off ~len:1 magic_parent - let report_old_file_sizes ~root ~generation stats = + let prefix_file_sizes ~root ~generation = let open Result_syntax in let* prefix_size = if generation = 0 then Ok Int63.zero @@ -140,9 +138,20 @@ module Make (Args : Gc_args.S) = struct if generation = 0 then Ok Int63.zero else Irmin_pack.Layout.V3.mapping ~root ~generation |> Io.size_of_path in + (mapping_size, prefix_size) + + let report_old_file_sizes ~root ~generation stats = + let open Result_syntax in + let+ mapping_size, prefix_size = prefix_file_sizes ~root ~generation in stats := Gc_stats.Worker.add_file_size !stats "old_prefix" prefix_size; stats := Gc_stats.Worker.add_file_size !stats "old_mapping" mapping_size + let report_new_file_sizes ~root ~generation stats = + let open Result_syntax in + let+ mapping_size, prefix_size = prefix_file_sizes ~root ~generation in + stats := Gc_stats.Worker.add_file_size !stats "prefix" prefix_size; + stats := Gc_stats.Worker.add_file_size !stats "mapping" mapping_size + type suffix_params = { start_offset : int63; chunk_start_idx : int; @@ -193,23 +202,18 @@ module Make (Args : Gc_args.S) = struct | Some commit -> commit in - (* Step 3. Create the new mapping. *) - let mapping = - (* Step 3.1 Start [Mapping_file] routine which will create the - reachable file. *) + (* Step 3. Compute the list of [offset, length] ranges of live objects + reachable from the GC commit. *) + let live_entries = stats := Gc_stats.Worker.finish_current_step !stats "mapping: start"; - (fun f -> - let report_mapping_size size = - stats := Gc_stats.Worker.add_file_size !stats "mapping" size - in - let path = - Irmin_pack.Layout.V3.mapping ~generation ~root:new_files_path - in - Mapping_file.create ~report_mapping_size ~path ~register_entries:f () - |> Errs.raise_if_error) - @@ fun ~register_entry -> + (* Step 3.1 The compact list of reachable [offset, length] *) + let live_entries = ref Live.empty in + let register_entry ~off ~len = + stats := Gc_stats.Worker.incr_objects_traversed !stats; + live_entries := Live.add ~off ~len !live_entries + in (* Step 3.2 If the commit parents are referenced by offset, then include - the commit parents in the reachable file. The parent(s) of [commit_key] + the commit parents in the reachable list. The parent(s) of [commit_key] must be included in the iteration because, when decoding the [Commit_value.t] at [commit_key], the parents will have to be read in order to produce a key for them. If the parent is referenced by hash, @@ -218,82 +222,66 @@ module Make (Args : Gc_args.S) = struct stats := Gc_stats.Worker.finish_current_step !stats "mapping: commits to reachable"; - let register_entry ~off ~len = - stats := Gc_stats.Worker.incr_objects_traversed !stats; - register_entry ~off ~len - in (* Step 3.3 Put the commit in the reachable file. *) let off = Node_store.get_offset node_store commit_key in let len = Node_store.get_length node_store commit_key in register_entry ~off ~len; - (* Step 3.4 Put the nodes and contents in the reachable file. *) + (* Step 3.4 Put the nodes and contents in the reachable list. *) stats := Gc_stats.Worker.finish_current_step !stats "mapping: objects to reachable"; iter_reachable commit node_store ~f:register_entry; - (* Step 3.5 Return and let the [Mapping_file] routine create the mapping - file. *) + (* Step 3.5 Return *) stats := Gc_stats.Worker.finish_current_step !stats "mapping: of reachable"; - () + !live_entries in let () = (* Step 4. Create the new prefix. *) stats := Gc_stats.Worker.finish_current_step !stats "prefix: start"; - let prefix = - let path = - Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation - in - Ao.create_rw_exn ~path + let mapping = + Irmin_pack.Layout.V4.mapping ~root:new_files_path ~generation in + let data = Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation in let () = - Errors.finalise_exn (fun _outcome -> - stats := - Gc_stats.Worker.add_file_size !stats "prefix" (Ao.end_poff prefix); - Ao.close prefix |> Errs.log_if_error "GC: Close prefix") - @@ fun () -> + let prefix = Sparse.Ao.create ~mapping ~data |> Errs.raise_if_error in (* Step 5. Transfer to the new prefix, flush and close. *) [%log.debug "GC: transfering to the new prefix"]; stats := Gc_stats.Worker.finish_current_step !stats "prefix: transfer"; + Errors.finalise_exn (fun _outcome -> + Sparse.Ao.flush prefix + >>= (fun _ -> Sparse.Ao.close prefix) + |> Errs.log_if_error "GC: Close prefix after data copy") + @@ fun () -> (* Step 5.1. Transfer all. *) - let append_exn = Ao.append_exn prefix in - let f ~off ~len = - let len = Int63.of_int len in - Dispatcher.read_bytes_exn dispatcher ~f:append_exn ~off ~len - in - let () = Mapping_file.iter_exn mapping f in - Ao.flush prefix |> Errs.raise_if_error + List.iter + (fun (off, len) -> + let len = Int63.of_int len in + let str = Dispatcher.read_seq_exn dispatcher ~off ~len in + Sparse.Ao.append_seq_exn prefix ~off str) + live_entries in - (* Step 5.2. Transfer again the parent commits but with a modified - magic. Reopen the new prefix, this time _not_ in append-only - as we have to modify data inside the file. *) + (* Step 5.2. Update the parent commits to be dangling. + Reopen the new prefix, this time in write-only as we have to modify data inside the file. *) stats := Gc_stats.Worker.finish_current_step !stats "prefix: rewrite commit parents"; - let read_exn ~off ~len buf = - Dispatcher.read_exn dispatcher ~off ~len buf - in - let prefix_data = - let path = - Irmin_pack.Layout.V4.prefix ~root:new_files_path ~generation - in - Io.open_ ~path ~readonly:false |> Errs.raise_if_error - in - let prefix = Sparse.v ~mapping ~data:prefix_data in + let prefix = Sparse.Wo.open_wo ~mapping ~data |> Errs.raise_if_error in Errors.finalise_exn (fun _outcome -> - Sparse.fsync prefix - >>= (fun _ -> Sparse.close prefix) + Sparse.Wo.fsync prefix + >>= (fun _ -> Sparse.Wo.close prefix) |> Errs.log_if_error "GC: Close prefix after parent rewrite") @@ fun () -> - let write_exn = Sparse.write_exn prefix in + let write_exn = Sparse.Wo.write_exn prefix in List.iter - (fun key -> transfer_parent_commit_exn ~read_exn ~write_exn key) + (fun key -> transfer_parent_commit_exn ~write_exn key) (Commit_value.parents commit) in + let () = report_new_file_sizes ~root ~generation stats |> ignore in (* Step 6. Calculate post-GC suffix parameters. *) let suffix_params, removable_chunk_idxs = diff --git a/src/irmin-pack/unix/sparse_file.ml b/src/irmin-pack/unix/sparse_file.ml index 63d62e5ace..9040517b5d 100644 --- a/src/irmin-pack/unix/sparse_file.ml +++ b/src/irmin-pack/unix/sparse_file.ml @@ -18,41 +18,8 @@ open Import include Sparse_file_intf module BigArr1 = Bigarray.Array1 -type int_bigarray = (int, Bigarray.int_elt, Bigarray.c_layout) BigArr1.t type int64_bigarray = (int64, Bigarray.int64_elt, Bigarray.c_layout) BigArr1.t -module Int_mmap : sig - type t = private { - fn : string; - fd : Unix.file_descr; - mutable arr : int_bigarray; - } - - val open_rw : string -> t - val close : t -> unit -end = struct - type t = { fn : string; fd : Unix.file_descr; mutable arr : int_bigarray } - - (* NOTE following mmap is shared *) - - let open_rw fn = - let shared = true in - assert (Sys.file_exists fn); - let fd = Unix.(openfile fn [ O_RDWR ] 0o660) in - let arr = - let open Bigarray in - Unix.map_file fd Int c_layout shared [| -1 |] |> array1_of_genarray - in - { fn; fd; arr } - - let close t = - Unix.close t.fd; - (* following tries to make the array unreachable, so GC'able; however, no guarantee - that arr actually is unreachable *) - t.arr <- Bigarray.(Array1.create Int c_layout 0); - () -end - module Int64_mmap : sig type t = private { fn : string; @@ -91,44 +58,8 @@ end module Make (Io : Io.S) = struct module Io = Io module Errs = Io_errors.Make (Io) - module Ao = Append_only_file.Make (Io) (Errs) module Mapping_file = struct - (** The mapping file is created from a decreasing list of - [(virtual_offset, 0, length)]. We first need to reverse it such that - virtual offsets are in increasing order. *) - let rev_inplace (src : int_bigarray) : unit = - let src_sz = BigArr1.dim src in - let _ = - assert (src_sz >= 3); - assert (src_sz mod 3 = 0) - in - let rec rev i j = - if i < j then ( - let ioff, ilen = (src.{i}, src.{i + 2}) in - let joff, jlen = (src.{j}, src.{j + 2}) in - src.{i} <- joff; - src.{i + 2} <- jlen; - src.{j} <- ioff; - src.{j + 2} <- ilen; - rev (i + 3) (j - 3)) - in - rev 0 (src_sz - 3) - - (** We then replace the [0] component of the triplets with the accumulated - length. This yields triplets - [(virtual_offset, physical_offset, length)], which will allow us to map - virtual offsets to their physical location in the prefix file. *) - let set_prefix_offsets src = - let src_sz = BigArr1.dim src in - let rec go i poff = - if i < src_sz then ( - src.{i + 1} <- poff; - let len = src.{i + 2} in - go (i + 3) (poff + len)) - in - go 0 0 - type t = { arr : int64_bigarray; path : string } let open_map ~path = @@ -148,85 +79,6 @@ module Make (Io : Io.S) = struct ^ ": mapping mmap size did not meet size requirements"))) | _ -> Error (`No_such_file_or_directory path) - let create ?report_mapping_size ~path ~register_entries () = - let open Result_syntax in - let* () = - if Sys.word_size <> 64 then Error `Gc_forbidden_on_32bit_platforms - else Ok () - in - - (* Unlink residual and ignore errors (typically no such file) *) - Io.unlink path |> ignore; - - (* Create [file] *) - let* file = - Ao.create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000 - ~auto_flush_procedure:`Internal - in - - (* Fill and close [file] *) - let append_entry ~off ~len = - (* Write [off, 0, len] in native-endian encoding because it will be read - with mmap. The [0] reserves the space for the future prefix offset. *) - let buffer = Bytes.create 24 in - Bytes.set_int64_ne buffer 0 (Int63.to_int64 off); - Bytes.set_int64_ne buffer 8 Int64.zero; - Bytes.set_int64_ne buffer 16 (Int64.of_int len); - (* Bytes.unsafe_to_string usage: buffer is uniquely owned; we assume - Bytes.set_int64_ne returns unique ownership; we give up ownership of buffer in - conversion to string. This is safe. *) - Ao.append_exn file (Bytes.unsafe_to_string buffer) - in - (* Check if we can collapse consecutive entries *) - let current_entry = ref None in - let register_entry ~off ~len = - let current = - match !current_entry with - | None -> (off, len) - | Some (off', len') -> - if off >= off' then - invalid_arg - "register_entry: offsets are not strictly decreasing"; - let dist = Int63.to_int (Int63.sub off' off) in - if dist <= len then (off, dist + len') - else ( - append_entry ~off:off' ~len:len'; - (off, len)) - in - current_entry := Some current - in - let* () = - Errs.catch (fun () -> - register_entries ~register_entry; - (* Flush pending entry *) - match !current_entry with - | None -> () - | Some (off, len) -> append_entry ~off ~len) - in - let* () = Ao.flush file in - let* () = Ao.close file in - - (* Reopen [file] but as an mmap *) - let file = Int_mmap.open_rw path in - let* () = - Errs.catch (fun () -> - rev_inplace file.arr; - set_prefix_offsets file.arr) - in - - (* Flush and close new mapping [file] *) - let* () = Errs.catch (fun () -> Unix.fsync file.fd) in - Int_mmap.close file; - - let* mapping_size = Io.size_of_path path in - Option.iter (fun f -> f mapping_size) report_mapping_size; - - (* Open created map *) - open_map ~path - - let entry_count arr = BigArr1.dim arr / 3 - let entry_idx i = i * 3 - let conv_int64 : int64 -> int = fun i -> (if Sys.big_endian then ( @@ -247,6 +99,8 @@ module Make (Io : Io.S) = struct else i) |> Int64.to_int + let entry_count arr = BigArr1.dim arr / 3 + let entry_idx i = i * 3 let entry_off arr i = arr.{entry_idx i} |> conv_int64 |> Int63.of_int let entry_poff arr i = arr.{entry_idx i + 1} |> conv_int64 |> Int63.of_int let entry_len arr i = arr.{entry_idx i + 2} |> conv_int64 @@ -256,10 +110,7 @@ module Make (Io : Io.S) = struct f ~off:(entry_off arr i) ~len:(entry_len arr i) done - let iter t f = - Errs.catch (fun () -> - iter_exn t f; - ()) + let iter t f = Errs.catch (fun () -> iter_exn t f) type entry = { off : int63; poff : int63; len : int } @@ -284,18 +135,15 @@ module Make (Io : Io.S) = struct type t = { mapping : Mapping_file.t; data : Io.t } - let v ~mapping ~data = { mapping; data } - - let open_ro ~mapping ~data = + let open_ ~readonly ~mapping ~data = let open Result_syntax in let* mapping = Mapping_file.open_map ~path:mapping in - let+ data = Io.open_ ~path:data ~readonly:true in - v ~mapping ~data + let+ data = Io.open_ ~path:data ~readonly in + { mapping; data } - let get_mapping t = t.mapping - let get_data t = t.data + let open_ro ~mapping ~data = open_ ~readonly:true ~mapping ~data let close t = Io.close t.data - let fsync t = Io.fsync t.data + let iter t fn = Mapping_file.iter t.mapping fn let get_poff { mapping; _ } ~off = match Mapping_file.find_nearest_geq mapping off with @@ -324,15 +172,101 @@ module Make (Io : Io.S) = struct let len = min max_len max_entry_len in Io.read_exn t.data ~off:poff ~len buf - let write_exn t ~off ~len str = - let poff, max_entry_len = get_poff t ~off in - assert (len <= max_entry_len); - Io.write_exn t.data ~off:poff ~len str - let next_valid_offset { mapping; _ } ~off = match Mapping_file.find_nearest_geq mapping off with | None -> None | Some entry -> let open Int63.Syntax in Some (if entry.off < off then off else entry.off) + + module Wo = struct + type nonrec t = t + + let open_wo ~mapping ~data = open_ ~readonly:false ~mapping ~data + + let write_exn t ~off ~len str = + let poff, max_entry_len = get_poff t ~off in + assert (len <= max_entry_len); + Io.write_exn t.data ~off:poff ~len str + + let fsync t = Io.fsync t.data + let close = close + end + + module Ao = struct + module Ao = Append_only_file.Make (Io) (Errs) + + type t = { mapping : Ao.t; data : Ao.t; mutable end_off : Int63.t } + + let end_off t = t.end_off + + let create ~mapping ~data = + let open Result_syntax in + let ao_create path = + Ao.create_rw ~path ~overwrite:false ~auto_flush_threshold:1_000_000 + ~auto_flush_procedure:`Internal + in + let* mapping = ao_create mapping in + let+ data = ao_create data in + { mapping; data; end_off = Int63.zero } + + let open_ao ~mapping_size ~mapping ~data = + let open Result_syntax in + let ao_open ~end_poff path = + Ao.open_rw ~path ~end_poff ~dead_header_size:0 + ~auto_flush_threshold:1_000_000 ~auto_flush_procedure:`Internal + in + let* ao_mapping = ao_open ~end_poff:mapping_size mapping in + let* end_off, end_poff = + if mapping_size <= Int63.zero then Ok (Int63.zero, Int63.zero) + else + let entry_len = 3 * 8 in + let+ entry = + Ao.read_to_string ao_mapping + ~off:Int63.(Syntax.(mapping_size - of_int entry_len)) + ~len:entry_len + in + let entry = Bytes.of_string entry in + let end_off = Bytes.get_int64_le entry 0 |> Int63.of_int64 in + let end_poff = Bytes.get_int64_le entry 8 |> Int63.of_int64 in + let len = Bytes.get_int64_le entry 16 |> Int63.of_int64 in + let open Int63.Syntax in + (end_off + len, end_poff + len) + in + let+ ao_data = ao_open ~end_poff data in + { mapping = ao_mapping; data = ao_data; end_off } + + let check_offset_exn { end_off; _ } ~off = + if Int63.Syntax.(end_off > off) then + Fmt.failwith + "Sparse.Ao.append_exn at offset %a, smaller than latest offset %a" + Int63.pp off Int63.pp end_off + + let append_seq_exn t ~off seq = + check_offset_exn t ~off; + let poff = Ao.end_poff t.data in + let len = + Seq.fold_left + (fun len str -> + Ao.append_exn t.data str; + len + String.length str) + 0 seq + in + let buffer = Bytes.create 24 in + Bytes.set_int64_le buffer 0 (Int63.to_int64 off); + Bytes.set_int64_le buffer 8 (Int63.to_int64 poff); + Bytes.set_int64_le buffer 16 (Int64.of_int len); + Ao.append_exn t.mapping (Bytes.unsafe_to_string buffer); + t.end_off <- Int63.(Syntax.(off + of_int len)) + + let flush t = + let open Result_syntax in + let* () = Ao.flush t.data in + Ao.flush t.mapping + + let close t = + let open Result_syntax in + let* () = Ao.close t.data in + Ao.close t.mapping + end end diff --git a/src/irmin-pack/unix/sparse_file_intf.ml b/src/irmin-pack/unix/sparse_file_intf.ml index ceb558204b..ad5c0ef965 100644 --- a/src/irmin-pack/unix/sparse_file_intf.ml +++ b/src/irmin-pack/unix/sparse_file_intf.ml @@ -20,61 +20,102 @@ module type S = sig module Io : Io.S module Errs : Io_errors.S with module Io = Io - module Mapping_file : sig - type t - (** A mapping file is a collection of chunks which contain 3 integers. A - length, the global offset ([off]) of the chunk and the offset of the - chunk in the prefix file ([poff]). + type t + type open_error := [ Io.open_error | `Corrupted_mapping_file of string ] - The chunks have consecutive [poff] with respect to their lengths. + val open_ro : mapping:string -> data:string -> (t, [> open_error ]) result + (** [open_ro ~mapping ~data] returns a new read-only view of the sparse file, + represented on disk by two files named [mapping] and [data]. *) - There is no need to close a [t] because its underlying file-descriptor - is always closed. *) + val close : t -> (unit, [> Io.close_error ]) result + (** Close the underlying files. *) - val create : - ?report_mapping_size:(int63 -> unit) -> - path:string -> - register_entries:(register_entry:(off:int63 -> len:int -> unit) -> unit) -> - unit -> - (t, Errs.t) result - (** [create ~path ~register_entries] creates a mapping file named [path]. + val read_exn : t -> off:int63 -> len:int -> bytes -> unit + (** [read_exn t ~off ~len buffer] writes into [buffer] the bytes from [off] to + [off+len]. *) - [register_entries] is a user callback that is responsible for calling - [register_entry] for each live entry. It must be called with strictly - decreasing offsets (or fails otherwise). + val read_range_exn : + t -> off:int63 -> min_len:int -> max_len:int -> bytes -> unit + (** Same as [read_exn], the amount read is [max_len] if possible or at least + [min_len] if reading more would step over a hole in the sparse file. *) - Returns an error if the platform is not 64bits. *) + val next_valid_offset : t -> off:int63 -> int63 option + (** [next_valid_offset t ~off] returns [Some off'] such that [off'] is the + smallest readable offset larger or equal to [off]. This enables jumping + over a sparse hole to the next compact range of data. *) - val iter : t -> (off:int63 -> len:int -> unit) -> (unit, Errs.t) result - (** [iter mapping f] calls [f] on each [(off,len)] pair in [mapping]. + val iter : t -> (off:int63 -> len:int -> unit) -> (unit, Errs.t) result + (** [iter t f] calls [f] on each [(off,len)] pair in [mapping]. Only used for + testing and debugging. - It is guaranteed for the offsets to be iterated in monotonic order. + It is guaranteed for the offsets to be iterated in monotonic order. - It is guaranteed that entries don't overlap. + It is guaranteed that entries don't overlap. - The exceptions raised by [f] are caught and returned (as long as they - are known by [Errs]). *) + The exceptions raised by [f] are caught and returned (as long as they are + known by [Errs]). *) - val iter_exn : t -> (off:int63 -> len:int -> unit) -> unit - (** Similar to [iter mapping f] but raises exceptions. *) - end + module Wo : sig + type t - type t - type open_error := [ Io.open_error | `Corrupted_mapping_file of string ] + val open_wo : mapping:string -> data:string -> (t, [> open_error ]) result + (** [open_wo ~mapping ~data] returns a write-only instance of the sparse + file. - val open_ro : mapping:string -> data:string -> (t, [> open_error ]) result - val v : mapping:Mapping_file.t -> data:Io.t -> t - val get_mapping : t -> Mapping_file.t - val get_data : t -> Io.t - val close : t -> (unit, [> Io.close_error ]) result - val fsync : t -> (unit, [> Io.write_error ]) result - val read_exn : t -> off:int63 -> len:int -> bytes -> unit + Note: This is unsafe and is only used by the GC to mark the parent + commits as dangling. One must ensure that no read-only instance is + opened at the same time, as otherwise the writes would be observable by + it. *) - val read_range_exn : - t -> off:int63 -> min_len:int -> max_len:int -> bytes -> unit + val write_exn : t -> off:int63 -> len:int -> string -> unit + (** [write_exn t ~off ~len str] writes the first [len] bytes of [str] to [t] + at the virtual offset [off]. *) - val write_exn : t -> off:int63 -> len:int -> string -> unit - val next_valid_offset : t -> off:int63 -> int63 option + val fsync : t -> (unit, [> Io.write_error ]) result + (** [fsync t] persists to the file system the effects of previous writes. *) + + val close : t -> (unit, [> Io.close_error ]) result + (** Close the underlying files. *) + end + + module Ao : sig + type t + + val end_off : t -> Int63.t + (** [end_off t] returns the largest virtual offset contained in the sparse + file [t]. Attempting to append with a strictly smaller virtual offset + will fail. *) + + val create : + mapping:string -> data:string -> (t, [> Io.create_error ]) result + (** [create ~mapping ~data] initializes a new empty sparse file, represented + on disk by two files named [mapping] and [data]. *) + + val open_ao : + mapping_size:Int63.t -> + mapping:string -> + data:string -> + ( t, + [> Io.open_error + | `Closed + | `Invalid_argument + | `Read_out_of_bounds + | `Inconsistent_store ] ) + result + (** [open_ao ~mapping_size ~mapping ~data] returns an append-only instance + of the sparse file. *) + + val append_seq_exn : t -> off:int63 -> string Seq.t -> unit + (** [append_seq_exn t ~off seq] appends the sequence of strings [seq] to the + sparse file [t], at the virtual offset [off] which must be larger than + the previously appended offsets. *) + + val flush : t -> (unit, [> Io.write_error ]) result + (** Flush the append buffer. Does not call [fsync]. *) + + val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result + (** Close the underlying files. *) + end end module type Sigs = sig diff --git a/test/irmin-pack/test_mapping.ml b/test/irmin-pack/test_mapping.ml index e313207e37..fb01f262fe 100644 --- a/test/irmin-pack/test_mapping.ml +++ b/test/irmin-pack/test_mapping.ml @@ -19,57 +19,40 @@ module Int63 = Optint.Int63 module Io = Irmin_pack_unix.Io.Unix module Errs = Irmin_pack_unix.Io_errors.Make (Io) module Sparse_file = Irmin_pack_unix.Sparse_file.Make (Io) -module Mapping_file = Sparse_file.Mapping_file let test_dir = Filename.concat "_build" "test-pack-mapping" +let rec make_string_seq len () = + if len <= 0 then Seq.Nil + else + let quantity = min 8 len in + Seq.Cons (String.make quantity 'X', make_string_seq (len - quantity)) + (** Call the [Mapping_file] routines to process [pairs] *) let process_on_disk pairs = - let register_entries ~register_entry = - List.iter - (fun (off, len) -> - Format.printf "%i (+%i) => %i@." off len (off + len); - register_entry ~off:(Int63.of_int off) ~len) - pairs - in - let path = Irmin_pack.Layout.V3.mapping ~root:test_dir ~generation:1 in - let mapping = - Mapping_file.create ~path ~register_entries () |> Errs.raise_if_error - in + let mapping = Irmin_pack.Layout.V5.mapping ~root:test_dir ~generation:1 in + Io.unlink mapping |> ignore; + let data = Irmin_pack.Layout.V5.prefix ~root:test_dir ~generation:1 in + Io.unlink data |> ignore; + let sparse = Sparse_file.Ao.create ~mapping ~data |> Errs.raise_if_error in + List.iter + (fun (off, len) -> + Format.printf "%i (+%i) => %i@." off len (off + len); + let str = make_string_seq len in + let off = Int63.of_int off in + Sparse_file.Ao.append_seq_exn sparse ~off str) + (List.rev pairs); + Sparse_file.Ao.flush sparse |> Errs.raise_if_error; + Sparse_file.Ao.close sparse |> Errs.raise_if_error; + let sparse = Sparse_file.open_ro ~mapping ~data |> Errs.raise_if_error in let l = ref [] in let f ~off ~len = l := (Int63.to_int off, len) :: !l in - Mapping_file.iter mapping f |> Errs.raise_if_error; + Sparse_file.iter sparse f |> Errs.raise_if_error; + Sparse_file.close sparse |> Errs.raise_if_error; !l |> List.rev (** Emulate the behaviour of the [Mapping_file] routines to process [pairs] *) -let process_in_mem pairs = - let length_per_offset = - let tbl = Hashtbl.create 0 in - List.iter - (fun (off, len) -> - let len = - match Hashtbl.find_opt tbl off with - | Some len' when len' > len -> len' - | _ -> len - in - Hashtbl.replace tbl off len) - pairs; - tbl - in - (* [List.map] gets the offsets, [List.sort_uniq] sort/dedups them and - [List.fold_left] merges the contiguous ones. *) - List.map fst pairs - |> List.sort_uniq compare - |> List.fold_left - (fun acc off -> - let len = Hashtbl.find length_per_offset off in - match acc with - | [] -> [ (off, len) ] - | (off', len') :: _ when off' + len' > off -> assert false - | (off', len') :: tl when off' + len' = off -> (off', len' + len) :: tl - | acc -> (off, len) :: acc) - [] - |> List.rev +let process_in_mem pairs = List.rev pairs let test input_entries = let output_entries = process_on_disk input_entries in