Пул буферов


Неожиданно много комментируемый пост "Пул потоков" заставил меня взглянуть пристальнее на некоторые модули, из недавно написанных, и кое-что подправить. Собственно, эти прогулки по исходникам стали причиной этого поста...

Когда говорят о многопоточных прогаммах, действительно (склоняю голову в согласии с Максимом Сохацким, автором Open BeOS, который все время пытается напомнить об этом), чаще всего идет речь о операциях ввода-вывода которые выносятся в отдельный поток или вызываются асинхронно. А там где есть ввод и вывод, обязательно есть БУФЕР. То есть, кусок памяти, куда или откуда этот самый ввод/вывод происходит. "Ну и что?", - скажите Вы! А то, что выделение памяти, это тоже ресурсоемкая операция! Это в обычной "куче". В "куче", которой правляет сборщик мусора выделение дешевое (если верить Sun-у и Microsoft-у), но дорого обходится освобождение памяти. Так или иначе, работа с памятью - штука дорогая. Особенно, когда речь идет о выденении буфера для операций ввода/вывода, так как обычно он достаточно большой. Давайте подумаем, где тут можно сэкономить? Тут Вы должны вспомнить "Пул потоков" и заорать:

Пул буферов


Пока не вдаваясь в подробности - исходники класса:


public class BufferPool : MarshalByRefObject, IDisposable {
public class Buffer : IDisposable {
BufferPool pool;
byte[] buffer;
int usedFlag;

public Buffer(BufferPool pool, int size) {
this.pool = pool;
buffer =
new byte[size];
}

public bool Allocate() {
int flag = Interlocked.CompareExchange(ref usedFlag, 1, 0);
return (flag == 0);
}

public void Release() {
Interlocked.Exchange(ref usedFlag,
0);
if (pool != null) pool.mrEvent.Set();
}

public byte[] Array { get { return buffer; } }

public int Size { get { return buffer.Length; } }

public bool InUse { get { return (usedFlag != 0); } }

void IDisposable.Dispose() {
Release();
}
}

const
int blockSize = 4096;
int maxBufferCount;
ArrayList buffers =
new ArrayList();
ReaderWriterLock rwLock =
new ReaderWriterLock();
ManualResetEvent mrEvent =
new ManualResetEvent(false);

public BufferPool():this(8) {}
public BufferPool(int maxBufferCount) {
this.maxBufferCount = maxBufferCount;
}

public void Dispose() {
Dispose(
true);
}

protected virtual void Dispose(bool disposing) {
if (disposing) {
GC.SuppressFinalize(
this);
if (mrEvent != null) {
mrEvent.Close();
}
}
}

public Buffer Allocate(int blockCount) {
return Allocate(blockCount, Timeout.Infinite);
}

public Buffer Allocate(int blockCount, int timeOut) {
if (mrEvent == null)
throw new ObjectDisposedException(this.GetType().Name);
Buffer result =
null;
int reqSize = blockCount * blockSize;
bool wait = false;
DateTime deadLine = (timeOut == Timeout.Infinite)
? DateTime.MaxValue
: DateTime.Now + TimeSpan.FromMilliseconds(timeOut);
do {
if (wait) {
if (mrEvent.WaitOne(
(
int)(deadLine - DateTime.Now).TotalMilliseconds, false ))
mrEvent.Reset();
else
throw new ApplicationException(ILSR.GetString("ErrTimeout"));
}
rwLock.AcquireReaderLock(
(
int)(deadLine - DateTime.Now).TotalMilliseconds);
if (!rwLock.IsReaderLockHeld)
break; // ApplicationException will be throwed
try {
foreach (WeakReference wr in buffers) {
if (wr.IsAlive) {
Buffer b = wr.Target as Buffer;
if (b.Size >= reqSize && b.Allocate()) {
result = b;
break;
}
}
else {
wr.Target = NewAllocatedBuffer(reqSize);
break;
}
}

if (result == null && buffers.Count < maxBufferCount) {
rwLock.UpgradeToWriterLock(
(
int)(deadLine - DateTime.Now).TotalMilliseconds);
if (!rwLock.IsWriterLockHeld)
break; // ApplicationException will be throwed
if (buffers.Count < maxBufferCount) {
result = NewAllocatedBuffer(reqSize);
buffers.Add(
new WeakReference(result));
}
}
}
finally {
rwLock.ReleaseReaderLock();
}
wait =
true;
}
while (result == null && DateTime.Now < deadLine);
if (result == null)
throw new ApplicationException(ILSR.GetString("ErrTimeout"));
return result;
}

public Buffer NewAllocatedBuffer(int size) {
if (buffers.Count >= maxBufferCount) return null;
Buffer r =
new Buffer(this, size);
r.Allocate();
return r;
}

public void FreeUnusedBuffers() {
rwLock.AcquireWriterLock(Timeout.Infinite);
if (rwLock.IsWriterLockHeld) try {
int index = 0;
while (index < buffers.Count) {
WeakReference wr = buffers[index] as WeakReference;
if (!wr.IsAlive)
buffers.RemoveAt(index);
else {
if ( ((Buffer)wr.Target).InUse )
index++;
else
buffers.RemoveAt(index);
}
}
}
finally {
rwLock.ReleaseWriterLock();
}
}

public int ActiveBuffers {
get {
rwLock.AcquireReaderLock(Timeout.Infinite);
int cnt = buffers.Count;
rwLock.ReleaseReaderLock();
return cnt;
}
}

public int UsedMemory {
get {
int size = 0;
rwLock.AcquireReaderLock(Timeout.Infinite);
if (rwLock.IsReaderLockHeld) try {
foreach (WeakReference wr in buffers)
if (wr.IsAlive)
size += ((Buffer)wr.Target).Size;
}
finally {
rwLock.ReleaseReaderLock();
}
return size;
}
}

public int MaxBufferCount {
get { return maxBufferCount; }
set { maxBufferCount = value; }
}

public override string ToString() {
return String.Format(
"{0}: Allocated {1} buffers, {2} Kb used.",
this.GetType().Name, ActiveBuffers, UsedMemory / 1024);
}

static object managerLock = new object();
static BufferPool defaultPool;
public static BufferPool DefaultPool {
get {
if (defaultPool == null) {
Debug.Assert(managerLock !=
null);
lock (managerLock) {
if (defaultPool == null)
defaultPool =
new BufferPool(8);
managerLock =
null;
}
}
return defaultPool;
}
}
}


Пример использования:

void CopyStream(Stream src,
Stream dst, BufferPool bf, int blockCount)
{
using (BufferPool.Buffer buffer = bf.Allocate(blockCount, 0)) {
int readed;
while ( (readed = src.Read(buffer.Array, 0, buffer.Size)) > 0 )
dst.Write(buffer.Array, 0, readed);
}
}

Коментарі

Unknown каже…
Будут подробности! Обещаю!

Популярні дописи з цього блогу

OAuth-аутентификация через ВКонтакте

Українська мова