I am writing a program in an environment that makes use of cgroups to identify and group processes together. I want to parse the CPU utilization of each cgroup by sampling /sys/fs/cgroup/cpuacct/.../cpuacct.stat
over a 1 second interval a few times and then averaging them. It also calculates the overall CPU utilization of the system by sampling /proc/stat
in the same manner.
I want to do this in a multithreaded way because if I did it serially, the samples would happen serially, and the program would take a while to run.
I also use PCRE to distinguish between two different process types.
#include <sys/types.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/uio.h>
#include <pcre.h>
#define ITERATIONS 3
#if !defined(CLK_TIME_PER_SECOND) && defined(_SC_CLK_TCK)
# define CLK_TIME_PER_SECOND ((int) sysconf (_SC_CLK_TCK))
#endif
#if !defined(CLK_TIME_PER_SECOND) && defined(CLK_TCK)
# define CLK_TIME_PER_SECOND ((int) CLK_TCK)
#endif
#if !defined(CLK_TIME_PER_SECOND) && defined(CLOCKS_PER_SEC)
# define CLK_TIME_PER_SECOND ((int) CLOCKS_PER_SEC)
#endif
#if !defined(CLK_TIME_PER_SECOND)
# define CLK_TIME_PER_SECOND 100
#endif
struct job_struct {
char procid[50]; //store job information as procid,procType tuple
char procType[50];
};
typedef struct {
char *procid; //for passing to the threads
char *procType;
} sample_struct;
int sumFlag, clk; //TODO: localize?
long double sum1, sum2, systemUtilization, otherCPU;
char *aStrRegex;
const char *pcreErrorStr;
int pcreErrorOffset, pcreExecRet;
int subStrVec[30];
pcre *reCompiled;
pcre_extra *pcreExtra;
pthread_mutex_t total_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t ok_to_add = PTHREAD_COND_INITIALIZER;
int sampleCGroup(char procid[], char procType[]);
void *addSampletoTotal();
void *overallUtilization();
long double getSystemUtilization(void);
int getOSVersion(void);
int getCPUCount(void);
int main(void) {
int cpus, i, jobCount, osVersion;
FILE *fp;
char buff[1024];
cpus = getCPUCount();
clk = CLK_TIME_PER_SECOND;
osVersion = getOSVersion();
if (osVersion <= 10) {
systemUtilization = getSystemUtilization();
printf("sys=%Lf\n", systemUtilization); //cgroups are not supported on certain OS versions, so print the overall utilization and exit, otherwise, try to run
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
return (0);
}
struct job_struct jobs[cpus]; //initially size the array to be the number of CPUs
sumFlag = 1; //condition variable for mutex
pthread_t systr; //thread id for overall system utilization, note this should only execute on certain versions, the conditional above should have returned if not
pthread_create(&systr, NULL, overallUtilization, NULL); //create the thread
aStrRegex = "proc_type1"; //type1
reCompiled = pcre_compile(aStrRegex, 0, &pcreErrorStr, &pcreErrorOffset, NULL); //compile the regex
if (reCompiled == NULL) {
printf("ERROR: Could not compile '%s': %s\n", aStrRegex, pcreErrorStr);
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double) cpus) { //sanity check for HT
systemUtilization = (long double) cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
return (-1);
}
// Optimize the regex
pcreExtra = pcre_study(reCompiled, 0, &pcreErrorStr);
/* pcre_study() returns NULL for both errors and when it can not optimize the regex. The last argument is how one checks for
errors (it is NULL if everything works, and points to an error string otherwise. */
if (pcreErrorStr != NULL) {
printf("ERROR: Could not study '%s': %s\n", aStrRegex, pcreErrorStr);
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double) cpus) { //sanity check for HT
systemUtilization = (long double) cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
return (-1);
} /* end if */
fp = popen("/var/bin/getProcs", "r"); //open getProcs to list the procs, this returns the cgroup ID and the type
if (fp == NULL) {
perror("/var/bin/getProcs");
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double) cpus) { //sanity check for HT
systemUtilization = (long double) cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pthread_mutex_destroy(&total_lock); //clean up mutex lock
pthread_cond_destroy(&ok_to_add);
return (-1);
}
jobCount = 0;
while (fgets(buff, sizeof(buff), fp) != NULL) {
sscanf(buff, "%[^,],%[^,]", jobs[jobCount].procid, jobs[jobCount].procType);
jobCount++;
}
if (jobCount == 0) {
pthread_join(systr, NULL); //join the system utilization thread
if (systemUtilization > (long double) cpus) {
systemUtilization = (long double) cpus;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%f\n", 0.0);
printf("cgrType1=%f\n", 0.0);
printf("cgrOther=%Lf\n", systemUtilization);
pclose(fp); //close the command
pthread_mutex_destroy(&total_lock); //destroy mutex lock
pthread_cond_destroy(&ok_to_add);
return (0);
}
pthread_t tid[jobCount]; // the thread identifiers, one for each job
for (i = 0; i < jobCount; i++) {
sample_struct *args = malloc(sizeof *args); //allocate memory for passing the argument struct to the threads
args->procid = jobs[i].procid; //assign info in job array to the arg struct
args->procType = jobs[i].procType;
pthread_create(&tid[i], NULL, addSampletoTotal, args); //create the thread
}
pthread_join(systr, NULL); //join the system utilization thread
for (i = 0; i < jobCount; i++) {
pthread_join(tid[i], NULL); //finish threads
}
pthread_mutex_destroy(&total_lock); //destroy mutex lock
pthread_cond_destroy(&ok_to_add);
if (systemUtilization > (long double) cpus) { //sanity check for HT
systemUtilization = (long double) cpus;
}
if (sum2 > (long double) cpus) {
sum2 = (long double) cpus;
}
if (sum1 > (long double) cpus) {
sum1 = (long double) cpus;
}
otherCPU = systemUtilization - sum2 - sum1;
if (otherCPU < 0.0) {
otherCPU = 0.0;
}
printf("sys=%Lf\n", systemUtilization); //print info
printf("cgrType2=%Lf\n", sum2);
printf("cgrType1=%Lf\n", sum1);
printf("cgrOther=%Lf\n", otherCPU);
pclose(fp); //close the command
return (0);
}
int sampleCGroup(char procid[], char procType[]) {
//open the cgroup cpustat file for the job based on the procid arg
char *pidPath = malloc(
strlen("/sys/fs/cgroup/cpuacct/cpuacct.stat/") + strlen(procid) + 1);//+1 for the zero-terminator
if (pidPath == NULL) {
perror("Could not allocate memory for str\n");
return (-1);
} else {
strcpy(pidPath, "/sys/fs/cgroup/cpuacct/"); //create the path to the cgroup info
strcat(pidPath, procid);
strcat(pidPath, "/cpuacct.stat");
}
long double a[2], b[2];
long double loadavg = 0.0;
int i;
FILE *fp;
for (i = 0; i < ITERATIONS; i++) {
fp = fopen(pidPath, "r"); //first sample
if (fp == NULL) {
perror(pidPath);
return (-1);
} else {
if (fscanf(fp, "%*4s %Lf %*6s %Lf", &a[0], &a[1]) != 2) {
perror("Could not parse first cgroup CPU sample!\n");
return (-1);
}
fclose(fp);
sleep(1); //let 1s go by
}
fp = fopen(pidPath, "r"); //second sample
if (fp == NULL) {
perror(pidPath);
return (-1);
} else {
if (fscanf(fp, "%*4s %Lf %*6s %Lf", &b[0], &b[1]) != 2) {
perror("Could not parse second cgroup CPU sample!\n");
return (-1);
}
fclose(fp);
}
loadavg += ((b[0] + b[1]) - (a[0] + a[1])); //delta
}
loadavg = (loadavg / (clk * ITERATIONS)); //calculate the CPU use by delta divided by clock ticks per one second times iterations
free(pidPath); //free the memory allocated to the path variable
//CRITICAL SECTION BEGIN
pthread_mutex_lock(&total_lock); //lock mutex
while (sumFlag == 0) {
pthread_cond_wait(&ok_to_add, &total_lock); //wait on ok to add signal
}
pcreExecRet = pcre_exec(reCompiled,
pcreExtra,
procType,
strlen(procType), // length of string
0, // Start looking at this point
0, // OPTIONS
subStrVec,
30); // Length of subStrVec
if (pcreExecRet > 0) {
sum1 += loadavg;
} else if (pcreExecRet == PCRE_ERROR_NOMATCH) {
sum2 += loadavg;
} else {
sum2 += loadavg; //assume type2 if this fails
perror("Could not determine type!\n");
}
sumFlag = 1;
pthread_cond_signal(&ok_to_add); //signal that it is ok to add
pthread_mutex_unlock(&total_lock); //unlock mutex
//CRITICAL SECTION END
return 0; //success
}
void *addSampletoTotal(void *args) {
sample_struct *actual_args = args; //get the jobs argument struct
sampleCGroup(actual_args->procid, actual_args->procType); //sample the cgroup for the job passed to the thread
free(actual_args); //free the memory allocated to args
pthread_exit(0); //exit thread
}
void *overallUtilization() {
systemUtilization = getSystemUtilization(); //sample the cgroup for the job passed to the thread
pthread_exit(0); //exit thread
}
long double getSystemUtilization(void) {
long double a[3], b[3];
long double loadavg = 0.0;
int i;
FILE *fp;
/* /proc/stat columns:
* 1st column : user = normal processes executing in user mode
* 2nd column : nice = niced processes executing in user mode
* 3rd column : system = processes executing in kernel mode
* 4th column : idle = twiddling thumbs
* 5th column : iowait = waiting for I/O to complete
* 6th column : irq = servicing interrupts
* 7th column : softirq = servicing softirqs
* */
for (i = 0; i < ITERATIONS; i++) {
fp = fopen("/proc/stat", "r"); //first sample
if (fp == NULL) {
perror("/proc/stat");
return (-1);
} else {
if (fscanf(fp, "%*s %Lf %Lf %Lf", &a[0], &a[1], &a[2]) != 3 ) {
perror("Could not parse first system CPU sample!\n");
return (-1);
}
fclose(fp);
sleep(1); //let 1s go by
}
fp = fopen("/proc/stat", "r"); //second sample
if (fp == NULL) {
perror("/proc/stat");
return (-1);
} else {
if (fscanf(fp, "%*s %Lf %Lf %Lf", &b[0], &b[1], &b[2]) != 3) {
perror("Could not parse second system CPU sample!\n");
return (-1);
}
fclose(fp);
}
loadavg += ((b[0] + b[1] + b[2]) - (a[0] + a[1] + a[2])); //delta
}
loadavg = (loadavg / (clk * ITERATIONS)); //calculate the CPU use by delta divided by clock ticks per one second times iterations
return loadavg;
}
int getOSVersion(void) {
FILE *os;
int version, checkVersion, patchLevel;
os = fopen("/etc/SuSE-release", "r"); //first sample
if (os == NULL) {
perror("/etc/SuSE-release");
return (-1);
} else {
if (fscanf(os, "%*s %*s %*s %*s %d %*s VERSION = %d PATCHLEVEL = %d", &version, &checkVersion, &patchLevel) != 3) {
perror("Unable to read SuSE release");
return -1;
}
fclose(os);
}
if (version == checkVersion) {
return version;
} else {
return -1;
}
}
int getCPUCount(void) {
#if defined _SC_NPROCESSORS_ONLN
{
long int nprocs = sysconf (_SC_NPROCESSORS_ONLN);
if (nprocs > 0)
return nprocs;
}
#elif HAVE_SCHED_GETAFFINITY_LIKE_GLIBC /* glibc >= 2.3.4 */
{
cpu_set_t set;
if (sched_getaffinity (0, sizeof (set), &set) == 0)
{
unsigned long count;
# ifdef CPU_COUNT
/* glibc >= 2.6 has the CPU_COUNT macro. */
count = CPU_COUNT (&set);
# else
size_t i;
count = 0;
for (i = 0; i < CPU_SETSIZE; i++)
if (CPU_ISSET (i, &set))
count++;
# endif
if (count > 0)
return count;
}
}
#endif
return 1;
}
In particular I want to be sure this is safe multithreading.