-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add IoObjectStore
that uses main runtime for network requests
#248
base: main
Are you sure you want to change the base?
Add IoObjectStore
that uses main runtime for network requests
#248
Conversation
@alamb FYI |
src/object_store/io_object_store.rs
Outdated
async fn get(&self, location: &Path) -> Result<GetResult> { | ||
let location = location.clone(); | ||
let store = self.inner.clone(); | ||
spawn_io(async move { store.get(&location).await }).await | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will be following this approach for all ObjectStore
methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YES! This is very cool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect -- this is great @matthewmturner -- I was thinking about writing just such a thing to add to DataFusion. If you get it in here I very much plan to upstream it (will try and do so tomorrow)
Just want to add a test for this then plan to merge it |
I am making great progress on the example I als plan to yoink this code too |
@alamb do you envision having the dedicated executor eventually live in the main datafusion repo? If not (or even as an interim solution while its iterated on) I could imagine having a library for I.e. an example is great to help understand and see how this part of execution works - but exposing the functionality directly should make it easier to just pick up and use. |
I envision DedicatedExecutor and IoObjectStore being directly in DataFusion (I can't really write the example without them :) ) Stay tuned 📺 |
This is problematic, as the returned GetResult contains a stream that will then "migrate" across the runtime boundary. It is at best undocumented what the implications of this are. Other issues will be found with things like put_multipart and friends. There is some further context on here - apache/arrow-rs#4040 (comment) I would strongly encourage implementing the IO dispatch at an application boundary that makes, e.g. AsyncFileReader, the ObjectStore trait is not designed to allow it to be shimmed in this way. |
I don't fully understand this - perhaps we can discuss in the context of my example. I am pretty close to having something to show for discussion Specifically my vision is to have an example that shows how to run a DataFUsion plan with CPU on one threadpool and IO on another. |
Rather than wrapping the ObjectStore trait, you would wrap the calls within DF that interact with ObjectStore (or any other interface that performs IO), e.g. AsyncFileReader, the internal methods of ListingTable, etc... The problem arises because a number of the APIs exposed by ObjectStore are exposed as streams, in some cases, e.g. GetResult the behaviour of tokio may sort of work (apache/arrow-rs#4040 (comment)), but others like list will simply not work correctly - polling the stream will potentially initiate a new request on the calling runtime. Ultimately DF needs to separate IO based activities from CPU-based activities, and overloading this onto the ObjectStore interface won't work reliably. |
Thank you -- I have some ideas about this. Will keep working (I am thinking of some way to move the work of one stream to another runtime) |
@alamb perhaps this is relevant https://stackoverflow.com/a/78094264 (answer from Alice Rhyl) |
Or you could just spawn IO at a non-streaming application boundary... This is simpler and much easier to reason about... But I guess I am now repeating myself... Edit: TBC the TCPStream is deep in the guts of our HTTP client, there aren't APIs that would allow us to move it, nor would it make sense given the whole purpose is to keep IO off the DF threads |
thanks @tustvold, appreciate your insight on this. need to digest implications of this a bit more to figure out next steps. It's not immediately clear to me if interfaces like |
Parquet will likely need to be handled separately from CSV/JSON/Avro because of the very different IO pattern, but the IO for those three should be similar enough to allow sharing. It has been long time since I worked on it, but I at least remember extracting a common FileStream trait or something similar. I'd hope something similar would be possible for the write side. |
i am going to mark this as draft for the time being while these more fundamental issues are resolved. |
My hope/plan is to figure out a good way to properly separate IO/Compute in the context of I hope to have a chance to work on it more tomorrow / over the weekend |
I'm super interested in the outcome here. I've got a production flight server that I'm trying to replace w/ flight sql + DataFusion, but I keep hitting executor starvation issue and have been tracking the conversation around dedicated runtimes closely. I ported DedicatedExecutor and CrossRtStream from Influx last week (good timing I know) and tried the IoObjectStore approach yesterday since it was super easy to hack in. Results are better but I am still hitting starvation when data transfer passes a couple hundred MB. That is all to say, if I can help test any approaches, lmk. |
@djanderson just to confirm i understand - is the dedicated executor being starved / the IO it needs to perform cant proceed? or the main runtime is being starved? |
I believe that the main runtime is being starved because it's manifesting itself as dropped network connections, either between MinIO and the flight sql server (more often) or between the flight sql server and the python client I'm using to test. That said I just got tokio-console set up and I'm still learning how to read it. Hopefully I can use it to make sure that I'm in fact getting all my IO running on the main thread via |
Any updates, @djanderson? I haven't found an easy way to determine which tasks ran on which runtime in the console. I also took a similar approach to using the DedicatedExecutor at the ObjectStore boundary in my project, but still see periodic S3 timeouts a la: apache/arrow-rs#5882 @alamb, do you have any suggestions on the number of threads to allocate for cpu bound tasks vs IO bound ones relative to number of available cores? I will try lowering the priority of the threads in the CPU runtime to see if that makes a difference. |
I recommend N-1 cores for the CPU bound tasks (so there is always a full core available to do network requests) Also, it is important that you actually run the stream in the separate runtime. I have sketched this out in the I am still playing around with making it easier to use the DedicatedExecutor paradigm, which is why I don't have that PR up for review yet |
FWIW @adriangb reports success using this procedure here: |
@rohitrastogi no updates from me, still also seeing the periodic timeouts and also didn't get much insight via tokio-console. What I'm doing right now is trying to implement some of the file range caching ideas from https://blog.haoxp.xyz/posts/caching-datafusion/, in the hopes that if i cache part of the file before the failure a retry might get me over the line. Still keeping a close eye on this work, appreciate everyone's efforts! |
Add's a wrapper
ObjectStore
that usesspawn_io
/ the main tokio runtime for making network requests instead of theDedicatedExecutor
which should only be used for CPU bound work.