Пул буферов
Неожиданно много комментируемый пост "Пул потоков" заставил меня взглянуть пристальнее на некоторые модули, из недавно написанных, и кое-что подправить. Собственно, эти прогулки по исходникам стали причиной этого поста...
Когда говорят о многопоточных прогаммах, действительно (склоняю голову в согласии с Максимом Сохацким, автором Open BeOS, который все время пытается напомнить об этом), чаще всего идет речь о операциях ввода-вывода которые выносятся в отдельный поток или вызываются асинхронно. А там где есть ввод и вывод, обязательно есть БУФЕР. То есть, кусок памяти, куда или откуда этот самый ввод/вывод происходит. "Ну и что?", - скажите Вы! А то, что выделение памяти, это тоже ресурсоемкая операция! Это в обычной "куче". В "куче", которой правляет сборщик мусора выделение дешевое (если верить Sun-у и Microsoft-у), но дорого обходится освобождение памяти. Так или иначе, работа с памятью - штука дорогая. Особенно, когда речь идет о выденении буфера для операций ввода/вывода, так как обычно он достаточно большой. Давайте подумаем, где тут можно сэкономить? Тут Вы должны вспомнить "Пул потоков" и заорать:
Пул буферов
Пока не вдаваясь в подробности - исходники класса:
public class BufferPool : MarshalByRefObject, IDisposable {
public class Buffer : IDisposable {
BufferPool pool;
byte[] buffer;
int usedFlag;
public Buffer(BufferPool pool, int size) {
this.pool = pool;
buffer = new byte[size];
}
public bool Allocate() {
int flag = Interlocked.CompareExchange(ref usedFlag, 1, 0);
return (flag == 0);
}
public void Release() {
Interlocked.Exchange(ref usedFlag, 0);
if (pool != null) pool.mrEvent.Set();
}
public byte[] Array { get { return buffer; } }
public int Size { get { return buffer.Length; } }
public bool InUse { get { return (usedFlag != 0); } }
void IDisposable.Dispose() {
Release();
}
}
const int blockSize = 4096;
int maxBufferCount;
ArrayList buffers = new ArrayList();
ReaderWriterLock rwLock = new ReaderWriterLock();
ManualResetEvent mrEvent = new ManualResetEvent(false);
public BufferPool():this(8) {}
public BufferPool(int maxBufferCount) {
this.maxBufferCount = maxBufferCount;
}
public void Dispose() {
Dispose(true);
}
protected virtual void Dispose(bool disposing) {
if (disposing) {
GC.SuppressFinalize(this);
if (mrEvent != null) {
mrEvent.Close();
}
}
}
public Buffer Allocate(int blockCount) {
return Allocate(blockCount, Timeout.Infinite);
}
public Buffer Allocate(int blockCount, int timeOut) {
if (mrEvent == null)
throw new ObjectDisposedException(this.GetType().Name);
Buffer result = null;
int reqSize = blockCount * blockSize;
bool wait = false;
DateTime deadLine = (timeOut == Timeout.Infinite)
? DateTime.MaxValue
: DateTime.Now + TimeSpan.FromMilliseconds(timeOut);
do {
if (wait) {
if (mrEvent.WaitOne(
(int)(deadLine - DateTime.Now).TotalMilliseconds, false ))
mrEvent.Reset();
else
throw new ApplicationException(ILSR.GetString("ErrTimeout"));
}
rwLock.AcquireReaderLock(
(int)(deadLine - DateTime.Now).TotalMilliseconds);
if (!rwLock.IsReaderLockHeld)
break; // ApplicationException will be throwed
try {
foreach (WeakReference wr in buffers) {
if (wr.IsAlive) {
Buffer b = wr.Target as Buffer;
if (b.Size >= reqSize && b.Allocate()) {
result = b;
break;
}
} else {
wr.Target = NewAllocatedBuffer(reqSize);
break;
}
}
if (result == null && buffers.Count < maxBufferCount) {
rwLock.UpgradeToWriterLock(
(int)(deadLine - DateTime.Now).TotalMilliseconds);
if (!rwLock.IsWriterLockHeld)
break; // ApplicationException will be throwed
if (buffers.Count < maxBufferCount) {
result = NewAllocatedBuffer(reqSize);
buffers.Add(new WeakReference(result));
}
}
} finally {
rwLock.ReleaseReaderLock();
}
wait = true;
} while (result == null && DateTime.Now < deadLine);
if (result == null)
throw new ApplicationException(ILSR.GetString("ErrTimeout"));
return result;
}
public Buffer NewAllocatedBuffer(int size) {
if (buffers.Count >= maxBufferCount) return null;
Buffer r = new Buffer(this, size);
r.Allocate();
return r;
}
public void FreeUnusedBuffers() {
rwLock.AcquireWriterLock(Timeout.Infinite);
if (rwLock.IsWriterLockHeld) try {
int index = 0;
while (index < buffers.Count) {
WeakReference wr = buffers[index] as WeakReference;
if (!wr.IsAlive)
buffers.RemoveAt(index);
else {
if ( ((Buffer)wr.Target).InUse )
index++;
else
buffers.RemoveAt(index);
}
}
} finally {
rwLock.ReleaseWriterLock();
}
}
public int ActiveBuffers {
get {
rwLock.AcquireReaderLock(Timeout.Infinite);
int cnt = buffers.Count;
rwLock.ReleaseReaderLock();
return cnt;
}
}
public int UsedMemory {
get {
int size = 0;
rwLock.AcquireReaderLock(Timeout.Infinite);
if (rwLock.IsReaderLockHeld) try {
foreach (WeakReference wr in buffers)
if (wr.IsAlive)
size += ((Buffer)wr.Target).Size;
} finally {
rwLock.ReleaseReaderLock();
}
return size;
}
}
public int MaxBufferCount {
get { return maxBufferCount; }
set { maxBufferCount = value; }
}
public override string ToString() {
return String.Format(
"{0}: Allocated {1} buffers, {2} Kb used.",
this.GetType().Name, ActiveBuffers, UsedMemory / 1024);
}
static object managerLock = new object();
static BufferPool defaultPool;
public static BufferPool DefaultPool {
get {
if (defaultPool == null) {
Debug.Assert(managerLock != null);
lock (managerLock) {
if (defaultPool == null)
defaultPool = new BufferPool(8);
managerLock = null;
}
}
return defaultPool;
}
}
}
Пример использования:
void CopyStream(Stream src,
Stream dst, BufferPool bf, int blockCount)
{
using (BufferPool.Buffer buffer = bf.Allocate(blockCount, 0)) {
int readed;
while ( (readed = src.Read(buffer.Array, 0, buffer.Size)) > 0 )
dst.Write(buffer.Array, 0, readed);
}
}
Коментарі