diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..64be2bf --- /dev/null +++ b/.gitignore @@ -0,0 +1,14 @@ +*.pyc +*.sw? +*~ +/.coverage +/.coverage.* +.eggs/ +__pycache__ +*.egg-info/ +build/ +dist/ +version.txt +/.hypothesis/ +/.tox/ +.mypy_cache/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..69b3349 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,40 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.4.0 + hooks: + - id: trailing-whitespace + - id: flake8 + - id: check-json + - id: check-yaml + +- repo: https://github.com/codespell-project/codespell + rev: v1.16.0 + hooks: + - id: codespell + +- repo: local + hooks: + - id: mypy + name: mypy + entry: mypy + args: [swh] + pass_filenames: false + language: system + types: [python] + +- repo: https://github.com/python/black + rev: 19.10b0 + hooks: + - id: black + +# unfortunately, we are far from being able to enable this... +# - repo: https://github.com/PyCQA/pydocstyle.git +# rev: 4.0.0 +# hooks: +# - id: pydocstyle +# name: pydocstyle +# description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. +# entry: pydocstyle --convention=google +# language: python +# types: [python] + diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..2d0a34a --- /dev/null +++ b/AUTHORS @@ -0,0 +1,3 @@ +Copyright (C) 2015 The Software Heritage developers + +See http://www.softwareheritage.org/ for more information. diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..0ad22b5 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,78 @@ +# Software Heritage Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as Software +Heritage contributors and maintainers pledge to making participation in our +project and our community a harassment-free experience for everyone, regardless +of age, body size, disability, ethnicity, sex characteristics, gender identity +and expression, level of experience, education, socio-economic status, +nationality, personal appearance, race, religion, or sexual identity and +orientation. + +## Our Standards + +Examples of behavior that contributes to creating a positive environment +include: + +* Using welcoming and inclusive language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or + advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic + address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable +behavior and are expected to take appropriate and fair corrective action in +response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct, or to ban temporarily or +permanently any contributor for other behaviors that they deem inappropriate, +threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies within all project spaces, and it also applies when +an individual is representing the project or its community in public spaces. +Examples of representing a project or community include using an official +project e-mail address, posting via an official social media account, or acting +as an appointed representative at an online or offline event. Representation of +a project may be further defined and clarified by project maintainers. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting the project team at `conduct@softwareheritage.org`. All +complaints will be reviewed and investigated and will result in a response that +is deemed necessary and appropriate to the circumstances. The project team is +obligated to maintain confidentiality with regard to the reporter of an +incident. Further details of specific enforcement policies may be posted +separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good +faith may face temporary or permanent repercussions as determined by other +members of the project's leadership. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, +available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html + +[homepage]: https://www.contributor-covenant.org + +For answers to common questions about this code of conduct, see +https://www.contributor-covenant.org/faq diff --git a/CONTRIBUTORS b/CONTRIBUTORS new file mode 100644 index 0000000..7c3f962 --- /dev/null +++ b/CONTRIBUTORS @@ -0,0 +1 @@ +Ishan Bhanuka diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..94a9ed0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,674 @@ + GNU GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU General Public License is a free, copyleft license for +software and other kinds of works. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +the GNU General Public License is intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. We, the Free Software Foundation, use the +GNU General Public License for most of our software; it applies also to +any other work released this way by its authors. You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + To protect your rights, we need to prevent others from denying you +these rights or asking you to surrender the rights. Therefore, you have +certain responsibilities if you distribute copies of the software, or if +you modify it: responsibilities to respect the freedom of others. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must pass on to the recipients the same +freedoms that you received. You must make sure that they, too, receive +or can get the source code. And you must show them these terms so they +know their rights. + + Developers that use the GNU GPL protect your rights with two steps: +(1) assert copyright on the software, and (2) offer you this License +giving you legal permission to copy, distribute and/or modify it. + + For the developers' and authors' protection, the GPL clearly explains +that there is no warranty for this free software. For both users' and +authors' sake, the GPL requires that modified versions be marked as +changed, so that their problems will not be attributed erroneously to +authors of previous versions. + + Some devices are designed to deny users access to install or run +modified versions of the software inside them, although the manufacturer +can do so. This is fundamentally incompatible with the aim of +protecting users' freedom to change the software. The systematic +pattern of such abuse occurs in the area of products for individuals to +use, which is precisely where it is most unacceptable. Therefore, we +have designed this version of the GPL to prohibit the practice for those +products. If such problems arise substantially in other domains, we +stand ready to extend this provision to those domains in future versions +of the GPL, as needed to protect the freedom of users. + + Finally, every program is threatened constantly by software patents. +States should not allow patents to restrict development and use of +software on general-purpose computers, but in those that do, we wish to +avoid the special danger that patents applied to a free program could +make it effectively proprietary. To prevent this, the GPL assures that +patents cannot be used to render the program non-free. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Use with the GNU Affero General Public License. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU Affero General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the special requirements of the GNU Affero General Public License, +section 13, concerning interaction through a network will apply to the +combination as such. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + +Also add information on how to contact you by electronic and paper mail. + + If the program does terminal interaction, make it output a short +notice like this when it starts in an interactive mode: + + Copyright (C) + This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, your program's commands +might be different; for a GUI interface, you would use an "about box". + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU GPL, see +. + + The GNU General Public License does not permit incorporating your program +into proprietary programs. If your program is a subroutine library, you +may consider it more useful to permit linking proprietary applications with +the library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. But first, please read +. diff --git a/LICENSE.Celery b/LICENSE.Celery new file mode 100644 index 0000000..06221a2 --- /dev/null +++ b/LICENSE.Celery @@ -0,0 +1,54 @@ +Copyright (c) 2015-2016 Ask Solem & contributors. All rights reserved. +Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved. +Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual contributors. All rights reserved. + +Celery is licensed under The BSD License (3 Clause, also known as +the new BSD license). The license is an OSI approved Open Source +license and is GPL-compatible(1). + +The license text can also be found here: +http://www.opensource.org/licenses/BSD-3-Clause + +License +======= + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Ask Solem, nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS +BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +Documentation License +===================== + +The documentation portion of Celery (the rendered contents of the +"docs" directory of a software distribution or checkout) is supplied +under the "Creative Commons Attribution-ShareAlike 4.0 +International" (CC BY-SA 4.0) License as described by +http://creativecommons.org/licenses/by-sa/4.0/ + +Footnotes +========= +(1) A GPL-compatible license makes it possible to + combine Celery with other software that is released + under the GPL, it does not mean that we're distributing + Celery under the GPL license. The BSD license, unlike the GPL, + let you distribute a modified version without making your + changes open source. diff --git a/PKG-INFO b/PKG-INFO index dc35e9d..9025937 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,30 +1,30 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.1.1 +Version: 0.2.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/bin/swh-worker-control b/bin/swh-worker-control deleted file mode 100755 index b6ff4e7..0000000 --- a/bin/swh-worker-control +++ /dev/null @@ -1,284 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -from fnmatch import fnmatch -from operator import itemgetter -import os -import sys - -import click - - -def list_remote_workers(inspect): - ping_replies = inspect.ping() - if not ping_replies: - return {} - workers = list(sorted(ping_replies)) - ret = {} - - for worker_name in workers: - if not worker_name.startswith("celery@"): - print("Unsupported worker: %s" % worker_name, file=sys.stderr) - continue - type, host = worker_name[len("celery@") :].split(".", 1) - worker = { - "name": worker_name, - "host": host, - "type": type, - } - ret[worker_name] = worker - - return ret - - -def make_filters(filter_host, filter_type): - """Parse the filters and create test functions""" - - def include(field, value): - def filter(worker, field=field, value=value): - return fnmatch(worker[field], value) - - return filter - - def exclude(field, value): - def filter(worker, field=field, value=value): - return not fnmatch(worker[field], value) - - return filter - - filters = [] - for host in filter_host: - if host.startswith("-"): - filters.append(exclude("host", host[1:])) - else: - filters.append(include("host", host)) - - for type_ in filter_type: - if type_.startswith("-"): - filters.append(exclude("type", type_[1:])) - else: - filters.append(include("type", type_)) - - return filters - - -def filter_workers(workers, filters): - """Filter workers according to the set criteria""" - return { - name: worker - for name, worker in workers.items() - if all(check(worker) for check in filters) - } - - -def get_clock_offsets(workers, inspect): - """Add a clock_offset entry for each worker""" - err_msg = "Could not get monotonic clock for {worker}" - - t = datetime.datetime.now(tz=datetime.timezone.utc) - for worker, clock in inspect._request("monotonic").items(): - monotonic = clock.get("monotonic") - if monotonic is None: - monotonic = 0 - click.echo(err_msg.format(worker=worker), err=True) - dt = datetime.timedelta(seconds=monotonic) - workers[worker]["clock_offset"] = t - dt - - -def worker_to_wallclock(worker, monotonic): - """Convert a monotonic timestamp from a worker to a wall clock time""" - dt = datetime.timedelta(seconds=monotonic) - return worker["clock_offset"] + dt - - -@click.group() -@click.option( - "--instance-config", - metavar="CONFIG", - default=None, - help="Use this worker instance configuration", -) -@click.option( - "--host", metavar="HOSTNAME_FILTER", multiple=True, help="Filter by hostname" -) -@click.option( - "--type", metavar="WORKER_TYPE_FILTER", multiple=True, help="Filter by worker type" -) -@click.option( - "--timeout", - metavar="TIMEOUT", - type=float, - default=1.0, - help="Timeout for remote control communication", -) -@click.option("--debug/--no-debug", default=False, help="Turn on debugging") -@click.pass_context -def cli(ctx, debug, timeout, instance_config, host, type): - """Manage the Software Heritage workers - - Filters support globs; a filter starting with a "-" excludes the - corresponding values. - - """ - if instance_config: - os.environ["SWH_WORKER_INSTANCE"] = instance_config - - from swh.scheduler.celery_backend.config import app - - full_inspect = app.control.inspect(timeout=timeout) - - workers = filter_workers( - list_remote_workers(full_inspect), make_filters(host, type) - ) - ctx.obj["workers"] = workers - - destination = list(workers) - inspect = app.control.inspect(destination=destination, timeout=timeout) - ctx.obj["inspect"] = inspect - - get_clock_offsets(workers, inspect) - - ctx.obj["control"] = app.control - ctx.obj["destination"] = destination - ctx.obj["timeout"] = timeout - ctx.obj["debug"] = debug - - -@cli.command() -@click.pass_context -def list_workers(ctx): - """List the currently running workers""" - workers = ctx.obj["workers"] - - for worker_name, worker in sorted(workers.items()): - click.echo("{type} alive on {host}".format(**worker)) - - if not workers: - sys.exit(2) - - -@cli.command() -@click.pass_context -def list_tasks(ctx): - """List the tasks currently running on workers""" - task_template = ( - "{worker} {name}" - "[{id} " - "started={started:%Y-%m-%mT%H:%M:%S} " - "pid={worker_pid}] {args} {kwargs}" - ) - inspect = ctx.obj["inspect"] - workers = ctx.obj["workers"] - active = inspect.active() - - if not active: - click.echo("No reply from workers", err=True) - sys.exit(2) - - has_tasks = False - for worker_name, tasks in sorted(active.items()): - worker = workers[worker_name] - if not tasks: - click.echo("No active tasks on {name}".format(**worker), err=True) - for task in sorted(tasks, key=itemgetter("time_start")): - task["started"] = worker_to_wallclock(worker, task["time_start"]) - click.echo(task_template.format(worker=worker_name, **task)) - has_tasks = True - - if not has_tasks: - sys.exit(2) - - -@cli.command() -@click.pass_context -def list_queues(ctx): - """List all the queues currently enabled on the workers""" - inspect = ctx.obj["inspect"] - active = inspect.active_queues() - - if not active: - click.echo("No reply from workers", err=True) - sys.exit(2) - - has_queues = False - for worker_name, queues in sorted(active.items()): - queues = sorted(queue["name"] for queue in queues) - if queues: - click.echo( - "{worker} {queues}".format(worker=worker_name, queues=" ".join(queues)) - ) - has_queues = True - else: - click.echo("No queues for {worker}".format(worker=worker_name), err=True) - - if not has_queues: - sys.exit(2) - - -@cli.command() -@click.option("--noop", is_flag=True, default=False, help="Do not proceed") -@click.argument("queues", nargs=-1) -@click.pass_context -def remove_queues(ctx, noop, queues): - """Cancel the queue for the given workers""" - msg_template = "Canceling queue {queue} on worker {worker}{noop}" - - inspect = ctx.obj["inspect"] - control = ctx.obj["control"] - timeout = ctx.obj["timeout"] - active = inspect.active_queues() - - if not queues: - queues = ["*"] - - if not active: - click.echo("No reply from workers", err=True) - sys.exit(2) - - for worker, active_queues in sorted(active.items()): - for queue in sorted(active_queues, key=itemgetter("name")): - if any(fnmatch(queue["name"], name) for name in queues): - msg = msg_template.format( - queue=queue["name"], worker=worker, noop=" (noop)" if noop else "" - ) - click.echo(msg, err=True) - if not noop: - control.cancel_consumer( - queue["name"], destination=[worker], timeout=timeout - ) - - -@cli.command() -@click.option("--noop", is_flag=True, default=False, help="Do not proceed") -@click.argument("queues", nargs=-1) -@click.pass_context -def add_queues(ctx, noop, queues): - """Start the queue for the given workers""" - msg_template = "Starting queue {queue} on worker {worker}{noop}" - - control = ctx.obj["control"] - timeout = ctx.obj["timeout"] - workers = ctx.obj["workers"] - - if not workers: - click.echo("No reply from workers", err=True) - sys.exit(2) - - for worker in sorted(workers): - for queue in queues: - msg = msg_template.format( - queue=queue, worker=worker, noop=" (noop)" if noop else "" - ) - click.echo(msg, err=True) - if not noop: - ret = control.add_consumer(queue, destination=[worker], timeout=timeout) - print(ret) - - -if __name__ == "__main__": - cli(obj={}) diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..eb6de3d --- /dev/null +++ b/conftest.py @@ -0,0 +1,6 @@ +from hypothesis import settings + +# define tests profile. Full documentation is at: +# https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles +settings.register_profile("fast", max_examples=5, deadline=5000) +settings.register_profile("slow", max_examples=20, deadline=5000) diff --git a/data/README.md b/data/README.md new file mode 100644 index 0000000..762c332 --- /dev/null +++ b/data/README.md @@ -0,0 +1,23 @@ +# Install/Update template + +Install the `task` template in elasticsearch: + +``` shell +INSTANCE=http://something:9200 +TEMPLATE_NAME=template_swh_tasks +curl -i -H'Content-Type: application/json' -d@./elastic-template.json -XPUT ${INSTANCE}/_template/${TEMPLATE_NAME} +``` + +# Update index settings + +The index setup is fixed and defined on the template settings basis. + +When that setup needs to change, we need to update both the template +and the existing indices. + +To update index settings: + +``` shell +INDEX_NAME=swh-tasks-2017-11 +curl -i -H'Content-Type: application/json' -d@./update-index-settings.json -XPUT ${INSTANCE}/${INDEX_NAME}/_settings +``` diff --git a/data/elastic-template.json b/data/elastic-template.json new file mode 100644 index 0000000..3446bb7 --- /dev/null +++ b/data/elastic-template.json @@ -0,0 +1,53 @@ +{ + "order": 0, + "index_patterns": ["swh-tasks-*"], + "settings": { + "index": { + "codec": "best_compression", + "refresh_interval": "1s", + "number_of_shards": 1 + } + }, + "mappings" : { + "task" : { + "_source" : { "enabled": true}, + "properties": { + "task_id": {"type": "double"}, + "task_policy": {"type": "text"}, + "task_status": {"type": "text"}, + "task_run_id": {"type": "double"}, + "arguments": { + "type": "object", + "properties" : { + "args": { + "type": "nested", + "dynamic": false + }, + "kwargs": { + "type": "text" + } + } + }, + "type": {"type": "text"}, + "backend_id": {"type": "text"}, + "metadata": { + "type": "object", + "enabled" : false + }, + "scheduled": { + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" + }, + "started": { + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" + }, + "ended": { + "type": "date", + "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis" + } + } + } + }, + "aliases": {} +} diff --git a/data/update-index-settings.json b/data/update-index-settings.json new file mode 100644 index 0000000..ce3deab --- /dev/null +++ b/data/update-index-settings.json @@ -0,0 +1,5 @@ +{ + "index": { + "refresh_interval": "1s" + } +} diff --git a/docs/.gitignore b/docs/.gitignore new file mode 100644 index 0000000..58a761e --- /dev/null +++ b/docs/.gitignore @@ -0,0 +1,3 @@ +_build/ +apidoc/ +*-stamp diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..c30c50a --- /dev/null +++ b/docs/Makefile @@ -0,0 +1 @@ +include ../../swh-docs/Makefile.sphinx diff --git a/docs/_static/.placeholder b/docs/_static/.placeholder new file mode 100644 index 0000000..e69de29 diff --git a/docs/_templates/.placeholder b/docs/_templates/.placeholder new file mode 100644 index 0000000..e69de29 diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..190deb7 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1 @@ +from swh.docs.sphinx.conf import * # NoQA diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..5935f97 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,168 @@ +.. _swh-scheduler: + +Software Heritage - Job scheduler +================================= + +Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., +listing a forge, loading new stuff from a Git repository) and one-off +activities (e.g., loading a specific version of a source package). + + +Description +----------- + +This module provides a scheduler service for the Software Heritage platform. It +allows to define tasks with a number of properties. In this documentation, we +will call these swh-tasks to prevent confusion. These swh-tasks are stored in +a database, and a HTTP-based RPC service is provided to create or find existing +swh-task declarations. + +The execution model for these swh-tasks is using Celery. Thus, each swh-task +type defined in the database must have a (series of) celery worker capable of +executing such a swh-task. + +Then a number of services are also provided to manage the scheduling of these +swh-tasks as Celery tasks. + +The `scheduler-runner` service is a daemon that regularly looks for swh-tasks +in the database that should be scheduled. For each of the selected swh-task, a +Celery task is instantiated. + +The `scheduler-listener` service is a daemon that listen to the Celery event +bus and maintain scheduled swh-tasks workflow status. + + +SWH Task Model +~~~~~~~~~~~~~~ + +Each swh-task-type is the declaration of a type of swh-task. Each swh-task-type +have the following fields: + +- `type`: Name of the swh-task type; can be anything but must be unique, +- `description`: Human-readable task description +- `backend_name`: Name of the task in the job-running backend, +- `default_interval`: Default interval for newly scheduled tasks, +- `min_interval`: Minimum interval between two runs of a task, +- `max_interval`: Maximum interval between two runs of a task, +- `backoff_factor`: Adjustment factor for the backoff between two task runs, +- `max_queue_length`: Maximum length of the queue for this type of tasks, +- `num_retries`: Default number of retries on transient failures, +- `retry_delay`: Retry delay for the task, + +Existing swh-task-types can be listed using the `swh scheduler` command line +tool:: + + $ swh scheduler task-type list + Known task types: + check-deposit: + Pre-checking deposit step before loading into swh archive + index-fossology-license: + Fossology license indexer task + load-git: + Update an origin of type git + load-hg: + Update an origin of type mercurial + +You can see the details of a swh-task-type:: + + $ swh scheduler task-type list -v -t load-git + Known task types: + load-git: swh.loader.git.tasks.UpdateGitRepository + Update an origin of type git + interval: 64 days, 0:00:00 [12:00:00, 64 days, 0:00:00] + backoff_factor: 2.0 + max_queue_length: 5000 + num_retries: None + retry_delay: None + + +An swh-task is an 'instance' of such a swh-task-type, and consists in: + +- `arguments`: Arguments passed to the underlying job scheduler, +- `next_run`: Next run of this task should be run on or after that time, +- `current_interval`: Interval between two runs of this task, taking into + account the backoff factor, +- `policy`: Whether the task is "one-shot" or "recurring", +- `retries_left`: Number of "short delay" retries of the task in case of + transient failure, +- `priority`: Priority of the task, +- `id`: Internal task identifier, +- `type`: References task_type table, +- `status`: Task status ( among "next_run_not_scheduled", "next_run_scheduled", + "completed", "disabled"). + +So a swh-task consist basically in: + +- a set of parameters defining how the scheduling of the + swh-task is handled, +- a set of parameters to specify the retry policy in case of transient failure + upon execution, +- a set of parameters that defines the job to be done (`bakend_name` + + `arguments`). + + +You can list pending swh-tasks (tasks that are to be scheduled ASAP):: + + $ swh scheduler task list-pending load-git --limit 2 + Found 1 load-git tasks + + Task 9052257 + Next run: 15 days ago (2019-06-25 10:35:10+00:00) + Interval: 2 days, 0:00:00 + Type: load-git + Policy: recurring + Args: + 'https://github.com/turtl/mobile' + Keyword args: + + +Looking for existing swh-task can be done via the command line tool:: + + $ swh scheduler task list -t load-hg --limit 2 + Found 2 tasks + + Task 168802702 + Next run: in 4 hours (2019-07-10 17:56:48+00:00) + Interval: 1 day, 0:00:00 + Type: load-hg + Policy: recurring + Status: next_run_not_scheduled + Priority: + Args: + 'https://bitbucket.org/kepung/pypy' + Keyword args: + + Task 169800445 + Next run: in a month (2019-08-10 17:54:24+00:00) + Interval: 32 days, 0:00:00 + Type: load-hg + Policy: recurring + Status: next_run_not_scheduled + Priority: + Args: + 'https://bitbucket.org/lunixbochs/pypy-1' + Keyword args: + + + +Writing a new worker for a new swh-task-type +-------------------------------------------- + +When you want to add a new swh-task-type, you need a celery worker backend +capable of executing this new task-type instances. + +Celery workers for swh-scheduler based tasks should be started using the Celery +app in `swh.scheduler.celery_config`. This later, among other things, provides +a loading mechanism for task types based on pkg_resources declared plugins under +the `[swh.workers]` entry point. + +TODO: add a fully working example of a dumb task. + + +Reference Documentation +----------------------- + +.. toctree:: + :maxdepth: 2 + + /apidoc/swh.scheduler diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..328b66e --- /dev/null +++ b/mypy.ini @@ -0,0 +1,33 @@ +[mypy] +namespace_packages = True +warn_unused_ignores = True + + +# 3rd party libraries without stubs (yet) + +[mypy-arrow.*] +ignore_missing_imports = True + +[mypy-celery.*] +ignore_missing_imports = True + +[mypy-elasticsearch.*] +ignore_missing_imports = True + +[mypy-kombu.*] +ignore_missing_imports = True + +[mypy-pika.*] +ignore_missing_imports = True + +[mypy-pkg_resources.*] +ignore_missing_imports = True + +[mypy-psycopg2.*] +ignore_missing_imports = True + +[mypy-pytest.*] +ignore_missing_imports = True + +[mypy-pytest_postgresql.*] +ignore_missing_imports = True diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..f1cacd8 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +norecursedirs = docs +filterwranings = + once diff --git a/requirements.txt b/requirements.txt index 7e5d4c6..f2c29f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,19 +1,21 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html arrow +attrs +attrs-strict celery >= 4 # Not a direct dependency: missing from celery 4.4.4 future >= 0.18.0 Click elasticsearch > 5.4 flask pika >= 1.1.0 psycopg2 pyyaml vcversioner setuptools # test dependencies # hypothesis diff --git a/setup.py b/setup.py index d47fc8b..fe21851 100755 --- a/setup.py +++ b/setup.py @@ -1,73 +1,72 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.scheduler", description="Software Heritage Scheduler", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DSCH/", packages=find_packages(), - scripts=["bin/swh-worker-control"], - setup_requires=["vcversioner"], + setup_requires=["setuptools-scm"], + use_scm_version=True, install_requires=parse_requirements() + parse_requirements("swh"), extras_require={"testing": parse_requirements("test")}, - vcversioner={}, include_package_data=True, entry_points=""" [console_scripts] swh-scheduler=swh.scheduler.cli:main [swh.cli.subcommands] scheduler=swh.scheduler.cli:cli """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-scheduler", "Documentation": "https://docs.softwareheritage.org/devel/swh-scheduler/", }, ) diff --git a/sql/.gitignore b/sql/.gitignore new file mode 100644 index 0000000..d501764 --- /dev/null +++ b/sql/.gitignore @@ -0,0 +1,3 @@ +*-stamp +autodoc/ +swh-scheduler.dump diff --git a/sql/Makefile b/sql/Makefile new file mode 100644 index 0000000..cc836d5 --- /dev/null +++ b/sql/Makefile @@ -0,0 +1,73 @@ +# Depends: postgresql-client, postgresql-autodoc + +DBNAME = softwareheritage-scheduler-dev +DOCDIR = autodoc + +SQL_SCHEMA = 30-swh-schema.sql +SQL_FUNC = 40-swh-func.sql +SQL_DATA = 50-swh-data.sql +SQL_INDEXES = 60-swh-indexes.sql +SQLS = $(SQL_SCHEMA) $(SQL_FUNC) $(SQL_DATA) $(SQL_INDEXES) +SQL_FILES = $(abspath $(addprefix $(CURDIR)/../swh/scheduler/sql/,$(SQLS))) + +PSQL_BIN = psql +PSQL_FLAGS = --echo-all -X -v ON_ERROR_STOP=1 +PSQL = $(PSQL_BIN) $(PSQL_FLAGS) + +PIFPAF=$(findstring postgresql://,$(PIFPAF_URLS)) + +all: + +createdb: createdb-stamp +createdb-stamp: $(SQL_FILES) +ifeq ($(PIFPAF),) + -dropdb $(DBNAME) +endif + createdb $(DBNAME) +ifeq ($(PIFPAF),) + touch $@ +else + rm -f $@ +endif + +filldb: filldb-stamp +filldb-stamp: createdb-stamp + cat $(SQL_FILES) | $(PSQL) $(DBNAME) +ifeq ($(PIFPAF),) + touch $@ +else + rm -f $@ +endif + +dropdb: + -dropdb $(DBNAME) + +dumpdb: swh-scheduler.dump +swh-scheduler.dump: filldb-stamp + pg_dump -Fc $(DBNAME) > $@ + +$(DOCDIR): + test -d $(DOCDIR)/ || mkdir $(DOCDIR) + +doc: autodoc-stamp $(DOCDIR)/swh-scheduler.pdf +autodoc-stamp: filldb-stamp $(DOCDIR) + postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh-scheduler + cp -a $(DOCDIR)/swh-scheduler.dot $(DOCDIR)/swh-scheduler.dot.orig +ifeq ($(PIFPAF),) + touch $@ +else + rm -f $@ +endif + +$(DOCDIR)/swh-scheduler.pdf: $(DOCDIR)/swh-scheduler.dot autodoc-stamp + dot -T pdf $< > $@ +$(DOCDIR)/swh-scheduler.svg: $(DOCDIR)/swh-scheduler.dot autodoc-stamp + dot -T svg $< > $@ + +clean: + rm -rf *-stamp $(DOCDIR)/ + +distclean: clean dropdb + rm -f swh-scheduler.dump + +.PHONY: all initdb createdb dropdb doc clean diff --git a/sql/updates/02.sql b/sql/updates/02.sql new file mode 100644 index 0000000..04fc040 --- /dev/null +++ b/sql/updates/02.sql @@ -0,0 +1,66 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 01 +-- to_version: 02 +-- description: Allow mass-creation of task runs + +begin; + +insert into dbversion (version, release, description) + values (2, now(), 'Work In Progress'); + +create or replace function swh_scheduler_mktemp_task_run () + returns void + language sql +as $$ + create temporary table tmp_task_run ( + like task_run excluding indexes + ) on commit drop; + alter table tmp_task_run + drop column id, + drop column status; +$$; + +comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling'; + +create or replace function swh_scheduler_schedule_task_run_from_temp () + returns void + language plpgsql +as $$ +begin + insert into task_run (task, backend_id, metadata, scheduled, status) + select task, backend_id, metadata, scheduled, 'scheduled' + from tmp_task_run; + return; +end; +$$; + +create or replace function swh_scheduler_start_task_run (backend_id text, + metadata jsonb default '{}'::jsonb, + ts timestamptz default now()) + returns task_run + language sql +as $$ + update task_run + set started = ts, + status = 'started', + metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata + where task_run.backend_id = swh_scheduler_start_task_run.backend_id + returning *; +$$; + +create or replace function swh_scheduler_end_task_run (backend_id text, + status task_run_status, + metadata jsonb default '{}'::jsonb, + ts timestamptz default now()) + returns task_run + language sql +as $$ + update task_run + set ended = ts, + status = swh_scheduler_end_task_run.status, + metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata + where task_run.backend_id = swh_scheduler_end_task_run.backend_id + returning *; +$$; + +commit; diff --git a/sql/updates/03.sql b/sql/updates/03.sql new file mode 100644 index 0000000..5ec51d5 --- /dev/null +++ b/sql/updates/03.sql @@ -0,0 +1,26 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 02 +-- to_version: 03 +-- description: Fix a bug in handling default intervals for task types + +begin; + +insert into dbversion (version, release, description) + values (3, now(), 'Work In Progress'); + +create or replace function swh_scheduler_create_tasks_from_temp () + returns setof task + language plpgsql +as $$ +begin + return query + insert into task (type, arguments, next_run, status, current_interval) + select type, arguments, next_run, 'next_run_not_scheduled', + (select default_interval from task_type tt where tt.type = tmp_task.type) + from tmp_task + returning task.*; +end; +$$; + + +commit; diff --git a/sql/updates/04.sql b/sql/updates/04.sql new file mode 100644 index 0000000..a051388 --- /dev/null +++ b/sql/updates/04.sql @@ -0,0 +1,53 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 03 +-- to_version: 04 +-- description: Add a maximum queue length to the task types in the scheduler + +begin; + +insert into dbversion (version, release, description) +values (4, now(), 'Work In Progress'); + +alter table task_type add column max_queue_length bigint; +comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks'; + + +drop function swh_scheduler_peek_ready_tasks (timestamptz, bigint); +drop function swh_scheduler_grab_ready_tasks (timestamptz, bigint); + +create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(), + num_tasks bigint default NULL) + returns setof task + language sql + stable +as $$ +select * from task + where next_run <= ts + and type = task_type + and status = 'next_run_not_scheduled' + order by next_run + limit num_tasks; +$$; + +create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), + num_tasks bigint default NULL) + returns setof task + language sql +as $$ + update task + set status='next_run_scheduled' + from ( + select id from task + where next_run <= ts + and type = task_type + and status='next_run_not_scheduled' + order by next_run + limit num_tasks + for update skip locked + ) next_tasks + where task.id = next_tasks.id + returning task.*; +$$; + + +commit; diff --git a/sql/updates/05.sql b/sql/updates/05.sql new file mode 100644 index 0000000..b55a533 --- /dev/null +++ b/sql/updates/05.sql @@ -0,0 +1,144 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 04 +-- to_version: 05 +-- description: Add reccurrence logic for temporary failures and one-shot tasks + +alter type task_status add value if not exists 'completed' before 'disabled'; +alter type task_run_status add value if not exists 'permfailed' after 'failed'; + +begin; + +insert into dbversion (version, release, description) +values (5, now(), 'Work In Progress'); + +alter table task_type add column num_retries bigint; +alter table task_type add column retry_delay interval; + +comment on column task_type.num_retries is 'Default number of retries on transient failures'; +comment on column task_type.retry_delay is 'Retry delay for the task'; + +create type task_policy as enum ('recurring', 'oneshot'); +comment on type task_policy is 'Recurrence policy of the given task'; + +alter table task add column policy task_policy not null default 'recurring'; +alter table task add column retries_left bigint not null default 0; + +comment on column task.policy is 'Whether the task is one-shot or recurring'; +comment on column task.retries_left is 'The number of "short delay" retries of the task in case of ' + 'transient failure'; + + +create or replace function swh_scheduler_mktemp_task () + returns void + language sql +as $$ + create temporary table tmp_task ( + like task excluding indexes + ) on commit drop; + alter table tmp_task + drop column id, + drop column current_interval, + drop column status, + alter column policy drop not null, + alter column retries_left drop not null; +$$; + +comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; + +create or replace function swh_scheduler_create_tasks_from_temp () + returns setof task + language plpgsql +as $$ +begin + return query + insert into task (type, arguments, next_run, status, current_interval, policy, retries_left) + select type, arguments, next_run, 'next_run_not_scheduled', + (select default_interval from task_type tt where tt.type = tmp_task.type), + coalesce(policy, 'recurring'), + coalesce(retries_left, (select num_retries from task_type tt where tt.type = tmp_task.type), 0) + from tmp_task + returning task.*; +end; +$$; + +comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; + +drop trigger update_interval_on_task_end on task_run; +drop function swh_scheduler_compute_new_task_interval (text, interval, task_run_status) cascade; +drop function swh_scheduler_update_task_interval () cascade; + +create or replace function swh_scheduler_update_task_on_task_end () + returns trigger + language plpgsql +as $$ +declare + cur_task task%rowtype; + cur_task_type task_type%rowtype; + adjustment_factor float; + new_interval interval; +begin + select * from task where id = new.task into cur_task; + select * from task_type where type = cur_task.type into cur_task_type; + + case + when new.status = 'permfailed' then + update task + set status = 'disabled' + where id = cur_task.id; + when new.status in ('eventful', 'uneventful') then + case + when cur_task.policy = 'oneshot' then + update task + set status = 'completed' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + if new.status = 'uneventful' then + adjustment_factor := 1/cur_task_type.backoff_factor; + else + adjustment_factor := 1/cur_task_type.backoff_factor; + end if; + new_interval := greatest( + cur_task_type.min_interval, + least( + cur_task_type.max_interval, + adjustment_factor * cur_task.current_interval)); + update task + set status = 'next_run_not_scheduled', + next_run = now() + new_interval, + current_interval = new_interval, + retries_left = coalesce(cur_task_type.num_retries, 0) + where id = cur_task.id; + end case; + else -- new.status in 'failed', 'lost' + if cur_task.retries_left > 0 then + update task + set status = 'next_run_not_scheduled', + next_run = now() + cur_task_type.retry_delay, + retries_left = cur_task.retries_left - 1 + where id = cur_task.id; + else -- no retries left + case + when cur_task.policy = 'oneshot' then + update task + set status = 'disabled' + where id = cur_task.id; + when cur_task.policy = 'recurring' then + update task + set status = 'next_run_not_scheduled', + next_run = now() + cur_task.current_interval, + retries_left = coalesce(cur_task_type.num_retries, 0) + where id = cur_task.id; + end case; + end if; -- retries + end case; + return null; +end; +$$; + +create trigger update_task_on_task_end + after update of status on task_run + for each row + when (new.status NOT IN ('scheduled', 'started')) + execute procedure swh_scheduler_update_task_on_task_end (); + +commit; diff --git a/sql/updates/06.sql b/sql/updates/06.sql new file mode 100644 index 0000000..8aa1d9f --- /dev/null +++ b/sql/updates/06.sql @@ -0,0 +1,23 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 05 +-- to_version: 06 +-- description: relax constraints on intervals for one-shot tasks + +begin; + +insert into dbversion (version, release, description) + values (6, now(), 'Work In Progress'); + + +alter table task_type + alter column default_interval drop not null, + alter column min_interval drop not null, + alter column max_interval drop not null, + alter column backoff_factor drop not null; + +alter table task + alter column current_interval drop not null, + add constraint task_check check (policy <> 'recurring' or current_interval is not null); + +commit; + diff --git a/sql/updates/07.sql b/sql/updates/07.sql new file mode 100644 index 0000000..6082e0a --- /dev/null +++ b/sql/updates/07.sql @@ -0,0 +1,54 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 06 +-- to_version: 07 +-- description: Archive 'oneshot' and disabled 'recurring' tasks (status = 'disabled') + +insert into dbversion (version, release, description) +values (7, now(), 'Work In Progress'); + +create type task_record as ( + task_id bigint, + task_policy task_policy, + task_status task_status, + task_run_id bigint, + arguments jsonb, + type text, + backend_id text, + metadata jsonb, + scheduled timestamptz, + started timestamptz, + ended timestamptz +); + +create index task_run_id_asc_idx on task_run(task asc, ended asc); + +create or replace function swh_scheduler_task_to_archive( + ts timestamptz, last_id bigint default -1, lim bigint default 10) + returns setof task_record + language sql stable +as $$ + select t.id as task_id, t.policy as task_policy, + t.status as task_status, tr.id as task_run_id, + t.arguments, t.type, tr.backend_id, tr.metadata, + tr.scheduled, tr.started, tr.ended + from task_run tr inner join task t on tr.task=t.id + where ((t.policy = 'oneshot' and t.status ='completed') or + (t.policy = 'recurring' and t.status ='disabled')) and + tr.ended < ts and + t.id > last_id + order by tr.task, tr.ended + limit lim; +$$; + +comment on function swh_scheduler_task_to_archive is 'Read archivable tasks function'; + +create or replace function swh_scheduler_delete_archive_tasks( + task_ids bigint[]) + returns void + language sql +as $$ + delete from task_run where task in (select * from unnest(task_ids)); + delete from task where id in (select * from unnest(task_ids)); +$$; + +comment on function swh_scheduler_delete_archive_tasks is 'Clean up archived tasks function'; diff --git a/sql/updates/08.sql b/sql/updates/08.sql new file mode 100644 index 0000000..b9ac8fb --- /dev/null +++ b/sql/updates/08.sql @@ -0,0 +1,51 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 07 +-- to_version: 08 +-- description: Improve task to archive filtering function + +insert into dbversion (version, release, description) +values (8, now(), 'Work In Progress'); + +drop function swh_scheduler_task_to_archive(timestamptz, bigint, bigint); + +create or replace function swh_scheduler_task_to_archive( + ts_after timestamptz, ts_before timestamptz, last_id bigint default -1, + lim bigint default 10) + returns setof task_record + language sql stable +as $$ + select t.id as task_id, t.policy as task_policy, + t.status as task_status, tr.id as task_run_id, + t.arguments, t.type, tr.backend_id, tr.metadata, + tr.scheduled, tr.started, tr.ended + from task_run tr inner join task t on tr.task=t.id + where ((t.policy = 'oneshot' and t.status ='completed') or + (t.policy = 'recurring' and t.status ='disabled')) and + ts_after <= tr.ended and tr.ended < ts_before and + t.id > last_id + order by tr.task, tr.ended + limit lim; +$$; + +comment on function swh_scheduler_task_to_archive is 'Read archivable tasks function'; + +drop function swh_scheduler_delete_archive_tasks(bigint[]); + +create or replace function swh_scheduler_delete_archived_tasks( + task_ids bigint[], task_run_ids bigint[]) + returns void + language sql +as $$ + -- clean up task_run_ids + delete from task_run where id in (select * from unnest(task_run_ids)); + -- clean up only tasks whose associated task_run are all cleaned up. + -- Remaining tasks will stay there and will be cleaned up when + -- remaining data have been indexed + delete from task + where id in (select t.id + from task t left outer join task_run tr on t.id=tr.task + where t.id in (select * from unnest(task_ids)) + and tr.task is null); +$$; + +comment on function swh_scheduler_delete_archived_tasks is 'Clean up archived tasks function'; diff --git a/sql/updates/09.sql b/sql/updates/09.sql new file mode 100644 index 0000000..7fe97ca --- /dev/null +++ b/sql/updates/09.sql @@ -0,0 +1,205 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 08 +-- to_version: 09 +-- description: Schedule task with priority + +insert into dbversion (version, release, description) +values (9, now(), 'Work In Progress'); + +create type task_priority as enum('high', 'normal', 'low'); +comment on type task_priority is 'Priority of the given task'; + +create table priority_ratio( + id task_priority primary key, + ratio float not null +); + +comment on table priority_ratio is 'Oneshot task''s reading ratio per priority'; +comment on column priority_ratio.id is 'Task priority id'; +comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority'; + +insert into priority_ratio (id, ratio) values ('high', 0.5); +insert into priority_ratio (id, ratio) values ('normal', 0.3); +insert into priority_ratio (id, ratio) values ('low', 0.2); + +alter table task add column priority task_priority references priority_ratio(id); +comment on column task.priority is 'Policy of the given task'; + +drop function swh_scheduler_peek_ready_tasks(text, timestamptz, bigint); +drop function swh_scheduler_grab_ready_tasks(text, timestamptz, bigint); + +create or replace function swh_scheduler_peek_no_priority_tasks (task_type text, ts timestamptz default now(), + num_tasks bigint default NULL) + returns setof task + language sql + stable +as $$ +select * from task + where next_run <= ts + and type = task_type + and status = 'next_run_not_scheduled' + and priority is null + order by next_run + limit num_tasks + for update skip locked; +$$; + +comment on function swh_scheduler_peek_no_priority_tasks (text, timestamptz, bigint) +is 'Retrieve tasks without priority'; + +create or replace function swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority task_priority) + returns numeric + language sql stable +as $$ + select ceil(num_tasks_priority * (select ratio from priority_ratio where id = task_priority)) :: numeric +$$; + +comment on function swh_scheduler_nb_priority_tasks (bigint, task_priority) +is 'Given a priority task and a total number, compute the number of tasks to read'; + +create or replace function swh_scheduler_peek_tasks_with_priority (task_type text, ts timestamptz default now(), + num_tasks_priority bigint default NULL, + task_priority task_priority default 'normal') + returns setof task + language sql + stable +as $$ + select * + from task t + where t.next_run <= ts + and t.type = task_type + and t.status = 'next_run_not_scheduled' + and t.priority = task_priority + order by t.next_run + limit num_tasks_priority + for update skip locked; +$$; + +comment on function swh_scheduler_peek_tasks_with_priority(text, timestamptz, bigint, task_priority) +is 'Retrieve tasks with a given priority'; + +create or replace function swh_scheduler_peek_priority_tasks (task_type text, ts timestamptz default now(), + num_tasks_priority bigint default NULL) + returns setof task + language plpgsql +as $$ +declare + r record; + count_row bigint; + nb_diff bigint; + nb_high bigint; + nb_normal bigint; + nb_low bigint; +begin + -- expected values to fetch + select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'high') into nb_high; + select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'normal') into nb_normal; + select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'low') into nb_low; + nb_diff := 0; + count_row := 0; + + for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_high, 'high') + loop + count_row := count_row + 1; + return next r; + end loop; + + if count_row < nb_high then + nb_normal := nb_normal + nb_high - count_row; + end if; + + count_row := 0; + for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_normal, 'normal') + loop + count_row := count_row + 1; + return next r; + end loop; + + if count_row < nb_normal then + nb_low := nb_low + nb_normal - count_row; + end if; + + return query select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_low, 'low'); +end +$$; + +comment on function swh_scheduler_peek_priority_tasks(text, timestamptz, bigint) +is 'Retrieve priority tasks'; + +create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(), + num_tasks bigint default NULL, num_tasks_priority bigint default NULL) + returns setof task + language plpgsql +as $$ +declare + r record; + count_row bigint; + nb_diff bigint; + nb_tasks bigint; +begin + count_row := 0; + + for r in select * from swh_scheduler_peek_priority_tasks(task_type, ts, num_tasks_priority) + order by priority, next_run + loop + count_row := count_row + 1; + return next r; + end loop; + + if count_row < num_tasks_priority then + nb_tasks := num_tasks + num_tasks_priority - count_row; + else + nb_tasks := num_tasks; + end if; + + for r in select * from swh_scheduler_peek_no_priority_tasks(task_type, ts, nb_tasks) + order by priority, next_run + loop + return next r; + end loop; + + return; +end +$$; + +comment on function swh_scheduler_peek_ready_tasks(text, timestamptz, bigint, bigint) +is 'Retrieve tasks with/without priority in order'; + +create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(), + num_tasks bigint default NULL, + num_tasks_priority bigint default NULL) + returns setof task + language sql +as $$ + update task + set status='next_run_scheduled' + from ( + select id from swh_scheduler_peek_ready_tasks(task_type, ts, num_tasks, num_tasks_priority) + ) next_tasks + where task.id = next_tasks.id + returning task.*; +$$; + +comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint, bigint) +is 'Grab tasks ready for scheduling and change their status'; + +create index on task(priority); + +create or replace function swh_scheduler_create_tasks_from_temp () + returns setof task + language plpgsql +as $$ +begin + return query + insert into task (type, arguments, next_run, status, current_interval, policy, retries_left, priority) + select type, arguments, next_run, 'next_run_not_scheduled', + (select default_interval from task_type tt where tt.type = tmp_task.type), + coalesce(policy, 'recurring'), + coalesce(retries_left, (select num_retries from task_type tt where tt.type = tmp_task.type), 0), + coalesce(priority, null) + from tmp_task + returning task.*; +end; +$$; + +comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; diff --git a/sql/updates/10.sql b/sql/updates/10.sql new file mode 100644 index 0000000..bd57768 --- /dev/null +++ b/sql/updates/10.sql @@ -0,0 +1,47 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 09 +-- to_version: 10 +-- description: Schedule task with priority + +insert into dbversion (version, release, description) +values (10, now(), 'Work In Progress'); + +drop type task_record cascade; +create type task_record as ( + task_id bigint, + task_policy task_policy, + task_status task_status, + task_run_id bigint, + arguments jsonb, + type text, + backend_id text, + metadata jsonb, + scheduled timestamptz, + started timestamptz, + ended timestamptz, + task_run_status task_run_status +); + +drop index task_run_id_asc_idx; +create index task_run_id_started_asc_idx on task_run(task asc, started asc); + +create or replace function swh_scheduler_task_to_archive( + ts_after timestamptz, ts_before timestamptz, last_id bigint default -1, + lim bigint default 10) + returns setof task_record + language sql stable +as $$ + select t.id as task_id, t.policy as task_policy, + t.status as task_status, tr.id as task_run_id, + t.arguments, t.type, tr.backend_id, tr.metadata, + tr.scheduled, tr.started, tr.ended, tr.status as task_run_status + from task_run tr inner join task t on tr.task=t.id + where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or + (t.policy = 'recurring' and t.status = 'disabled')) and + ((ts_after <= tr.started and tr.started < ts_before) or tr.started is null) and + t.id > last_id + order by tr.task, tr.started + limit lim; +$$; + +comment on function swh_scheduler_task_to_archive is 'Read archivable tasks function'; diff --git a/sql/updates/11.sql b/sql/updates/11.sql new file mode 100644 index 0000000..75edc4f --- /dev/null +++ b/sql/updates/11.sql @@ -0,0 +1,64 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 10 +-- to_version: 11 +-- description: Upgrade scheduler create_tasks routine + +insert into dbversion (version, release, description) +values (11, now(), 'Work In Progress'); + +create or replace function swh_scheduler_mktemp_task () + returns void + language sql +as $$ + create temporary table tmp_task ( + like task excluding indexes + ) on commit drop; + alter table tmp_task + alter column retries_left drop not null, + drop column id; +$$; + +comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation'; + +create or replace function swh_scheduler_create_tasks_from_temp () + returns setof task + language plpgsql +as $$ +begin + -- update the default values in one go + -- this is separated from the insert/select to avoid too much + -- juggling + update tmp_task t + set current_interval = tt.default_interval, + retries_left = coalesce(retries_left, tt.num_retries, 0) + from task_type tt + where tt.type=t.type; + + insert into task (type, arguments, next_run, status, current_interval, policy, + retries_left, priority) + select type, arguments, next_run, status, current_interval, policy, + retries_left, priority + from tmp_task t + where not exists(select 1 + from task + where type = t.type and + arguments = t.arguments and + policy = t.policy and + ((priority is null and t.priority is null) + or priority = t.priority) and + status = t.status); + + return query + select distinct t.* + from tmp_task tt inner join task t on ( + t.type = tt.type and + t.arguments = tt.arguments and + t.status = tt.status and + ((t.priority is null and tt.priority is null) + or t.priority=tt.priority) and + t.policy=tt.policy + ); +end; +$$; + +comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; diff --git a/sql/updates/12.sql b/sql/updates/12.sql new file mode 100644 index 0000000..3ab8c42 --- /dev/null +++ b/sql/updates/12.sql @@ -0,0 +1,51 @@ +-- SWH Scheduler Schema upgrade +-- from_version: 11 +-- to_version: 12 +-- description: Upgrade scheduler create_tasks routine + +insert into dbversion (version, release, description) + values (12, now(), 'Work In Progress'); + +create or replace function swh_scheduler_create_tasks_from_temp () + returns setof task + language plpgsql +as $$ +begin + -- update the default values in one go + -- this is separated from the insert/select to avoid too much + -- juggling + update tmp_task t + set current_interval = tt.default_interval, + retries_left = coalesce(retries_left, tt.num_retries, 0) + from task_type tt + where tt.type=t.type; + + insert into task (type, arguments, next_run, status, current_interval, policy, + retries_left, priority) + select type, arguments, next_run, status, current_interval, policy, + retries_left, priority + from tmp_task t + where not exists(select 1 + from task + where type = t.type and + arguments->'args' = t.arguments->'args' and + arguments->'kwargs' = t.arguments->'kwargs' and + policy = t.policy and + priority is not distinct from t.priority and + status = t.status); + + return query + select distinct t.* + from tmp_task tt inner join task t on ( + tt.type = t.type and + tt.arguments->'args' = t.arguments->'args' and + tt.arguments->'kwargs' = t.arguments->'kwargs' and + tt.policy = t.policy and + tt.priority is not distinct from t.priority and + tt.status = t.status + ); +end; +$$; + +comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; + diff --git a/sql/updates/13.sql b/sql/updates/13.sql new file mode 100644 index 0000000..9f7279d --- /dev/null +++ b/sql/updates/13.sql @@ -0,0 +1,19 @@ +insert into dbversion (version, release, description) + values (13, now(), 'Work In Progress'); + +-- comments for columns of table dbversion +comment on column dbversion.version is 'SQL schema version'; +comment on column dbversion.release is 'Version deployment timestamp'; +comment on column dbversion.description is 'Version description'; + +-- comments for columns of table task +comment on column task.id is 'Task Identifier'; +comment on column task.type is 'References task_type table'; +comment on column task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')'; + +-- comments for columns of table task_run +comment on column task_run.id is 'Task run identifier'; +comment on column task_run.task is 'References task table'; +comment on column task_run.scheduled is 'Scheduled run time for task'; +comment on column task_run.started is 'Task starting time'; +comment on column task_run.ended is 'Task ending time'; diff --git a/sql/updates/14.sql b/sql/updates/14.sql new file mode 100644 index 0000000..98f8184 --- /dev/null +++ b/sql/updates/14.sql @@ -0,0 +1,50 @@ +insert into dbversion (version, release, description) + values (14, now(), 'Work In Progress'); + +drop index task_args; +drop index task_kwargs; + +create index on task using btree(type, md5(arguments::text)); + +create or replace function swh_scheduler_create_tasks_from_temp () + returns setof task + language plpgsql +as $$ +begin + -- update the default values in one go + -- this is separated from the insert/select to avoid too much + -- juggling + update tmp_task t + set current_interval = tt.default_interval, + retries_left = coalesce(retries_left, tt.num_retries, 0) + from task_type tt + where tt.type=t.type; + + insert into task (type, arguments, next_run, status, current_interval, policy, + retries_left, priority) + select type, arguments, next_run, status, current_interval, policy, + retries_left, priority + from tmp_task t + where not exists(select 1 + from task + where type = t.type and + md5(arguments::text) = md5(t.arguments::text) and + arguments = t.arguments and + policy = t.policy and + priority is not distinct from t.priority and + status = t.status); + + return query + select distinct t.* + from tmp_task tt inner join task t on ( + tt.type = t.type and + md5(tt.arguments::text) = md5(t.arguments::text) and + tt.arguments = t.arguments and + tt.policy = t.policy and + tt.priority is not distinct from t.priority and + tt.status = t.status + ); +end; +$$; + +comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table'; diff --git a/sql/updates/15.sql b/sql/updates/15.sql new file mode 100644 index 0000000..5dd088d --- /dev/null +++ b/sql/updates/15.sql @@ -0,0 +1,24 @@ +insert into dbversion (version, release, description) + values (15, now(), 'Work In Progress'); + +create or replace function swh_scheduler_task_to_archive( + ts_after timestamptz, ts_before timestamptz, last_id bigint default -1, + lim bigint default 10) + returns setof task_record + language sql stable +as $$ + select t.id as task_id, t.policy as task_policy, + t.status as task_status, tr.id as task_run_id, + t.arguments, t.type, tr.backend_id, tr.metadata, + tr.scheduled, tr.started, tr.ended, tr.status as task_run_status + from task_run tr inner join task t on tr.task=t.id + where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or + (t.policy = 'recurring' and t.status = 'disabled')) and + ((ts_after <= tr.started and tr.started < ts_before) or + (tr.started is null and (ts_after <= tr.scheduled and tr.scheduled < ts_before))) and + t.id >= last_id + order by tr.task, tr.started + limit lim; +$$; + +comment on function swh_scheduler_task_to_archive (timestamptz, timestamptz, bigint, bigint) is 'Read archivable tasks function'; diff --git a/sql/updates/16.sql b/sql/updates/16.sql new file mode 100644 index 0000000..7213eee --- /dev/null +++ b/sql/updates/16.sql @@ -0,0 +1,53 @@ +insert into dbversion (version, release, description) + values (16, now(), 'Work In Progress'); + +create table if not exists listers ( + id uuid primary key default uuid_generate_v4(), + name text not null, + instance_name text not null, + created timestamptz not null default now(), -- auto_now_add in the model + current_state jsonb not null, + updated timestamptz not null +); + +comment on table listers is 'Lister instances known to the origin visit scheduler'; +comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)'; +comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)'; +comment on column listers.created is 'Timestamp at which the lister was originally created'; +comment on column listers.current_state is 'Known current state of this lister'; +comment on column listers.updated is 'Timestamp at which the lister state was last updated'; + +-- lister schema +create unique index on listers (name, instance_name); + +create table if not exists listed_origins ( + -- Basic information + lister_id uuid not null references listers(id), + url text not null, + visit_type text not null, + extra_loader_arguments jsonb not null, + + -- Whether this origin still exists or not + enabled boolean not null, + + -- time-based information + first_seen timestamptz not null default now(), + last_seen timestamptz not null, + + -- potentially provided by the lister + last_update timestamptz, + + primary key (lister_id, url, visit_type) +); + +comment on table listed_origins is 'Origins known to the origin visit scheduler'; +comment on column listed_origins.lister_id is 'Lister instance which owns this origin'; +comment on column listed_origins.url is 'URL of the origin listed'; +comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url'; +comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin'; + +comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.'; +comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister'; +comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister'; + +comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote'; diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index dc35e9d..9025937 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,30 +1,30 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.1.1 +Version: 0.2.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index fa7fef9..38ebe79 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,60 +1,104 @@ +.gitignore +.pre-commit-config.yaml +AUTHORS +CODE_OF_CONDUCT.md +CONTRIBUTORS +LICENSE +LICENSE.Celery MANIFEST.in Makefile README.md +conftest.py +mypy.ini pyproject.toml +pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py +tox.ini version.txt -bin/swh-worker-control +data/README.md +data/elastic-template.json +data/update-index-settings.json +docs/.gitignore +docs/Makefile +docs/conf.py +docs/index.rst +docs/_static/.placeholder +docs/_templates/.placeholder +sql/.gitignore +sql/Makefile +sql/updates/02.sql +sql/updates/03.sql +sql/updates/04.sql +sql/updates/05.sql +sql/updates/06.sql +sql/updates/07.sql +sql/updates/08.sql +sql/updates/09.sql +sql/updates/10.sql +sql/updates/11.sql +sql/updates/12.sql +sql/updates/13.sql +sql/updates/14.sql +sql/updates/15.sql +sql/updates/16.sql swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli_utils.py swh/scheduler/elasticsearch_memory.py +swh/scheduler/exc.py +swh/scheduler/interface.py +swh/scheduler/model.py swh/scheduler/py.typed swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py +swh/scheduler/api/serializers.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/pika_listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py swh/scheduler/cli/admin.py +swh/scheduler/cli/celery_monitor.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py swh/scheduler/cli/utils.py +swh/scheduler/sql/10-swh-init.sql swh/scheduler/sql/30-swh-schema.sql swh/scheduler/sql/40-swh-func.sql swh/scheduler/sql/50-swh-data.sql swh/scheduler/sql/60-swh-indexes.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/common.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py +swh/scheduler/tests/test_cli_celery_monitor.py swh/scheduler/tests/test_cli_task_type.py swh/scheduler/tests/test_common.py +swh/scheduler/tests/test_model.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/es/__init__.py swh/scheduler/tests/es/conftest.py swh/scheduler/tests/es/test_backend_es.py swh/scheduler/tests/es/test_cli_task.py swh/scheduler/tests/es/test_elasticsearch_memory.py \ No newline at end of file diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt index fb74eb5..0c3f81b 100644 --- a/swh.scheduler.egg-info/requires.txt +++ b/swh.scheduler.egg-info/requires.txt @@ -1,21 +1,23 @@ arrow +attrs +attrs-strict celery>=4 future>=0.18.0 Click elasticsearch>5.4 flask pika>=1.1.0 psycopg2 pyyaml vcversioner setuptools swh.core[db,http]>=0.0.65 swh.storage>=0.0.182 [testing] pytest pytest-mock pytest-postgresql>=2.1.0 celery>=4.3 hypothesis>=3.11.0 swh.lister diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py index 4cfe0df..15d8e54 100644 --- a/swh/scheduler/api/client.py +++ b/swh/scheduler/api/client.py @@ -1,142 +1,24 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import RPCClient +from .serializers import ENCODERS, DECODERS +from .. import exc +from ..interface import SchedulerInterface + class RemoteScheduler(RPCClient): """Proxy to a remote scheduler API """ - def close_connection(self): - return self.post("close_connection", {}) - - def set_status_tasks(self, task_ids, status="disabled", next_run=None): - return self.post( - "set_status_tasks", - dict(task_ids=task_ids, status=status, next_run=next_run), - ) - - def create_task_type(self, task_type): - return self.post("create_task_type", {"task_type": task_type}) - - def get_task_type(self, task_type_name): - return self.post("get_task_type", {"task_type_name": task_type_name}) - - def get_task_types(self): - return self.post("get_task_types", {}) - - def create_tasks(self, tasks): - return self.post("create_tasks", {"tasks": tasks}) - - def disable_tasks(self, task_ids): - return self.post("disable_tasks", {"task_ids": task_ids}) - - def get_tasks(self, task_ids): - return self.post("get_tasks", {"task_ids": task_ids}) - - def get_task_runs(self, task_ids, limit=None): - return self.post("get_task_runs", {"task_ids": task_ids, "limit": limit}) - - def search_tasks( - self, - task_id=None, - task_type=None, - status=None, - priority=None, - policy=None, - before=None, - after=None, - limit=None, - ): - return self.post( - "search_tasks", - dict( - task_id=task_id, - task_type=task_type, - status=status, - priority=priority, - policy=policy, - before=before, - after=after, - limit=limit, - ), - ) - - def peek_ready_tasks( - self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None - ): - return self.post( - "peek_ready_tasks", - { - "task_type": task_type, - "timestamp": timestamp, - "num_tasks": num_tasks, - "num_tasks_priority": num_tasks_priority, - }, - ) - - def grab_ready_tasks( - self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None - ): - return self.post( - "grab_ready_tasks", - { - "task_type": task_type, - "timestamp": timestamp, - "num_tasks": num_tasks, - "num_tasks_priority": num_tasks_priority, - }, - ) - - def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): - return self.post( - "schedule_task_run", - { - "task_id": task_id, - "backend_id": backend_id, - "metadata": metadata, - "timestamp": timestamp, - }, - ) - - def mass_schedule_task_runs(self, task_runs): - return self.post("mass_schedule_task_runs", {"task_runs": task_runs}) - - def start_task_run(self, backend_id, metadata=None, timestamp=None): - return self.post( - "start_task_run", - {"backend_id": backend_id, "metadata": metadata, "timestamp": timestamp,}, - ) - - def end_task_run(self, backend_id, status, metadata=None, timestamp=None): - return self.post( - "end_task_run", - { - "backend_id": backend_id, - "status": status, - "metadata": metadata, - "timestamp": timestamp, - }, - ) - - def filter_task_to_archive(self, after_ts, before_ts, limit=10, page_token=None): - return self.post( - "filter_task_to_archive", - { - "after_ts": after_ts, - "before_ts": before_ts, - "limit": limit, - "page_token": page_token, - }, - ) + backend_class = SchedulerInterface - def delete_archived_tasks(self, task_ids): - return self.post("delete_archived_tasks", {"task_ids": task_ids}) + reraise_exceptions = [getattr(exc, a) for a in exc.__all__] - def get_priority_ratios(self): - return self.get("get_priority_ratios") + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS diff --git a/swh/scheduler/api/serializers.py b/swh/scheduler/api/serializers.py new file mode 100644 index 0000000..930c700 --- /dev/null +++ b/swh/scheduler/api/serializers.py @@ -0,0 +1,28 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Decoder and encoders for swh.scheduler.model objects.""" + +from typing import Callable, Dict, List, Tuple + +import attr + +import swh.scheduler.model as model + + +def _encode_model_object(obj): + d = attr.asdict(obj) + d["__type__"] = type(obj).__name__ + return d + + +ENCODERS: List[Tuple[type, str, Callable]] = [ + (model.BaseSchedulerModel, "scheduler_model", _encode_model_object), +] + + +DECODERS: Dict[str, Callable] = { + "scheduler_model": lambda d: getattr(model, d.pop("__type__"))(**d) +} diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py index 02ad19f..03b424f 100644 --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -1,266 +1,140 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os import logging - -from flask import request, Flask +import os from swh.core import config -from swh.core.api import ( - decode_request, - error_handler, - encode_data_server as encode_data, -) +from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp +from swh.core.api import encode_data_server as encode_data +from swh.core.api import error_handler, negotiate -from swh.core.api import negotiate, JSONFormatter, MsgpackFormatter -from swh.scheduler import get_scheduler as get_scheduler_from +from swh.scheduler import get_scheduler +from swh.scheduler.exc import SchedulerException +from swh.scheduler.interface import SchedulerInterface +from .serializers import ENCODERS, DECODERS -app = Flask(__name__) scheduler = None -@app.errorhandler(Exception) -def my_error_handler(exception): - return error_handler(exception, encode_data) - - -def get_sched(): +def get_global_scheduler(): global scheduler if not scheduler: - scheduler = get_scheduler_from(**app.config["scheduler"]) + scheduler = get_scheduler(**app.config["scheduler"]) return scheduler -def has_no_empty_params(rule): - return len(rule.defaults or ()) >= len(rule.arguments or ()) - - -@app.route("/") -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def index(): - return "SWH Scheduler API server" - - -@app.route("/close_connection", methods=["GET", "POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def close_connection(): - return get_sched().close_connection() - - -@app.route("/set_status_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def set_status_tasks(): - return get_sched().set_status_tasks(**decode_request(request)) +class SchedulerServerApp(RPCServerApp): + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS -@app.route("/create_task_type", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def create_task_type(): - return get_sched().create_task_type(**decode_request(request)) - - -@app.route("/get_task_type", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_task_type(): - return get_sched().get_task_type(**decode_request(request)) - - -@app.route("/get_task_types", methods=["GET", "POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_task_types(): - return get_sched().get_task_types(**decode_request(request)) - - -@app.route("/create_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def create_tasks(): - return get_sched().create_tasks(**decode_request(request)) - - -@app.route("/disable_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def disable_tasks(): - return get_sched().disable_tasks(**decode_request(request)) - - -@app.route("/get_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_tasks(): - return get_sched().get_tasks(**decode_request(request)) - - -@app.route("/get_task_runs", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_task_runs(): - return get_sched().get_task_runs(**decode_request(request)) - - -@app.route("/search_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def search_tasks(): - return get_sched().search_tasks(**decode_request(request)) - - -@app.route("/peek_ready_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def peek_ready_tasks(): - return get_sched().peek_ready_tasks(**decode_request(request)) - - -@app.route("/grab_ready_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def grab_ready_tasks(): - return get_sched().grab_ready_tasks(**decode_request(request)) - - -@app.route("/schedule_task_run", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def schedule_task_run(): - return get_sched().schedule_task_run(**decode_request(request)) - - -@app.route("/mass_schedule_task_runs", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def mass_schedule_task_runs(): - return get_sched().mass_schedule_task_runs(**decode_request(request)) - - -@app.route("/start_task_run", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def start_task_run(): - return get_sched().start_task_run(**decode_request(request)) - - -@app.route("/end_task_run", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def end_task_run(): - return get_sched().end_task_run(**decode_request(request)) +app = SchedulerServerApp( + __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler +) -@app.route("/filter_task_to_archive", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def filter_task_to_archive(): - return get_sched().filter_task_to_archive(**decode_request(request)) +@app.errorhandler(SchedulerException) +def argument_error_handler(exception): + return error_handler(exception, encode_data, status_code=400) -@app.route("/delete_archived_tasks", methods=["POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def delete_archived_tasks(): - return get_sched().delete_archived_tasks(**decode_request(request)) +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) -@app.route("/get_priority_ratios", methods=["GET", "POST"]) -@negotiate(MsgpackFormatter) -@negotiate(JSONFormatter) -def get_priority_ratios(): - return get_sched().get_priority_ratios(**decode_request(request)) +def has_no_empty_params(rule): + return len(rule.defaults or ()) >= len(rule.arguments or ()) @app.route("/site-map") @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def site_map(): links = [] - sched = get_sched() for rule in app.url_map.iter_rules(): - if has_no_empty_params(rule) and hasattr(sched, rule.endpoint): + if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint): links.append( - dict(rule=rule.rule, description=getattr(sched, rule.endpoint).__doc__) + dict( + rule=rule.rule, + description=getattr(SchedulerInterface, rule.endpoint).__doc__, + ) ) # links is now a list of url, endpoint tuples return links def load_and_check_config(config_file, type="local"): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config.read(config_file) vcfg = cfg.get("scheduler") if not vcfg: raise KeyError("Missing '%scheduler' configuration") if type == "local": cls = vcfg.get("cls") if cls != "local": raise ValueError( "The scheduler backend can only be started with a 'local' " "configuration" ) args = vcfg.get("args") if not args: raise KeyError("Invalid configuration; missing 'args' config entry") db = args.get("db") if not db: raise KeyError("Invalid configuration; missing 'db' config entry") return cfg api_cfg = None def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_file = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_file) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app if __name__ == "__main__": print('Please use the "swh-scheduler api-server" command') diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py index 5ae47dc..cfbdff2 100644 --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -1,571 +1,674 @@ -# Copyright (C) 2015-2019 The Software Heritage developers +# Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from arrow import Arrow, utcnow +import attr import psycopg2.pool import psycopg2.extras -from typing import Any, Dict, Optional +from typing import Any, Dict, Iterable, List, Optional from psycopg2.extensions import AsIs from swh.core.db import BaseDb from swh.core.db.common import db_transaction +from .exc import StaleData +from .model import Lister, ListedOrigin logger = logging.getLogger(__name__) def adapt_arrow(arrow): return AsIs("'%s'::timestamptz" % arrow.isoformat()) psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) psycopg2.extensions.register_adapter(Arrow, adapt_arrow) +psycopg2.extras.register_uuid() def format_query(query, keys): """Format a query with the given keys""" query_keys = ", ".join(keys) placeholders = ", ".join(["%s"] * len(keys)) return query.format(keys=query_keys, placeholders=placeholders) class SchedulerBackend: """Backend for the Software Heritage scheduling database. """ def __init__(self, db, min_pool_conns=1, max_pool_conns=10): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection """ if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = BaseDb(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() task_type_keys = [ "type", "description", "backend_name", "default_interval", "min_interval", "max_interval", "backoff_factor", "max_queue_length", "num_retries", "retry_delay", ] @db_transaction() def create_task_type(self, task_type, db=None, cur=None): """Create a new task type ready for scheduling. Args: task_type (dict): a dictionary with the following keys: - type (str): an identifier for the task type - description (str): a human-readable description of what the task does - backend_name (str): the name of the task in the job-scheduling backend - default_interval (datetime.timedelta): the default interval between two task runs - min_interval (datetime.timedelta): the minimum interval between two task runs - max_interval (datetime.timedelta): the maximum interval between two task runs - backoff_factor (float): the factor by which the interval changes at each run - max_queue_length (int): the maximum length of the task queue for this task type """ keys = [key for key in self.task_type_keys if key in task_type] query = format_query( """insert into task_type ({keys}) values ({placeholders}) on conflict do nothing""", keys, ) cur.execute(query, [task_type[key] for key in keys]) @db_transaction() def get_task_type(self, task_type_name, db=None, cur=None): """Retrieve the task type with id task_type_name""" query = format_query( "select {keys} from task_type where type=%s", self.task_type_keys, ) cur.execute(query, (task_type_name,)) return cur.fetchone() @db_transaction() def get_task_types(self, db=None, cur=None): """Retrieve all registered task types""" query = format_query("select {keys} from task_type", self.task_type_keys,) cur.execute(query) return cur.fetchall() + @db_transaction() + def get_or_create_lister( + self, name: str, instance_name: Optional[str] = None, db=None, cur=None + ) -> Lister: + """Retrieve information about the given instance of the lister from the + database, or create the entry if it did not exist. + """ + + if instance_name is None: + instance_name = "" + + select_cols = ", ".join(Lister.select_columns()) + insert_cols, insert_meta = ( + ", ".join(tup) for tup in Lister.insert_columns_and_metavars() + ) + + query = f""" + with added as ( + insert into listers ({insert_cols}) values ({insert_meta}) + on conflict do nothing + returning {select_cols} + ) + select {select_cols} from added + union all + select {select_cols} from listers + where (name, instance_name) = (%(name)s, %(instance_name)s); + """ + + cur.execute(query, attr.asdict(Lister(name=name, instance_name=instance_name))) + + return Lister(**cur.fetchone()) + + @db_transaction() + def update_lister(self, lister: Lister, db=None, cur=None) -> Lister: + """Update the state for the given lister instance in the database. + + Returns: + a new Lister object, with all fields updated from the database + + Raises: + StaleData if the `updated` timestamp for the lister instance in + database doesn't match the one passed by the user. + """ + + select_cols = ", ".join(Lister.select_columns()) + set_vars = ", ".join( + f"{col} = {meta}" + for col, meta in zip(*Lister.insert_columns_and_metavars()) + ) + + query = f"""update listers + set {set_vars} + where id=%(id)s and updated=%(updated)s + returning {select_cols}""" + + cur.execute(query, attr.asdict(lister)) + updated = cur.fetchone() + + if not updated: + raise StaleData("Stale data; Lister state not updated") + + return Lister(**updated) + + @db_transaction() + def record_listed_origins( + self, listed_origins: Iterable[ListedOrigin], db=None, cur=None + ) -> List[ListedOrigin]: + """Record a set of origins that a lister has listed. + + This performs an "upsert": origins with the same (lister_id, url, + visit_type) values are updated with new values for + extra_loader_arguments, last_update and last_seen. + """ + + pk_cols = ListedOrigin.primary_key_columns() + select_cols = ListedOrigin.select_columns() + insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars() + + upsert_cols = [col for col in insert_cols if col not in pk_cols] + upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols) + + query = f"""INSERT into listed_origins ({", ".join(insert_cols)}) + VALUES %s + ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE + SET {upsert_set} + RETURNING {", ".join(select_cols)} + """ + + ret = psycopg2.extras.execute_values( + cur=cur, + sql=query, + argslist=(attr.asdict(origin) for origin in listed_origins), + template=f"({', '.join(insert_meta)})", + page_size=1000, + fetch=True, + ) + + return [ListedOrigin(**d) for d in ret] + task_create_keys = [ "type", "arguments", "next_run", "policy", "status", "retries_left", "priority", ] task_keys = task_create_keys + ["id", "current_interval"] @db_transaction() def create_tasks(self, tasks, policy="recurring", db=None, cur=None): """Create new tasks. Args: tasks (list): each task is a dictionary with the following keys: - type (str): the task type - arguments (dict): the arguments for the task runner, keys: - args (list of str): arguments - kwargs (dict str -> str): keyword arguments - next_run (datetime.datetime): the next scheduled run for the task Returns: a list of created tasks. """ cur.execute("select swh_scheduler_mktemp_task()") db.copy_to( tasks, "tmp_task", self.task_create_keys, default_values={"policy": policy, "status": "next_run_not_scheduled"}, cur=cur, ) query = format_query( "select {keys} from swh_scheduler_create_tasks_from_temp()", self.task_keys, ) cur.execute(query) return cur.fetchall() @db_transaction() def set_status_tasks( self, task_ids, status="disabled", next_run=None, db=None, cur=None ): """Set the tasks' status whose ids are listed. If given, also set the next_run date. """ if not task_ids: return query = ["UPDATE task SET status = %s"] args = [status] if next_run: query.append(", next_run = %s") args.append(next_run) query.append(" WHERE id IN %s") args.append(tuple(task_ids)) cur.execute("".join(query), args) @db_transaction() def disable_tasks(self, task_ids, db=None, cur=None): """Disable the tasks whose ids are listed.""" return self.set_status_tasks(task_ids, db=db, cur=cur) @db_transaction() def search_tasks( self, task_id=None, task_type=None, status=None, priority=None, policy=None, before=None, after=None, limit=None, db=None, cur=None, ): """Search tasks from selected criterions""" where = [] args = [] if task_id: if isinstance(task_id, (str, int)): where.append("id = %s") else: where.append("id in %s") task_id = tuple(task_id) args.append(task_id) if task_type: if isinstance(task_type, str): where.append("type = %s") else: where.append("type in %s") task_type = tuple(task_type) args.append(task_type) if status: if isinstance(status, str): where.append("status = %s") else: where.append("status in %s") status = tuple(status) args.append(status) if priority: if isinstance(priority, str): where.append("priority = %s") else: priority = tuple(priority) where.append("priority in %s") args.append(priority) if policy: where.append("policy = %s") args.append(policy) if before: where.append("next_run <= %s") args.append(before) if after: where.append("next_run >= %s") args.append(after) query = "select * from task" if where: query += " where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def get_tasks(self, task_ids, db=None, cur=None): """Retrieve the info of tasks whose ids are listed.""" query = format_query("select {keys} from task where id in %s", self.task_keys) cur.execute(query, (tuple(task_ids),)) return cur.fetchall() @db_transaction() def peek_ready_tasks( self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, db=None, cur=None, ): """Fetch the list of ready tasks Args: task_type (str): filtering task per their type timestamp (datetime.datetime): peek tasks that need to be executed before that timestamp num_tasks (int): only peek at num_tasks tasks (with no priority) num_tasks_priority (int): only peek at num_tasks_priority tasks (with priority) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_peek_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)""", (task_type, timestamp, num_tasks, num_tasks_priority), ) logger.debug("PEEK %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() @db_transaction() def grab_ready_tasks( self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, db=None, cur=None, ): """Fetch the list of ready tasks, and mark them as scheduled Args: task_type (str): filtering task per their type timestamp (datetime.datetime): grab tasks that need to be executed before that timestamp num_tasks (int): only grab num_tasks tasks (with no priority) num_tasks_priority (int): only grab oneshot num_tasks tasks (with priorities) Returns: a list of tasks """ if timestamp is None: timestamp = utcnow() cur.execute( """select * from swh_scheduler_grab_ready_tasks( %s, %s, %s :: bigint, %s :: bigint)""", (task_type, timestamp, num_tasks, num_tasks_priority), ) logger.debug("GRAB %s => %s" % (task_type, cur.rowcount)) return cur.fetchall() task_run_create_keys = ["task", "backend_id", "scheduled", "metadata"] @db_transaction() def schedule_task_run( self, task_id, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as scheduled, adding a task_run entry in the database. Args: task_id (int): the identifier for the task being scheduled backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: a fresh task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)", (task_id, backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def mass_schedule_task_runs(self, task_runs, db=None, cur=None): """Schedule a bunch of task runs. Args: task_runs (list): a list of dicts with keys: - task (int): the identifier for the task being scheduled - backend_id (str): the identifier of the job in the backend - metadata (dict): metadata to add to the task_run entry - scheduled (datetime.datetime): the instant the event occurred Returns: None """ cur.execute("select swh_scheduler_mktemp_task_run()") db.copy_to(task_runs, "tmp_task_run", self.task_run_create_keys, cur=cur) cur.execute("select swh_scheduler_schedule_task_run_from_temp()") @db_transaction() def start_task_run( self, backend_id, metadata=None, timestamp=None, db=None, cur=None ): """Mark a given task as started, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_start_task_run(%s, %s, %s)", (backend_id, metadata, timestamp), ) return cur.fetchone() @db_transaction() def end_task_run( self, backend_id, status, metadata=None, timestamp=None, result=None, db=None, cur=None, ): """Mark a given task as ended, updating the corresponding task_run entry in the database. Args: backend_id (str): the identifier of the job in the backend status (str): how the task ended; one of: 'eventful', 'uneventful', 'failed' metadata (dict): metadata to add to the task_run entry timestamp (datetime.datetime): the instant the event occurred Returns: the updated task_run entry """ if metadata is None: metadata = {} if timestamp is None: timestamp = utcnow() cur.execute( "select * from swh_scheduler_end_task_run(%s, %s, %s, %s)", (backend_id, status, metadata, timestamp), ) return cur.fetchone() @db_transaction() def filter_task_to_archive( self, after_ts: str, before_ts: str, limit: int = 10, page_token: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: """Compute the tasks to archive within the datetime interval [after_ts, before_ts[. The method returns a paginated result. Returns: dict with the following keys: - **next_page_token**: opaque token to be used as `page_token` to retrieve the next page of result. If absent, there is no more pages to gather. - **tasks**: list of task dictionaries with the following keys: **id** (str): origin task id **started** (Optional[datetime]): started date **scheduled** (datetime): scheduled date **arguments** (json dict): task's arguments ... """ assert not page_token or isinstance(page_token, str) last_id = -1 if page_token is None else int(page_token) tasks = [] cur.execute( "select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)", (after_ts, before_ts, last_id, limit + 1), ) for row in cur: task = dict(row) # nested type index does not accept bare values # transform it as a dict to comply with this task["arguments"]["args"] = { i: v for i, v in enumerate(task["arguments"]["args"]) } kwargs = task["arguments"]["kwargs"] task["arguments"]["kwargs"] = json.dumps(kwargs) tasks.append(task) if len(tasks) >= limit + 1: # remains data, add pagination information result = { "tasks": tasks[:limit], "next_page_token": str(tasks[-1]["task_id"]), } else: result = {"tasks": tasks} return result @db_transaction() def delete_archived_tasks(self, task_ids, db=None, cur=None): """Delete archived tasks as much as possible. Only the task_ids whose complete associated task_run have been cleaned up will be. """ _task_ids = _task_run_ids = [] for task_id in task_ids: _task_ids.append(task_id["task_id"]) _task_run_ids.append(task_id["task_run_id"]) cur.execute( "select * from swh_scheduler_delete_archived_tasks(%s, %s)", (_task_ids, _task_run_ids), ) task_run_keys = [ "id", "task", "backend_id", "scheduled", "started", "ended", "metadata", "status", ] @db_transaction() def get_task_runs(self, task_ids, limit=None, db=None, cur=None): """Search task run for a task id""" where = [] args = [] if task_ids: if isinstance(task_ids, (str, int)): where.append("task = %s") else: where.append("task in %s") task_ids = tuple(task_ids) args.append(task_ids) else: return () query = "select * from task_run where " + " and ".join(where) if limit: query += " limit %s :: bigint" args.append(limit) cur.execute(query, args) return cur.fetchall() @db_transaction() def get_priority_ratios(self, db=None, cur=None): cur.execute("select id, ratio from priority_ratio") return {row["id"]: row["ratio"] for row in cur.fetchall()} diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py index 34f33dd..57e26a4 100644 --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -1,93 +1,91 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup @click.group(name="scheduler", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--database", "-d", default=None, help="Scheduling database DSN (imply cls is 'local')", ) @click.option( "--url", "-u", default=None, help="Scheduler's url access (imply cls is 'remote')" ) @click.option( "--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console" ) @click.pass_context def cli(ctx, config_file, database, url, no_stdout): """Software Heritage Scheduler tools. Use a local scheduler instance by default (plugged to the main scheduler db). """ + try: + from psycopg2 import OperationalError + except ImportError: + + class OperationalError(Exception): + pass + from swh.core import config - from swh.scheduler.celery_backend.config import setup_log_handler from swh.scheduler import get_scheduler, DEFAULT_CONFIG ctx.ensure_object(dict) - log_level = ctx.obj.get("log_level", logging.INFO) - - setup_log_handler( - loglevel=log_level, - colorize=False, - format="[%(levelname)s] %(name)s -- %(message)s", - log_console=not no_stdout, - ) logger = logging.getLogger(__name__) scheduler = None conf = config.read(config_file, DEFAULT_CONFIG) if "scheduler" not in conf: raise ValueError("missing 'scheduler' configuration") if database: conf["scheduler"]["cls"] = "local" conf["scheduler"]["args"]["db"] = database elif url: conf["scheduler"]["cls"] = "remote" conf["scheduler"]["args"] = {"url": url} sched_conf = conf["scheduler"] try: logger.debug("Instantiating scheduler with %s" % (sched_conf)) scheduler = get_scheduler(**sched_conf) - except ValueError: + except (ValueError, OperationalError): # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj["scheduler"] = scheduler ctx.obj["config"] = conf -from . import admin, task, task_type # noqa +from . import admin, celery_monitor, task, task_type # noqa def main(): import click.core click.core.DEPRECATED_HELP_NOTICE = """ DEPRECATED! Please use the command 'swh scheduler'.""" cli.deprecated = True return cli(auto_envvar_prefix="SWH_SCHEDULER") if __name__ == "__main__": main() diff --git a/swh/scheduler/cli/celery_monitor.py b/swh/scheduler/cli/celery_monitor.py new file mode 100644 index 0000000..7204ead --- /dev/null +++ b/swh/scheduler/cli/celery_monitor.py @@ -0,0 +1,157 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from ast import literal_eval +import csv +import logging +import sys +import time +from typing import Any, Dict, Optional + +import click + +from . import cli + +logger = logging.getLogger(__name__) + + +def destination_from_pattern(ctx: click.Context, pattern: Optional[str]): + """Get the celery destination pattern from host and type values""" + if pattern is None: + logger.debug("Matching all workers") + elif "*" in pattern: + ctx.obj["inspect"].pattern = pattern + ctx.obj["inspect"].matcher = "glob" + logger.debug("Using glob pattern %s", pattern) + else: + destination = pattern.split(",") + ctx.obj["inspect"].destination = destination + logger.debug("Using destinations %s", ", ".join(destination)) + + +@cli.group("celery-monitor") +@click.option( + "--timeout", type=float, default=3.0, help="Timeout for celery remote control" +) +@click.option("--pattern", help="Celery destination pattern", default=None) +@click.pass_context +def celery_monitor(ctx: click.Context, timeout: float, pattern: Optional[str]) -> None: + """Monitoring of Celery""" + from swh.scheduler.celery_backend.config import app + + ctx.obj["timeout"] = timeout + ctx.obj["inspect"] = app.control.inspect(timeout=timeout) + + destination_from_pattern(ctx, pattern) + + +@celery_monitor.command("ping-workers") +@click.pass_context +def ping_workers(ctx: click.Context) -> None: + """Check which workers respond to the celery remote control""" + + response_times = {} + + def ping_callback(response): + rtt = time.monotonic() - ping_time + for destination in response: + logger.debug("Got ping response from %s: %r", destination, response) + response_times[destination] = rtt + + ctx.obj["inspect"].callback = ping_callback + + ping_time = time.monotonic() + ret = ctx.obj["inspect"].ping() + + if not ret: + logger.info("No response in %f seconds", time.monotonic() - ping_time) + ctx.exit(1) + + for destination in ret: + logger.info( + "Got response from %s in %f seconds", + destination, + response_times[destination], + ) + + ctx.exit(0) + + +@celery_monitor.command("list-running") +@click.option( + "--format", + help="Output format", + default="pretty", + type=click.Choice(["pretty", "csv"]), +) +@click.pass_context +def list_running(ctx: click.Context, format: str): + """List running tasks on the lister workers""" + response_times = {} + + def active_callback(response): + rtt = time.monotonic() - active_time + for destination in response: + response_times[destination] = rtt + + ctx.obj["inspect"].callback = active_callback + + active_time = time.monotonic() + ret = ctx.obj["inspect"].active() + + if not ret: + logger.info("No response in %f seconds", time.monotonic() - active_time) + ctx.exit(1) + + def pretty_task_arguments(task: Dict[str, Any]) -> str: + arg_list = [] + for arg in task["args"]: + arg_list.append(repr(arg)) + for k, v in task["kwargs"].items(): + arg_list.append(f"{k}={v!r}") + + return f'{task["name"]}({", ".join(arg_list)})' + + def get_task_data(worker: str, task: Dict[str, Any]) -> Dict[str, Any]: + duration = time.time() - task["time_start"] + return { + "worker": worker, + "name": task["name"], + "args": literal_eval(task["args"]), + "kwargs": literal_eval(task["kwargs"]), + "duration": duration, + "worker_pid": task["worker_pid"], + } + + if format == "csv": + writer = csv.DictWriter( + sys.stdout, ["worker", "name", "args", "kwargs", "duration", "worker_pid"] + ) + writer.writeheader() + + def output(data: Dict[str, Any]): + writer.writerow(data) + + elif format == "pretty": + + def output(data: Dict[str, Any]): + print( + f"{data['worker']}: {pretty_task_arguments(data)} " + f"[for {data['duration']:f}s, pid={data['worker_pid']}]" + ) + + else: + logger.error("Unknown format %s", format) + ctx.exit(127) + + for worker, active in sorted(ret.items()): + if not active: + logger.info("%s: no active tasks", worker) + continue + + for task in sorted(active, key=lambda t: t["time_start"]): + output(get_task_data(worker, task)) + + ctx.exit(0) diff --git a/swh/scheduler/exc.py b/swh/scheduler/exc.py new file mode 100644 index 0000000..0c92e43 --- /dev/null +++ b/swh/scheduler/exc.py @@ -0,0 +1,17 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +__all__ = [ + "SchedulerException", + "StaleData", +] + + +class SchedulerException(Exception): + pass + + +class StaleData(SchedulerException): + pass diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py new file mode 100644 index 0000000..0ff5311 --- /dev/null +++ b/swh/scheduler/interface.py @@ -0,0 +1,290 @@ +# Copyright (C) 2015-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Iterable, List, Optional + +from swh.core.api import remote_api_endpoint + +from swh.scheduler.model import ListedOrigin, Lister + + +class SchedulerInterface: + @remote_api_endpoint("task_type/create") + def create_task_type(self, task_type): + """Create a new task type ready for scheduling. + + Args: + task_type (dict): a dictionary with the following keys: + + - type (str): an identifier for the task type + - description (str): a human-readable description of what the + task does + - backend_name (str): the name of the task in the + job-scheduling backend + - default_interval (datetime.timedelta): the default interval + between two task runs + - min_interval (datetime.timedelta): the minimum interval + between two task runs + - max_interval (datetime.timedelta): the maximum interval + between two task runs + - backoff_factor (float): the factor by which the interval + changes at each run + - max_queue_length (int): the maximum length of the task queue + for this task type + + """ + ... + + @remote_api_endpoint("task_type/get") + def get_task_type(self, task_type_name): + """Retrieve the task type with id task_type_name""" + ... + + @remote_api_endpoint("task_type/get_all") + def get_task_types(self): + """Retrieve all registered task types""" + ... + + @remote_api_endpoint("task/create") + def create_tasks(self, tasks, policy="recurring"): + """Create new tasks. + + Args: + tasks (list): each task is a dictionary with the following keys: + + - type (str): the task type + - arguments (dict): the arguments for the task runner, keys: + + - args (list of str): arguments + - kwargs (dict str -> str): keyword arguments + + - next_run (datetime.datetime): the next scheduled run for the + task + + Returns: + a list of created tasks. + + """ + ... + + @remote_api_endpoint("task/set_status") + def set_status_tasks(self, task_ids, status="disabled", next_run=None): + """Set the tasks' status whose ids are listed. + + If given, also set the next_run date. + """ + ... + + @remote_api_endpoint("task/disable") + def disable_tasks(self, task_ids): + """Disable the tasks whose ids are listed.""" + ... + + @remote_api_endpoint("task/search") + def search_tasks( + self, + task_id=None, + task_type=None, + status=None, + priority=None, + policy=None, + before=None, + after=None, + limit=None, + ): + """Search tasks from selected criterions""" + ... + + @remote_api_endpoint("task/get") + def get_tasks(self, task_ids): + """Retrieve the info of tasks whose ids are listed.""" + ... + + @remote_api_endpoint("task/peek_ready") + def peek_ready_tasks( + self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, + ): + """Fetch the list of ready tasks + + Args: + task_type (str): filtering task per their type + timestamp (datetime.datetime): peek tasks that need to be executed + before that timestamp + num_tasks (int): only peek at num_tasks tasks (with no priority) + num_tasks_priority (int): only peek at num_tasks_priority + tasks (with priority) + + Returns: + a list of tasks + + """ + ... + + @remote_api_endpoint("task/grab_ready") + def grab_ready_tasks( + self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None, + ): + """Fetch the list of ready tasks, and mark them as scheduled + + Args: + task_type (str): filtering task per their type + timestamp (datetime.datetime): grab tasks that need to be executed + before that timestamp + num_tasks (int): only grab num_tasks tasks (with no priority) + num_tasks_priority (int): only grab oneshot num_tasks tasks (with + priorities) + + Returns: + a list of tasks + + """ + ... + + @remote_api_endpoint("task_run/schedule_one") + def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None): + """Mark a given task as scheduled, adding a task_run entry in the database. + + Args: + task_id (int): the identifier for the task being scheduled + backend_id (str): the identifier of the job in the backend + metadata (dict): metadata to add to the task_run entry + timestamp (datetime.datetime): the instant the event occurred + + Returns: + a fresh task_run entry + + """ + ... + + @remote_api_endpoint("task_run/schedule") + def mass_schedule_task_runs(self, task_runs): + """Schedule a bunch of task runs. + + Args: + task_runs (list): a list of dicts with keys: + + - task (int): the identifier for the task being scheduled + - backend_id (str): the identifier of the job in the backend + - metadata (dict): metadata to add to the task_run entry + - scheduled (datetime.datetime): the instant the event occurred + + Returns: + None + """ + ... + + @remote_api_endpoint("task_run/start") + def start_task_run(self, backend_id, metadata=None, timestamp=None): + """Mark a given task as started, updating the corresponding task_run + entry in the database. + + Args: + backend_id (str): the identifier of the job in the backend + metadata (dict): metadata to add to the task_run entry + timestamp (datetime.datetime): the instant the event occurred + + Returns: + the updated task_run entry + + """ + ... + + @remote_api_endpoint("task_run/end") + def end_task_run( + self, backend_id, status, metadata=None, timestamp=None, result=None, + ): + """Mark a given task as ended, updating the corresponding task_run entry in the + database. + + Args: + backend_id (str): the identifier of the job in the backend + status (str): how the task ended; one of: 'eventful', 'uneventful', + 'failed' + metadata (dict): metadata to add to the task_run entry + timestamp (datetime.datetime): the instant the event occurred + + Returns: + the updated task_run entry + + """ + ... + + @remote_api_endpoint("task/filter_for_archive") + def filter_task_to_archive( + self, + after_ts: str, + before_ts: str, + limit: int = 10, + page_token: Optional[str] = None, + ) -> Dict[str, Any]: + """Compute the tasks to archive within the datetime interval + [after_ts, before_ts[. The method returns a paginated result. + + Returns: + dict with the following keys: + - **next_page_token**: opaque token to be used as + `page_token` to retrieve the next page of result. If absent, + there is no more pages to gather. + - **tasks**: list of task dictionaries with the following keys: + + **id** (str): origin task id + **started** (Optional[datetime]): started date + **scheduled** (datetime): scheduled date + **arguments** (json dict): task's arguments + ... + + """ + ... + + @remote_api_endpoint("task/delete_archived") + def delete_archived_tasks(self, task_ids): + """Delete archived tasks as much as possible. Only the task_ids whose + complete associated task_run have been cleaned up will be. + + """ + ... + + @remote_api_endpoint("task_run/get") + def get_task_runs(self, task_ids, limit=None): + """Search task run for a task id""" + ... + + @remote_api_endpoint("lister/get_or_create") + def get_or_create_lister( + self, name: str, instance_name: Optional[str] = None + ) -> Lister: + """Retrieve information about the given instance of the lister from the + database, or create the entry if it did not exist. + """ + ... + + @remote_api_endpoint("lister/update") + def update_lister(self, lister: Lister) -> Lister: + """Update the state for the given lister instance in the database. + + Returns: + a new Lister object, with all fields updated from the database + + Raises: + StaleData if the `updated` timestamp for the lister instance in + database doesn't match the one passed by the user. + """ + ... + + @remote_api_endpoint("origins/record") + def record_listed_origins( + self, listed_origins: Iterable[ListedOrigin] + ) -> List[ListedOrigin]: + """Record a set of origins that a lister has listed. + + This performs an "upsert": origins with the same (lister_id, url, + visit_type) values are updated with new values for + extra_loader_arguments, last_update and last_seen. + """ + ... + + @remote_api_endpoint("priority_ratios/get") + def get_priority_ratios(self): + ... diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py new file mode 100644 index 0000000..211e769 --- /dev/null +++ b/swh/scheduler/model.py @@ -0,0 +1,162 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +from uuid import UUID +from typing import Any, Dict, List, Optional, Tuple + +import attr +import attr.converters +from attrs_strict import type_validator + + +@attr.s +class BaseSchedulerModel: + """Base class for database-backed objects. + + These database-backed objects are defined through attrs-based attributes + that match the columns of the database 1:1. This is a (very) lightweight + ORM. + + These attrs-based attributes have metadata specific to the functionality + expected from these fields in the database: + + - `primary_key`: the column is a primary key; it should be filtered out + when doing an `update` of the object + - `auto_primary_key`: the column is a primary key, which is automatically handled + by the database. It will not be inserted to. This must be matched with a + database-side default value. + - `auto_now_add`: the column is a timestamp that is set to the current time when + the object is inserted, and never updated afterwards. This must be matched with + a database-side default value. + - `auto_now`: the column is a timestamp that is set to the current time when + the object is inserted or updated. + + """ + + _pk_cols: Optional[Tuple[str, ...]] = None + _select_cols: Optional[Tuple[str, ...]] = None + _insert_cols_and_metavars: Optional[Tuple[Tuple[str, ...], Tuple[str, ...]]] = None + + @classmethod + def primary_key_columns(cls) -> Tuple[str, ...]: + """Get the primary key columns for this object type""" + if cls._pk_cols is None: + columns: List[str] = [] + for field in attr.fields(cls): + if any( + field.metadata.get(flag) + for flag in ("auto_primary_key", "primary_key") + ): + columns.append(field.name) + cls._pk_cols = tuple(sorted(columns)) + + return cls._pk_cols + + @classmethod + def select_columns(cls) -> Tuple[str, ...]: + """Get all the database columns needed for a `select` on this object type""" + if cls._select_cols is None: + columns: List[str] = [] + for field in attr.fields(cls): + columns.append(field.name) + cls._select_cols = tuple(sorted(columns)) + + return cls._select_cols + + @classmethod + def insert_columns_and_metavars(cls) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: + """Get the database columns and metavars needed for an `insert` or `update` on + this object type. + + This implements support for the `auto_*` field metadata attributes. + """ + if cls._insert_cols_and_metavars is None: + zipped_cols_and_metavars: List[Tuple[str, str]] = [] + + for field in attr.fields(cls): + if any( + field.metadata.get(flag) + for flag in ("auto_now_add", "auto_primary_key") + ): + continue + elif field.metadata.get("auto_now"): + zipped_cols_and_metavars.append((field.name, "now()")) + else: + zipped_cols_and_metavars.append((field.name, f"%({field.name})s")) + + zipped_cols_and_metavars.sort() + + cols, metavars = zip(*zipped_cols_and_metavars) + cls._insert_cols_and_metavars = cols, metavars + + return cls._insert_cols_and_metavars + + +@attr.s +class Lister(BaseSchedulerModel): + name = attr.ib(type=str, validator=[type_validator()]) + instance_name = attr.ib(type=str, validator=[type_validator()]) + + # Populated by database + id = attr.ib( + type=Optional[UUID], + validator=type_validator(), + default=None, + metadata={"auto_primary_key": True}, + ) + + current_state = attr.ib( + type=Dict[str, Any], validator=[type_validator()], factory=dict + ) + created = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now_add": True}, + ) + updated = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now": True}, + ) + + +@attr.s +class ListedOrigin(BaseSchedulerModel): + """Basic information about a listed origin, output by a lister""" + + lister_id = attr.ib( + type=UUID, validator=[type_validator()], metadata={"primary_key": True} + ) + url = attr.ib( + type=str, validator=[type_validator()], metadata={"primary_key": True} + ) + visit_type = attr.ib( + type=str, validator=[type_validator()], metadata={"primary_key": True} + ) + extra_loader_arguments = attr.ib( + type=Dict[str, str], validator=[type_validator()], factory=dict + ) + + last_update = attr.ib( + type=Optional[datetime.datetime], validator=[type_validator()], default=None, + ) + + enabled = attr.ib(type=bool, validator=[type_validator()], default=True) + + first_seen = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now_add": True}, + ) + last_seen = attr.ib( + type=Optional[datetime.datetime], + validator=[type_validator()], + default=None, + metadata={"auto_now": True}, + ) diff --git a/swh/scheduler/sql/10-swh-init.sql b/swh/scheduler/sql/10-swh-init.sql new file mode 100644 index 0000000..d159cc5 --- /dev/null +++ b/swh/scheduler/sql/10-swh-init.sql @@ -0,0 +1 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql index a5f0b0e..0dfed9e 100644 --- a/swh/scheduler/sql/30-swh-schema.sql +++ b/swh/scheduler/sql/30-swh-schema.sql @@ -1,112 +1,161 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (15, now(), 'Work In Progress'); + values (16, now(), 'Work In Progress'); create table task_type ( type text primary key, description text not null, backend_name text not null, default_interval interval, min_interval interval, max_interval interval, backoff_factor float, max_queue_length bigint, num_retries bigint, retry_delay interval ); comment on table task_type is 'Types of schedulable tasks'; comment on column task_type.type is 'Short identifier for the task type'; comment on column task_type.description is 'Human-readable task description'; comment on column task_type.backend_name is 'Name of the task in the job-running backend'; comment on column task_type.default_interval is 'Default interval for newly scheduled tasks'; comment on column task_type.min_interval is 'Minimum interval between two runs of a task'; comment on column task_type.max_interval is 'Maximum interval between two runs of a task'; comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs'; comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks'; comment on column task_type.num_retries is 'Default number of retries on transient failures'; comment on column task_type.retry_delay is 'Retry delay for the task'; create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled'); comment on type task_status is 'Status of a given task'; create type task_policy as enum ('recurring', 'oneshot'); comment on type task_policy is 'Recurrence policy of the given task'; create type task_priority as enum('high', 'normal', 'low'); comment on type task_priority is 'Priority of the given task'; create table priority_ratio( id task_priority primary key, ratio float not null ); comment on table priority_ratio is 'Oneshot task''s reading ratio per priority'; comment on column priority_ratio.id is 'Task priority id'; comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority'; insert into priority_ratio (id, ratio) values ('high', 0.5); insert into priority_ratio (id, ratio) values ('normal', 0.3); insert into priority_ratio (id, ratio) values ('low', 0.2); create table task ( id bigserial primary key, type text not null references task_type(type), arguments jsonb not null, next_run timestamptz not null, current_interval interval, status task_status not null, policy task_policy not null default 'recurring', retries_left bigint not null default 0, priority task_priority references priority_ratio(id), check (policy <> 'recurring' or current_interval is not null) ); comment on table task is 'Schedule of recurring tasks'; comment on column task.arguments is 'Arguments passed to the underlying job scheduler. ' 'Contains two keys, ''args'' (list) and ''kwargs'' (object).'; comment on column task.next_run is 'The next run of this task should be run on or after that time'; comment on column task.current_interval is 'The interval between two runs of this task, ' 'taking into account the backoff factor'; comment on column task.policy is 'Whether the task is one-shot or recurring'; comment on column task.retries_left is 'The number of "short delay" retries of the task in case of ' 'transient failure'; comment on column task.priority is 'Policy of the given task'; comment on column task.id is 'Task Identifier'; comment on column task.type is 'References task_type table'; comment on column task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')'; create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost'); comment on type task_run_status is 'Status of a given task run'; create table task_run ( id bigserial primary key, task bigint not null references task(id), backend_id text, scheduled timestamptz, started timestamptz, ended timestamptz, metadata jsonb, status task_run_status not null default 'scheduled' ); comment on table task_run is 'History of task runs sent to the job-running backend'; comment on column task_run.backend_id is 'id of the task run in the job-running backend'; comment on column task_run.metadata is 'Useful metadata for the given task run. ' 'For instance, the worker that took on the job, ' 'or the logs for the run.'; comment on column task_run.id is 'Task run identifier'; comment on column task_run.task is 'References task table'; comment on column task_run.scheduled is 'Scheduled run time for task'; comment on column task_run.started is 'Task starting time'; comment on column task_run.ended is 'Task ending time'; + +create table if not exists listers ( + id uuid primary key default uuid_generate_v4(), + name text not null, + instance_name text not null, + created timestamptz not null default now(), -- auto_now_add in the model + current_state jsonb not null, + updated timestamptz not null +); + +comment on table listers is 'Lister instances known to the origin visit scheduler'; +comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)'; +comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)'; +comment on column listers.created is 'Timestamp at which the lister was originally created'; +comment on column listers.current_state is 'Known current state of this lister'; +comment on column listers.updated is 'Timestamp at which the lister state was last updated'; + + +create table if not exists listed_origins ( + -- Basic information + lister_id uuid not null references listers(id), + url text not null, + visit_type text not null, + extra_loader_arguments jsonb not null, + + -- Whether this origin still exists or not + enabled boolean not null, + + -- time-based information + first_seen timestamptz not null default now(), + last_seen timestamptz not null, + + -- potentially provided by the lister + last_update timestamptz, + + primary key (lister_id, url, visit_type) +); + +comment on table listed_origins is 'Origins known to the origin visit scheduler'; +comment on column listed_origins.lister_id is 'Lister instance which owns this origin'; +comment on column listed_origins.url is 'URL of the origin listed'; +comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url'; +comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin'; + +comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.'; +comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister'; +comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister'; + +comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote'; diff --git a/swh/scheduler/sql/60-swh-indexes.sql b/swh/scheduler/sql/60-swh-indexes.sql index 7da9519..690541c 100644 --- a/swh/scheduler/sql/60-swh-indexes.sql +++ b/swh/scheduler/sql/60-swh-indexes.sql @@ -1,13 +1,16 @@ create index on task(type); create index on task(next_run); -- used for quick equality checking create index on task using btree(type, md5(arguments::text)); create index on task(priority); create index on task_run(task); create index on task_run(backend_id); create index task_run_id_asc_idx on task_run(task asc, started asc); + +-- lister schema +create unique index on listers (name, instance_name); diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py index 258c80f..9c2c463 100644 --- a/swh/scheduler/tests/common.py +++ b/swh/scheduler/tests/common.py @@ -1,95 +1,104 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime TEMPLATES = { "git": { "type": "update-git", "arguments": {"args": [], "kwargs": {},}, "next_run": None, }, "hg": { "type": "update-hg", "arguments": {"args": [], "kwargs": {},}, "next_run": None, "policy": "oneshot", }, } TASK_TYPES = { "git": { "type": "update-git", "description": "Update a git repository", "backend_name": "swh.loader.git.tasks.UpdateGitRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, "hg": { "type": "update-hg", "description": "Update a mercurial repository", "backend_name": "swh.loader.mercurial.tasks.UpdateHgRepository", "default_interval": datetime.timedelta(days=64), "min_interval": datetime.timedelta(hours=12), "max_interval": datetime.timedelta(days=64), "backoff_factor": 2, "max_queue_length": None, "num_retries": 7, "retry_delay": datetime.timedelta(hours=2), }, } def tasks_from_template(template, max_timestamp, num, num_priority=0, priorities=None): """Build tasks from template """ def _task_from_template(template, next_run, priority, *args, **kwargs): ret = copy.deepcopy(template) ret["next_run"] = next_run if priority: ret["priority"] = priority if args: ret["arguments"]["args"] = list(args) if kwargs: ret["arguments"]["kwargs"] = kwargs return ret def _pop_priority(priorities): if not priorities: return None for priority, remains in priorities.items(): if remains > 0: priorities[priority] = remains - 1 return priority return None if num_priority and priorities: priorities = { priority: ratio * num_priority for priority, ratio in priorities.items() } tasks = [] for i in range(num + num_priority): priority = _pop_priority(priorities) tasks.append( _task_from_template( template, max_timestamp - datetime.timedelta(microseconds=i), priority, "argument-%03d" % i, - **{"kwarg%03d" % i: "bogus-kwarg"} + **{"kwarg%03d" % i: "bogus-kwarg"}, ) ) return tasks + + +LISTERS = ( + {"name": "github"}, + {"name": "gitlab", "instance_name": "gitlab"}, + {"name": "gitlab", "instance_name": "freedesktop"}, + {"name": "npm"}, + {"name": "pypi"}, +) diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index b3be2e4..9aeb208 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,112 +1,134 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - import os import pytest import glob -from datetime import timedelta +from datetime import datetime, timedelta, timezone import pkg_resources +from typing import List from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR +from swh.scheduler.model import ListedOrigin, Lister +from swh.scheduler.tests.common import LISTERS # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith("CELERY")]: os.environ.pop(var) # test_cli tests depends on a en/C locale, so ensure it os.environ["LC_ALL"] = "C.UTF-8" DUMP_FILES = os.path.join(SQL_DIR, "*.sql") # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py TASK_NAMES = ["ping", "multiping", "add", "error", "echo"] @pytest.fixture(scope="session") def celery_enable_logging(): return True @pytest.fixture(scope="session") def celery_includes(): task_modules = [ "swh.scheduler.tests.tasks", ] for entrypoint in pkg_resources.iter_entry_points("swh.workers"): task_modules.extend(entrypoint.load()().get("task_modules", [])) return task_modules @pytest.fixture(scope="session") def celery_parameters(): return { "task_cls": "swh.scheduler.task:SWHTask", } @pytest.fixture(scope="session") def celery_config(): return { "accept_content": ["application/x-msgpack", "application/json"], "task_serializer": "msgpack", "result_serializer": "json", } # use the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application # with the test application @pytest.fixture(scope="session") def swh_app(celery_session_app): from swh.scheduler.celery_backend import config config.app = celery_session_app yield celery_session_app @pytest.fixture def swh_scheduler_config(request, postgresql): scheduler_config = { "db": postgresql.dsn, } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() return scheduler_config @pytest.fixture def swh_scheduler(swh_scheduler_config): scheduler = get_scheduler("local", swh_scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type( { "type": "swh-test-{}".format(taskname), "description": "The {} testing task".format(taskname), "backend_name": "swh.scheduler.tests.tasks.{}".format(taskname), "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), } ) return scheduler # this alias is used to be able to easily instantiate a db-backed Scheduler # eg. for the RPC client/server test suite. swh_db_scheduler = swh_scheduler + + +@pytest.fixture +def stored_lister(swh_scheduler) -> Lister: + """Store a lister in the scheduler and return its information""" + return swh_scheduler.get_or_create_lister(**LISTERS[0]) + + +@pytest.fixture +def listed_origins(stored_lister) -> List[ListedOrigin]: + """Return a (fixed) set of 1000 listed origins""" + return [ + ListedOrigin( + lister_id=stored_lister.id, + url=f"https://example.com/{i:04d}.git", + visit_type="git", + last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), + ) + for i in range(1000) + ] diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py index 73b3373..fa0dd62 100644 --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -1,64 +1,68 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from flask import url_for import swh.scheduler.api.server as server from swh.scheduler.api.client import RemoteScheduler from swh.scheduler.tests.test_scheduler import TestScheduler # noqa # tests are executed using imported class (TestScheduler) using overloaded # swh_scheduler fixture below # the Flask app used as server in these tests @pytest.fixture def app(swh_db_scheduler): + assert hasattr(server, "scheduler") server.scheduler = swh_db_scheduler yield server.app # the RPCClient class used as client used in these tests @pytest.fixture def swh_rpc_client_class(): return RemoteScheduler @pytest.fixture def swh_scheduler(swh_rpc_client, app): yield swh_rpc_client def test_site_map(flask_app_client): sitemap = flask_app_client.get(url_for("site_map")) assert sitemap.headers["Content-Type"] == "application/json" rules = set(x["rule"] for x in sitemap.json) # we expect at least these rules expected_rules = set( "/" + rule for rule in ( - "set_status_tasks", - "create_task_type", - "get_task_type", - "get_task_types", - "create_tasks", - "disable_tasks", - "get_tasks", - "search_tasks", - "get_task_runs", - "peek_ready_tasks", - "grab_ready_tasks", - "schedule_task_run", - "mass_schedule_task_runs", - "start_task_run", - "end_task_run", - "filter_task_to_archive", - "delete_archived_tasks", - "get_priority_ratios", + "lister/get_or_create", + "lister/update", + "origins/record", + "priority_ratios/get", + "task/create", + "task/delete_archived", + "task/disable", + "task/filter_for_archive", + "task/get", + "task/grab_ready", + "task/peek_ready", + "task/search", + "task/set_status", + "task_run/end", + "task_run/get", + "task_run/schedule", + "task_run/schedule_one", + "task_run/start", + "task_type/create", + "task_type/get", + "task_type/get_all", ) ) assert rules == expected_rules diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py index 8edabbd..ed9539d 100644 --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -1,167 +1,164 @@ from time import sleep from itertools import count from celery.result import GroupResult from celery.result import AsyncResult import pytest from swh.scheduler.utils import create_task_dict from swh.scheduler.celery_backend.runner import run_ready_tasks def test_ping(swh_app, celery_session_worker): res = swh_app.send_task("swh.scheduler.tests.tasks.ping") assert res res.wait() assert res.successful() assert res.result == "OK" def test_ping_with_kw(swh_app, celery_session_worker): res = swh_app.send_task("swh.scheduler.tests.tasks.ping", kwargs={"a": 1}) assert res res.wait() assert res.successful() assert res.result == "OK (kw={'a': 1})" def test_multiping(swh_app, celery_session_worker): "Test that a task that spawns subtasks (group) works" res = swh_app.send_task("swh.scheduler.tests.tasks.multiping", kwargs={"n": 5}) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) results = [x.get() for x in promise.results] assert len(results) == 5 for i in range(5): assert ("OK (kw={'i': %s})" % i) in results -@pytest.mark.db def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler): "Test that the scheduler fixture works properly" task_type = swh_scheduler.get_task_type("swh-test-ping") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.ping" swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) backend_tasks = run_ready_tasks(swh_scheduler, swh_app) assert backend_tasks for task in backend_tasks: # Make sure the task completed AsyncResult(id=task["backend_id"]).get() -@pytest.mark.db def test_task_return_value(swh_app, celery_session_worker, swh_scheduler): task_type = swh_scheduler.get_task_type("swh-test-add") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.add" swh_scheduler.create_tasks([create_task_dict("swh-test-add", "oneshot", 12, 30)]) backend_tasks = run_ready_tasks(swh_scheduler, swh_app) assert len(backend_tasks) == 1 task = backend_tasks[0] value = AsyncResult(id=task["backend_id"]).get() assert value == 42 -@pytest.mark.db def test_task_exception(swh_app, celery_session_worker, swh_scheduler): task_type = swh_scheduler.get_task_type("swh-test-error") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error" swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) backend_tasks = run_ready_tasks(swh_scheduler, swh_app) assert len(backend_tasks) == 1 task = backend_tasks[0] result = AsyncResult(id=task["backend_id"]) with pytest.raises(NotImplementedError): result.get() def test_statsd(swh_app, celery_session_worker, mocker): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) res = swh_app.send_task("swh.scheduler.tests.tasks.echo") assert res res.wait() assert res.successful() assert res.result == {} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:uneventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) def test_statsd_with_status(swh_app, celery_session_worker, mocker): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) res = swh_app.send_task( "swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"} ) assert res res.wait() assert res.successful() assert res.result == {"status": "eventful"} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:eventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) diff --git a/swh/scheduler/tests/test_cli_celery_monitor.py b/swh/scheduler/tests/test_cli_celery_monitor.py new file mode 100644 index 0000000..5a0ed24 --- /dev/null +++ b/swh/scheduler/tests/test_cli_celery_monitor.py @@ -0,0 +1,130 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from click.testing import CliRunner +import pytest + +from swh.scheduler.cli import cli + + +def invoke(*args, catch_exceptions=False): + result = CliRunner(mix_stderr=False).invoke( + cli, ["celery-monitor", *args], catch_exceptions=catch_exceptions, + ) + + return result + + +def test_celery_monitor(): + """Check that celery-monitor returns its help text""" + + result = invoke() + + assert "Commands:" in result.stdout + assert "Options:" in result.stdout + + +def test_celery_monitor_ping(caplog, swh_app, celery_session_worker): + caplog.set_level(logging.INFO, "swh.scheduler.cli.celery_monitor") + + result = invoke("--pattern", celery_session_worker.hostname, "ping-workers") + + assert result.exit_code == 0 + + assert len(caplog.records) == 1 + + (record,) = caplog.records + + assert record.levelname == "INFO" + assert f"response from {celery_session_worker.hostname}" in record.message + + +@pytest.mark.parametrize( + "filter_args,filter_message,exit_code", + [ + ((), "Matching all workers", 0), + ( + ("--pattern", "celery@*.test-host"), + "Using glob pattern celery@*.test-host", + 1, + ), + ( + ("--pattern", "celery@test-type.test-host"), + "Using destinations celery@test-type.test-host", + 1, + ), + ( + ("--pattern", "celery@test-type.test-host,celery@test-type2.test-host"), + ( + "Using destinations " + "celery@test-type.test-host, celery@test-type2.test-host" + ), + 1, + ), + ], +) +def test_celery_monitor_ping_filter( + caplog, swh_app, celery_session_worker, filter_args, filter_message, exit_code +): + caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") + + result = invoke("--timeout", "1.5", *filter_args, "ping-workers") + + assert result.exit_code == exit_code, result.stdout + + got_no_response_message = False + got_filter_message = False + + for record in caplog.records: + # Check the proper filter has been generated + if record.levelname == "DEBUG": + if filter_message in record.message: + got_filter_message = True + # Check that no worker responded + if record.levelname == "INFO": + if "No response in" in record.message: + got_no_response_message = True + + assert got_filter_message + + if filter_args: + assert got_no_response_message + + +def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker): + caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") + + result = invoke("--pattern", celery_session_worker.hostname, "list-running") + + assert result.exit_code == 0, result.stdout + + for record in caplog.records: + if record.levelname != "INFO": + continue + assert f"{celery_session_worker.hostname}: no active tasks" in record.message + + +@pytest.mark.parametrize("format", ["csv", "pretty"]) +def test_celery_monitor_list_running_format( + caplog, swh_app, celery_session_worker, format +): + caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor") + + result = invoke( + "--pattern", celery_session_worker.hostname, "list-running", "--format", format + ) + + assert result.exit_code == 0, result.stdout + + for record in caplog.records: + if record.levelname != "INFO": + continue + assert f"{celery_session_worker.hostname}: no active tasks" in record.message + + if format == "csv": + lines = result.stdout.splitlines() + assert lines == ["worker,name,args,kwargs,duration,worker_pid"] diff --git a/swh/scheduler/tests/test_model.py b/swh/scheduler/tests/test_model.py new file mode 100644 index 0000000..47bb618 --- /dev/null +++ b/swh/scheduler/tests/test_model.py @@ -0,0 +1,94 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +import attr + +from swh.scheduler import model + + +def test_select_columns(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str) + test1 = attr.ib(type=str) + a_first_attr = attr.ib(type=str) + + @property + def test2(self): + """This property should not show up in the extracted columns""" + return self.test1 + + assert TestModel.select_columns() == ("a_first_attr", "id", "test1") + + +def test_insert_columns(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str) + test1 = attr.ib(type=str) + + @property + def test2(self): + """This property should not show up in the extracted columns""" + return self.test1 + + assert TestModel.insert_columns_and_metavars() == ( + ("id", "test1"), + ("%(id)s", "%(test1)s"), + ) + + +def test_insert_columns_auto_now_add(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str) + test1 = attr.ib(type=str) + added = attr.ib(type=datetime.datetime, metadata={"auto_now_add": True}) + + assert TestModel.insert_columns_and_metavars() == ( + ("id", "test1"), + ("%(id)s", "%(test1)s"), + ) + + +def test_insert_columns_auto_now(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str) + test1 = attr.ib(type=str) + updated = attr.ib(type=datetime.datetime, metadata={"auto_now": True}) + + assert TestModel.insert_columns_and_metavars() == ( + ("id", "test1", "updated"), + ("%(id)s", "%(test1)s", "now()"), + ) + + +def test_insert_columns_primary_key(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str, metadata={"auto_primary_key": True}) + test1 = attr.ib(type=str) + + assert TestModel.insert_columns_and_metavars() == (("test1",), ("%(test1)s",)) + + +def test_insert_primary_key(): + @attr.s + class TestModel(model.BaseSchedulerModel): + id = attr.ib(type=str, metadata={"auto_primary_key": True}) + test1 = attr.ib(type=str) + + assert TestModel.primary_key_columns() == ("id",) + + @attr.s + class TestModel2(model.BaseSchedulerModel): + col1 = attr.ib(type=str, metadata={"primary_key": True}) + col2 = attr.ib(type=str, metadata={"primary_key": True}) + test1 = attr.ib(type=str) + + assert TestModel2.primary_key_columns() == ("col1", "col2") diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py index c2cd1a4..6d58e8c 100644 --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -1,587 +1,678 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import random import uuid from collections import defaultdict +import inspect from typing import Any, Dict from arrow import utcnow - +import attr import pytest -from .common import tasks_from_template, TEMPLATES, TASK_TYPES +from swh.scheduler.exc import StaleData +from swh.scheduler.interface import SchedulerInterface + +from .common import tasks_from_template, TEMPLATES, TASK_TYPES, LISTERS def subdict(d, keys=None, excl=()): if keys is None: keys = [k for k in d.keys()] return {k: d[k] for k in keys if k not in excl} -@pytest.mark.db class TestScheduler: + def test_interface(self, swh_scheduler): + """Checks all methods of SchedulerInterface are implemented by this + backend, and that they have the same signature.""" + # Create an instance of the protocol (which cannot be instantiated + # directly, so this creates a subclass, then instantiates it) + interface = type("_", (SchedulerInterface,), {})() + + assert "create_task_type" in dir(interface) + + missing_methods = [] + + for meth_name in dir(interface): + if meth_name.startswith("_"): + continue + interface_meth = getattr(interface, meth_name) + try: + concrete_meth = getattr(swh_scheduler, meth_name) + except AttributeError: + if not getattr(interface_meth, "deprecated_endpoint", False): + # The backend is missing a (non-deprecated) endpoint + missing_methods.append(meth_name) + continue + + expected_signature = inspect.signature(interface_meth) + actual_signature = inspect.signature(concrete_meth) + + assert expected_signature == actual_signature, meth_name + + assert missing_methods == [] + def test_get_priority_ratios(self, swh_scheduler): assert swh_scheduler.get_priority_ratios() == { "high": 0.5, "normal": 0.3, "low": 0.2, } def test_add_task_type(self, swh_scheduler): tt = TASK_TYPES["git"] swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) tt2 = TASK_TYPES["hg"] swh_scheduler.create_task_type(tt2) assert tt == swh_scheduler.get_task_type(tt["type"]) assert tt2 == swh_scheduler.get_task_type(tt2["type"]) def test_create_task_type_idempotence(self, swh_scheduler): tt = TASK_TYPES["git"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt) assert tt == swh_scheduler.get_task_type(tt["type"]) def test_get_task_types(self, swh_scheduler): tt, tt2 = TASK_TYPES["git"], TASK_TYPES["hg"] swh_scheduler.create_task_type(tt) swh_scheduler.create_task_type(tt2) actual_task_types = swh_scheduler.get_task_types() assert tt in actual_task_types assert tt2 in actual_task_types def test_create_tasks(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) num_tasks_priority = 100 tasks_1 = tasks_from_template(TEMPLATES["git"], utcnow(), 100) tasks_2 = tasks_from_template( TEMPLATES["hg"], utcnow(), 100, num_tasks_priority, priorities=priority_ratio, ) tasks = tasks_1 + tasks_2 # tasks are returned only once with their ids ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2) set_ret1 = set([t["id"] for t in ret1]) # creating the same set result in the same ids ret = swh_scheduler.create_tasks(tasks) set_ret = set([t["id"] for t in ret]) # Idempotence results assert set_ret == set_ret1 assert len(ret) == len(ret1) ids = set() actual_priorities = defaultdict(int) for task, orig_task in zip(ret, tasks): task = copy.deepcopy(task) task_type = TASK_TYPES[orig_task["type"].split("-")[-1]] assert task["id"] not in ids assert task["status"] == "next_run_not_scheduled" assert task["current_interval"] == task_type["default_interval"] assert task["policy"] == orig_task.get("policy", "recurring") priority = task.get("priority") if priority: actual_priorities[priority] += 1 assert task["retries_left"] == (task_type["num_retries"] or 0) ids.add(task["id"]) del task["id"] del task["status"] del task["current_interval"] del task["retries_left"] if "policy" not in orig_task: del task["policy"] if "priority" not in orig_task: del task["priority"] assert task == orig_task assert dict(actual_priorities) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() } def test_peek_ready_tasks_no_priority(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] tasks = tasks_from_template(TEMPLATES["git"], t, 100) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) for i in range(len(ready_tasks) - 1): assert ready_tasks[i]["next_run"] <= ready_tasks[i + 1]["next_run"] # Only get the first few ready tasks limit = random.randrange(5, 5 + len(tasks) // 2) ready_tasks_limited = swh_scheduler.peek_ready_tasks(task_type, num_tasks=limit) assert len(ready_tasks_limited) == limit assert ready_tasks_limited == ready_tasks[:limit] # Limit by timestamp max_ts = tasks[limit - 1]["next_run"] ready_tasks_timestamped = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts ) for ready_task in ready_tasks_timestamped: assert ready_task["next_run"] <= max_ts # Make sure we get proper behavior for the first ready tasks assert ready_tasks[: len(ready_tasks_timestamped)] == ready_tasks_timestamped # Limit by both ready_tasks_both = swh_scheduler.peek_ready_tasks( task_type, timestamp=max_ts, num_tasks=limit // 3 ) assert len(ready_tasks_both) <= limit // 3 for ready_task in ready_tasks_both: assert ready_task["next_run"] <= max_ts assert ready_task in ready_tasks[: limit // 3] def _priority_ratio(self, swh_scheduler): return swh_scheduler.get_priority_ratios() def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["git"], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio, ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) # take all available tasks ready_tasks = swh_scheduler.peek_ready_tasks(task_type) assert len(ready_tasks) == len(tasks) assert num_tasks_priority + num_tasks_no_priority == len(ready_tasks) count_tasks_per_priority = defaultdict(int) for task in ready_tasks: priority = task.get("priority") if priority: count_tasks_per_priority[priority] += 1 assert dict(count_tasks_per_priority) == { priority: int(ratio * num_tasks_priority) for priority, ratio in priority_ratio.items() } # Only get some ready tasks num_tasks = random.randrange(5, 5 + num_tasks_no_priority // 2) num_tasks_priority = random.randrange(5, num_tasks_priority // 2) ready_tasks_limited = swh_scheduler.peek_ready_tasks( task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority ) count_tasks_per_priority = defaultdict(int) for task in ready_tasks_limited: priority = task.get("priority") count_tasks_per_priority[priority] += 1 import math for priority, ratio in priority_ratio.items(): expected_count = math.ceil(ratio * num_tasks_priority) actual_prio = count_tasks_per_priority[priority] assert actual_prio == expected_count or actual_prio == expected_count + 1 assert count_tasks_per_priority[None] == num_tasks def test_grab_ready_tasks(self, swh_scheduler): priority_ratio = self._priority_ratio(swh_scheduler) self._create_task_types(swh_scheduler) t = utcnow() task_type = TEMPLATES["git"]["type"] num_tasks_priority = 100 num_tasks_no_priority = 100 # Create tasks with and without priorities tasks = tasks_from_template( TEMPLATES["git"], t, num=num_tasks_no_priority, num_priority=num_tasks_priority, priorities=priority_ratio, ) random.shuffle(tasks) swh_scheduler.create_tasks(tasks) first_ready_tasks = swh_scheduler.peek_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10 ) grabbed_tasks = swh_scheduler.grab_ready_tasks( task_type, num_tasks=10, num_tasks_priority=10 ) for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks): assert peeked["status"] == "next_run_not_scheduled" del peeked["status"] assert grabbed["status"] == "next_run_scheduled" del grabbed["status"] assert peeked == grabbed assert peeked["priority"] == grabbed["priority"] def test_get_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) random.shuffle(tasks) while len(tasks) > 1: length = random.randrange(1, len(tasks)) cur_tasks = sorted(tasks[:length], key=lambda x: x["id"]) tasks[:length] = [] ret = swh_scheduler.get_tasks(task["id"] for task in cur_tasks) # result is not guaranteed to be sorted ret.sort(key=lambda x: x["id"]) assert ret == cur_tasks def test_search_tasks(self, swh_scheduler): def make_real_dicts(lst): """RealDictRow is not a real dict.""" return [dict(d.items()) for d in lst] self._create_task_types(swh_scheduler) t = utcnow() tasks = tasks_from_template(TEMPLATES["git"], t, 100) tasks = swh_scheduler.create_tasks(tasks) assert make_real_dicts(swh_scheduler.search_tasks()) == make_real_dicts(tasks) def assert_filtered_task_ok( self, task: Dict[str, Any], after: datetime.datetime, before: datetime.datetime ) -> None: """Ensure filtered tasks have the right expected properties (within the range, recurring disabled, etc..) """ started = task["started"] date = started if started is not None else task["scheduled"] assert after <= date and date <= before if task["task_policy"] == "oneshot": assert task["task_status"] in ["completed", "disabled"] if task["task_policy"] == "recurring": assert task["task_status"] in ["disabled"] def test_filter_task_to_archive(self, swh_scheduler): """Filtering only list disabled recurring or completed oneshot tasks """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) # simulate scheduling tasks pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) # we simulate the task are being done _tasks = [] for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") _tasks.append(t) # Randomly update task's status per policy status_per_policy = {"recurring": 0, "oneshot": 0} status_choice = { # policy: [tuple (1-for-filtering, 'associated-status')] "recurring": [ (1, "disabled"), (0, "completed"), (0, "next_run_not_scheduled"), ], "oneshot": [ (0, "next_run_not_scheduled"), (1, "disabled"), (1, "completed"), ], } tasks_to_update = defaultdict(list) _task_ids = defaultdict(list) # randomize 'disabling' recurring task or 'complete' oneshot task for task in pending_tasks: policy = task["policy"] _task_ids[policy].append(task["id"]) status = random.choice(status_choice[policy]) if status[0] != 1: continue # elected for filtering status_per_policy[policy] += status[0] tasks_to_update[policy].append(task["id"]) swh_scheduler.disable_tasks(tasks_to_update["recurring"]) # hack: change the status to something else than completed/disabled swh_scheduler.set_status_tasks( _task_ids["oneshot"], status="next_run_not_scheduled" ) # complete the tasks to update swh_scheduler.set_status_tasks(tasks_to_update["oneshot"], status="completed") total_tasks_filtered = ( status_per_policy["recurring"] + status_per_policy["oneshot"] ) # no pagination scenario # retrieve tasks to archive after = _time.shift(days=-1) after_ts = after.format("YYYY-MM-DD") before = utcnow().shift(days=1) before_ts = before.format("YYYY-MM-DD") tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=total_tasks ) tasks_to_archive = tasks_result["tasks"] assert len(tasks_to_archive) == total_tasks_filtered assert tasks_result.get("next_page_token") is None actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in tasks_to_archive: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy # pagination scenario nb_tasks = 3 tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) == nb_tasks next_page_token = tasks_result["next_page_token"] assert next_page_token is not None all_tasks = tasks_to_archive2 while next_page_token is not None: # Retrieve paginated results tasks_result = swh_scheduler.filter_task_to_archive( after_ts=after_ts, before_ts=before_ts, limit=nb_tasks, page_token=next_page_token, ) tasks_to_archive2 = tasks_result["tasks"] assert len(tasks_to_archive2) <= nb_tasks all_tasks.extend(tasks_to_archive2) next_page_token = tasks_result.get("next_page_token") actual_filtered_per_status = {"recurring": 0, "oneshot": 0} for task in all_tasks: self.assert_filtered_task_ok(task, after, before) actual_filtered_per_status[task["task_policy"]] += 1 assert actual_filtered_per_status == status_per_policy def test_delete_archived_tasks(self, swh_scheduler): self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) _tasks = [] percent = random.randint(0, 100) # random election removal boundary for task in backend_tasks: t = swh_scheduler.end_task_run(task["backend_id"], status="eventful") c = random.randint(0, 100) if c <= percent: _tasks.append({"task_id": t["task"], "task_run_id": t["id"]}) swh_scheduler.delete_archived_tasks(_tasks) all_tasks = [task["id"] for task in swh_scheduler.search_tasks()] tasks_count = len(all_tasks) tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks)) assert tasks_count == total_tasks - len(_tasks) assert tasks_run_count == total_tasks - len(_tasks) def test_get_task_runs_no_task(self, swh_scheduler): """No task exist in the scheduler's db, get_task_runs() should always return an empty list. """ assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_no_task_executed(self, swh_scheduler): """No task has been executed yet, get_task_runs() should always return an empty list. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) swh_scheduler.create_tasks(recurring + oneshots) assert not swh_scheduler.get_task_runs(task_ids=()) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3)) assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10) def test_get_task_runs_with_scheduled(self, swh_scheduler): """Some tasks have been scheduled but not executed yet, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) total_tasks = len(recurring) + len(oneshots) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) assert not swh_scheduler.get_task_runs(task_ids=[total_tasks + 1]) btask = backend_tasks[0] runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 run = runs[0] assert subdict(run, excl=("id",)) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": None, "ended": None, "metadata": None, "status": "scheduled", } runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks], limit=2 ) assert len(runs) == 2 runs = swh_scheduler.get_task_runs( task_ids=[bt["task"] for bt in backend_tasks] ) assert len(runs) == total_tasks keys = ("task", "backend_id", "scheduled") assert ( sorted([subdict(x, keys) for x in runs], key=lambda x: x["task"]) == backend_tasks ) def test_get_task_runs_with_executed(self, swh_scheduler): """Some tasks have been executed, get_task_runs() should not return an empty list. limit should behave as expected. """ self._create_task_types(swh_scheduler) _time = utcnow() recurring = tasks_from_template(TEMPLATES["git"], _time, 12) oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12) pending_tasks = swh_scheduler.create_tasks(recurring + oneshots) backend_tasks = [ { "task": task["id"], "backend_id": str(uuid.uuid4()), "scheduled": utcnow(), } for task in pending_tasks ] swh_scheduler.mass_schedule_task_runs(backend_tasks) btask = backend_tasks[0] ts = utcnow() swh_scheduler.start_task_run( btask["backend_id"], metadata={"something": "stupid"}, timestamp=ts ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": None, "metadata": {"something": "stupid"}, "status": "started", } ts2 = utcnow() swh_scheduler.end_task_run( btask["backend_id"], metadata={"other": "stuff"}, timestamp=ts2, status="eventful", ) runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]]) assert len(runs) == 1 assert subdict(runs[0], excl=("id")) == { "task": btask["task"], "backend_id": btask["backend_id"], "scheduled": btask["scheduled"], "started": ts, "ended": ts2, "metadata": {"something": "stupid", "other": "stuff"}, "status": "eventful", } + def test_get_or_create_lister(self, swh_scheduler): + db_listers = [] + for lister_args in LISTERS: + db_listers.append(swh_scheduler.get_or_create_lister(**lister_args)) + + for lister, lister_args in zip(db_listers, LISTERS): + assert lister.name == lister_args["name"] + assert lister.instance_name == lister_args.get("instance_name", "") + + lister_get_again = swh_scheduler.get_or_create_lister( + lister.name, lister.instance_name + ) + + assert lister == lister_get_again + + def test_update_lister(self, swh_scheduler, stored_lister): + lister = attr.evolve(stored_lister, current_state={"updated": "now"}) + + updated_lister = swh_scheduler.update_lister(lister) + + assert updated_lister.updated > lister.updated + assert updated_lister == attr.evolve(lister, updated=updated_lister.updated) + + def test_update_lister_stale(self, swh_scheduler, stored_lister): + swh_scheduler.update_lister(stored_lister) + + with pytest.raises(StaleData) as exc: + swh_scheduler.update_lister(stored_lister) + assert "state not updated" in exc.value.args[0] + + def test_record_listed_origins(self, swh_scheduler, listed_origins): + ret = swh_scheduler.record_listed_origins(listed_origins) + + assert set(returned.url for returned in ret) == set( + origin.url for origin in listed_origins + ) + + assert all(origin.first_seen == origin.last_seen for origin in ret) + + def test_record_listed_origins_upsert(self, swh_scheduler, listed_origins): + # First, insert `cutoff` origins + cutoff = 100 + assert cutoff < len(listed_origins) + + ret = swh_scheduler.record_listed_origins(listed_origins[:cutoff]) + assert len(ret) == cutoff + + # Then, insert all origins, including the `cutoff` first. + ret = swh_scheduler.record_listed_origins(listed_origins) + + assert len(ret) == len(listed_origins) + + # Two different "first seen" values + assert len(set(origin.first_seen for origin in ret)) == 2 + + # But a single "last seen" value + assert len(set(origin.last_seen for origin in ret)) == 1 + def _create_task_types(self, scheduler): for tt in TASK_TYPES.values(): scheduler.create_task_type(tt) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..b6252bd --- /dev/null +++ b/tox.ini @@ -0,0 +1,42 @@ +[tox] +envlist=black,flake8,mypy,py3 + +[testenv] +extras = + testing +deps = + pytest-cov + dev: ipdb +setenv = + LC_ALL=C.UTF-8 + LC_CTYPE=C.UTF-8 + LANG=C.UTF-8 +commands = + pytest --doctest-modules \ + !slow: --hypothesis-profile=fast \ + slow: --hypothesis-profile=slow \ + --cov={envsitepackagesdir}/swh/scheduler \ + {envsitepackagesdir}/swh/scheduler \ + --cov-branch {posargs} + +[testenv:black] +skip_install = true +deps = + black +commands = + {envpython} -m black --check swh + +[testenv:flake8] +skip_install = true +deps = + flake8 +commands = + {envpython} -m flake8 + +[testenv:mypy] +extras = + testing +deps = + mypy +commands = + mypy swh