{ Nc_read++; if (Nc_read>= Nc) Nc_read = 0;
}
publicstringMyDequeue() // первый обратившийся к ресурсу поток, прочитает
{ if (Nc_read == 0) //сообщение, удалит его из ресурса, сохранит в
{ if (this.Count>0) last_res = this.Dequeue(); //статическойпеременнойlast_rec. Все остальные
Console.WriteLine ("MyDequeue: " + last_res); //потоки будут обращаться уже к переменнойlast_rec.
}
}
};
ClassSupNConsM
{ objectlock_obj = new object(); // объектсинхронизации
publicintNsup = 0; // числопоставщиков
publicintNcons = 0; // числопотребителей
publicintNmsg = 0; // число всех сообщений
MyQueueQchange = new MyQueue(); // объекточереди
publicstring[] res_str; // массив результирующих строк (результирующая строка –
// список сообщений, полученных потребителем)
int N1_sup = 5; // число сообщений для каждого поставщика
|
|
voidCons(objectn) // методпотребителя, аргумент - № потребителя
{ intncons_n = Convert.ToInt32(n); // преобразование аргумента к нужному типу
stringres = "Потребитель " + ncons_n; // строка с результатами работы потока
stringres0 = res;
stringstr = ""; // в str помещается текст, читаемый потоком
stringstr_prev = ""; // в str_prevсохраняется текст, прочитанный ранее
intnread = 0; // число прочитанных потоком сообщений
bool t;
intT = 0; // Общее время ожидания в потоке
while (nread<Nmsg) //Цикл чтения сообщений
{ t = false;
lock (lock_obj)
{ if (Qchange.Count>= 0) { str = Qchange.MyDequeue(); // очередноесообщение
t = (str!= str_prev); // если сообщение новое для потока, if (t) Qchange.Num_Read();// увеличивается счетчик потребителей,
} // прочитавших данное сообщение
}
if (t == false) { T += 50; // если сообщение не новое, то
Thread.Sleep(50); // задержка, после чего опять выполняется
|
|
if (T> 5000) break; // попытка чтения
}
else{str_prev = str; // новое сообщение запоминается как nread++; // предыдущее, счетчик сообщений в потоке
res += "\n" + nread.ToString() + ". " + str; // увеличивается, формируется строка
Console.WriteLine("{0}: {1} {2} {3}", res0, str, t, nread); // выходного сообщения
T = 0;
} // конец цикла чтения сообщений
res_str[ncons_n] = res;
System.Windows.Forms.MessageBox.Show(res_str[ncons_n]); // выводрез-овработыпотока
}
void Sup(object num) //методпоставщика, аргумент - № поставщика
{string n = Convert.ToString(num);
Random r = new Random();
intqn;
for (inti = 0; i<N1_sup; i++)
{lock (lock_obj) { Qchange.Enqueue("сообщение № " + (i + 1).ToString() + " от " + n);
//qn = Qchange.Count;
}
//Console.WriteLine("Qchange.Enqueue {0}", qn);
int del = Convert.ToInt32(r.NextDouble()) * 3 + 1;
Thread.Sleep(del);
}
}
Public void Test()
{this.N1_sup = 5;
this.Nmsg = 3 * N1_sup;
this.Qchange.Nc = 5;
res_str = new string[6];
Thread ts1 = new Thread(Sup);
Thread ts2 = new Thread(Sup);
Thread ts3 = new Thread(Sup);
Thread tc1 = new Thread(Cons);
Thread tc2 = new Thread(Cons);
Thread tc3 = new Thread(Cons);
Thread tc4 = new Thread(Cons);
Thread tc5 = new Thread(Cons);
ts2.Start(2);
ts1.Start(1);
tc2.Start(2);
tc1.Start(1);
tc3.Start(3);
tc4.Start(4);
tc5.Start(5);
ts3.Start(3);
tc1.Join(); tc2.Join(); tc3.Join(); tc4.Join(); tc5.Join();
}
} // конец класса
}//конец пр-ва имен
Пулпотоков http://www.intuit.ru/studies/courses/5938/1074/lecture/16447?page=4
Пул (pool – лужа, бильярд, объединение) потоков п редназначен для упрощения многопоточной обработки. Программист выделяет фрагменты кода (рабочие элементы), которые можно выполнять параллельно.Преимуществами использования пула потоков считают:
Планировщик (среда выполнения) оптимальным образом распределяет рабочие элементы по рабочим потокам пула. Т.е. вопросы эффективной загрузки, оптимального числа потоков решаются не программистом, а планировщиком (исполняющей средой).
При применении пула уменьшаются накладные расходы, связанные с ручным созданием и завершением потоков.
Пул потоков используется для обработки задач типа Task. Объект Taskобладают рядом полезных встроенных механизмов (ожидание наступления события, продолжениеработы и т.д.). Поэтому для распараллеливания рабочих элементов рекомендуется использовать именно задачи или шаблоны класса Parallel. Сам пул потоков представлен классом ThreadPool.
Для добавления рабочего элемента в пул используется метод QueueUserWorkItem:
public static bool QueueUserWorkItem
( WaitCallbackcallBack
)
public static bool QueueUserWorkItem
( WaitCallbackcallBack,
objectdata
)
Данный метод помещает код (метод, рабочий элемент) в очередь на выполнение. Код(метод) выполняется, когда становится доступен поток из пула потоков.
Параметры
Callback – тип - System.Threading.WaitCallback, делегатWaitCallback, представляющий метод,
который требуется выполнить.
data тип -System.Object, объект, содержащий данные, используемые методом.
Возвращаемое значение true, если метод успешно помещен в очередь, иначе false. Если рабочий элемент не может быть помещен в очередь, создается исключение NotSupportedException.
Делегат System.Threading.WaitCallback в пространстве System.Treading объявлен как
[ ComVisibleAttribute (true) ]
public delegate void WaitCallback (object data)
Аргумент есть объект, используемый как аргумент метода, который исполняется делегатом.
Делегат WaitCallback может указывать на любой метод, имеющий один параметр System.Object и не возвращающий ничего.
|
|
Замечание.COM – это стандарт для взаимодействия модулей на разных языка х. Сборки C# также поддерживают этот стандарт по желанию (разработчик может включить их в свойствах сборки), так что.NET-объекты могут быть использованы как COM-объекты, и таким образом — различными нативными приложениями, а также скриптовыми языками (например, javascript).
Атрибут [ComVisible(true)] означает, что интерфейс будет виден самой подсистеме COM. Если программа не экспортирует COM-объекты, и не работает со скриптами, то можно эти атрибуты игнорировать (если ваша программа полностью на.NET, COM-атрибуты не важны для нее).
В следующем примере в пул будут помещены Nрабочих элементов, каждый элемент будет формировать текстовую строку созначениями переданного параметра и параметрами текущего потока. В примере будут использованы свойства и методы классов Thread и ThreadPool:
Thread.CurrentThread.ManagedThreadId – идентификатортекущегопотока,
Thread.CurrentThread.IsThreadPoolThread – признак исполнения потока в пуле(true) или нет (false).
ThreadPool.GetMaxThreads (outmaxThreads, outoiasThreads) – метод определяет максимально возможное число потоков в пуле и максимально возможное число асинхронных потоков ввода-вывода в пуле;
ThreadPool.GetAvailableThreads (outavThreads, outoiasThreads) – метод определяет доступное число потоков в пуле и доступное число асинхронных потоков ввода-вывода в пуле.
Число потоков, находящихся в данный момент в пуле можно определить как разность
maxThreads - avThreads
privateintThreadsInPool (bool arg=false)
{ intmaxThreads = 0; //макс. возм.число потоков в пуле
intoiasThreads= 0; //макс. возм.число асинхр.потоков ввода-вывода в пуле
intavThreads = 0; //число доступных потоков в пуле
ThreadPool.GetMaxThreads(outmaxThreads, out oiasThreads);
ThreadPool.GetAvailableThreads(outavThreads, out oiasThreads);
if (arg == true)
Console.WriteLine("Мак. число потоков в пуле {0}, число доступ. потоков {1}, " +
"число ассинх. пот. ввода-вывода {2}", maxThreads, avThreads, oiasThreads);
returnmaxThreads - avThreads;
}
constintN = 10; //числорабочих потоков
string[] res = newstring[N]; //результатывыполненияраб. элементов
|
|
privatevoidF(objectobj) //функция, исполняемая рабочим элементом
{intind = (int)obj;
res[ind] += string.Format("i: {0}, ThreadID: {1,-3}, IsPoolThread:{2}, ThreadsInPool: {3} ",
ind, Thread.CurrentThread.ManagedThreadId,
Thread.CurrentThread.IsThreadPoolThread, ThreadsInPool());
);
Thread.Sleep(100); //задержкавтелепотока
}
PublicvoidTest1()
{ ThreadsInPool(true); //информация о числе потоков в пуле
intt0 = Environment.TickCount; // фиксация момента времени
for (inti=0; i<N; i++)
{ res[i] = "";
ThreadPool.QueueUserWorkItem(F, i); // запускэлементавпуле
}
ThreadsInPool(true); //информация сразу после потоков
intn = -1;
while (n!=0)
{ Thread.Sleep(20); // ожидание завершения потоков в пуле
n = ThreadsInPool();
}
ThreadsInPool(true); //информация после завершения потоков
Console.WriteLine("Времявыполнения = {0}", Environment.TickCount - t0);
for (inti = 0; i< N; i++) Console.WriteLine(res[i]);
}
Результаты работы
Макс. число потоков в пуле 2047, число доступ.потоков 2047, число ассинх. пот. ввода-вывода 1000
Макс. число потоков в пуле 2047, число доступ. потоков 2043, число ассинх. пот. ввода-вывода 1000
Макс. число потоков в пуле 2047, число доступ. потоков 2047, число ассинх. пот. ввода-вывода 1000
Времявыполнения 10 потоков 344
i: 0, ThreadID: 3, IsPoolThread:True, ThreadsInPool: 4
i: 1, ThreadID: 4, IsPoolThread:True, ThreadsInPool: 4
i: 2, ThreadID: 5, IsPoolThread:True, ThreadsInPool: 4
i: 3, ThreadID: 6, IsPoolThread:True, ThreadsInPool: 4
i: 4, ThreadID: 5, IsPoolThread:True, ThreadsInPool: 4
i: 5, ThreadID: 6, IsPoolThread:True, ThreadsInPool: 4
i: 6, ThreadID: 3, IsPoolThread:True, ThreadsInPool: 4
i: 7, ThreadID: 4, IsPoolThread:True, ThreadsInPool: 4
i: 8, ThreadID: 3, IsPoolThread:True, ThreadsInPool: 4
i: 9, ThreadID: 6, IsPoolThread:True, ThreadsInPool: 4