TPL Part 4 -- Task的协同
来源:程序员人生 发布时间:2015-06-23 08:11:38 阅读次数:2194次
简单的Continuation
Task.ContinueWith(Task): 当指定的Task履行终了时。
void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
});
root Task.ContinueWith((Task previousTask)=>{
Console.WriteLine("continute task completed");
});
rootTask.Start();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
Task.ContinueWhenAll(Task[]):当指定的所有Task都履行终了时,示例代码:
Task continuation = Task.Factory.ContinueWhenAll<int>(tasks, antecedents =>{
foreach(Task<int> t in antecedents) {
// dosomething
}
});
TaskFactory.ContinueWhenAny(Task[]):当指定的所有Task的任意1个履行终了时,代码与ContinueWhenAll类似(以下代码中,打印出前1个Task的履行时间):
Task continuation = Task.Factory.ContinueWhenAny<int>(tasks,
(Task<int>antecedent) => {
//write out a message using the antecedent result
Console.WriteLine("The first task slept for {0} milliseconds",
antecedent.Result);
});
Continue 选项
OnlyOnRanToCompletion仅当履行完
NotOnRanToCompletion:没有履行完(被取消或出现异常)
OnlyOnFaulted:仅当出现异常
NotOnFaulted:没有出现异常
OnlyOnCancelled:仅当被取消
NotOnCancelled:没有被取消
处理异常
void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
rootTask.ContinueWith((Task previousTask)=>{
Console.WriteLine("even root throw exception , I still run");
});
rootTask.Start();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
以上代码中,第1个task中抛出了异常,Continue的Task依然会继续履行。可是Task被Finalized时异常就会抛出。
解决方案:
void Main()
{
Task rootTask = new Task(()=>{
Console.WriteLine("root task completed");
throw new Exception("root throwed exception");
});
var t2 = rootTask.ContinueWith((Task previousTask)=>{
//
if(previousTask.Status== TaskStatus.Faulted){
throw previousTask.Exception.InnerException;
}
Console.WriteLine("even root throw exception , I still run");
});
rootTask.Start();
try{
t2.Wait();
}
catch(AggregateException ex){
ex.Handle(inner=>{Console.WriteLine("exception handled in main thread"); return true;});
}
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
在Task中冒泡抛出异常,在主线程中等待最后那个Task的履行并对AggregateException进行处理。
创建子Task
创建子Task并附加在父Task上:
void Main()
{
Task parentTask = new Task(() => {
Console.WriteLine("parent task started");
//create the first child task
Task childTask = new Task(() => {
// writeout a message and wait
Console.WriteLine("Child task running");
Thread.Sleep(1000);
Console.WriteLine("Child task throwed exception");
throw new Exception();
} ,TaskCreationOptions.AttachedToParent);
Console.WriteLine("start child task...");
childTask.Start();
Console.WriteLine("parent task ended");
});
// startthe parent task
parentTask.Start();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
1. 父Task会抛出子Task中的异常
2. 父Task的状态会遭到所附加的子Task状态的影响
Barrier的使用
class BankAccount {
public int Balance {
get;
set;
}
} ;
void Main()
{
//create the array of bank accounts
BankAccount[] accounts = new BankAccount[6];
for(int i = 0;i < accounts.Length; i++) {
accounts[i] = new BankAccount();
}
//create the total balance counter
int totalBalance = 0;
//create the barrier
Barrier barrier = new Barrier(3, (myBarrier) => {
// zerothe balance
totalBalance= 0;
// sumthe account totals
foreach(BankAccount account in accounts) {
totalBalance+= account.Balance;
}
// writeout the balance
Console.WriteLine("[From barrier :] Total balance: {0}",totalBalance);
});
//define the tasks array
Task[] tasks = new Task[3];
// loopto create the tasks
for(int i = 0;i < tasks.Length; i++) {
tasks[i]= new Task((stateObj) => {
//create a typed reference to the account
BankAccount account = (BankAccount)stateObj;
// startof phase
Random rnd = new Random();
for(int j = 0;j < 1000; j++) {
account.Balance+= 2;
}
Thread.Sleep(new Random().Next(3000));
Console.WriteLine("Task {0} waiting, phase {1} ",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier
barrier.SignalAndWait();
account.Balance-= 1000;
Console.WriteLine("barrier finished .");
// endof phase
Console.WriteLine("Task {0}, phase {1} ended",
Task.CurrentId,barrier.CurrentPhaseNumber);
//signal the barrier
barrier.SignalAndWait();
},
accounts[i]);
}
// startthe task
foreach(Task t in tasks) {
t.Start();
}
// waitfor all of the tasks to complete
Task.WaitAll(tasks);
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
在以上代码中,打开了3个barrier和3个Task,在Task中为每一个账户添加2000,然后给barrier发出同步信号,当barrier收到3个信号时,对账号进行求和并保存;当barrier完成逻辑后,控制权交给了每一个Task,此时每一个Task对account减1000,再次求和,最后结果为3000。
如果希望通过Cancel来控制barrier的行动,还可以在barrier中传入tokenSource.Token:barrier.SignalAndWait(tokenSource.Token);并在Task中履行Cancel:tokenSource.Cancel()。
可以通过调用barrier.RemoveParticipant();来减少barrier的count。
CountEventDown
作用和Barrier类似,累计信号数量,当信号量到达指定数量,set event。
void Main()
{
CountdownEvent cdevent = new CountdownEvent(5);
//create a Random that we will use to generate
// sleepintervals
Random rnd = new Random();
//create 5 tasks, each of which will wait for
// arandom period and then signal the event
Task[] tasks = new Task[6];
for(int i = 0;i < tasks.Length; i++) {
//create the new task
tasks[i]= new Task(() => {
// putthe task to sleep for a random period
// up toone second
Thread.Sleep(rnd.Next(500, 1000));
//signal the event
Console.WriteLine("Task {0} signalling event",Task.CurrentId);
cdevent.Signal();
});
};
//create the final task, which will rendezous with the other 5
// usingthe count down event
tasks[5] = new Task(()=> {
// waiton the event
Console.WriteLine("Rendezvous task waiting");
cdevent.Wait();
Console.WriteLine("CountDownEvent has been set");
});
// startthe tasks
foreach(Task t in tasks) {
t.Start();
}
Task.WaitAll(tasks);
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
在以上代码中,开启了5个Task和1个count为5的CountDownEvent对象,每一个Task中完成任务后分别对CountDownEvent发信号,当凑齐5个信号后,会打印出CountDownEvent has been set。
ManualResetEvent 和 AutoResetEvent
熟习.net之前版本的应当都对它们很熟习,用于在多线程环境中完成线程同步。区分在于,前者必须调用reset才能恢覆信号;而AutoResetEvent则会自动reset。在此不再赘述。
SemaphoreSlim
void Main()
{
SemaphoreSlim semaphore = new SemaphoreSlim(3);
//create the cancellation token source
CancellationTokenSource tokenSource
= new CancellationTokenSource();
//create and start the task that will wait on the event
for(int i = 0;i < 10; i++) {
Task.Factory.StartNew((obj)=> {
semaphore.Wait(tokenSource.Token);
// printout a message when we are released
Console.WriteLine("Task {0} released", obj);
},i,tokenSource.Token);
}
//create and start the signalling task
Task signallingTask = Task.Factory.StartNew(() => {
// loopwhile the task has not been cancelled
while(!tokenSource.Token.IsCancellationRequested) {
// go tosleep for a random period
tokenSource.Token.WaitHandle.WaitOne(500);
//signal the semaphore
semaphore.Release(3);
Console.WriteLine("Semaphore released");
}
// if wereach this point, we know the task has been cancelled
tokenSource.Token.ThrowIfCancellationRequested();
},tokenSource.Token);
// askthe user to press return before we cancel
// thetoken and bring the tasks to an end
Console.WriteLine("Press enter to cancel tasks");
Console.ReadLine();
//cancel the token source and wait for the tasks
tokenSource.Cancel();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
在以上代码中,new了1个SemaphoreSlim对象并传入3,开了10个Task线程,每当有信号从Semaphore传来时,打印Task[i]被release。同时开1个信号线程,每500毫秒release3个Task。
可见,Semaphore的作用主要是可以选择1次release多少个Task。
Producer / Consumer(生产者/消费者模式)
以下代码中,new了1个BlockingCollection,类型为Deposit。开了3个生产者Task,每一个生产者中创建20个Deposit对象并给Amount赋值为100。在主线程中等待生产者Task履行终了,调用blockingCollection.CompleteAdding()方法。以后开1个消费者Task用于操作账户对象,循环判断blockingCollection.IsCompleted属性(生产者是不是完成工作),从集合拿出存款对象,增加账户余额。
示例代码:
class BankAccount {
public int Balance {
get;
set;
}
}
class Deposit {
public int Amount {
get;
set;
}
}
void Main()
{
BlockingCollection<Deposit> blockingCollection
= new BlockingCollection<Deposit>();
var producers = new List<Task>();
for(int i = 0;i < 3; i++) {
var producer = Task.Factory.StartNew((obj) => {
//create a series of deposits
for(int j = 0;j < 20; j++) {
//create the transfer
var randAmount = new Random().Next(100);
Deposit deposit = new Deposit { Amount = randAmount};
Thread.Sleep(newRandom().Next(200));
// placethe transfer in the collection
blockingCollection.Add(deposit);
Console.WriteLine(string.Format("Amount: {0} deposit Processed, index: {1}",randAmount, int.Parse(obj.ToString()) +j));
}
}, i*20);
producers.Add(producer);
};
//create a many to one continuation that will signal
// theend of production to the consumer
Task.Factory.ContinueWhenAll(producers.ToArray(),antecedents => {
//signal that production has ended
Console.WriteLine("Signalling production end");
blockingCollection.CompleteAdding();
});
//create a bank account
BankAccount account = new BankAccount();
//create the consumer, which will update
// thebalance based on the deposits
Task consumer = Task.Factory.StartNew(() => {
while(!blockingCollection.IsCompleted) {
Deposit deposit;
// tryto take the next item
if(blockingCollection.TryTake(outdeposit)) {
//update the balance with the transfer amount
account.Balance+= deposit.Amount;
}
}
// printout the final balance
Console.WriteLine("Final Balance: {0}", account.Balance);
});
// waitfor the consumer to finish
consumer.Wait();
// waitfor input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠