Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ray] Consistent Crashes w/ Actor-Handle-Based Implementation of Parquet File Compactor #8687

Closed
2 tasks
pdames opened this issue May 31, 2020 · 6 comments · Fixed by #8756
Closed
2 tasks
Labels
bug Something that is supposed to be working; but isn't needs-repro-script Issue needs a runnable script to be reproduced P1 Issue that should be fixed within a few weeks

Comments

@pdames
Copy link
Member

pdames commented May 31, 2020

What is the problem?

I’ve been running a task-based compactor against production Parquet datasets successfully and stably for the last few days, but just had my first crash with an actor-handle-based compactor running against a relatively small ~7GB parquet input dataset (divided into 20MB chunks).

The original pure-task-based compactor launches one distributed task per input delta in a table’s stream, takes a hash of the primary key modulo a desired number of buckets to group “like” primary keys together, saves each parallel task’s bucket groupings into distinct S3 files, and then "compacts" (i.e. sorts by sort key columns and dedupes by primary keys) these like-groupings by creating one parallel compaction task per hash bucket index.

The actor-based compactor I was just testing tries to reduce the number of small hash bucket grouping files by passing a list of hash bucket actor handles into the task - in this case 75 for 75 hash buckets. Basically this:

@ray.remote
class HashBucket:
    def __init__(self, ):
        self.dataframes = list()
    def append(self, dataframe):
        self.dataframes.append(dataframe)
    def write(self, file_path):
        output = next(iter(self.dataframes)) if len(self.dataframes) == 1 \
            else pd.concat(self.dataframes, axis=0, copy=False)
        output.to_parquet(file_path)

So, whereas each parallel task would previously just write multiple small dataframe parquet file to S3 for the same bucket, they now call hashBucket.append.remote(dataframe).

This step went just fine.

Once all appends have completed, I iterate through the completed hash buckets to produce 1 output file in S3 per bucket, like so:

 write_tasks_pending = []
    for hash_bucket_index in all_hash_bucket_indices:
        file_path = #[...]
        write_task_promise = hb_actors[hash_bucket_index].write.remote(file_path)
        write_tasks_pending.append(write_task_promise)
    while len(write_tasks_pending):
        write_task_complete, write_tasks_pending = ray.wait(write_tasks_pending)

The failure occurred here - it wrote several outputs successfully, then crashed with the following stack trace (while calling hb_actors[hash_bucket_index].write.remote(file_path)):
simple_compactor_actor_stack_trace.txt

Subsequent ray exec attempts then fail to connect to the cluster:

2020-05-27 23:24:06,509	INFO updater.py:201 -- NodeUpdater: i-09bf37eaf088eef3d: Waiting for IP...
2020-05-27 23:24:06,510	INFO log_timer.py:17 -- NodeUpdater: i-09bf37eaf088eef3d: Got IP [LogTimer=571ms]
2020-05-27 23:24:06,510	INFO updater.py:257 -- NodeUpdater: i-09bf37eaf088eef3d: Running python ./simple_compactor_actors.py “compactor” “compactor” “testTableStreamId” ‘[“customer_id”,“session_id”]’ ‘[“dw_insert_date”]’ 1_000_000 75 on 54.84.223.16...
E0528 06:24:29.959751 24475 24475 raylet_client.cc:69] Retrying to connect to socket for pathname /tmp/ray/session_2020-05-27_14-47-54_876303_4354/sockets/raylet (num_attempts = 1, num_retries = 5)
E0528 06:24:30.459939 24475 24475 raylet_client.cc:69] Retrying to connect to socket for pathname /tmp/ray/session_2020-05-27_14-47-54_876303_4354/sockets/raylet (num_attempts = 2, num_retries = 5)
E0528 06:24:30.960027 24475 24475 raylet_client.cc:69] Retrying to connect to socket for pathname /tmp/ray/session_2020-05-27_14-47-54_876303_4354/sockets/raylet (num_attempts = 3, num_retries = 5)
E0528 06:24:31.460111 24475 24475 raylet_client.cc:69] Retrying to connect to socket for pathname /tmp/ray/session_2020-05-27_14-47-54_876303_4354/sockets/raylet (num_attempts = 4, num_retries = 5)
F0528 06:24:31.960207 24475 24475 raylet_client.cc:78] Could not connect to socket /tmp/ray/session_2020-05-27_14-47-54_876303_4354/sockets/raylet
*** Check failure stack trace: ***
  @   0x7f2cd19a6afd google::LogMessage::Fail()
  @   0x7f2cd19a7f6c google::LogMessage::SendToLog()
  @   0x7f2cd19a67d9 google::LogMessage::Flush()
  @   0x7f2cd19a69f1 google::LogMessage::~LogMessage()
  @   0x7f2cd1726439 ray::RayLog::~RayLog()
  @   0x7f2cd15010cd ray::raylet::RayletConnection::RayletConnection()
  @   0x7f2cd1501a5c ray::raylet::RayletClient::RayletClient()
  @   0x7f2cd149dc37 ray::CoreWorker::CoreWorker()
  @   0x7f2cd14a1ac4 ray::CoreWorkerProcess::CreateWorker()
  @   0x7f2cd14a204f ray::CoreWorkerProcess::CoreWorkerProcess()
  @   0x7f2cd14a269b ray::CoreWorkerProcess::Initialize()
  @   0x7f2cd140d064 __pyx_pw_3ray_7_raylet_10CoreWorker_1__cinit__()
  @   0x7f2cd140e145 __pyx_tp_new_3ray_7_raylet_CoreWorker()
  @   0x55eb57a1a725 type_call
  @   0x55eb5799338b _PyObject_FastCallDict
  @   0x55eb57a1a58e call_function
  @   0x55eb57a3ea1a _PyEval_EvalFrameDefault
  @   0x55eb57a13b3e _PyEval_EvalCodeWithName
  @   0x55eb57a146b1 fast_function
  @   0x55eb57a1a515 call_function
  @   0x55eb57a3f734 _PyEval_EvalFrameDefault
  @   0x55eb57a13814 _PyEval_EvalCodeWithName
  @   0x55eb57a146b1 fast_function
  @   0x55eb57a1a515 call_function
  @   0x55eb57a3f734 _PyEval_EvalFrameDefault
  @   0x55eb57a151c9 PyEval_EvalCodeEx
  @   0x55eb57a15f5c PyEval_EvalCode
  @   0x55eb57a985e4 run_mod
  @   0x55eb57a989e1 PyRun_FileExFlags
  @   0x55eb57a98be4 PyRun_SimpleFileExFlags
  @   0x55eb57a9c6c4 Py_Main
  @   0x55eb5796452e main
Shared connection to 54.84.223.16 closed.
Error: Command failed:

Ray version and other system information (Python version, TensorFlow version, OS):
Python 3.6
99cc2e2
4 r5n-8xlarge instances in us-east-1 w/ ami-0dbb717f493016a1a (Deep Learning AMI, Ubuntu 18.04, Version 27.0)

Reproduction (REQUIRED)

Reproduction relies on the files available at #8707.

  1. Create an S3 bucket like "compactor-8687"
  2. Create an "input" S3 folder within this bucket, and place 7GB+ of Parquet file input files (w/ 20MB per file to reproduce the original scenario). Ensure that each input file follows the naming convention: {tableStreamId}-{eventTimeInt}.[parq|parquet]. For example, testTableStreamId-0.parq" is a valid input file name for table steam ID "testTableStreamId" and event time "0".
  3. Stand up a new Ray cluster from https://github.com/ray-project/ray/blob/2d167062ccd64fc8c65a2a1be75cbe6a675fb978/doc/examples/compactor/compactor.yaml, sync https://github.com/ray-project/ray/blob/2d167062ccd64fc8c65a2a1be75cbe6a675fb978/doc/examples/compactor/compactor_actors_example.py, and run it with a command of the following form:
ray exec ./compactor.yaml 'python ./compactor_actors_example.py "compactor-8687" "compactor-8687" "testTableStreamId" --primary-keys "primary_key_column_1" "primary_key_column_2" --sort-keys "sort_key_column_1" --records-per-output-file 1_000_000 --hash-bucket-count 75'
  1. Verify that the above failure occurs while attempting to write Parquet output files with the HashBucket actor.
  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.
@pdames pdames added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels May 31, 2020
@ericl ericl added P1 Issue that should be fixed within a few weeks needs-repro-script Issue needs a runnable script to be reproduced and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels May 31, 2020
@ericl
Copy link
Contributor

ericl commented May 31, 2020

Is it possible to create a simplified reproduction script you can share?

@pdames
Copy link
Member Author

pdames commented Jun 1, 2020

@ericl
Copy link
Contributor

ericl commented Jun 1, 2020

Can you mock out the data/etc dependencies so the file can be run without any additional setup?

@rkooo567
Copy link
Contributor

rkooo567 commented Jun 6, 2020

@pdames we merged a possible fix. Is it possible for you to try again with the latest master?

@rkooo567 rkooo567 reopened this Jun 6, 2020
@pdames
Copy link
Member Author

pdames commented Jun 9, 2020

@rkooo567 Fix verified on the latest build from master - 10/10 job runs succeeded using the same source dataset and cluster config that consistently crashed before. Thanks!

@pdames pdames closed this as completed Jun 9, 2020
@rkooo567
Copy link
Contributor

rkooo567 commented Jun 9, 2020

@pdames Happy to hear that!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't needs-repro-script Issue needs a runnable script to be reproduced P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants