* * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Alchemy\Phrasea\Command\ExpiringRights; use Alchemy\Phrasea\Command\Command; use Alchemy\Phrasea\Model\Manipulator\WebhookEventManipulator; use appbox; use collection; use databox; use Doctrine\DBAL\DBALException; use PDO; use Symfony\Component\Console\Formatter\OutputFormatterStyle; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use function igorw\get_in; class AlertExpiringRightsCommand extends Command { const RIGHT_SHORTENED = 'shortened'; const RIGHT_EXTENDED = 'extended'; const RIGHT_EXPIRING = 'expiring'; /** @var InputInterface $input */ private $input; /** @var OutputInterface $output */ private $output; /** @var appbox $appbox */ private $appbox; /** @var array $databoxes */ private $databoxes; private $now = null; public function configure() { $this->setName("workflow:expiring:run") ->setDescription('alert owners and users of expiring records') ->addOption('dry', null, InputOption::VALUE_NONE, "Dry run (list alerts but don't insert in webhooks).") ->addOption('show-sql', null, InputOption::VALUE_NONE, "Show the selection sql.") ->addOption('dump-webhooks', null, InputOption::VALUE_NONE, "Show the webhooks data.") ->addOption('now', null, InputOption::VALUE_REQUIRED, "for testing : fake the 'today' date (format=YYYYMMDD).") // ->setHelp('') ; } /** * @param InputInterface $input * @param OutputInterface $output * @return int */ protected function doExecute(InputInterface $input, OutputInterface $output) { $this->input = $input; // add cool styles $style = new OutputFormatterStyle('black', 'yellow'); // , array('bold')); $output->getFormatter()->setStyle('warning', $style); $this->output = $output; // sanitize parameters if(($now = $input->getOption('now')) !== null) { if(preg_match("/^[0-9]{8}$/", $now) === 1) { $this->now = $now; } else { $this->output->writeln(sprintf("bad format for 'now' (%s) option (must be YYYYMMDD)", $now)); return -1; } } $this->appbox = $this->container['phraseanet.appbox']; // list databoxes and collections to access by id or by name $this->databoxes = []; foreach ($this->appbox->get_databoxes() as $databox) { $sbas_id = $databox->get_sbas_id(); $sbas_name = $databox->get_dbname(); $this->databoxes[$sbas_id] = [ 'dbox' => $databox, 'collections' => [] ]; $this->databoxes[$sbas_name] = &$this->databoxes[$sbas_id]; // list all collections foreach ($databox->get_collections() as $collection) { $coll_id = $collection->get_coll_id(); $coll_name = $collection->get_name(); $this->databoxes[$sbas_id]['collections'][$coll_id] = $collection; $this->databoxes[$sbas_id]['collections'][$coll_name] = &$this->databoxes[$sbas_id]['collections'][$coll_id]; } } // play jobs $ret = 0; foreach ($this->container['conf']->get(['expiring-rights', 'jobs'], []) as $jobname => &$job) { if($job['active']) { if (!$this->playJob($jobname, $job)) { $this->output->writeln(sprintf("job skipped")); $ret = -1; } } } return $ret; } /** * @param $jobname * @param $job * @return bool * @throws DBALException */ private function playJob($jobname, $job) { $this->output->writeln(sprintf("\n\n======== Playing job \"%s\" ========\n", $jobname)); // ensure that the job syntax is ok if (!$this->sanitizeJob($job)) { return false; } switch ($job['target']) { case "owners": return $this->playJobOwners($jobname, $job); break; case "downloaders": return $this->playJobDownloaders($jobname, $job); break; default: $this->output->writeln(sprintf("alert>bad target \"%s\" (should be \"owners\" or \"downloaders\")\n", $job['target'])); break; } return false; } private function playJobOwners($jobname, $job) { // ensure that the job syntax is ok if (!$this->sanitizeJobOwners($job)) { return false; } if (get_in($job, ['active'], false) === false) { return true; } // actions by api $actions = []; // build sql where clause $wheres = []; // clause on databox ? $d = $job['databox']; if (!is_string($d) && !is_int($d)) { $this->output->writeln(sprintf("bad databox clause")); return false; } if (!array_key_exists($d, $this->databoxes)) { $this->output->writeln(sprintf("unknown databox (%s)", $d)); return false; } // find the sbas_id for the databox of this job /** @var Databox $dbox */ $dbox = $this->databoxes[$d]['dbox']; $sbas_id = $dbox->get_sbas_id(); // filter on collections $collList = []; foreach (get_in($job, ['collection'], []) as $c) { /** @var collection $coll */ if (($coll = get_in($this->databoxes[$sbas_id], ['collections', $c])) !== null) { $collList[] = $dbox->get_connection()->quote($coll->get_coll_id()); } } if (!empty($collList)) { if(count($collList) === 1) { $wheres[] = "r.`coll_id`=" . $collList[0]; } else { $wheres[] = "r.`coll_id` IN(" . join(',', $collList) . ")"; } } // filter on status if (($status = get_in($job, ['status'])) !== null) { $m = preg_replace('/[^0-1]/', 'x', trim($status)); if (strlen($m) == 0 || strlen($m) > 32) { $this->output->writeln(sprintf("invalid status (%s)", $status)); return false; } $mask_xor = str_replace(' ', '0', ltrim(str_replace(['0', 'x'], [' ', ' '], $m))); $mask_and = str_replace(' ', '0', ltrim(str_replace(['x', '0'], [' ', '1'], $m))); if ($mask_xor && $mask_and) { $wheres[] = '((r.`status` ^ 0b' . $mask_xor . ') & 0b' . $mask_and . ') = 0'; } elseif ($mask_xor) { $wheres[] = '(r.`status` ^ 0b' . $mask_xor . ') = 0'; } elseif ($mask_and) { $wheres[] = '(r.`status` & 0b' . $mask_and . ') = 0'; } } // filter on negated set_status if(($set_status = get_in($job, ['set_status'])) !== null) { $m = preg_replace('/[^0-1]/', 'x', trim($set_status)); if (strlen($m) == 0 || strlen($m) > 32) { $this->output->writeln(sprintf("invalid set_status (%s)", $set_status)); return false; } $mask_xor = str_replace(' ', '0', ltrim(str_replace(['0', 'x'], [' ', ' '], $m))); $mask_and = str_replace(' ', '0', ltrim(str_replace(['x', '0'], [' ', '1'], $m))); if ($mask_xor && $mask_and) { $wheres[] = '((r.`status` ^ 0b' . $mask_xor . ') & 0b' . $mask_and . ') != 0'; } elseif ($mask_xor) { $wheres[] = '(r.`status` ^ 0b' . $mask_xor . ') != 0'; } elseif ($mask_and) { $wheres[] = '(r.`status` & 0b' . $mask_and . ') != 0'; } // set status by actions api foreach (str_split(strrev($m)) as $bit => $val) { if ($val == '0' || $val == '1') { if(!array_key_exists('status', $actions)) { $actions['status'] = []; } $actions['status'][] = ['bit' => $bit, 'state' => $val=='1']; } } } // filter on negated set_collection if($set_collection = get_in($job, ['set_collection'])) { /** @var collection $coll */ if (($coll = get_in($this->databoxes[$sbas_id], ['collections', $set_collection])) !== null) { $wheres[] = "r.`coll_id`!=" . $dbox->get_connection()->quote($coll->get_coll_id()); // change collection by actions api $actions['base_id'] = $coll->get_coll_id(); } else { $this->output->writeln(sprintf("unknown collection (%s)", $c)); return false; } } // clause on expiration date // the NOW() can be faked for testing $expire_field_id = null; foreach ($dbox->get_meta_structure() as $dbf) { if ($dbf->get_name() === $job['expire_field']) { $expire_field_id = $dbf->get_id(); break; } } if ($expire_field_id === null) { $this->output->writeln(sprintf("unknown field (%s)", $job['expire_field'])); return false; } $now = $this->now === null ? "NOW()" : $this->appbox->get_connection()->quote($this->now); $delta = (int)$job['prior_notice']; if ($delta > 0) { $value = "(`expire`+INTERVAL " . $delta . " DAY)"; } elseif ($delta < 0) { $value = "(`expire`-INTERVAL " . -$delta . " DAY)"; } else { $value = "`expire`"; } $sql_where = count($wheres) > 0 ? " WHERE " . join(" AND ", $wheres) : ""; $sql = "SELECT t.*, DATEDIFF(`expire`, " . $now . ") AS 'expire_in' FROM (\n" . " SELECT r.`record_id`, CAST(CAST(m.`value` AS DATE) AS DATETIME) AS `expire`\n" . " FROM `record` AS r INNER JOIN `metadatas` AS m\n" . " ON m.`record_id`=r.`record_id` AND m.`meta_struct_id`=" . $dbox->get_connection()->quote($expire_field_id, PDO::PARAM_INT) . "\n" . $sql_where . ") AS t WHERE " . $now . ">=" . $value; if ($this->input->getOption('show-sql')) { $this->output->writeln(sprintf("sql: %s", $sql)); } // play sql $records = []; $stmt = $dbox->get_connection()->prepare($sql); $stmt->execute(); if ($this->input->getOption('dry')) { $this->output->writeln(sprintf("dry mode: updates on %d records NOT executed", $stmt->rowCount())); } if(empty($actions)) { $this->output->writeln(sprintf("no actions defined")); return false; } while ($row = $stmt->fetch(\PDO::FETCH_ASSOC)) { $record = $dbox->get_record($row['record_id']); $row['collection'] = $record->getCollection()->get_name(); $row['title'] = $record->get_title(); $records[] = $row; // update by api if(!empty($actions)) { $js = json_encode($actions); $this->output->writeln(sprintf("on databox(%s), record(%s) :%s js=%s \n", $sbas_id, $row['record_id'], $this->input->getOption('dry') ? " [DRY]" : '', $js )); if(!$this->input->getOption('dry')) { $record->setMetadatasByActions(json_decode($js, false)); // false: setMetadatasByActions expects object, not array ! } } } $stmt->closeCursor(); $n_records = count($records); if($n_records === 0) { return true; } foreach ($job['alerts'] as $alert) { $method = get_in($alert, ['method']); switch($method) { case "webhook": $payload = [ 'job' => $jobname, 'delta' => $delta, 'email' => $alert['recipient'], 'sbas_id' => $dbox->get_sbas_id(), 'base' => $dbox->get_viewname(), 'records' => $records ]; if ($this->input->getOption('dry')) { $this->output->writeln( sprintf( "dry run : webhook about %d record(s) to [%s] NOT inserted", $n_records, join(',', $alert['recipient']) ) ); } else { $this->output->writeln( sprintf( "Inserting webhook about %d record(s) to [%s]", $n_records, join(',', $alert['recipient']) ) ); /** @var WebhookEventManipulator $manipulator */ $webhookManipulator = $this->container['manipulator.webhook-event']; $webhookManipulator->create( "Expiring.Rights.Records", "Expiring.Rights", $payload ); } if($this->input->getOption("dump-webhooks")) { $this->output->writeln("webhook: \"Expiring.Rights.Records\", \"Expiring.Rights\"\npayload="); $this->output->writeln(json_encode($payload, JSON_PRETTY_PRINT)); } break; default: $this->output->writeln(sprintf("bad or undefined alert method (%s), ignored", $method)); break; } } return true; } private function playJobDownloaders($jobname, $job) { // ensure that the job syntax is ok if (!$this->sanitizeJobDownloaders($job)) { return false; } if (get_in($job, ['active'], false) === false) { return true; } // build sql where clause $wheres = [ '`job` = ' . $this->appbox->get_connection()->quote($jobname) ]; // clause on databox $databox = $job['databox']; if(!is_string($databox) && !is_int($databox)) { $this->output->writeln(sprintf("bad databox clause")); return false; } if (!array_key_exists($databox, $this->databoxes)) { $this->output->writeln(sprintf("unknown databox (%s)", $job['databox'])); return false; } // find the sbas_id for the databox of this job /** @var Databox $dbox */ $dbox = $this->databoxes[$databox]['dbox']; $sbas_id = $dbox->get_sbas_id(); $wheres[] = "(`sbas_id`=" . $dbox->get_connection()->quote($sbas_id) . ")"; $wheres[] = "(ISNULL(`alerted`) OR !ISNULL(`new_expire`))"; // clause on expiration date // the NOW() can be faked for testing $now = $this->now === null ? "NOW()" : $this->appbox->get_connection()->quote($this->now); $delta = (int)$job['prior_notice']; if ($delta > 0) { $value = "(real_expire+INTERVAL " . $delta . " DAY)"; } elseif ($delta < 0) { $value = "(real_expire-INTERVAL " . -$delta . " DAY)"; } else { $value = "real_expire"; } // build SELECT sql $sql_where = join("\n AND", $wheres); $sql = "SELECT t.*, DATEDIFF(real_expire, " . $now . ") AS 'expire_in' FROM (\n" . " SELECT *, COALESCE(`expire`, `new_expire`) AS `real_expire` FROM `ExpiringRights`\n" . " WHERE " . $sql_where . "\n" . ") AS t\n" . "WHERE !ISNULL(real_expire) AND " . $now . ">=" . $value . "\n" . "ORDER BY `user_id`"; if($this->input->getOption('show-sql')) { $this->output->writeln(sprintf("%s", $sql)); } // play sql $usersById = []; $n_records = [ "all" => 0, self::RIGHT_EXPIRING => 0, self::RIGHT_EXTENDED => 0, self::RIGHT_SHORTENED => 0 ]; $stmt = $this->appbox->get_connection()->prepare($sql); $stmt->execute(); while ($row = $stmt->fetch(\PDO::FETCH_ASSOC)) { $type = null; if($row['new_expire'] !== null) { if($row['expire'] === null || $row['alerted'] === null) { // same thing as new expire $type = self::RIGHT_EXPIRING; } elseif($row['new_expire'] < $row['expire']) { // shortened $type = self::RIGHT_SHORTENED; } elseif($row['new_expire'] > $row['expire']) { // extended $type = self::RIGHT_EXTENDED; } else { // same date (should not happen ?) $type = self::RIGHT_EXPIRING; } } else { // new_expire === null if($row['alerted'] === null) { $type = self::RIGHT_EXPIRING; } else { // nothing to do (should not happen) } } if($type !== null) { $user_id = $row['user_id']; if(!array_key_exists($user_id, $usersById)) { $usersById[$user_id] = [ 'user_id' => $user_id, 'email' => $row['email'], 'records' => [] ]; } unset($row['id'], $row['job'], $row['user_id'], $row['email'], $row['sbas_id'], $row['base']); $row['type'] = $type; $usersById[$user_id]['records'][] = $row; $n_records[$type]++; } $n_records['all']++; } $stmt->closeCursor(); $payload = [ 'job' => $jobname, 'sbas_id' => $sbas_id, 'base' => $dbox->get_viewname(), 'delta' => $delta, 'users' => array_values($usersById) ]; unset($usersById); if($n_records['all'] > 0 && !$this->input->getOption('dry')) { // build UPDATE sql $sql = "UPDATE `ExpiringRights` SET expire=COALESCE(new_expire, expire), new_expire=NULL, alerted=" . $now . "\n" . " WHERE " . $sql_where; $stmt = $this->appbox->get_connection()->prepare($sql); $stmt->execute([]); $this->appbox->get_connection()->exec($sql); $stmt->closeCursor(); } $this->output->writeln( sprintf( "%d records selected (%s expiring, %s shortened, %s extended)", $n_records['all'], $n_records[self::RIGHT_EXPIRING], $n_records[self::RIGHT_SHORTENED], $n_records[self::RIGHT_EXTENDED] ) ); if ($n_records['all'] > 0 ) { foreach ($job['alerts'] as $alert) { switch ($alert['method']) { case 'webhook': if ($this->input->getOption('dry')) { $this->output->writeln( sprintf( "dry run : webhook about %d record(s) NOT inserted", $n_records['all'] ) ); } else { $this->output->writeln( sprintf( "webhook about %d record(s) inserted", $n_records['all'] ) ); /** @var WebhookEventManipulator $manipulator */ $webhookManipulator = $this->container['manipulator.webhook-event']; $webhookManipulator->create( "Expiring.Rights.Downloaded", "Expiring.Rights", $payload ); } if($this->input->getOption("dump-webhooks")) { $this->output->writeln("webhook: \"Expiring.Rights.Downloaded\", \"Expiring.Rights\"\npayload="); $this->output->writeln(json_encode($payload, JSON_PRETTY_PRINT)); } break; default : $this->output->writeln(sprintf("unknown alert method \"%s\"", $alert['method'])); break; } } } return true; } // ================================================================================================ /** * check that a yaml->php block is ok against rules * * @param array $object * @param array $rules * @return bool */ private function sanitize(array $object, array $rules) { $object_ok = true; foreach ($rules as $key => $fsanitize) { if (!array_key_exists($key, $object) || !($fsanitize($object[$key]))) { $this->output->writeln(sprintf("missing or bad format setting \"%s\"", $key)); $object_ok = false; } } return $object_ok; } /** * check that a job (first level block) is ok * * @param $job * @return bool */ private function sanitizeJob($job) { return $this->sanitize( $job, [ 'active' => "is_bool", 'target' => function($v) {return in_array($v, ['owners', 'downloaders']);}, 'databox' => "is_string", 'prior_notice' => 'is_int', 'expire_field' => 'is_string', 'alerts' => 'is_array' ] ); } private function sanitizeJobOwners($job) { if(get_in($job, ['set_status']) === null && get_in($job, ['set_collection']) === null) { $this->output->writeln(sprintf("missing 'set_status' or 'set_collection' setting")); return false; } return true; } private function sanitizeJobDownloaders($job) { return true; // sanitizeJob is enough } }