use new ControlList::add() argument
[ardour.git] / libs / pbd / system_exec.cc
1 /*
2     Copyright (C) 2010 Paul Davis
3     Copyright (C) 2010-2014 Robin Gareus <robin@gareus.org>
4     Copyright (C) 2005-2008 Lennart Poettering
5
6     This program is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     This program is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with this program; if not, write to the Free Software
18     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19
20 */
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <errno.h>
25 #include <unistd.h>
26
27 #include <assert.h>
28
29 #ifndef COMPILER_MSVC
30 #include <dirent.h>
31 #endif
32
33 #ifdef PLATFORM_WINDOWS
34 #include <windows.h>
35 #else
36 #include <fcntl.h>
37 #include <sys/types.h>
38 #include <sys/wait.h>
39 #include <sys/socket.h>
40 #include <sys/ioctl.h>
41 #include <sys/time.h>
42 #include <sys/resource.h>
43 #endif
44
45
46 #include "pbd/system_exec.h"
47
48 using namespace std;
49 using namespace PBD;
50
51 static void * interposer_thread (void *arg);
52 static void close_fd (int& fd) { if (fd >= 0) ::close (fd); fd = -1; }
53
54 #ifndef PLATFORM_WINDOWS
55 /*
56  * This function was part of libasyncns.
57  * LGPL v2.1
58  * Copyright 2005-2008 Lennart Poettering
59  */
60 static int close_allv(const int except_fds[]) {
61         struct rlimit rl;
62         int fd;
63
64 #ifdef __linux__
65
66         DIR *d;
67
68         assert(except_fds);
69
70         if ((d = opendir("/proc/self/fd"))) {
71                 struct dirent *de;
72
73                 while ((de = readdir(d))) {
74                         int found;
75                         long l;
76                         char *e = NULL;
77                         int i;
78
79                         if (de->d_name[0] == '.')
80                                         continue;
81
82                         errno = 0;
83                         l = strtol(de->d_name, &e, 10);
84                         if (errno != 0 || !e || *e) {
85                                 closedir(d);
86                                 errno = EINVAL;
87                                 return -1;
88                         }
89
90                         fd = (int) l;
91
92                         if ((long) fd != l) {
93                                 closedir(d);
94                                 errno = EINVAL;
95                                 return -1;
96                         }
97
98                         if (fd < 3)
99                                 continue;
100
101                         if (fd == dirfd(d))
102                                 continue;
103
104                         found = 0;
105                         for (i = 0; except_fds[i] >= 0; i++)
106                                 if (except_fds[i] == fd) {
107                                                 found = 1;
108                                                 break;
109                                 }
110
111                         if (found) continue;
112
113                         if (close(fd) < 0) {
114                                 int saved_errno;
115
116                                 saved_errno = errno;
117                                 closedir(d);
118                                 errno = saved_errno;
119
120                                 return -1;
121                         }
122                 }
123
124                 closedir(d);
125                 return 0;
126         }
127
128 #endif
129
130         if (getrlimit(RLIMIT_NOFILE, &rl) < 0)
131                 return -1;
132
133         for (fd = 0; fd < (int) rl.rlim_max; fd++) {
134                 int i;
135
136                 if (fd <= 3)
137                                 continue;
138
139                 for (i = 0; except_fds[i] >= 0; i++)
140                         if (except_fds[i] == fd)
141                                 continue;
142
143                 if (close(fd) < 0 && errno != EBADF)
144                         return -1;
145         }
146
147         return 0;
148 }
149 #endif /* not on windows */
150
151
152 SystemExec::SystemExec (std::string c, std::string a)
153         : cmd(c)
154 {
155         pthread_mutex_init(&write_lock, NULL);
156         thread_active=false;
157         pid = 0;
158         pin[1] = -1;
159         nicelevel = 0;
160         envp = NULL;
161         argp = NULL;
162 #ifdef PLATFORM_WINDOWS
163         stdinP[0] = stdinP[1] = INVALID_HANDLE_VALUE;
164         stdoutP[0] = stdoutP[1] = INVALID_HANDLE_VALUE;
165         stderrP[0] = stderrP[1] = INVALID_HANDLE_VALUE;
166 #endif
167         make_envp();
168         make_argp(a);
169 }
170
171 SystemExec::SystemExec (std::string c, char **a)
172         : cmd(c) , argp(a)
173 {
174         pthread_mutex_init(&write_lock, NULL);
175         thread_active=false;
176         pid = 0;
177         pin[1] = -1;
178         nicelevel = 0;
179         envp = NULL;
180 #ifdef PLATFORM_WINDOWS
181         stdinP[0] = stdinP[1] = INVALID_HANDLE_VALUE;
182         stdoutP[0] = stdoutP[1] = INVALID_HANDLE_VALUE;
183         stderrP[0] = stderrP[1] = INVALID_HANDLE_VALUE;
184         make_wargs(a);
185 #endif
186         make_envp();
187 }
188
189 SystemExec::~SystemExec ()
190 {
191         terminate ();
192         if (envp) {
193                 for (int i=0;envp[i];++i) {
194                   free(envp[i]);
195                 }
196                 free (envp);
197         }
198         if (argp) {
199                 for (int i=0;argp[i];++i) {
200                   free(argp[i]);
201                 }
202                 free (argp);
203         }
204 #ifdef PLATFORM_WINDOWS
205         if (w_args) free(w_args);
206 #endif
207         pthread_mutex_destroy(&write_lock);
208 }
209
210 static void *
211 interposer_thread (void *arg) {
212         SystemExec *sex = static_cast<SystemExec *>(arg);
213         sex->output_interposer();
214         pthread_exit(0);
215         return 0;
216 }
217
218 #ifdef PLATFORM_WINDOWS /* Windows Process */
219
220 /* HELPER FUNCTIONS */
221
222 static void create_pipe (HANDLE *pipe, bool in) {
223         SECURITY_ATTRIBUTES secAtt = { sizeof( SECURITY_ATTRIBUTES ), NULL, TRUE };
224         HANDLE tmpHandle;
225         if (in) {
226                 if (!CreatePipe(&pipe[0], &tmpHandle, &secAtt, 1024 * 1024)) return;
227                 if (!DuplicateHandle(GetCurrentProcess(), tmpHandle, GetCurrentProcess(), &pipe[1], 0, FALSE, DUPLICATE_SAME_ACCESS)) return;
228         } else {
229                 if (!CreatePipe(&tmpHandle, &pipe[1], &secAtt, 1024 * 1024)) return;
230                 if (!DuplicateHandle(GetCurrentProcess(), tmpHandle, GetCurrentProcess(), &pipe[0], 0, FALSE, DUPLICATE_SAME_ACCESS)) return;
231         }
232         CloseHandle(tmpHandle);
233 }
234
235 static void destroy_pipe (HANDLE pipe[2]) {
236         if (pipe[0] != INVALID_HANDLE_VALUE) {
237                 CloseHandle(pipe[0]);
238                 pipe[0] = INVALID_HANDLE_VALUE;
239         }
240         if (pipe[1] != INVALID_HANDLE_VALUE) {
241                 CloseHandle(pipe[1]);
242                 pipe[1] = INVALID_HANDLE_VALUE;
243         }
244 }
245
246 static BOOL CALLBACK my_terminateApp(HWND hwnd, LPARAM procId)
247 {
248         DWORD currentProcId = 0;
249         GetWindowThreadProcessId(hwnd, &currentProcId);
250         if (currentProcId == (DWORD)procId)
251                 PostMessage(hwnd, WM_CLOSE, 0, 0);
252         return TRUE;
253 }
254
255 /* PROCESS API */
256
257 void
258 SystemExec::make_envp() {
259         ;/* environemt is copied over with CreateProcess(...,env=0 ,..) */
260 }
261
262 void
263 SystemExec::make_wargs(char **a) {
264         std::string wa = cmd;
265         if (cmd[0] != '"' && cmd[cmd.size()] != '"' && strchr(cmd.c_str(), ' ')) { wa = "\"" + cmd + "\""; }
266         std::replace(cmd.begin(), cmd.end(), '/', '\\' );
267         char **tmp = a;
268         while (tmp && *tmp) {
269                 wa.append(" \"");
270                 wa.append(*tmp);
271                 wa.append("\"");
272                 tmp++;
273         }
274         w_args = strdup(wa.c_str());
275 }
276
277 void
278 SystemExec::make_argp(std::string args) {
279         std::string wa = cmd;
280         if (cmd[0] != '"' && cmd[cmd.size()] != '"' && strchr(cmd.c_str(), ' ')) { wa = "\"" + cmd + "\""; }
281         std::replace(cmd.begin(), cmd.end(), '/', '\\' );
282         wa.append(" ");
283         wa.append(args);
284         w_args = strdup(wa.c_str());
285 }
286
287 void
288 SystemExec::terminate ()
289 {
290         ::pthread_mutex_lock(&write_lock);
291         if (pid) {
292                 /* terminate */
293                 EnumWindows(my_terminateApp, (LPARAM)pid->dwProcessId);
294                 PostThreadMessage(pid->dwThreadId, WM_CLOSE, 0, 0);
295
296                 /* kill ! */
297                 TerminateProcess(pid->hProcess, 0xf291);
298
299                 CloseHandle(pid->hThread);
300                 CloseHandle(pid->hProcess);
301                 destroy_pipe(stdinP);
302                 destroy_pipe(stdoutP);
303                 destroy_pipe(stderrP);
304                 delete pid;
305                 pid=0;
306         }
307         ::pthread_mutex_unlock(&write_lock);
308 }
309
310 int
311 SystemExec::wait (int options)
312 {
313         while (is_running()) {
314                 WaitForSingleObject(pid->hProcess, INFINITE);
315                 Sleep(20);
316         }
317         return 0;
318 }
319
320 bool
321 SystemExec::is_running ()
322 {
323         return pid?true:false;
324 }
325
326 int
327 SystemExec::start (int stderr_mode)
328 {
329         char* working_dir = 0;
330
331         if (pid) { return 0; }
332
333         pid = new PROCESS_INFORMATION;
334         memset(pid, 0, sizeof(PROCESS_INFORMATION));
335
336         create_pipe(stdinP, true);
337         create_pipe(stdoutP, false);
338
339         if (stderr_mode == 2) {
340         /* merge stout & stderr */
341                 DuplicateHandle(GetCurrentProcess(), stdoutP[1], GetCurrentProcess(), &stderrP[1], 0, TRUE, DUPLICATE_SAME_ACCESS);
342         } else if (stderr_mode == 1) {
343                 //TODO read/flush this pipe or close it...
344                 create_pipe(stderrP, false);
345         } else {
346                 //TODO: keep stderr of this process mode.
347         }
348
349         bool success = false;
350         STARTUPINFOA startupInfo = { sizeof( STARTUPINFO ), 0, 0, 0,
351                 (unsigned long)CW_USEDEFAULT, (unsigned long)CW_USEDEFAULT,
352                 (unsigned long)CW_USEDEFAULT, (unsigned long)CW_USEDEFAULT,
353                 0, 0, 0,
354                 STARTF_USESTDHANDLES,
355                 0, 0, 0,
356                 stdinP[0], stdoutP[1], stderrP[1]
357         };
358
359         success = CreateProcess(0, w_args,
360                 0, 0, /* bInheritHandles = */ TRUE,
361                 (CREATE_NO_WINDOW&0) | CREATE_UNICODE_ENVIRONMENT | (0&CREATE_NEW_CONSOLE),
362                 /*env = */ 0,
363                 working_dir,
364                 &startupInfo, pid);
365
366         if (stdinP[0] != INVALID_HANDLE_VALUE) {
367                 CloseHandle(stdinP[0]);
368                 stdinP[0] = INVALID_HANDLE_VALUE;
369         }
370         if (stdoutP[1] != INVALID_HANDLE_VALUE) {
371                 CloseHandle(stdoutP[1]);
372                 stdoutP[1] = INVALID_HANDLE_VALUE;
373         }
374         if (stderrP[1] != INVALID_HANDLE_VALUE) {
375                 CloseHandle(stderrP[1]);
376                 stderrP[1] = INVALID_HANDLE_VALUE;
377         }
378
379         if (!success) {
380                 CloseHandle(pid->hThread);
381                 CloseHandle(pid->hProcess);
382                 destroy_pipe(stdinP);
383                 destroy_pipe(stdoutP);
384                 destroy_pipe(stderrP);
385                 delete pid;
386                 pid=0;
387                 return -1;
388         }
389
390         int rv = pthread_create(&thread_id_tt, NULL, interposer_thread, this);
391         thread_active=true;
392         if (rv) {
393                 thread_active=false;
394                 terminate();
395                 return -2;
396         }
397         Sleep(20);
398         return 0;
399 }
400
401 void
402 SystemExec::output_interposer()
403 {
404         DWORD bytesRead = 0;
405         char data[BUFSIZ];
406 #if 0 // untested code to set up nonblocking
407         unsigned long l = 1;
408         ioctlsocket(stdoutP[0], FIONBIO, &l);
409 #endif
410         while(1) {
411 #if 0 // for non-blocking pipes..
412                 DWORD bytesAvail = 0;
413                 PeekNamedPipe(stdoutP[0], 0, 0, 0, &bytesAvail, 0);
414                 if (bytesAvail < 1) {Sleep(500); printf("N/A\n"); continue;}
415 #endif
416                 if (stdoutP[0] == INVALID_HANDLE_VALUE) break;
417                 if (!ReadFile(stdoutP[0], data, BUFSIZ, &bytesRead, 0)) break;
418                 if (bytesRead < 1) continue; /* actually not needed; but this is safe. */
419                 data[bytesRead] = 0;
420                 ReadStdout(data, bytesRead);/* EMIT SIGNAL */
421         }
422         Terminated();/* EMIT SIGNAL */
423 }
424
425 void
426 SystemExec::close_stdin()
427 {
428         if (stdinP[0]!= INVALID_HANDLE_VALUE)  FlushFileBuffers(stdinP[0]);
429         if (stdinP[1]!= INVALID_HANDLE_VALUE)  FlushFileBuffers(stdinP[1]);
430         Sleep(200);
431         destroy_pipe(stdinP);
432 }
433
434 int
435 SystemExec::write_to_stdin(std::string d, size_t len)
436 {
437         const char *data;
438         DWORD r,c;
439
440         ::pthread_mutex_lock(&write_lock);
441
442         data=d.c_str();
443         if (len == 0) {
444                 len=(d.length());
445         }
446         c=0;
447         while (c < len) {
448                 if (!WriteFile(stdinP[1], data+c, len-c, &r, NULL)) {
449                         if (GetLastError() == 0xE8 /*NT_STATUS_INVALID_USER_BUFFER*/) {
450                                 Sleep(100);
451                                 continue;
452                         } else {
453                                 fprintf(stderr, "SYSTEM-EXEC: stdin write error.\n");
454                                 break;
455                         }
456                 }
457                 c += r;
458         }
459         ::pthread_mutex_unlock(&write_lock);
460         return c;
461 }
462
463
464 /* end windows process */
465 #else
466 /* UNIX/POSIX process */
467
468 extern char **environ;
469 void
470 SystemExec::make_envp() {
471         int i=0;
472         envp = (char **) calloc(1, sizeof(char*));
473         /* copy current environment */
474         for (i=0;environ[i];++i) {
475           envp[i] = strdup(environ[i]);
476           envp = (char **) realloc(envp, (i+2) * sizeof(char*));
477         }
478         envp[i] = 0;
479 }
480
481 void
482 SystemExec::make_argp(std::string args) {
483         int argn = 1;
484         char *cp1;
485         char *cp2;
486
487         char *carg = strdup(args.c_str());
488
489         argp = (char **) malloc((argn + 1) * sizeof(char *));
490         if (argp == (char **) 0) {
491                 free(carg);
492                 return; // FATAL
493         }
494
495         argp[0] = strdup(cmd.c_str());
496
497         /* TODO: quotations and escapes
498          * http://stackoverflow.com/questions/1511797/convert-string-to-argv-in-c
499          *
500          * It's actually not needed. All relevant invocations specify 'argp' directly.
501          * Only 'xjadeo -L -R' uses this function and that uses neither quotations
502          * nor arguments with white-space.
503          */
504         for (cp1 = cp2 = carg; *cp2 != '\0'; ++cp2) {
505                 if (*cp2 == ' ') {
506                         *cp2 = '\0';
507                         argp[argn++] = strdup(cp1);
508                         cp1 = cp2 + 1;
509             argp = (char **) realloc(argp, (argn + 1) * sizeof(char *));
510                 }
511         }
512         if (cp2 != cp1) {
513                 argp[argn++] = strdup(cp1);
514                 argp = (char **) realloc(argp, (argn + 1) * sizeof(char *));
515         }
516         argp[argn] = (char *) 0;
517         free(carg);
518 }
519
520
521
522 void
523 SystemExec::terminate ()
524 {
525         ::pthread_mutex_lock(&write_lock);
526
527         /* close stdin in an attempt to get the child to exit cleanly.
528          */
529
530         close_stdin();
531
532         if (pid) {
533                 ::usleep(50000);
534                 sched_yield();
535                 wait(WNOHANG);
536         }
537
538         /* if pid is non-zero, the child task is still executing (i.e. it did
539          * not exit in response to stdin being closed). try to kill it.
540          */
541         
542         if (pid) {
543                 ::kill(pid, SIGTERM);
544                 usleep(50000);
545                 sched_yield();
546                 wait(WNOHANG);
547         }
548
549         /* if pid is non-zero, the child task is STILL executing after being
550          * sent SIGTERM. Act tough ... send SIGKILL
551          */
552
553         if (pid) {
554                 ::fprintf(stderr, "Process is still running! trying SIGKILL\n");
555                 ::kill(pid, SIGKILL);
556         }
557
558         wait();
559         if (thread_active) pthread_join(thread_id_tt, NULL);
560         thread_active = false;
561         ::pthread_mutex_unlock(&write_lock);
562 }
563
564 int
565 SystemExec::wait (int options)
566 {
567         int status=0;
568         int ret;
569
570         if (pid==0) return -1;
571
572         ret = waitpid (pid, &status, options);
573
574         if (ret == pid) {
575                 if (WEXITSTATUS(status) || WIFSIGNALED(status)) {
576                         pid=0;
577                 }
578         } else {
579                 if (ret != 0) {
580                         if (errno == ECHILD) {
581                                 /* no currently running children, reset pid */
582                                 pid=0;
583                         }
584                 } /* else the process is still running */
585         }
586         return status;
587 }
588
589 bool
590 SystemExec::is_running ()
591 {
592         int status=0;
593         if (pid==0) return false;
594         if (::waitpid(pid, &status, WNOHANG)==0) return true;
595         return false;
596 }
597
598 int
599 SystemExec::start (int stderr_mode)
600 {
601         if (is_running()) {
602                 return 0; // mmh what to return here?
603         }
604         int r;
605
606         if (::pipe(pin) < 0 || ::pipe(pout) < 0 || ::pipe(pok) < 0) {
607                 /* Something unexpected went wrong creating a pipe. */
608                 return -1;
609         }
610
611         r = ::fork();
612         if (r < 0) {
613                 /* failed to fork */
614                 return -2;
615         }
616
617         if (r > 0) {
618                 /* main */
619                 pid=r;
620
621                 /* check if execve was successful. */
622                 close_fd(pok[1]);
623                 char buf;
624                 for ( ;; ) {
625                         ssize_t n = ::read(pok[0], &buf, 1 );
626                         if ( n==1 ) {
627                                 /* child process returned from execve */
628                                 pid=0;
629                                 close_fd(pok[0]);
630                                 close_fd(pin[1]);
631                                 close_fd(pin[0]);
632                                 close_fd(pout[1]);
633                                 close_fd(pout[0]);
634                                 pin[1] = -1;
635                                 return -3;
636                         } else if ( n==-1 ) {
637                                  if ( errno==EAGAIN || errno==EINTR )
638                                          continue;
639                         }
640                         break;
641                 }
642                 close_fd(pok[0]);
643                 /* child started successfully */
644
645 #if 0
646 /* use fork for output-interposer
647  * it will run in a separated process
648  */
649                 /* catch stdout thread */
650                 r = ::fork();
651                 if (r < 0) {
652                         // failed to fork
653                         terminate();
654                         return -2;
655                 }
656                 if (r == 0) {
657                         /* 2nd child process - catch stdout */
658                         close_fd(pin[1]);
659                         close_fd(pout[1]);
660                         output_interposer();
661                         exit(0);
662                 }
663                 close_fd(pout[1]);
664                 close_fd(pin[0]);
665                 close_fd(pout[0]);
666 #else /* use pthread */
667                 close_fd(pout[1]);
668                 close_fd(pin[0]);
669                 int rv = pthread_create(&thread_id_tt, NULL, interposer_thread, this);
670
671                 thread_active=true;
672                 if (rv) {
673                         thread_active=false;
674                         terminate();
675                         return -2;
676                 }
677 #endif
678                 return 0; /* all systems go - return to main */
679         }
680
681         /* child process - exec external process */
682         close_fd(pok[0]);
683         ::fcntl(pok[1], F_SETFD, FD_CLOEXEC);
684
685         close_fd(pin[1]);
686         if (pin[0] != STDIN_FILENO) {
687           ::dup2(pin[0], STDIN_FILENO);
688         }
689         close_fd(pin[0]);
690         close_fd(pout[0]);
691         if (pout[1] != STDOUT_FILENO) {
692                 ::dup2(pout[1], STDOUT_FILENO);
693         }
694
695         if (stderr_mode == 2) {
696                 /* merge STDERR into output */
697                 if (pout[1] != STDERR_FILENO) {
698                         ::dup2(pout[1], STDERR_FILENO);
699                 }
700         } else if (stderr_mode == 1) {
701                 /* ignore STDERR */
702                 ::close(STDERR_FILENO);
703         } else {
704                 /* keep STDERR */
705         }
706
707         if (pout[1] != STDOUT_FILENO && pout[1] != STDERR_FILENO) {
708                 close_fd(pout[1]);
709         }
710
711         if (nicelevel !=0) {
712                 ::nice(nicelevel);
713         }
714
715 #if 0
716         /* chdir to executable dir */
717         char *directory;
718         directory = strdup(cmd.c_str());
719         if (strrchr(directory, '/') != (char *) 0) {
720                 ::chdir(directory);
721         }
722         free(directory);
723 #endif
724
725 #ifdef HAVE_SIGSET
726         sigset(SIGPIPE, SIG_DFL);
727 #else
728         signal(SIGPIPE, SIG_DFL);
729 #endif
730
731         int good_fds[1] = { -1 };
732         close_allv(good_fds);
733
734         ::execve(argp[0], argp, envp);
735         /* if we reach here something went wrong.. */
736         char buf = 0;
737         (void) ::write(pok[1], &buf, 1 );
738         close_fd(pok[1]);
739         exit(-1);
740         return -1;
741 }
742
743 void
744 SystemExec::output_interposer()
745 {
746         int rfd=pout[0];
747         char buf[BUFSIZ];
748         ssize_t r;
749         unsigned long l = 1;
750
751         ioctl(rfd, FIONBIO, &l); // set non-blocking I/O
752
753         for (;fcntl(rfd, F_GETFL)!=-1;) {
754                 r = read(rfd, buf, sizeof(buf));
755                 if (r < 0 && (errno == EINTR || errno == EAGAIN)) {
756                         ::usleep(1000);
757                         continue;
758                 }
759                 if (r <= 0) {
760                         break;
761                 }
762                 buf[r]=0;
763                 std::string rv = std::string(buf,r); // TODO: check allocation strategy
764                 ReadStdout(rv, r);/* EMIT SIGNAL */
765         }
766         Terminated();/* EMIT SIGNAL */
767 }
768
769 void
770 SystemExec::close_stdin()
771 {
772         if (pin[1]<0) return;
773         close_fd(pin[0]);
774         close_fd(pin[1]);
775         close_fd(pout[0]);
776         close_fd(pout[1]);
777 }
778
779 int
780 SystemExec::write_to_stdin(std::string d, size_t len)
781 {
782         const char *data;
783         ssize_t r;
784         size_t c;
785         ::pthread_mutex_lock(&write_lock);
786
787         data=d.c_str();
788         if (len == 0) {
789                 len=(d.length());
790         }
791         c=0;
792         while (c < len) {
793                 for (;;) {
794                         r=::write(pin[1], data+c, len-c);
795                         if (r < 0 && (errno == EINTR || errno == EAGAIN)) {
796                                 sleep(1);
797                                 continue;
798                         }
799                         if ((size_t) r != (len-c)) {
800                                 ::pthread_mutex_unlock(&write_lock);
801                                 return c;
802                         }
803                         break;
804                 }
805                 c += r;
806         }
807         fsync(pin[1]);
808         ::pthread_mutex_unlock(&write_lock);
809         return c;
810 }
811
812 #endif // end UNIX process