00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "syncml.h"
00023
00024 #include "syncml_internals.h"
00025 #include "sml_queue_internals.h"
00026 #include "sml_error_internals.h"
00027
00028 #ifndef DOXYGEN_SHOULD_SKIP_THIS
00029
00030 static gboolean _queue_prepare(GSource *source, gint *timeout_)
00031 {
00032 smlTrace(TRACE_INTERNAL, "%s(%p, %p)", __func__, source, timeout_);
00033 *timeout_ = 1;
00034 return FALSE;
00035 }
00036
00037
00038 void smlQueueAssert(SmlQueue *queue)
00039 {
00040 if (queue->tail) {
00041 smlAssert(queue->head);
00042 }
00043 if (queue->prio) {
00044 smlAssert(queue->head);
00045 }
00046 if (queue->head) {
00047 smlAssert(queue->tail);
00048 }
00049
00050 if (g_list_length(queue->head) == 1) {
00051 smlAssert(queue->tail == queue->head);
00052 }
00053
00054 smlAssert(g_list_last(queue->head) == queue->tail);
00055 }
00056
00057 static gboolean _queue_check(GSource *source)
00058 {
00059 SmlQueue *queue = *((SmlQueue **)(source + 1));
00060 return smlQueueCheck(queue);
00061 }
00062
00063 static gboolean _queue_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
00064 {
00065 smlTrace(TRACE_INTERNAL, "%s(%p, %p, %p)", __func__, source, callback, user_data);
00066 SmlQueue *queue = user_data;
00067 smlAssert(queue->handler);
00068
00069 while (smlQueueCheck(queue))
00070 smlQueueDispatch(queue);
00071
00072 return TRUE;
00073 }
00074 #endif
00075
00081 SmlQueue *smlQueueNew(SmlError **error)
00082 {
00083 CHECK_ERROR_REF
00084 SmlQueue *queue = smlTryMalloc0(sizeof(SmlQueue), error);
00085 if (!queue)
00086 return NULL;
00087
00088 if (!g_thread_supported ()) g_thread_init (NULL);
00089
00090 queue->mutex = g_mutex_new();
00091
00092 return queue;
00093 }
00094
00095 void smlQueueFree(SmlQueue *queue)
00096 {
00097 if (queue->source)
00098 smlQueueDetach(queue);
00099
00100 if (queue->head)
00101 g_list_free(queue->head);
00102
00103 g_mutex_free(queue->mutex);
00104
00105 smlSafeFree((gpointer *)&queue);
00106 }
00107
00108 void smlQueueFlush(SmlQueue *queue)
00109 {
00110 g_mutex_lock(queue->mutex);
00111
00112 smlQueueAssert(queue);
00113
00114 queue->tail = NULL;
00115 g_list_free(queue->head);
00116 queue->head = NULL;
00117 queue->prio = NULL;
00118
00119 smlQueueAssert(queue);
00120
00121 g_mutex_unlock(queue->mutex);
00122 }
00123
00124 SmlBool smlQueueCheckPrio(SmlQueue *queue)
00125 {
00126 g_mutex_lock(queue->mutex);
00127 SmlBool ret = (queue->prio == NULL) ? FALSE : TRUE;
00128 g_mutex_unlock(queue->mutex);
00129
00130 return ret;
00131 }
00132
00133 SmlBool smlQueueCheck(SmlQueue *queue)
00134 {
00135 g_mutex_lock(queue->mutex);
00136 SmlBool ret = (queue->head == NULL) ? FALSE : TRUE;
00137 g_mutex_unlock(queue->mutex);
00138
00139 return ret;
00140 }
00141
00142 unsigned int smlQueueLength(SmlQueue *queue)
00143 {
00144 g_mutex_lock(queue->mutex);
00145 unsigned int ret = g_list_length(queue->head);
00146 g_mutex_unlock(queue->mutex);
00147
00148 return ret;
00149 }
00150
00151 unsigned int smlQueueLengthPrio(SmlQueue *queue)
00152 {
00153 g_mutex_lock(queue->mutex);
00154 unsigned int ret = g_list_length(queue->prio);
00155 g_mutex_unlock(queue->mutex);
00156
00157 return ret;
00158 }
00159
00160 void *smlQueueTryPop(SmlQueue *queue)
00161 {
00162 smlAssert(queue);
00163 void *message = NULL;
00164
00165 g_mutex_lock(queue->mutex);
00166
00167 smlQueueAssert(queue);
00168
00169 if (queue->head) {
00170 message = queue->head->data;
00171 if (queue->head == queue->tail)
00172 queue->tail = NULL;
00173 if (queue->prio && message == queue->prio->data)
00174 queue->prio = g_list_delete_link(queue->prio, queue->prio);
00175 queue->head = g_list_delete_link(queue->head, queue->head);
00176 }
00177
00178 smlQueueAssert(queue);
00179
00180 g_mutex_unlock(queue->mutex);
00181
00182 return message;
00183 }
00184
00185 void *smlQueueTryPopPrio(SmlQueue *queue)
00186 {
00187 smlAssert(queue);
00188 void *message = NULL;
00189
00190 g_mutex_lock(queue->mutex);
00191
00192 smlQueueAssert(queue);
00193
00194 message = queue->prio ? queue->prio->data : NULL;
00195 queue->prio = g_list_delete_link(queue->prio, queue->prio);
00196
00197 if (message) {
00198 queue->head = g_list_remove(queue->head, message);
00199 queue->tail = g_list_last(queue->head);
00200 }
00201
00202 smlQueueAssert(queue);
00203
00204 g_mutex_unlock(queue->mutex);
00205
00206 return message;
00207 }
00208
00209 void smlQueueLock(SmlQueue *queue)
00210 {
00211 smlAssert(queue);
00212 g_mutex_lock(queue->mutex);
00213 }
00214
00215 void smlQueueUnlock(SmlQueue *queue)
00216 {
00217 smlAssert(queue);
00218 g_mutex_unlock(queue->mutex);
00219 }
00220
00221 void *smlQueuePeek(SmlQueue *queue)
00222 {
00223 smlAssert(queue);
00224 return queue->head ? queue->head->data : NULL;
00225 }
00226
00227 void *smlQueuePeekPrio(SmlQueue *queue)
00228 {
00229 smlAssert(queue);
00230 void *message = NULL;
00231
00232 g_mutex_lock(queue->mutex);
00233
00234 message = queue->prio ? queue->prio->data : NULL;
00235
00236 smlQueueAssert(queue);
00237
00238 g_mutex_unlock(queue->mutex);
00239
00240 return message;
00241 }
00242
00243 void smlQueuePrint(SmlQueue *queue)
00244 {
00245 smlAssert(queue);
00246
00247 g_mutex_lock(queue->mutex);
00248
00249 GString *info = g_string_new("Contents of queue ");
00250 g_string_append_printf(info, "%p:", queue);
00251
00252 GList *m = NULL;
00253 for (m = queue->head; m; m = m->next) {
00254 g_string_append_printf(info, ", %p (list %p)", m->data, m);
00255 }
00256 smlTrace(TRACE_INTERNAL, "%s: %s", __func__, VA_STRING(info->str));
00257 g_string_free(info, TRUE);
00258
00259 info = g_string_new("Contents of prio queue:");
00260 for (m = queue->prio; m; m = m->next) {
00261 g_string_append_printf(info, ", %p (list %p)", m->data, m);
00262 }
00263 smlTrace(TRACE_INTERNAL, "%s: %s", __func__, VA_STRING(info->str));
00264 g_string_free(info, TRUE);
00265 smlTrace(TRACE_INTERNAL, "%s: Tail of queue: %p (list %p)", __func__, queue->tail ? queue->tail->data : NULL, queue->tail);
00266
00267 g_mutex_unlock(queue->mutex);
00268 }
00269
00270 GList *smlQueuePeekNext(SmlQueue *queue, GList *prev)
00271 {
00272 smlAssert(queue);
00273
00274 if (!prev)
00275 return queue->head;
00276 else
00277 return prev->next;
00278 }
00279
00280 void smlQueuePushHeadPrio(SmlQueue *queue, void *data)
00281 {
00282 smlAssert(queue);
00283 smlAssert(data);
00284
00285 g_mutex_lock(queue->mutex);
00286
00287 smlQueueAssert(queue);
00288
00289 queue->head = g_list_prepend(queue->head, data);
00290 if (!queue->tail)
00291 queue->tail = queue->head;
00292 queue->prio = g_list_prepend(queue->prio, data);
00293
00294 smlQueueAssert(queue);
00295
00296 g_mutex_unlock(queue->mutex);
00297 }
00298
00299 void smlQueuePushHead(SmlQueue *queue, void *data)
00300 {
00301 smlAssert(queue);
00302 smlAssert(data);
00303
00304 g_mutex_lock(queue->mutex);
00305
00306 smlQueueAssert(queue);
00307
00308 queue->head = g_list_prepend(queue->head, data);
00309 if (!queue->tail)
00310 queue->tail = queue->head;
00311
00312 smlQueueAssert(queue);
00313
00314 g_mutex_unlock(queue->mutex);
00315 }
00316
00323 void smlQueueSendPrio(SmlQueue *queue, void *data)
00324 {
00325 smlAssert(queue);
00326 smlAssert(data);
00327
00328 g_mutex_lock(queue->mutex);
00329
00330 smlQueueAssert(queue);
00331
00332 if (queue->tail) {
00333 queue->tail = g_list_append(queue->tail, data);
00334 queue->tail = queue->tail->next;
00335 } else {
00336 queue->head = g_list_append(queue->head, data);
00337 queue->tail = queue->head;
00338 }
00339 queue->prio = g_list_append(queue->prio, data);
00340
00341 smlQueueAssert(queue);
00342
00343 g_mutex_unlock(queue->mutex);
00344 }
00345
00346
00353 void smlQueueSend(SmlQueue *queue, void *data)
00354 {
00355 smlAssert(queue);
00356 smlAssert(data);
00357
00358 g_mutex_lock(queue->mutex);
00359
00360 smlQueueAssert(queue);
00361
00362 if (queue->tail) {
00363 queue->tail = g_list_append(queue->tail, data);
00364 queue->tail = queue->tail->next;
00365 } else {
00366 queue->head = g_list_append(queue->head, data);
00367 queue->tail = queue->head;
00368 }
00369
00370 smlQueueAssert(queue);
00371
00372 g_mutex_unlock(queue->mutex);
00373 }
00374
00384 void smlQueueSetHandler(SmlQueue *queue, SmlQueueHandler handler, void *userdata)
00385 {
00386 queue->handler = handler;
00387 queue->userdata = userdata;
00388 }
00389
00400 void smlQueueAttach(SmlQueue *queue, GMainContext *context)
00401 {
00402 smlTrace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, context);
00403
00404 smlAssert(queue);
00405 smlAssert(queue->source == NULL);
00406
00407 GSourceFuncs *functions = g_malloc0(sizeof(GSourceFuncs));
00408 functions->prepare = _queue_prepare;
00409 functions->check = _queue_check;
00410 functions->dispatch = _queue_dispatch;
00411 functions->finalize = NULL;
00412
00413 GSource *source = g_source_new(functions, sizeof(GSource) + sizeof(SmlQueue *));
00414 SmlQueue **queueptr = (SmlQueue **)(source + 1);
00415 *queueptr = queue;
00416 g_source_set_callback(source, NULL, queue, NULL);
00417 queue->source = source;
00418 queue->functions = functions;
00419 g_source_attach(source, context);
00420 queue->context = context;
00421 if (context)
00422 g_main_context_ref(context);
00423
00424 smlTrace(TRACE_EXIT, "%s", __func__);
00425 }
00426
00427 void smlQueueDetach(SmlQueue *queue)
00428 {
00429 smlTrace(TRACE_ENTRY, "%s(%p)", __func__, queue);
00430
00431 smlAssert(queue);
00432 smlAssert(queue->source);
00433
00434 g_source_destroy(queue->source);
00435 g_source_unref(queue->source);
00436
00437 smlSafeFree((gpointer *)&(queue->functions));
00438
00439 queue->source = NULL;
00440
00441 if (queue->context) {
00442 g_main_context_unref(queue->context);
00443 queue->context = NULL;
00444 }
00445
00446 smlTrace(TRACE_EXIT, "%s", __func__);
00447 }
00448
00449 SmlBool smlQueueIsAttached(SmlQueue *queue)
00450 {
00451 return queue->source ? TRUE : FALSE;
00452 }
00453
00454 void smlQueueDispatch(SmlQueue *queue)
00455 {
00456 void *message = smlQueueTryPop(queue);
00457 if (message)
00458 queue->handler(message, queue->userdata);
00459 }
00460
00461 void smlQueueDispatchPrio(SmlQueue *queue)
00462 {
00463
00464 void *message = smlQueueTryPopPrio(queue);
00465 if (message)
00466 queue->handler(message, queue->userdata);
00467 }
00468