forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SHS-NG M4.8: Add a caching layer to the status store when writing. #14
Closed
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
vanzin
force-pushed
the
shs-ng/M4.8
branch
3 times, most recently
from
April 25, 2017 17:43
ef9cd35
to
13af8e4
Compare
Turns out I ended up not using the raw storage methods in KVStore, so this change removes them to simplify the API and save some code.
The application listing is still generated from event logs, but is now stored in LevelDB. No data (except for the internal LevelDB pages) is kept in memory. The actual app UIs are, as of now, still untouched. The provider stores things internally using the public REST API types; I believe this is better going forward since it will make it easier to get rid of the internal history server API which is mostly redundant at this point. I also added a finalizer to LevelDBIterator, to make sure that resources are eventually released. This helps when code iterates but does not exhaust the iterator, thus not triggering the auto-close code. HistoryServerSuite was modified to not re-start the history server unnecessarily; this makes the json validation tests run more quickly.
The initial listener is based on the existing JobProgressListener, and tries to mimic its behavior as much as possible. The change also includes some minor code movement so that some types and methods from the initial history provider code can be reused. Note the code here is not 100% correct. This is meant as a building ground for the UI integration in the next milestone. As different parts of the UI are ported, fixes will be made to the different parts of this code to account for the needed behavior. I also added annotations to API types so that Jackson is able to correctly deserialize options, sequences and maps that store primitive types.
This change adds some building blocks for hooking up the new data store to the UI. This is achieved by returning a new SparkUI implementation when using the new KVStoreProvider; this new UI does not currently contain any data for the old UI / API endpoints; that will be implemented in M4. The interaction between the UI and the underlying store was isolated in a new AppStateStore class. The M4 code will call into this class to retrieve data to populate the UI and API. Some new indexed fields had to be added to the stored types so that the code could efficiently process the API requests. On the history server side, some changes were made in how the UI is used. Because there's state kept on disk, the code needs to be more careful about closing those resources when the UIs are unloaded; and because of that some locking needs to exist to make sure it's OK to move files around. The app cache was also simplified a bit; it just checks a flag in the UI instance to check whether it should be used, and tries to re-load it when the FS listing code invalidates a loaded UI.
This change moves the environment page to the new UI module, making modifications so that its data is stored in and retrieved from the new store. Some temporary internal API changes were made to maintain the current code working when using the new UI implementation. These may be cleaned up when the old UI code is cleaned up and only the new implementation remains.
The executors page is built on top of the REST API, so the page itself was easy to hook up to the new code. Some other pages depend on the `ExecutorListener` class that is being removed, though, so they needed to be modified to use data from the new store. Fortunately, all they seemed to need is the map of executor logs, so that was somewhat easy too. The executor timeline graph required some extra code to save the executor-related events in the UI store. This just implements the existing functionality, without making any changes related to efficiency or scalability of that graph. I had to change some of the test golden files because the old code would return executors in "random" order (since it used a mutable Map instead of something that returns a sorted list), and the new code returns executors in id order. The static files are still kept in the core/ resources directory since Jetty's default servlet does not handle fetching static files from multiple jars. TODO: add unit tests for the new ExecutorSummary fields being populated.
This required adding information about StreamBlockId to the UI store, which is not available yet via the API. So an internal type was added until there's a need to expose that information in the API. The UI only lists RDDs that have cached partitions, and that information wasn't being correctly captured in UIListener, so that's also fixed, along with some minor (internal) API adjustments so that the UI can get the correct data.
This change is a little larger because there's a whole lot of logic behind these pages, all really tied to internal types and listeners. There's also a lot of code that was moved to the new module. - Added missing StageData and ExecutorStageSummary fields which are used by the UI. Some json golden files needed to be updated to account for new fields. - Save RDD graph data in the store. This tries to re-use existing types as much as possible, so that the code doesn't need to be re-written. So it's probably not very optimal. - Some old classes (e.g. JobProgressListener) still remain, since they're used in other parts of the code; they're not used by the UI anymore, though, and will be cleaned up in a separate change. - Save information about active pools in the disk store; this could potentially be avoided, since it's most probably not much data, but it makes it easier later to add this kind of information to the API and to history if wanted. - Because the new store sorts things slightly differently from the previous code, some json golden files had some elements within them shuffled around. - The retention unit test in UISeleniumSuite was disabled because the code to throw away old stages / tasks hasn't been added yet. It's less of a problem with the new store since it doesn't use memory, but it will be added later to avoid a similar issue with unbound disk space usage. - The job description field in the API tries to follow the old behavior, which makes it be empty most of the time, even though there's information to fill it in. For stages, a new field was added to hold the description (which is basically the job description), so that the UI can be rendered in the old way. - A new stage status ("SKIPPED") was added to account for the fact that the API couldn't represent that state before. Because of the way the new code tracks stages, they would end up showing up as "PENDING" in the UI. TODO: add UIListener unit tests for the new fields.
With the new UI store, the API resource classes have a lot less code, since there's no need for complicated translations between the UI types and the API types. So the code ended up with a bunch of files with a single method declared in them. This change re-structures the API code so that it uses less classes; mainly, most sub-resources were removed, and the code to deal with single-attempt and multi-attempt apps was simplified. The only change was the addition of a method to return a single attempt's information; that was missing in the old API, so trying to retrieve "/v1/applications/appId/attemptId" would result in a 404 even if the attempt existed (and URIs under that one would return valid data). The streaming API resources also overtook the same treatment; the streaming backend is still not hooked up to the store, but once it is, the code in the remaining classes will be simplified even further.
This makes it easier for callers to control the end of iteration, making it easier to write Scala code that automatically closes underlying iterator resources. Before, code had to use Scala's "takeWhile", convert the result to a list, and manually close the iterators; with these two parameters, that can be avoided in a bunch of cases, with iterators auto-closing when the last element is reached.
The only remaining use of this class was the SparkStatusTracker, which was modified to use the new state store. The test code to wait for executors was moved to TestUtils and now uses the SparkStatusTracker API. As part of this change I also modified the streaming UI to read the needed data from the store, which was missed in the previous patch that made JobProgressListener redundant.
To avoid having the writes to the status store stall the listener bus thread, this change adds a caching layer that also does writes asynchronously. The write layer avoids high memory usage by de-duplicating writes; this is done by "overwriting" previous write operations to the same key that haven't yet been written to the underlying store. The read path is kept up to date with these operations, so that the listener that is doing the writes gets a consistent view of the store. Note the caching only works for single reads (i.e. not iteration). Because of that, the status listener needs to keep track of some extra information in memory so that it doesn't need to rely on data being written to the underlying store for handling certain events. On top of those, there's a lot of work in this change to keep track of certain metrics that can help us measure how well the cache is working and whether it needs some tuning. Tested with pretty large log files with lots of tasks and verified that the write queue doesn't grow too much.
vanzin
force-pushed
the
shs-ng/M4.7
branch
2 times, most recently
from
May 5, 2017 22:56
f6f28c7
to
7b4cb08
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
To avoid having the writes to the status store stall the listener bus
thread, this change adds a caching layer that also does writes
asynchronously.
The write layer avoids high memory usage by de-duplicating writes;
this is done by "overwriting" previous write operations to the same
key that haven't yet been written to the underlying store. The read
path is kept up to date with these operations, so that the listener
that is doing the writes gets a consistent view of the store.
Note the caching only works for single reads (i.e. not iteration).
Because of that, the status listener needs to keep track of some
extra information in memory so that it doesn't need to rely on
data being written to the underlying store for handling certain
events.
On top of those, there's a lot of work in this change to keep track
of certain metrics that can help us measure how well the cache is
working and whether it needs some tuning.
Tested with pretty large log files with lots of tasks and verified
that the write queue doesn't grow too much.