Skip to content

[SPARK-32170][CORE] Improve the speculation for the inefficient tasks by the task metrics. #28994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

weixiuli
Copy link
Contributor

@weixiuli weixiuli commented Jul 4, 2020

What changes were proposed in this pull request?

Improve the speculation for the inefficient tasks by the task metrics.

Why are the changes needed?

  1. Tasks will be speculated when meet certain conditions no matter they are inefficient or not,this would be a huge waste of cluster resources.
  2. In production, the speculation task comes from an efficient one will be killed finally, which is unnecessary and wastes the cluster resources. Sometimes, it interferes with other task scheduling.
  3. So, we should evaluate whether the task is inefficient by success tasks metrics firstly, and then decide to speculate it or not. The inefficient task should be speculated and the efficient one should not, it is better for the cluster resources.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add UT.

@weixiuli weixiuli marked this pull request as draft July 4, 2020 11:21
@weixiuli
Copy link
Contributor Author

weixiuli commented Jul 4, 2020

@cloud-fan @dongjoon-hyun kindly review, thanks.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mridulm
Copy link
Contributor

mridulm commented Jul 8, 2020

@venkata91 You might be interested in this.

@weixiuli
Copy link
Contributor Author

@maropu @cloud-fan @gatorsmile @mridulm @dongjoon-hyun Could you help check this PR? Thanks.

}.map(_.taskMetrics).filter(_.isDefined).map(_.get).foreach { task =>
if (task.inputMetrics != null) {
sumInputRecords += task.inputMetrics.recordsRead
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about recordsWritten? Should that also be considered wrt progress same wrt shuffleRecordsWritten?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even cache can also take time when written to disk, does that need to be taken into consideration? Similarly GC time, shuffle read blocked time etc. could also impact task progress

} else if (taskData != null && taskData.contains(tid) && taskData(tid) != null &&
taskData(tid).taskMetrics.isDefined) {
val taskMetrics = taskData(tid).taskMetrics.get
val currentTaskProgressRate = (taskMetrics.inputMetrics.recordsRead +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to add taskProgress as part of taskMetrics that way it can also be shown in SparkUI? Although taskProgress for tasks which doesn't involve input/output/shuffle records would be hard to measure?

@venkata91
Copy link
Contributor

venkata91 commented Jul 17, 2020

This is an interesting idea and a good start. Just considering the runTime of a task alone might not be useful in many cases. Thanks!

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 26, 2020
@github-actions github-actions bot closed this Oct 27, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants