diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..64be2bf
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,14 @@
+*.pyc
+*.sw?
+*~
+/.coverage
+/.coverage.*
+.eggs/
+__pycache__
+*.egg-info/
+build/
+dist/
+version.txt
+/.hypothesis/
+/.tox/
+.mypy_cache/
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..69b3349
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,40 @@
+repos:
+- repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v2.4.0
+ hooks:
+ - id: trailing-whitespace
+ - id: flake8
+ - id: check-json
+ - id: check-yaml
+
+- repo: https://github.com/codespell-project/codespell
+ rev: v1.16.0
+ hooks:
+ - id: codespell
+
+- repo: local
+ hooks:
+ - id: mypy
+ name: mypy
+ entry: mypy
+ args: [swh]
+ pass_filenames: false
+ language: system
+ types: [python]
+
+- repo: https://github.com/python/black
+ rev: 19.10b0
+ hooks:
+ - id: black
+
+# unfortunately, we are far from being able to enable this...
+# - repo: https://github.com/PyCQA/pydocstyle.git
+# rev: 4.0.0
+# hooks:
+# - id: pydocstyle
+# name: pydocstyle
+# description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions.
+# entry: pydocstyle --convention=google
+# language: python
+# types: [python]
+
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..2d0a34a
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,3 @@
+Copyright (C) 2015 The Software Heritage developers
+
+See http://www.softwareheritage.org/ for more information.
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
new file mode 100644
index 0000000..0ad22b5
--- /dev/null
+++ b/CODE_OF_CONDUCT.md
@@ -0,0 +1,78 @@
+# Software Heritage Code of Conduct
+
+## Our Pledge
+
+In the interest of fostering an open and welcoming environment, we as Software
+Heritage contributors and maintainers pledge to making participation in our
+project and our community a harassment-free experience for everyone, regardless
+of age, body size, disability, ethnicity, sex characteristics, gender identity
+and expression, level of experience, education, socio-economic status,
+nationality, personal appearance, race, religion, or sexual identity and
+orientation.
+
+## Our Standards
+
+Examples of behavior that contributes to creating a positive environment
+include:
+
+* Using welcoming and inclusive language
+* Being respectful of differing viewpoints and experiences
+* Gracefully accepting constructive criticism
+* Focusing on what is best for the community
+* Showing empathy towards other community members
+
+Examples of unacceptable behavior by participants include:
+
+* The use of sexualized language or imagery and unwelcome sexual attention or
+ advances
+* Trolling, insulting/derogatory comments, and personal or political attacks
+* Public or private harassment
+* Publishing others' private information, such as a physical or electronic
+ address, without explicit permission
+* Other conduct which could reasonably be considered inappropriate in a
+ professional setting
+
+## Our Responsibilities
+
+Project maintainers are responsible for clarifying the standards of acceptable
+behavior and are expected to take appropriate and fair corrective action in
+response to any instances of unacceptable behavior.
+
+Project maintainers have the right and responsibility to remove, edit, or
+reject comments, commits, code, wiki edits, issues, and other contributions
+that are not aligned to this Code of Conduct, or to ban temporarily or
+permanently any contributor for other behaviors that they deem inappropriate,
+threatening, offensive, or harmful.
+
+## Scope
+
+This Code of Conduct applies within all project spaces, and it also applies when
+an individual is representing the project or its community in public spaces.
+Examples of representing a project or community include using an official
+project e-mail address, posting via an official social media account, or acting
+as an appointed representative at an online or offline event. Representation of
+a project may be further defined and clarified by project maintainers.
+
+## Enforcement
+
+Instances of abusive, harassing, or otherwise unacceptable behavior may be
+reported by contacting the project team at `conduct@softwareheritage.org`. All
+complaints will be reviewed and investigated and will result in a response that
+is deemed necessary and appropriate to the circumstances. The project team is
+obligated to maintain confidentiality with regard to the reporter of an
+incident. Further details of specific enforcement policies may be posted
+separately.
+
+Project maintainers who do not follow or enforce the Code of Conduct in good
+faith may face temporary or permanent repercussions as determined by other
+members of the project's leadership.
+
+## Attribution
+
+This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
+available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
+
+[homepage]: https://www.contributor-covenant.org
+
+For answers to common questions about this code of conduct, see
+https://www.contributor-covenant.org/faq
diff --git a/CONTRIBUTORS b/CONTRIBUTORS
new file mode 100644
index 0000000..7c3f962
--- /dev/null
+++ b/CONTRIBUTORS
@@ -0,0 +1 @@
+Ishan Bhanuka
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..94a9ed0
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,674 @@
+ GNU GENERAL PUBLIC LICENSE
+ Version 3, 29 June 2007
+
+ Copyright (C) 2007 Free Software Foundation, Inc.
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+ Preamble
+
+ The GNU General Public License is a free, copyleft license for
+software and other kinds of works.
+
+ The licenses for most software and other practical works are designed
+to take away your freedom to share and change the works. By contrast,
+the GNU General Public License is intended to guarantee your freedom to
+share and change all versions of a program--to make sure it remains free
+software for all its users. We, the Free Software Foundation, use the
+GNU General Public License for most of our software; it applies also to
+any other work released this way by its authors. You can apply it to
+your programs, too.
+
+ When we speak of free software, we are referring to freedom, not
+price. Our General Public Licenses are designed to make sure that you
+have the freedom to distribute copies of free software (and charge for
+them if you wish), that you receive source code or can get it if you
+want it, that you can change the software or use pieces of it in new
+free programs, and that you know you can do these things.
+
+ To protect your rights, we need to prevent others from denying you
+these rights or asking you to surrender the rights. Therefore, you have
+certain responsibilities if you distribute copies of the software, or if
+you modify it: responsibilities to respect the freedom of others.
+
+ For example, if you distribute copies of such a program, whether
+gratis or for a fee, you must pass on to the recipients the same
+freedoms that you received. You must make sure that they, too, receive
+or can get the source code. And you must show them these terms so they
+know their rights.
+
+ Developers that use the GNU GPL protect your rights with two steps:
+(1) assert copyright on the software, and (2) offer you this License
+giving you legal permission to copy, distribute and/or modify it.
+
+ For the developers' and authors' protection, the GPL clearly explains
+that there is no warranty for this free software. For both users' and
+authors' sake, the GPL requires that modified versions be marked as
+changed, so that their problems will not be attributed erroneously to
+authors of previous versions.
+
+ Some devices are designed to deny users access to install or run
+modified versions of the software inside them, although the manufacturer
+can do so. This is fundamentally incompatible with the aim of
+protecting users' freedom to change the software. The systematic
+pattern of such abuse occurs in the area of products for individuals to
+use, which is precisely where it is most unacceptable. Therefore, we
+have designed this version of the GPL to prohibit the practice for those
+products. If such problems arise substantially in other domains, we
+stand ready to extend this provision to those domains in future versions
+of the GPL, as needed to protect the freedom of users.
+
+ Finally, every program is threatened constantly by software patents.
+States should not allow patents to restrict development and use of
+software on general-purpose computers, but in those that do, we wish to
+avoid the special danger that patents applied to a free program could
+make it effectively proprietary. To prevent this, the GPL assures that
+patents cannot be used to render the program non-free.
+
+ The precise terms and conditions for copying, distribution and
+modification follow.
+
+ TERMS AND CONDITIONS
+
+ 0. Definitions.
+
+ "This License" refers to version 3 of the GNU General Public License.
+
+ "Copyright" also means copyright-like laws that apply to other kinds of
+works, such as semiconductor masks.
+
+ "The Program" refers to any copyrightable work licensed under this
+License. Each licensee is addressed as "you". "Licensees" and
+"recipients" may be individuals or organizations.
+
+ To "modify" a work means to copy from or adapt all or part of the work
+in a fashion requiring copyright permission, other than the making of an
+exact copy. The resulting work is called a "modified version" of the
+earlier work or a work "based on" the earlier work.
+
+ A "covered work" means either the unmodified Program or a work based
+on the Program.
+
+ To "propagate" a work means to do anything with it that, without
+permission, would make you directly or secondarily liable for
+infringement under applicable copyright law, except executing it on a
+computer or modifying a private copy. Propagation includes copying,
+distribution (with or without modification), making available to the
+public, and in some countries other activities as well.
+
+ To "convey" a work means any kind of propagation that enables other
+parties to make or receive copies. Mere interaction with a user through
+a computer network, with no transfer of a copy, is not conveying.
+
+ An interactive user interface displays "Appropriate Legal Notices"
+to the extent that it includes a convenient and prominently visible
+feature that (1) displays an appropriate copyright notice, and (2)
+tells the user that there is no warranty for the work (except to the
+extent that warranties are provided), that licensees may convey the
+work under this License, and how to view a copy of this License. If
+the interface presents a list of user commands or options, such as a
+menu, a prominent item in the list meets this criterion.
+
+ 1. Source Code.
+
+ The "source code" for a work means the preferred form of the work
+for making modifications to it. "Object code" means any non-source
+form of a work.
+
+ A "Standard Interface" means an interface that either is an official
+standard defined by a recognized standards body, or, in the case of
+interfaces specified for a particular programming language, one that
+is widely used among developers working in that language.
+
+ The "System Libraries" of an executable work include anything, other
+than the work as a whole, that (a) is included in the normal form of
+packaging a Major Component, but which is not part of that Major
+Component, and (b) serves only to enable use of the work with that
+Major Component, or to implement a Standard Interface for which an
+implementation is available to the public in source code form. A
+"Major Component", in this context, means a major essential component
+(kernel, window system, and so on) of the specific operating system
+(if any) on which the executable work runs, or a compiler used to
+produce the work, or an object code interpreter used to run it.
+
+ The "Corresponding Source" for a work in object code form means all
+the source code needed to generate, install, and (for an executable
+work) run the object code and to modify the work, including scripts to
+control those activities. However, it does not include the work's
+System Libraries, or general-purpose tools or generally available free
+programs which are used unmodified in performing those activities but
+which are not part of the work. For example, Corresponding Source
+includes interface definition files associated with source files for
+the work, and the source code for shared libraries and dynamically
+linked subprograms that the work is specifically designed to require,
+such as by intimate data communication or control flow between those
+subprograms and other parts of the work.
+
+ The Corresponding Source need not include anything that users
+can regenerate automatically from other parts of the Corresponding
+Source.
+
+ The Corresponding Source for a work in source code form is that
+same work.
+
+ 2. Basic Permissions.
+
+ All rights granted under this License are granted for the term of
+copyright on the Program, and are irrevocable provided the stated
+conditions are met. This License explicitly affirms your unlimited
+permission to run the unmodified Program. The output from running a
+covered work is covered by this License only if the output, given its
+content, constitutes a covered work. This License acknowledges your
+rights of fair use or other equivalent, as provided by copyright law.
+
+ You may make, run and propagate covered works that you do not
+convey, without conditions so long as your license otherwise remains
+in force. You may convey covered works to others for the sole purpose
+of having them make modifications exclusively for you, or provide you
+with facilities for running those works, provided that you comply with
+the terms of this License in conveying all material for which you do
+not control copyright. Those thus making or running the covered works
+for you must do so exclusively on your behalf, under your direction
+and control, on terms that prohibit them from making any copies of
+your copyrighted material outside their relationship with you.
+
+ Conveying under any other circumstances is permitted solely under
+the conditions stated below. Sublicensing is not allowed; section 10
+makes it unnecessary.
+
+ 3. Protecting Users' Legal Rights From Anti-Circumvention Law.
+
+ No covered work shall be deemed part of an effective technological
+measure under any applicable law fulfilling obligations under article
+11 of the WIPO copyright treaty adopted on 20 December 1996, or
+similar laws prohibiting or restricting circumvention of such
+measures.
+
+ When you convey a covered work, you waive any legal power to forbid
+circumvention of technological measures to the extent such circumvention
+is effected by exercising rights under this License with respect to
+the covered work, and you disclaim any intention to limit operation or
+modification of the work as a means of enforcing, against the work's
+users, your or third parties' legal rights to forbid circumvention of
+technological measures.
+
+ 4. Conveying Verbatim Copies.
+
+ You may convey verbatim copies of the Program's source code as you
+receive it, in any medium, provided that you conspicuously and
+appropriately publish on each copy an appropriate copyright notice;
+keep intact all notices stating that this License and any
+non-permissive terms added in accord with section 7 apply to the code;
+keep intact all notices of the absence of any warranty; and give all
+recipients a copy of this License along with the Program.
+
+ You may charge any price or no price for each copy that you convey,
+and you may offer support or warranty protection for a fee.
+
+ 5. Conveying Modified Source Versions.
+
+ You may convey a work based on the Program, or the modifications to
+produce it from the Program, in the form of source code under the
+terms of section 4, provided that you also meet all of these conditions:
+
+ a) The work must carry prominent notices stating that you modified
+ it, and giving a relevant date.
+
+ b) The work must carry prominent notices stating that it is
+ released under this License and any conditions added under section
+ 7. This requirement modifies the requirement in section 4 to
+ "keep intact all notices".
+
+ c) You must license the entire work, as a whole, under this
+ License to anyone who comes into possession of a copy. This
+ License will therefore apply, along with any applicable section 7
+ additional terms, to the whole of the work, and all its parts,
+ regardless of how they are packaged. This License gives no
+ permission to license the work in any other way, but it does not
+ invalidate such permission if you have separately received it.
+
+ d) If the work has interactive user interfaces, each must display
+ Appropriate Legal Notices; however, if the Program has interactive
+ interfaces that do not display Appropriate Legal Notices, your
+ work need not make them do so.
+
+ A compilation of a covered work with other separate and independent
+works, which are not by their nature extensions of the covered work,
+and which are not combined with it such as to form a larger program,
+in or on a volume of a storage or distribution medium, is called an
+"aggregate" if the compilation and its resulting copyright are not
+used to limit the access or legal rights of the compilation's users
+beyond what the individual works permit. Inclusion of a covered work
+in an aggregate does not cause this License to apply to the other
+parts of the aggregate.
+
+ 6. Conveying Non-Source Forms.
+
+ You may convey a covered work in object code form under the terms
+of sections 4 and 5, provided that you also convey the
+machine-readable Corresponding Source under the terms of this License,
+in one of these ways:
+
+ a) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by the
+ Corresponding Source fixed on a durable physical medium
+ customarily used for software interchange.
+
+ b) Convey the object code in, or embodied in, a physical product
+ (including a physical distribution medium), accompanied by a
+ written offer, valid for at least three years and valid for as
+ long as you offer spare parts or customer support for that product
+ model, to give anyone who possesses the object code either (1) a
+ copy of the Corresponding Source for all the software in the
+ product that is covered by this License, on a durable physical
+ medium customarily used for software interchange, for a price no
+ more than your reasonable cost of physically performing this
+ conveying of source, or (2) access to copy the
+ Corresponding Source from a network server at no charge.
+
+ c) Convey individual copies of the object code with a copy of the
+ written offer to provide the Corresponding Source. This
+ alternative is allowed only occasionally and noncommercially, and
+ only if you received the object code with such an offer, in accord
+ with subsection 6b.
+
+ d) Convey the object code by offering access from a designated
+ place (gratis or for a charge), and offer equivalent access to the
+ Corresponding Source in the same way through the same place at no
+ further charge. You need not require recipients to copy the
+ Corresponding Source along with the object code. If the place to
+ copy the object code is a network server, the Corresponding Source
+ may be on a different server (operated by you or a third party)
+ that supports equivalent copying facilities, provided you maintain
+ clear directions next to the object code saying where to find the
+ Corresponding Source. Regardless of what server hosts the
+ Corresponding Source, you remain obligated to ensure that it is
+ available for as long as needed to satisfy these requirements.
+
+ e) Convey the object code using peer-to-peer transmission, provided
+ you inform other peers where the object code and Corresponding
+ Source of the work are being offered to the general public at no
+ charge under subsection 6d.
+
+ A separable portion of the object code, whose source code is excluded
+from the Corresponding Source as a System Library, need not be
+included in conveying the object code work.
+
+ A "User Product" is either (1) a "consumer product", which means any
+tangible personal property which is normally used for personal, family,
+or household purposes, or (2) anything designed or sold for incorporation
+into a dwelling. In determining whether a product is a consumer product,
+doubtful cases shall be resolved in favor of coverage. For a particular
+product received by a particular user, "normally used" refers to a
+typical or common use of that class of product, regardless of the status
+of the particular user or of the way in which the particular user
+actually uses, or expects or is expected to use, the product. A product
+is a consumer product regardless of whether the product has substantial
+commercial, industrial or non-consumer uses, unless such uses represent
+the only significant mode of use of the product.
+
+ "Installation Information" for a User Product means any methods,
+procedures, authorization keys, or other information required to install
+and execute modified versions of a covered work in that User Product from
+a modified version of its Corresponding Source. The information must
+suffice to ensure that the continued functioning of the modified object
+code is in no case prevented or interfered with solely because
+modification has been made.
+
+ If you convey an object code work under this section in, or with, or
+specifically for use in, a User Product, and the conveying occurs as
+part of a transaction in which the right of possession and use of the
+User Product is transferred to the recipient in perpetuity or for a
+fixed term (regardless of how the transaction is characterized), the
+Corresponding Source conveyed under this section must be accompanied
+by the Installation Information. But this requirement does not apply
+if neither you nor any third party retains the ability to install
+modified object code on the User Product (for example, the work has
+been installed in ROM).
+
+ The requirement to provide Installation Information does not include a
+requirement to continue to provide support service, warranty, or updates
+for a work that has been modified or installed by the recipient, or for
+the User Product in which it has been modified or installed. Access to a
+network may be denied when the modification itself materially and
+adversely affects the operation of the network or violates the rules and
+protocols for communication across the network.
+
+ Corresponding Source conveyed, and Installation Information provided,
+in accord with this section must be in a format that is publicly
+documented (and with an implementation available to the public in
+source code form), and must require no special password or key for
+unpacking, reading or copying.
+
+ 7. Additional Terms.
+
+ "Additional permissions" are terms that supplement the terms of this
+License by making exceptions from one or more of its conditions.
+Additional permissions that are applicable to the entire Program shall
+be treated as though they were included in this License, to the extent
+that they are valid under applicable law. If additional permissions
+apply only to part of the Program, that part may be used separately
+under those permissions, but the entire Program remains governed by
+this License without regard to the additional permissions.
+
+ When you convey a copy of a covered work, you may at your option
+remove any additional permissions from that copy, or from any part of
+it. (Additional permissions may be written to require their own
+removal in certain cases when you modify the work.) You may place
+additional permissions on material, added by you to a covered work,
+for which you have or can give appropriate copyright permission.
+
+ Notwithstanding any other provision of this License, for material you
+add to a covered work, you may (if authorized by the copyright holders of
+that material) supplement the terms of this License with terms:
+
+ a) Disclaiming warranty or limiting liability differently from the
+ terms of sections 15 and 16 of this License; or
+
+ b) Requiring preservation of specified reasonable legal notices or
+ author attributions in that material or in the Appropriate Legal
+ Notices displayed by works containing it; or
+
+ c) Prohibiting misrepresentation of the origin of that material, or
+ requiring that modified versions of such material be marked in
+ reasonable ways as different from the original version; or
+
+ d) Limiting the use for publicity purposes of names of licensors or
+ authors of the material; or
+
+ e) Declining to grant rights under trademark law for use of some
+ trade names, trademarks, or service marks; or
+
+ f) Requiring indemnification of licensors and authors of that
+ material by anyone who conveys the material (or modified versions of
+ it) with contractual assumptions of liability to the recipient, for
+ any liability that these contractual assumptions directly impose on
+ those licensors and authors.
+
+ All other non-permissive additional terms are considered "further
+restrictions" within the meaning of section 10. If the Program as you
+received it, or any part of it, contains a notice stating that it is
+governed by this License along with a term that is a further
+restriction, you may remove that term. If a license document contains
+a further restriction but permits relicensing or conveying under this
+License, you may add to a covered work material governed by the terms
+of that license document, provided that the further restriction does
+not survive such relicensing or conveying.
+
+ If you add terms to a covered work in accord with this section, you
+must place, in the relevant source files, a statement of the
+additional terms that apply to those files, or a notice indicating
+where to find the applicable terms.
+
+ Additional terms, permissive or non-permissive, may be stated in the
+form of a separately written license, or stated as exceptions;
+the above requirements apply either way.
+
+ 8. Termination.
+
+ You may not propagate or modify a covered work except as expressly
+provided under this License. Any attempt otherwise to propagate or
+modify it is void, and will automatically terminate your rights under
+this License (including any patent licenses granted under the third
+paragraph of section 11).
+
+ However, if you cease all violation of this License, then your
+license from a particular copyright holder is reinstated (a)
+provisionally, unless and until the copyright holder explicitly and
+finally terminates your license, and (b) permanently, if the copyright
+holder fails to notify you of the violation by some reasonable means
+prior to 60 days after the cessation.
+
+ Moreover, your license from a particular copyright holder is
+reinstated permanently if the copyright holder notifies you of the
+violation by some reasonable means, this is the first time you have
+received notice of violation of this License (for any work) from that
+copyright holder, and you cure the violation prior to 30 days after
+your receipt of the notice.
+
+ Termination of your rights under this section does not terminate the
+licenses of parties who have received copies or rights from you under
+this License. If your rights have been terminated and not permanently
+reinstated, you do not qualify to receive new licenses for the same
+material under section 10.
+
+ 9. Acceptance Not Required for Having Copies.
+
+ You are not required to accept this License in order to receive or
+run a copy of the Program. Ancillary propagation of a covered work
+occurring solely as a consequence of using peer-to-peer transmission
+to receive a copy likewise does not require acceptance. However,
+nothing other than this License grants you permission to propagate or
+modify any covered work. These actions infringe copyright if you do
+not accept this License. Therefore, by modifying or propagating a
+covered work, you indicate your acceptance of this License to do so.
+
+ 10. Automatic Licensing of Downstream Recipients.
+
+ Each time you convey a covered work, the recipient automatically
+receives a license from the original licensors, to run, modify and
+propagate that work, subject to this License. You are not responsible
+for enforcing compliance by third parties with this License.
+
+ An "entity transaction" is a transaction transferring control of an
+organization, or substantially all assets of one, or subdividing an
+organization, or merging organizations. If propagation of a covered
+work results from an entity transaction, each party to that
+transaction who receives a copy of the work also receives whatever
+licenses to the work the party's predecessor in interest had or could
+give under the previous paragraph, plus a right to possession of the
+Corresponding Source of the work from the predecessor in interest, if
+the predecessor has it or can get it with reasonable efforts.
+
+ You may not impose any further restrictions on the exercise of the
+rights granted or affirmed under this License. For example, you may
+not impose a license fee, royalty, or other charge for exercise of
+rights granted under this License, and you may not initiate litigation
+(including a cross-claim or counterclaim in a lawsuit) alleging that
+any patent claim is infringed by making, using, selling, offering for
+sale, or importing the Program or any portion of it.
+
+ 11. Patents.
+
+ A "contributor" is a copyright holder who authorizes use under this
+License of the Program or a work on which the Program is based. The
+work thus licensed is called the contributor's "contributor version".
+
+ A contributor's "essential patent claims" are all patent claims
+owned or controlled by the contributor, whether already acquired or
+hereafter acquired, that would be infringed by some manner, permitted
+by this License, of making, using, or selling its contributor version,
+but do not include claims that would be infringed only as a
+consequence of further modification of the contributor version. For
+purposes of this definition, "control" includes the right to grant
+patent sublicenses in a manner consistent with the requirements of
+this License.
+
+ Each contributor grants you a non-exclusive, worldwide, royalty-free
+patent license under the contributor's essential patent claims, to
+make, use, sell, offer for sale, import and otherwise run, modify and
+propagate the contents of its contributor version.
+
+ In the following three paragraphs, a "patent license" is any express
+agreement or commitment, however denominated, not to enforce a patent
+(such as an express permission to practice a patent or covenant not to
+sue for patent infringement). To "grant" such a patent license to a
+party means to make such an agreement or commitment not to enforce a
+patent against the party.
+
+ If you convey a covered work, knowingly relying on a patent license,
+and the Corresponding Source of the work is not available for anyone
+to copy, free of charge and under the terms of this License, through a
+publicly available network server or other readily accessible means,
+then you must either (1) cause the Corresponding Source to be so
+available, or (2) arrange to deprive yourself of the benefit of the
+patent license for this particular work, or (3) arrange, in a manner
+consistent with the requirements of this License, to extend the patent
+license to downstream recipients. "Knowingly relying" means you have
+actual knowledge that, but for the patent license, your conveying the
+covered work in a country, or your recipient's use of the covered work
+in a country, would infringe one or more identifiable patents in that
+country that you have reason to believe are valid.
+
+ If, pursuant to or in connection with a single transaction or
+arrangement, you convey, or propagate by procuring conveyance of, a
+covered work, and grant a patent license to some of the parties
+receiving the covered work authorizing them to use, propagate, modify
+or convey a specific copy of the covered work, then the patent license
+you grant is automatically extended to all recipients of the covered
+work and works based on it.
+
+ A patent license is "discriminatory" if it does not include within
+the scope of its coverage, prohibits the exercise of, or is
+conditioned on the non-exercise of one or more of the rights that are
+specifically granted under this License. You may not convey a covered
+work if you are a party to an arrangement with a third party that is
+in the business of distributing software, under which you make payment
+to the third party based on the extent of your activity of conveying
+the work, and under which the third party grants, to any of the
+parties who would receive the covered work from you, a discriminatory
+patent license (a) in connection with copies of the covered work
+conveyed by you (or copies made from those copies), or (b) primarily
+for and in connection with specific products or compilations that
+contain the covered work, unless you entered into that arrangement,
+or that patent license was granted, prior to 28 March 2007.
+
+ Nothing in this License shall be construed as excluding or limiting
+any implied license or other defenses to infringement that may
+otherwise be available to you under applicable patent law.
+
+ 12. No Surrender of Others' Freedom.
+
+ If conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot convey a
+covered work so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you may
+not convey it at all. For example, if you agree to terms that obligate you
+to collect a royalty for further conveying from those to whom you convey
+the Program, the only way you could satisfy both those terms and this
+License would be to refrain entirely from conveying the Program.
+
+ 13. Use with the GNU Affero General Public License.
+
+ Notwithstanding any other provision of this License, you have
+permission to link or combine any covered work with a work licensed
+under version 3 of the GNU Affero General Public License into a single
+combined work, and to convey the resulting work. The terms of this
+License will continue to apply to the part which is the covered work,
+but the special requirements of the GNU Affero General Public License,
+section 13, concerning interaction through a network will apply to the
+combination as such.
+
+ 14. Revised Versions of this License.
+
+ The Free Software Foundation may publish revised and/or new versions of
+the GNU General Public License from time to time. Such new versions will
+be similar in spirit to the present version, but may differ in detail to
+address new problems or concerns.
+
+ Each version is given a distinguishing version number. If the
+Program specifies that a certain numbered version of the GNU General
+Public License "or any later version" applies to it, you have the
+option of following the terms and conditions either of that numbered
+version or of any later version published by the Free Software
+Foundation. If the Program does not specify a version number of the
+GNU General Public License, you may choose any version ever published
+by the Free Software Foundation.
+
+ If the Program specifies that a proxy can decide which future
+versions of the GNU General Public License can be used, that proxy's
+public statement of acceptance of a version permanently authorizes you
+to choose that version for the Program.
+
+ Later license versions may give you additional or different
+permissions. However, no additional obligations are imposed on any
+author or copyright holder as a result of your choosing to follow a
+later version.
+
+ 15. Disclaimer of Warranty.
+
+ THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY
+APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT
+HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY
+OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM
+IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF
+ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
+
+ 16. Limitation of Liability.
+
+ IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
+THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
+GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
+USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
+DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
+PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
+EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGES.
+
+ 17. Interpretation of Sections 15 and 16.
+
+ If the disclaimer of warranty and limitation of liability provided
+above cannot be given local legal effect according to their terms,
+reviewing courts shall apply local law that most closely approximates
+an absolute waiver of all civil liability in connection with the
+Program, unless a warranty or assumption of liability accompanies a
+copy of the Program in return for a fee.
+
+ END OF TERMS AND CONDITIONS
+
+ How to Apply These Terms to Your New Programs
+
+ If you develop a new program, and you want it to be of the greatest
+possible use to the public, the best way to achieve this is to make it
+free software which everyone can redistribute and change under these terms.
+
+ To do so, attach the following notices to the program. It is safest
+to attach them to the start of each source file to most effectively
+state the exclusion of warranty; and each file should have at least
+the "copyright" line and a pointer to where the full notice is found.
+
+
+ Copyright (C)
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see .
+
+Also add information on how to contact you by electronic and paper mail.
+
+ If the program does terminal interaction, make it output a short
+notice like this when it starts in an interactive mode:
+
+ Copyright (C)
+ This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
+ This is free software, and you are welcome to redistribute it
+ under certain conditions; type `show c' for details.
+
+The hypothetical commands `show w' and `show c' should show the appropriate
+parts of the General Public License. Of course, your program's commands
+might be different; for a GUI interface, you would use an "about box".
+
+ You should also get your employer (if you work as a programmer) or school,
+if any, to sign a "copyright disclaimer" for the program, if necessary.
+For more information on this, and how to apply and follow the GNU GPL, see
+.
+
+ The GNU General Public License does not permit incorporating your program
+into proprietary programs. If your program is a subroutine library, you
+may consider it more useful to permit linking proprietary applications with
+the library. If this is what you want to do, use the GNU Lesser General
+Public License instead of this License. But first, please read
+.
diff --git a/LICENSE.Celery b/LICENSE.Celery
new file mode 100644
index 0000000..06221a2
--- /dev/null
+++ b/LICENSE.Celery
@@ -0,0 +1,54 @@
+Copyright (c) 2015-2016 Ask Solem & contributors. All rights reserved.
+Copyright (c) 2012-2014 GoPivotal, Inc. All rights reserved.
+Copyright (c) 2009, 2010, 2011, 2012 Ask Solem, and individual contributors. All rights reserved.
+
+Celery is licensed under The BSD License (3 Clause, also known as
+the new BSD license). The license is an OSI approved Open Source
+license and is GPL-compatible(1).
+
+The license text can also be found here:
+http://www.opensource.org/licenses/BSD-3-Clause
+
+License
+=======
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ * Neither the name of Ask Solem, nor the
+ names of its contributors may be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Ask Solem OR CONTRIBUTORS
+BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
+
+Documentation License
+=====================
+
+The documentation portion of Celery (the rendered contents of the
+"docs" directory of a software distribution or checkout) is supplied
+under the "Creative Commons Attribution-ShareAlike 4.0
+International" (CC BY-SA 4.0) License as described by
+http://creativecommons.org/licenses/by-sa/4.0/
+
+Footnotes
+=========
+(1) A GPL-compatible license makes it possible to
+ combine Celery with other software that is released
+ under the GPL, it does not mean that we're distributing
+ Celery under the GPL license. The BSD license, unlike the GPL,
+ let you distribute a modified version without making your
+ changes open source.
diff --git a/PKG-INFO b/PKG-INFO
index dc35e9d..9025937 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,30 +1,30 @@
Metadata-Version: 2.1
Name: swh.scheduler
-Version: 0.1.1
+Version: 0.2.0
Summary: Software Heritage Scheduler
Home-page: https://forge.softwareheritage.org/diffusion/DSCH/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/
Description: swh-scheduler
=============
Job scheduler for the Software Heritage project.
Task manager for asynchronous/delayed tasks, used for both recurrent (e.g.,
listing a forge, loading new stuff from a Git repository) and one-off
activities (e.g., loading a specific version of a source package).
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/bin/swh-worker-control b/bin/swh-worker-control
deleted file mode 100755
index b6ff4e7..0000000
--- a/bin/swh-worker-control
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright (C) 2017 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import datetime
-from fnmatch import fnmatch
-from operator import itemgetter
-import os
-import sys
-
-import click
-
-
-def list_remote_workers(inspect):
- ping_replies = inspect.ping()
- if not ping_replies:
- return {}
- workers = list(sorted(ping_replies))
- ret = {}
-
- for worker_name in workers:
- if not worker_name.startswith("celery@"):
- print("Unsupported worker: %s" % worker_name, file=sys.stderr)
- continue
- type, host = worker_name[len("celery@") :].split(".", 1)
- worker = {
- "name": worker_name,
- "host": host,
- "type": type,
- }
- ret[worker_name] = worker
-
- return ret
-
-
-def make_filters(filter_host, filter_type):
- """Parse the filters and create test functions"""
-
- def include(field, value):
- def filter(worker, field=field, value=value):
- return fnmatch(worker[field], value)
-
- return filter
-
- def exclude(field, value):
- def filter(worker, field=field, value=value):
- return not fnmatch(worker[field], value)
-
- return filter
-
- filters = []
- for host in filter_host:
- if host.startswith("-"):
- filters.append(exclude("host", host[1:]))
- else:
- filters.append(include("host", host))
-
- for type_ in filter_type:
- if type_.startswith("-"):
- filters.append(exclude("type", type_[1:]))
- else:
- filters.append(include("type", type_))
-
- return filters
-
-
-def filter_workers(workers, filters):
- """Filter workers according to the set criteria"""
- return {
- name: worker
- for name, worker in workers.items()
- if all(check(worker) for check in filters)
- }
-
-
-def get_clock_offsets(workers, inspect):
- """Add a clock_offset entry for each worker"""
- err_msg = "Could not get monotonic clock for {worker}"
-
- t = datetime.datetime.now(tz=datetime.timezone.utc)
- for worker, clock in inspect._request("monotonic").items():
- monotonic = clock.get("monotonic")
- if monotonic is None:
- monotonic = 0
- click.echo(err_msg.format(worker=worker), err=True)
- dt = datetime.timedelta(seconds=monotonic)
- workers[worker]["clock_offset"] = t - dt
-
-
-def worker_to_wallclock(worker, monotonic):
- """Convert a monotonic timestamp from a worker to a wall clock time"""
- dt = datetime.timedelta(seconds=monotonic)
- return worker["clock_offset"] + dt
-
-
-@click.group()
-@click.option(
- "--instance-config",
- metavar="CONFIG",
- default=None,
- help="Use this worker instance configuration",
-)
-@click.option(
- "--host", metavar="HOSTNAME_FILTER", multiple=True, help="Filter by hostname"
-)
-@click.option(
- "--type", metavar="WORKER_TYPE_FILTER", multiple=True, help="Filter by worker type"
-)
-@click.option(
- "--timeout",
- metavar="TIMEOUT",
- type=float,
- default=1.0,
- help="Timeout for remote control communication",
-)
-@click.option("--debug/--no-debug", default=False, help="Turn on debugging")
-@click.pass_context
-def cli(ctx, debug, timeout, instance_config, host, type):
- """Manage the Software Heritage workers
-
- Filters support globs; a filter starting with a "-" excludes the
- corresponding values.
-
- """
- if instance_config:
- os.environ["SWH_WORKER_INSTANCE"] = instance_config
-
- from swh.scheduler.celery_backend.config import app
-
- full_inspect = app.control.inspect(timeout=timeout)
-
- workers = filter_workers(
- list_remote_workers(full_inspect), make_filters(host, type)
- )
- ctx.obj["workers"] = workers
-
- destination = list(workers)
- inspect = app.control.inspect(destination=destination, timeout=timeout)
- ctx.obj["inspect"] = inspect
-
- get_clock_offsets(workers, inspect)
-
- ctx.obj["control"] = app.control
- ctx.obj["destination"] = destination
- ctx.obj["timeout"] = timeout
- ctx.obj["debug"] = debug
-
-
-@cli.command()
-@click.pass_context
-def list_workers(ctx):
- """List the currently running workers"""
- workers = ctx.obj["workers"]
-
- for worker_name, worker in sorted(workers.items()):
- click.echo("{type} alive on {host}".format(**worker))
-
- if not workers:
- sys.exit(2)
-
-
-@cli.command()
-@click.pass_context
-def list_tasks(ctx):
- """List the tasks currently running on workers"""
- task_template = (
- "{worker} {name}"
- "[{id} "
- "started={started:%Y-%m-%mT%H:%M:%S} "
- "pid={worker_pid}] {args} {kwargs}"
- )
- inspect = ctx.obj["inspect"]
- workers = ctx.obj["workers"]
- active = inspect.active()
-
- if not active:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- has_tasks = False
- for worker_name, tasks in sorted(active.items()):
- worker = workers[worker_name]
- if not tasks:
- click.echo("No active tasks on {name}".format(**worker), err=True)
- for task in sorted(tasks, key=itemgetter("time_start")):
- task["started"] = worker_to_wallclock(worker, task["time_start"])
- click.echo(task_template.format(worker=worker_name, **task))
- has_tasks = True
-
- if not has_tasks:
- sys.exit(2)
-
-
-@cli.command()
-@click.pass_context
-def list_queues(ctx):
- """List all the queues currently enabled on the workers"""
- inspect = ctx.obj["inspect"]
- active = inspect.active_queues()
-
- if not active:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- has_queues = False
- for worker_name, queues in sorted(active.items()):
- queues = sorted(queue["name"] for queue in queues)
- if queues:
- click.echo(
- "{worker} {queues}".format(worker=worker_name, queues=" ".join(queues))
- )
- has_queues = True
- else:
- click.echo("No queues for {worker}".format(worker=worker_name), err=True)
-
- if not has_queues:
- sys.exit(2)
-
-
-@cli.command()
-@click.option("--noop", is_flag=True, default=False, help="Do not proceed")
-@click.argument("queues", nargs=-1)
-@click.pass_context
-def remove_queues(ctx, noop, queues):
- """Cancel the queue for the given workers"""
- msg_template = "Canceling queue {queue} on worker {worker}{noop}"
-
- inspect = ctx.obj["inspect"]
- control = ctx.obj["control"]
- timeout = ctx.obj["timeout"]
- active = inspect.active_queues()
-
- if not queues:
- queues = ["*"]
-
- if not active:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- for worker, active_queues in sorted(active.items()):
- for queue in sorted(active_queues, key=itemgetter("name")):
- if any(fnmatch(queue["name"], name) for name in queues):
- msg = msg_template.format(
- queue=queue["name"], worker=worker, noop=" (noop)" if noop else ""
- )
- click.echo(msg, err=True)
- if not noop:
- control.cancel_consumer(
- queue["name"], destination=[worker], timeout=timeout
- )
-
-
-@cli.command()
-@click.option("--noop", is_flag=True, default=False, help="Do not proceed")
-@click.argument("queues", nargs=-1)
-@click.pass_context
-def add_queues(ctx, noop, queues):
- """Start the queue for the given workers"""
- msg_template = "Starting queue {queue} on worker {worker}{noop}"
-
- control = ctx.obj["control"]
- timeout = ctx.obj["timeout"]
- workers = ctx.obj["workers"]
-
- if not workers:
- click.echo("No reply from workers", err=True)
- sys.exit(2)
-
- for worker in sorted(workers):
- for queue in queues:
- msg = msg_template.format(
- queue=queue, worker=worker, noop=" (noop)" if noop else ""
- )
- click.echo(msg, err=True)
- if not noop:
- ret = control.add_consumer(queue, destination=[worker], timeout=timeout)
- print(ret)
-
-
-if __name__ == "__main__":
- cli(obj={})
diff --git a/conftest.py b/conftest.py
new file mode 100644
index 0000000..eb6de3d
--- /dev/null
+++ b/conftest.py
@@ -0,0 +1,6 @@
+from hypothesis import settings
+
+# define tests profile. Full documentation is at:
+# https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles
+settings.register_profile("fast", max_examples=5, deadline=5000)
+settings.register_profile("slow", max_examples=20, deadline=5000)
diff --git a/data/README.md b/data/README.md
new file mode 100644
index 0000000..762c332
--- /dev/null
+++ b/data/README.md
@@ -0,0 +1,23 @@
+# Install/Update template
+
+Install the `task` template in elasticsearch:
+
+``` shell
+INSTANCE=http://something:9200
+TEMPLATE_NAME=template_swh_tasks
+curl -i -H'Content-Type: application/json' -d@./elastic-template.json -XPUT ${INSTANCE}/_template/${TEMPLATE_NAME}
+```
+
+# Update index settings
+
+The index setup is fixed and defined on the template settings basis.
+
+When that setup needs to change, we need to update both the template
+and the existing indices.
+
+To update index settings:
+
+``` shell
+INDEX_NAME=swh-tasks-2017-11
+curl -i -H'Content-Type: application/json' -d@./update-index-settings.json -XPUT ${INSTANCE}/${INDEX_NAME}/_settings
+```
diff --git a/data/elastic-template.json b/data/elastic-template.json
new file mode 100644
index 0000000..3446bb7
--- /dev/null
+++ b/data/elastic-template.json
@@ -0,0 +1,53 @@
+{
+ "order": 0,
+ "index_patterns": ["swh-tasks-*"],
+ "settings": {
+ "index": {
+ "codec": "best_compression",
+ "refresh_interval": "1s",
+ "number_of_shards": 1
+ }
+ },
+ "mappings" : {
+ "task" : {
+ "_source" : { "enabled": true},
+ "properties": {
+ "task_id": {"type": "double"},
+ "task_policy": {"type": "text"},
+ "task_status": {"type": "text"},
+ "task_run_id": {"type": "double"},
+ "arguments": {
+ "type": "object",
+ "properties" : {
+ "args": {
+ "type": "nested",
+ "dynamic": false
+ },
+ "kwargs": {
+ "type": "text"
+ }
+ }
+ },
+ "type": {"type": "text"},
+ "backend_id": {"type": "text"},
+ "metadata": {
+ "type": "object",
+ "enabled" : false
+ },
+ "scheduled": {
+ "type": "date",
+ "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis"
+ },
+ "started": {
+ "type": "date",
+ "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis"
+ },
+ "ended": {
+ "type": "date",
+ "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||strict_date_optional_time||epoch_millis"
+ }
+ }
+ }
+ },
+ "aliases": {}
+}
diff --git a/data/update-index-settings.json b/data/update-index-settings.json
new file mode 100644
index 0000000..ce3deab
--- /dev/null
+++ b/data/update-index-settings.json
@@ -0,0 +1,5 @@
+{
+ "index": {
+ "refresh_interval": "1s"
+ }
+}
diff --git a/docs/.gitignore b/docs/.gitignore
new file mode 100644
index 0000000..58a761e
--- /dev/null
+++ b/docs/.gitignore
@@ -0,0 +1,3 @@
+_build/
+apidoc/
+*-stamp
diff --git a/docs/Makefile b/docs/Makefile
new file mode 100644
index 0000000..c30c50a
--- /dev/null
+++ b/docs/Makefile
@@ -0,0 +1 @@
+include ../../swh-docs/Makefile.sphinx
diff --git a/docs/_static/.placeholder b/docs/_static/.placeholder
new file mode 100644
index 0000000..e69de29
diff --git a/docs/_templates/.placeholder b/docs/_templates/.placeholder
new file mode 100644
index 0000000..e69de29
diff --git a/docs/conf.py b/docs/conf.py
new file mode 100644
index 0000000..190deb7
--- /dev/null
+++ b/docs/conf.py
@@ -0,0 +1 @@
+from swh.docs.sphinx.conf import * # NoQA
diff --git a/docs/index.rst b/docs/index.rst
new file mode 100644
index 0000000..5935f97
--- /dev/null
+++ b/docs/index.rst
@@ -0,0 +1,168 @@
+.. _swh-scheduler:
+
+Software Heritage - Job scheduler
+=================================
+
+Task manager for asynchronous/delayed tasks, used for both recurrent (e.g.,
+listing a forge, loading new stuff from a Git repository) and one-off
+activities (e.g., loading a specific version of a source package).
+
+
+Description
+-----------
+
+This module provides a scheduler service for the Software Heritage platform. It
+allows to define tasks with a number of properties. In this documentation, we
+will call these swh-tasks to prevent confusion. These swh-tasks are stored in
+a database, and a HTTP-based RPC service is provided to create or find existing
+swh-task declarations.
+
+The execution model for these swh-tasks is using Celery. Thus, each swh-task
+type defined in the database must have a (series of) celery worker capable of
+executing such a swh-task.
+
+Then a number of services are also provided to manage the scheduling of these
+swh-tasks as Celery tasks.
+
+The `scheduler-runner` service is a daemon that regularly looks for swh-tasks
+in the database that should be scheduled. For each of the selected swh-task, a
+Celery task is instantiated.
+
+The `scheduler-listener` service is a daemon that listen to the Celery event
+bus and maintain scheduled swh-tasks workflow status.
+
+
+SWH Task Model
+~~~~~~~~~~~~~~
+
+Each swh-task-type is the declaration of a type of swh-task. Each swh-task-type
+have the following fields:
+
+- `type`: Name of the swh-task type; can be anything but must be unique,
+- `description`: Human-readable task description
+- `backend_name`: Name of the task in the job-running backend,
+- `default_interval`: Default interval for newly scheduled tasks,
+- `min_interval`: Minimum interval between two runs of a task,
+- `max_interval`: Maximum interval between two runs of a task,
+- `backoff_factor`: Adjustment factor for the backoff between two task runs,
+- `max_queue_length`: Maximum length of the queue for this type of tasks,
+- `num_retries`: Default number of retries on transient failures,
+- `retry_delay`: Retry delay for the task,
+
+Existing swh-task-types can be listed using the `swh scheduler` command line
+tool::
+
+ $ swh scheduler task-type list
+ Known task types:
+ check-deposit:
+ Pre-checking deposit step before loading into swh archive
+ index-fossology-license:
+ Fossology license indexer task
+ load-git:
+ Update an origin of type git
+ load-hg:
+ Update an origin of type mercurial
+
+You can see the details of a swh-task-type::
+
+ $ swh scheduler task-type list -v -t load-git
+ Known task types:
+ load-git: swh.loader.git.tasks.UpdateGitRepository
+ Update an origin of type git
+ interval: 64 days, 0:00:00 [12:00:00, 64 days, 0:00:00]
+ backoff_factor: 2.0
+ max_queue_length: 5000
+ num_retries: None
+ retry_delay: None
+
+
+An swh-task is an 'instance' of such a swh-task-type, and consists in:
+
+- `arguments`: Arguments passed to the underlying job scheduler,
+- `next_run`: Next run of this task should be run on or after that time,
+- `current_interval`: Interval between two runs of this task, taking into
+ account the backoff factor,
+- `policy`: Whether the task is "one-shot" or "recurring",
+- `retries_left`: Number of "short delay" retries of the task in case of
+ transient failure,
+- `priority`: Priority of the task,
+- `id`: Internal task identifier,
+- `type`: References task_type table,
+- `status`: Task status ( among "next_run_not_scheduled", "next_run_scheduled",
+ "completed", "disabled").
+
+So a swh-task consist basically in:
+
+- a set of parameters defining how the scheduling of the
+ swh-task is handled,
+- a set of parameters to specify the retry policy in case of transient failure
+ upon execution,
+- a set of parameters that defines the job to be done (`bakend_name` +
+ `arguments`).
+
+
+You can list pending swh-tasks (tasks that are to be scheduled ASAP)::
+
+ $ swh scheduler task list-pending load-git --limit 2
+ Found 1 load-git tasks
+
+ Task 9052257
+ Next run: 15 days ago (2019-06-25 10:35:10+00:00)
+ Interval: 2 days, 0:00:00
+ Type: load-git
+ Policy: recurring
+ Args:
+ 'https://github.com/turtl/mobile'
+ Keyword args:
+
+
+Looking for existing swh-task can be done via the command line tool::
+
+ $ swh scheduler task list -t load-hg --limit 2
+ Found 2 tasks
+
+ Task 168802702
+ Next run: in 4 hours (2019-07-10 17:56:48+00:00)
+ Interval: 1 day, 0:00:00
+ Type: load-hg
+ Policy: recurring
+ Status: next_run_not_scheduled
+ Priority:
+ Args:
+ 'https://bitbucket.org/kepung/pypy'
+ Keyword args:
+
+ Task 169800445
+ Next run: in a month (2019-08-10 17:54:24+00:00)
+ Interval: 32 days, 0:00:00
+ Type: load-hg
+ Policy: recurring
+ Status: next_run_not_scheduled
+ Priority:
+ Args:
+ 'https://bitbucket.org/lunixbochs/pypy-1'
+ Keyword args:
+
+
+
+Writing a new worker for a new swh-task-type
+--------------------------------------------
+
+When you want to add a new swh-task-type, you need a celery worker backend
+capable of executing this new task-type instances.
+
+Celery workers for swh-scheduler based tasks should be started using the Celery
+app in `swh.scheduler.celery_config`. This later, among other things, provides
+a loading mechanism for task types based on pkg_resources declared plugins under
+the `[swh.workers]` entry point.
+
+TODO: add a fully working example of a dumb task.
+
+
+Reference Documentation
+-----------------------
+
+.. toctree::
+ :maxdepth: 2
+
+ /apidoc/swh.scheduler
diff --git a/mypy.ini b/mypy.ini
new file mode 100644
index 0000000..328b66e
--- /dev/null
+++ b/mypy.ini
@@ -0,0 +1,33 @@
+[mypy]
+namespace_packages = True
+warn_unused_ignores = True
+
+
+# 3rd party libraries without stubs (yet)
+
+[mypy-arrow.*]
+ignore_missing_imports = True
+
+[mypy-celery.*]
+ignore_missing_imports = True
+
+[mypy-elasticsearch.*]
+ignore_missing_imports = True
+
+[mypy-kombu.*]
+ignore_missing_imports = True
+
+[mypy-pika.*]
+ignore_missing_imports = True
+
+[mypy-pkg_resources.*]
+ignore_missing_imports = True
+
+[mypy-psycopg2.*]
+ignore_missing_imports = True
+
+[mypy-pytest.*]
+ignore_missing_imports = True
+
+[mypy-pytest_postgresql.*]
+ignore_missing_imports = True
diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 0000000..f1cacd8
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,4 @@
+[pytest]
+norecursedirs = docs
+filterwranings =
+ once
diff --git a/requirements.txt b/requirements.txt
index 7e5d4c6..f2c29f5 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,19 +1,21 @@
# Add here external Python modules dependencies, one per line. Module names
# should match https://pypi.python.org/pypi names. For the full spec or
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
arrow
+attrs
+attrs-strict
celery >= 4
# Not a direct dependency: missing from celery 4.4.4
future >= 0.18.0
Click
elasticsearch > 5.4
flask
pika >= 1.1.0
psycopg2
pyyaml
vcversioner
setuptools
# test dependencies
# hypothesis
diff --git a/setup.py b/setup.py
index d47fc8b..fe21851 100755
--- a/setup.py
+++ b/setup.py
@@ -1,73 +1,72 @@
#!/usr/bin/env python3
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from setuptools import setup, find_packages
from os import path
from io import open
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.md"), encoding="utf-8") as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = "requirements-%s.txt" % name
else:
reqf = "requirements.txt"
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
requirements.append(line)
return requirements
setup(
name="swh.scheduler",
description="Software Heritage Scheduler",
long_description=long_description,
long_description_content_type="text/markdown",
python_requires=">=3.7",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DSCH/",
packages=find_packages(),
- scripts=["bin/swh-worker-control"],
- setup_requires=["vcversioner"],
+ setup_requires=["setuptools-scm"],
+ use_scm_version=True,
install_requires=parse_requirements() + parse_requirements("swh"),
extras_require={"testing": parse_requirements("test")},
- vcversioner={},
include_package_data=True,
entry_points="""
[console_scripts]
swh-scheduler=swh.scheduler.cli:main
[swh.cli.subcommands]
scheduler=swh.scheduler.cli:cli
""",
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 5 - Production/Stable",
],
project_urls={
"Bug Reports": "https://forge.softwareheritage.org/maniphest",
"Funding": "https://www.softwareheritage.org/donate",
"Source": "https://forge.softwareheritage.org/source/swh-scheduler",
"Documentation": "https://docs.softwareheritage.org/devel/swh-scheduler/",
},
)
diff --git a/sql/.gitignore b/sql/.gitignore
new file mode 100644
index 0000000..d501764
--- /dev/null
+++ b/sql/.gitignore
@@ -0,0 +1,3 @@
+*-stamp
+autodoc/
+swh-scheduler.dump
diff --git a/sql/Makefile b/sql/Makefile
new file mode 100644
index 0000000..cc836d5
--- /dev/null
+++ b/sql/Makefile
@@ -0,0 +1,73 @@
+# Depends: postgresql-client, postgresql-autodoc
+
+DBNAME = softwareheritage-scheduler-dev
+DOCDIR = autodoc
+
+SQL_SCHEMA = 30-swh-schema.sql
+SQL_FUNC = 40-swh-func.sql
+SQL_DATA = 50-swh-data.sql
+SQL_INDEXES = 60-swh-indexes.sql
+SQLS = $(SQL_SCHEMA) $(SQL_FUNC) $(SQL_DATA) $(SQL_INDEXES)
+SQL_FILES = $(abspath $(addprefix $(CURDIR)/../swh/scheduler/sql/,$(SQLS)))
+
+PSQL_BIN = psql
+PSQL_FLAGS = --echo-all -X -v ON_ERROR_STOP=1
+PSQL = $(PSQL_BIN) $(PSQL_FLAGS)
+
+PIFPAF=$(findstring postgresql://,$(PIFPAF_URLS))
+
+all:
+
+createdb: createdb-stamp
+createdb-stamp: $(SQL_FILES)
+ifeq ($(PIFPAF),)
+ -dropdb $(DBNAME)
+endif
+ createdb $(DBNAME)
+ifeq ($(PIFPAF),)
+ touch $@
+else
+ rm -f $@
+endif
+
+filldb: filldb-stamp
+filldb-stamp: createdb-stamp
+ cat $(SQL_FILES) | $(PSQL) $(DBNAME)
+ifeq ($(PIFPAF),)
+ touch $@
+else
+ rm -f $@
+endif
+
+dropdb:
+ -dropdb $(DBNAME)
+
+dumpdb: swh-scheduler.dump
+swh-scheduler.dump: filldb-stamp
+ pg_dump -Fc $(DBNAME) > $@
+
+$(DOCDIR):
+ test -d $(DOCDIR)/ || mkdir $(DOCDIR)
+
+doc: autodoc-stamp $(DOCDIR)/swh-scheduler.pdf
+autodoc-stamp: filldb-stamp $(DOCDIR)
+ postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh-scheduler
+ cp -a $(DOCDIR)/swh-scheduler.dot $(DOCDIR)/swh-scheduler.dot.orig
+ifeq ($(PIFPAF),)
+ touch $@
+else
+ rm -f $@
+endif
+
+$(DOCDIR)/swh-scheduler.pdf: $(DOCDIR)/swh-scheduler.dot autodoc-stamp
+ dot -T pdf $< > $@
+$(DOCDIR)/swh-scheduler.svg: $(DOCDIR)/swh-scheduler.dot autodoc-stamp
+ dot -T svg $< > $@
+
+clean:
+ rm -rf *-stamp $(DOCDIR)/
+
+distclean: clean dropdb
+ rm -f swh-scheduler.dump
+
+.PHONY: all initdb createdb dropdb doc clean
diff --git a/sql/updates/02.sql b/sql/updates/02.sql
new file mode 100644
index 0000000..04fc040
--- /dev/null
+++ b/sql/updates/02.sql
@@ -0,0 +1,66 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 01
+-- to_version: 02
+-- description: Allow mass-creation of task runs
+
+begin;
+
+insert into dbversion (version, release, description)
+ values (2, now(), 'Work In Progress');
+
+create or replace function swh_scheduler_mktemp_task_run ()
+ returns void
+ language sql
+as $$
+ create temporary table tmp_task_run (
+ like task_run excluding indexes
+ ) on commit drop;
+ alter table tmp_task_run
+ drop column id,
+ drop column status;
+$$;
+
+comment on function swh_scheduler_mktemp_task_run () is 'Create a temporary table for bulk task run scheduling';
+
+create or replace function swh_scheduler_schedule_task_run_from_temp ()
+ returns void
+ language plpgsql
+as $$
+begin
+ insert into task_run (task, backend_id, metadata, scheduled, status)
+ select task, backend_id, metadata, scheduled, 'scheduled'
+ from tmp_task_run;
+ return;
+end;
+$$;
+
+create or replace function swh_scheduler_start_task_run (backend_id text,
+ metadata jsonb default '{}'::jsonb,
+ ts timestamptz default now())
+ returns task_run
+ language sql
+as $$
+ update task_run
+ set started = ts,
+ status = 'started',
+ metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_start_task_run.metadata
+ where task_run.backend_id = swh_scheduler_start_task_run.backend_id
+ returning *;
+$$;
+
+create or replace function swh_scheduler_end_task_run (backend_id text,
+ status task_run_status,
+ metadata jsonb default '{}'::jsonb,
+ ts timestamptz default now())
+ returns task_run
+ language sql
+as $$
+ update task_run
+ set ended = ts,
+ status = swh_scheduler_end_task_run.status,
+ metadata = coalesce(task_run.metadata, '{}'::jsonb) || swh_scheduler_end_task_run.metadata
+ where task_run.backend_id = swh_scheduler_end_task_run.backend_id
+ returning *;
+$$;
+
+commit;
diff --git a/sql/updates/03.sql b/sql/updates/03.sql
new file mode 100644
index 0000000..5ec51d5
--- /dev/null
+++ b/sql/updates/03.sql
@@ -0,0 +1,26 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 02
+-- to_version: 03
+-- description: Fix a bug in handling default intervals for task types
+
+begin;
+
+insert into dbversion (version, release, description)
+ values (3, now(), 'Work In Progress');
+
+create or replace function swh_scheduler_create_tasks_from_temp ()
+ returns setof task
+ language plpgsql
+as $$
+begin
+ return query
+ insert into task (type, arguments, next_run, status, current_interval)
+ select type, arguments, next_run, 'next_run_not_scheduled',
+ (select default_interval from task_type tt where tt.type = tmp_task.type)
+ from tmp_task
+ returning task.*;
+end;
+$$;
+
+
+commit;
diff --git a/sql/updates/04.sql b/sql/updates/04.sql
new file mode 100644
index 0000000..a051388
--- /dev/null
+++ b/sql/updates/04.sql
@@ -0,0 +1,53 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 03
+-- to_version: 04
+-- description: Add a maximum queue length to the task types in the scheduler
+
+begin;
+
+insert into dbversion (version, release, description)
+values (4, now(), 'Work In Progress');
+
+alter table task_type add column max_queue_length bigint;
+comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks';
+
+
+drop function swh_scheduler_peek_ready_tasks (timestamptz, bigint);
+drop function swh_scheduler_grab_ready_tasks (timestamptz, bigint);
+
+create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(),
+ num_tasks bigint default NULL)
+ returns setof task
+ language sql
+ stable
+as $$
+select * from task
+ where next_run <= ts
+ and type = task_type
+ and status = 'next_run_not_scheduled'
+ order by next_run
+ limit num_tasks;
+$$;
+
+create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(),
+ num_tasks bigint default NULL)
+ returns setof task
+ language sql
+as $$
+ update task
+ set status='next_run_scheduled'
+ from (
+ select id from task
+ where next_run <= ts
+ and type = task_type
+ and status='next_run_not_scheduled'
+ order by next_run
+ limit num_tasks
+ for update skip locked
+ ) next_tasks
+ where task.id = next_tasks.id
+ returning task.*;
+$$;
+
+
+commit;
diff --git a/sql/updates/05.sql b/sql/updates/05.sql
new file mode 100644
index 0000000..b55a533
--- /dev/null
+++ b/sql/updates/05.sql
@@ -0,0 +1,144 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 04
+-- to_version: 05
+-- description: Add reccurrence logic for temporary failures and one-shot tasks
+
+alter type task_status add value if not exists 'completed' before 'disabled';
+alter type task_run_status add value if not exists 'permfailed' after 'failed';
+
+begin;
+
+insert into dbversion (version, release, description)
+values (5, now(), 'Work In Progress');
+
+alter table task_type add column num_retries bigint;
+alter table task_type add column retry_delay interval;
+
+comment on column task_type.num_retries is 'Default number of retries on transient failures';
+comment on column task_type.retry_delay is 'Retry delay for the task';
+
+create type task_policy as enum ('recurring', 'oneshot');
+comment on type task_policy is 'Recurrence policy of the given task';
+
+alter table task add column policy task_policy not null default 'recurring';
+alter table task add column retries_left bigint not null default 0;
+
+comment on column task.policy is 'Whether the task is one-shot or recurring';
+comment on column task.retries_left is 'The number of "short delay" retries of the task in case of '
+ 'transient failure';
+
+
+create or replace function swh_scheduler_mktemp_task ()
+ returns void
+ language sql
+as $$
+ create temporary table tmp_task (
+ like task excluding indexes
+ ) on commit drop;
+ alter table tmp_task
+ drop column id,
+ drop column current_interval,
+ drop column status,
+ alter column policy drop not null,
+ alter column retries_left drop not null;
+$$;
+
+comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation';
+
+create or replace function swh_scheduler_create_tasks_from_temp ()
+ returns setof task
+ language plpgsql
+as $$
+begin
+ return query
+ insert into task (type, arguments, next_run, status, current_interval, policy, retries_left)
+ select type, arguments, next_run, 'next_run_not_scheduled',
+ (select default_interval from task_type tt where tt.type = tmp_task.type),
+ coalesce(policy, 'recurring'),
+ coalesce(retries_left, (select num_retries from task_type tt where tt.type = tmp_task.type), 0)
+ from tmp_task
+ returning task.*;
+end;
+$$;
+
+comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table';
+
+drop trigger update_interval_on_task_end on task_run;
+drop function swh_scheduler_compute_new_task_interval (text, interval, task_run_status) cascade;
+drop function swh_scheduler_update_task_interval () cascade;
+
+create or replace function swh_scheduler_update_task_on_task_end ()
+ returns trigger
+ language plpgsql
+as $$
+declare
+ cur_task task%rowtype;
+ cur_task_type task_type%rowtype;
+ adjustment_factor float;
+ new_interval interval;
+begin
+ select * from task where id = new.task into cur_task;
+ select * from task_type where type = cur_task.type into cur_task_type;
+
+ case
+ when new.status = 'permfailed' then
+ update task
+ set status = 'disabled'
+ where id = cur_task.id;
+ when new.status in ('eventful', 'uneventful') then
+ case
+ when cur_task.policy = 'oneshot' then
+ update task
+ set status = 'completed'
+ where id = cur_task.id;
+ when cur_task.policy = 'recurring' then
+ if new.status = 'uneventful' then
+ adjustment_factor := 1/cur_task_type.backoff_factor;
+ else
+ adjustment_factor := 1/cur_task_type.backoff_factor;
+ end if;
+ new_interval := greatest(
+ cur_task_type.min_interval,
+ least(
+ cur_task_type.max_interval,
+ adjustment_factor * cur_task.current_interval));
+ update task
+ set status = 'next_run_not_scheduled',
+ next_run = now() + new_interval,
+ current_interval = new_interval,
+ retries_left = coalesce(cur_task_type.num_retries, 0)
+ where id = cur_task.id;
+ end case;
+ else -- new.status in 'failed', 'lost'
+ if cur_task.retries_left > 0 then
+ update task
+ set status = 'next_run_not_scheduled',
+ next_run = now() + cur_task_type.retry_delay,
+ retries_left = cur_task.retries_left - 1
+ where id = cur_task.id;
+ else -- no retries left
+ case
+ when cur_task.policy = 'oneshot' then
+ update task
+ set status = 'disabled'
+ where id = cur_task.id;
+ when cur_task.policy = 'recurring' then
+ update task
+ set status = 'next_run_not_scheduled',
+ next_run = now() + cur_task.current_interval,
+ retries_left = coalesce(cur_task_type.num_retries, 0)
+ where id = cur_task.id;
+ end case;
+ end if; -- retries
+ end case;
+ return null;
+end;
+$$;
+
+create trigger update_task_on_task_end
+ after update of status on task_run
+ for each row
+ when (new.status NOT IN ('scheduled', 'started'))
+ execute procedure swh_scheduler_update_task_on_task_end ();
+
+commit;
diff --git a/sql/updates/06.sql b/sql/updates/06.sql
new file mode 100644
index 0000000..8aa1d9f
--- /dev/null
+++ b/sql/updates/06.sql
@@ -0,0 +1,23 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 05
+-- to_version: 06
+-- description: relax constraints on intervals for one-shot tasks
+
+begin;
+
+insert into dbversion (version, release, description)
+ values (6, now(), 'Work In Progress');
+
+
+alter table task_type
+ alter column default_interval drop not null,
+ alter column min_interval drop not null,
+ alter column max_interval drop not null,
+ alter column backoff_factor drop not null;
+
+alter table task
+ alter column current_interval drop not null,
+ add constraint task_check check (policy <> 'recurring' or current_interval is not null);
+
+commit;
+
diff --git a/sql/updates/07.sql b/sql/updates/07.sql
new file mode 100644
index 0000000..6082e0a
--- /dev/null
+++ b/sql/updates/07.sql
@@ -0,0 +1,54 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 06
+-- to_version: 07
+-- description: Archive 'oneshot' and disabled 'recurring' tasks (status = 'disabled')
+
+insert into dbversion (version, release, description)
+values (7, now(), 'Work In Progress');
+
+create type task_record as (
+ task_id bigint,
+ task_policy task_policy,
+ task_status task_status,
+ task_run_id bigint,
+ arguments jsonb,
+ type text,
+ backend_id text,
+ metadata jsonb,
+ scheduled timestamptz,
+ started timestamptz,
+ ended timestamptz
+);
+
+create index task_run_id_asc_idx on task_run(task asc, ended asc);
+
+create or replace function swh_scheduler_task_to_archive(
+ ts timestamptz, last_id bigint default -1, lim bigint default 10)
+ returns setof task_record
+ language sql stable
+as $$
+ select t.id as task_id, t.policy as task_policy,
+ t.status as task_status, tr.id as task_run_id,
+ t.arguments, t.type, tr.backend_id, tr.metadata,
+ tr.scheduled, tr.started, tr.ended
+ from task_run tr inner join task t on tr.task=t.id
+ where ((t.policy = 'oneshot' and t.status ='completed') or
+ (t.policy = 'recurring' and t.status ='disabled')) and
+ tr.ended < ts and
+ t.id > last_id
+ order by tr.task, tr.ended
+ limit lim;
+$$;
+
+comment on function swh_scheduler_task_to_archive is 'Read archivable tasks function';
+
+create or replace function swh_scheduler_delete_archive_tasks(
+ task_ids bigint[])
+ returns void
+ language sql
+as $$
+ delete from task_run where task in (select * from unnest(task_ids));
+ delete from task where id in (select * from unnest(task_ids));
+$$;
+
+comment on function swh_scheduler_delete_archive_tasks is 'Clean up archived tasks function';
diff --git a/sql/updates/08.sql b/sql/updates/08.sql
new file mode 100644
index 0000000..b9ac8fb
--- /dev/null
+++ b/sql/updates/08.sql
@@ -0,0 +1,51 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 07
+-- to_version: 08
+-- description: Improve task to archive filtering function
+
+insert into dbversion (version, release, description)
+values (8, now(), 'Work In Progress');
+
+drop function swh_scheduler_task_to_archive(timestamptz, bigint, bigint);
+
+create or replace function swh_scheduler_task_to_archive(
+ ts_after timestamptz, ts_before timestamptz, last_id bigint default -1,
+ lim bigint default 10)
+ returns setof task_record
+ language sql stable
+as $$
+ select t.id as task_id, t.policy as task_policy,
+ t.status as task_status, tr.id as task_run_id,
+ t.arguments, t.type, tr.backend_id, tr.metadata,
+ tr.scheduled, tr.started, tr.ended
+ from task_run tr inner join task t on tr.task=t.id
+ where ((t.policy = 'oneshot' and t.status ='completed') or
+ (t.policy = 'recurring' and t.status ='disabled')) and
+ ts_after <= tr.ended and tr.ended < ts_before and
+ t.id > last_id
+ order by tr.task, tr.ended
+ limit lim;
+$$;
+
+comment on function swh_scheduler_task_to_archive is 'Read archivable tasks function';
+
+drop function swh_scheduler_delete_archive_tasks(bigint[]);
+
+create or replace function swh_scheduler_delete_archived_tasks(
+ task_ids bigint[], task_run_ids bigint[])
+ returns void
+ language sql
+as $$
+ -- clean up task_run_ids
+ delete from task_run where id in (select * from unnest(task_run_ids));
+ -- clean up only tasks whose associated task_run are all cleaned up.
+ -- Remaining tasks will stay there and will be cleaned up when
+ -- remaining data have been indexed
+ delete from task
+ where id in (select t.id
+ from task t left outer join task_run tr on t.id=tr.task
+ where t.id in (select * from unnest(task_ids))
+ and tr.task is null);
+$$;
+
+comment on function swh_scheduler_delete_archived_tasks is 'Clean up archived tasks function';
diff --git a/sql/updates/09.sql b/sql/updates/09.sql
new file mode 100644
index 0000000..7fe97ca
--- /dev/null
+++ b/sql/updates/09.sql
@@ -0,0 +1,205 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 08
+-- to_version: 09
+-- description: Schedule task with priority
+
+insert into dbversion (version, release, description)
+values (9, now(), 'Work In Progress');
+
+create type task_priority as enum('high', 'normal', 'low');
+comment on type task_priority is 'Priority of the given task';
+
+create table priority_ratio(
+ id task_priority primary key,
+ ratio float not null
+);
+
+comment on table priority_ratio is 'Oneshot task''s reading ratio per priority';
+comment on column priority_ratio.id is 'Task priority id';
+comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority';
+
+insert into priority_ratio (id, ratio) values ('high', 0.5);
+insert into priority_ratio (id, ratio) values ('normal', 0.3);
+insert into priority_ratio (id, ratio) values ('low', 0.2);
+
+alter table task add column priority task_priority references priority_ratio(id);
+comment on column task.priority is 'Policy of the given task';
+
+drop function swh_scheduler_peek_ready_tasks(text, timestamptz, bigint);
+drop function swh_scheduler_grab_ready_tasks(text, timestamptz, bigint);
+
+create or replace function swh_scheduler_peek_no_priority_tasks (task_type text, ts timestamptz default now(),
+ num_tasks bigint default NULL)
+ returns setof task
+ language sql
+ stable
+as $$
+select * from task
+ where next_run <= ts
+ and type = task_type
+ and status = 'next_run_not_scheduled'
+ and priority is null
+ order by next_run
+ limit num_tasks
+ for update skip locked;
+$$;
+
+comment on function swh_scheduler_peek_no_priority_tasks (text, timestamptz, bigint)
+is 'Retrieve tasks without priority';
+
+create or replace function swh_scheduler_nb_priority_tasks(num_tasks_priority bigint, task_priority task_priority)
+ returns numeric
+ language sql stable
+as $$
+ select ceil(num_tasks_priority * (select ratio from priority_ratio where id = task_priority)) :: numeric
+$$;
+
+comment on function swh_scheduler_nb_priority_tasks (bigint, task_priority)
+is 'Given a priority task and a total number, compute the number of tasks to read';
+
+create or replace function swh_scheduler_peek_tasks_with_priority (task_type text, ts timestamptz default now(),
+ num_tasks_priority bigint default NULL,
+ task_priority task_priority default 'normal')
+ returns setof task
+ language sql
+ stable
+as $$
+ select *
+ from task t
+ where t.next_run <= ts
+ and t.type = task_type
+ and t.status = 'next_run_not_scheduled'
+ and t.priority = task_priority
+ order by t.next_run
+ limit num_tasks_priority
+ for update skip locked;
+$$;
+
+comment on function swh_scheduler_peek_tasks_with_priority(text, timestamptz, bigint, task_priority)
+is 'Retrieve tasks with a given priority';
+
+create or replace function swh_scheduler_peek_priority_tasks (task_type text, ts timestamptz default now(),
+ num_tasks_priority bigint default NULL)
+ returns setof task
+ language plpgsql
+as $$
+declare
+ r record;
+ count_row bigint;
+ nb_diff bigint;
+ nb_high bigint;
+ nb_normal bigint;
+ nb_low bigint;
+begin
+ -- expected values to fetch
+ select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'high') into nb_high;
+ select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'normal') into nb_normal;
+ select swh_scheduler_nb_priority_tasks(num_tasks_priority, 'low') into nb_low;
+ nb_diff := 0;
+ count_row := 0;
+
+ for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_high, 'high')
+ loop
+ count_row := count_row + 1;
+ return next r;
+ end loop;
+
+ if count_row < nb_high then
+ nb_normal := nb_normal + nb_high - count_row;
+ end if;
+
+ count_row := 0;
+ for r in select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_normal, 'normal')
+ loop
+ count_row := count_row + 1;
+ return next r;
+ end loop;
+
+ if count_row < nb_normal then
+ nb_low := nb_low + nb_normal - count_row;
+ end if;
+
+ return query select * from swh_scheduler_peek_tasks_with_priority(task_type, ts, nb_low, 'low');
+end
+$$;
+
+comment on function swh_scheduler_peek_priority_tasks(text, timestamptz, bigint)
+is 'Retrieve priority tasks';
+
+create or replace function swh_scheduler_peek_ready_tasks (task_type text, ts timestamptz default now(),
+ num_tasks bigint default NULL, num_tasks_priority bigint default NULL)
+ returns setof task
+ language plpgsql
+as $$
+declare
+ r record;
+ count_row bigint;
+ nb_diff bigint;
+ nb_tasks bigint;
+begin
+ count_row := 0;
+
+ for r in select * from swh_scheduler_peek_priority_tasks(task_type, ts, num_tasks_priority)
+ order by priority, next_run
+ loop
+ count_row := count_row + 1;
+ return next r;
+ end loop;
+
+ if count_row < num_tasks_priority then
+ nb_tasks := num_tasks + num_tasks_priority - count_row;
+ else
+ nb_tasks := num_tasks;
+ end if;
+
+ for r in select * from swh_scheduler_peek_no_priority_tasks(task_type, ts, nb_tasks)
+ order by priority, next_run
+ loop
+ return next r;
+ end loop;
+
+ return;
+end
+$$;
+
+comment on function swh_scheduler_peek_ready_tasks(text, timestamptz, bigint, bigint)
+is 'Retrieve tasks with/without priority in order';
+
+create or replace function swh_scheduler_grab_ready_tasks (task_type text, ts timestamptz default now(),
+ num_tasks bigint default NULL,
+ num_tasks_priority bigint default NULL)
+ returns setof task
+ language sql
+as $$
+ update task
+ set status='next_run_scheduled'
+ from (
+ select id from swh_scheduler_peek_ready_tasks(task_type, ts, num_tasks, num_tasks_priority)
+ ) next_tasks
+ where task.id = next_tasks.id
+ returning task.*;
+$$;
+
+comment on function swh_scheduler_grab_ready_tasks (text, timestamptz, bigint, bigint)
+is 'Grab tasks ready for scheduling and change their status';
+
+create index on task(priority);
+
+create or replace function swh_scheduler_create_tasks_from_temp ()
+ returns setof task
+ language plpgsql
+as $$
+begin
+ return query
+ insert into task (type, arguments, next_run, status, current_interval, policy, retries_left, priority)
+ select type, arguments, next_run, 'next_run_not_scheduled',
+ (select default_interval from task_type tt where tt.type = tmp_task.type),
+ coalesce(policy, 'recurring'),
+ coalesce(retries_left, (select num_retries from task_type tt where tt.type = tmp_task.type), 0),
+ coalesce(priority, null)
+ from tmp_task
+ returning task.*;
+end;
+$$;
+
+comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table';
diff --git a/sql/updates/10.sql b/sql/updates/10.sql
new file mode 100644
index 0000000..bd57768
--- /dev/null
+++ b/sql/updates/10.sql
@@ -0,0 +1,47 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 09
+-- to_version: 10
+-- description: Schedule task with priority
+
+insert into dbversion (version, release, description)
+values (10, now(), 'Work In Progress');
+
+drop type task_record cascade;
+create type task_record as (
+ task_id bigint,
+ task_policy task_policy,
+ task_status task_status,
+ task_run_id bigint,
+ arguments jsonb,
+ type text,
+ backend_id text,
+ metadata jsonb,
+ scheduled timestamptz,
+ started timestamptz,
+ ended timestamptz,
+ task_run_status task_run_status
+);
+
+drop index task_run_id_asc_idx;
+create index task_run_id_started_asc_idx on task_run(task asc, started asc);
+
+create or replace function swh_scheduler_task_to_archive(
+ ts_after timestamptz, ts_before timestamptz, last_id bigint default -1,
+ lim bigint default 10)
+ returns setof task_record
+ language sql stable
+as $$
+ select t.id as task_id, t.policy as task_policy,
+ t.status as task_status, tr.id as task_run_id,
+ t.arguments, t.type, tr.backend_id, tr.metadata,
+ tr.scheduled, tr.started, tr.ended, tr.status as task_run_status
+ from task_run tr inner join task t on tr.task=t.id
+ where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or
+ (t.policy = 'recurring' and t.status = 'disabled')) and
+ ((ts_after <= tr.started and tr.started < ts_before) or tr.started is null) and
+ t.id > last_id
+ order by tr.task, tr.started
+ limit lim;
+$$;
+
+comment on function swh_scheduler_task_to_archive is 'Read archivable tasks function';
diff --git a/sql/updates/11.sql b/sql/updates/11.sql
new file mode 100644
index 0000000..75edc4f
--- /dev/null
+++ b/sql/updates/11.sql
@@ -0,0 +1,64 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 10
+-- to_version: 11
+-- description: Upgrade scheduler create_tasks routine
+
+insert into dbversion (version, release, description)
+values (11, now(), 'Work In Progress');
+
+create or replace function swh_scheduler_mktemp_task ()
+ returns void
+ language sql
+as $$
+ create temporary table tmp_task (
+ like task excluding indexes
+ ) on commit drop;
+ alter table tmp_task
+ alter column retries_left drop not null,
+ drop column id;
+$$;
+
+comment on function swh_scheduler_mktemp_task () is 'Create a temporary table for bulk task creation';
+
+create or replace function swh_scheduler_create_tasks_from_temp ()
+ returns setof task
+ language plpgsql
+as $$
+begin
+ -- update the default values in one go
+ -- this is separated from the insert/select to avoid too much
+ -- juggling
+ update tmp_task t
+ set current_interval = tt.default_interval,
+ retries_left = coalesce(retries_left, tt.num_retries, 0)
+ from task_type tt
+ where tt.type=t.type;
+
+ insert into task (type, arguments, next_run, status, current_interval, policy,
+ retries_left, priority)
+ select type, arguments, next_run, status, current_interval, policy,
+ retries_left, priority
+ from tmp_task t
+ where not exists(select 1
+ from task
+ where type = t.type and
+ arguments = t.arguments and
+ policy = t.policy and
+ ((priority is null and t.priority is null)
+ or priority = t.priority) and
+ status = t.status);
+
+ return query
+ select distinct t.*
+ from tmp_task tt inner join task t on (
+ t.type = tt.type and
+ t.arguments = tt.arguments and
+ t.status = tt.status and
+ ((t.priority is null and tt.priority is null)
+ or t.priority=tt.priority) and
+ t.policy=tt.policy
+ );
+end;
+$$;
+
+comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table';
diff --git a/sql/updates/12.sql b/sql/updates/12.sql
new file mode 100644
index 0000000..3ab8c42
--- /dev/null
+++ b/sql/updates/12.sql
@@ -0,0 +1,51 @@
+-- SWH Scheduler Schema upgrade
+-- from_version: 11
+-- to_version: 12
+-- description: Upgrade scheduler create_tasks routine
+
+insert into dbversion (version, release, description)
+ values (12, now(), 'Work In Progress');
+
+create or replace function swh_scheduler_create_tasks_from_temp ()
+ returns setof task
+ language plpgsql
+as $$
+begin
+ -- update the default values in one go
+ -- this is separated from the insert/select to avoid too much
+ -- juggling
+ update tmp_task t
+ set current_interval = tt.default_interval,
+ retries_left = coalesce(retries_left, tt.num_retries, 0)
+ from task_type tt
+ where tt.type=t.type;
+
+ insert into task (type, arguments, next_run, status, current_interval, policy,
+ retries_left, priority)
+ select type, arguments, next_run, status, current_interval, policy,
+ retries_left, priority
+ from tmp_task t
+ where not exists(select 1
+ from task
+ where type = t.type and
+ arguments->'args' = t.arguments->'args' and
+ arguments->'kwargs' = t.arguments->'kwargs' and
+ policy = t.policy and
+ priority is not distinct from t.priority and
+ status = t.status);
+
+ return query
+ select distinct t.*
+ from tmp_task tt inner join task t on (
+ tt.type = t.type and
+ tt.arguments->'args' = t.arguments->'args' and
+ tt.arguments->'kwargs' = t.arguments->'kwargs' and
+ tt.policy = t.policy and
+ tt.priority is not distinct from t.priority and
+ tt.status = t.status
+ );
+end;
+$$;
+
+comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table';
+
diff --git a/sql/updates/13.sql b/sql/updates/13.sql
new file mode 100644
index 0000000..9f7279d
--- /dev/null
+++ b/sql/updates/13.sql
@@ -0,0 +1,19 @@
+insert into dbversion (version, release, description)
+ values (13, now(), 'Work In Progress');
+
+-- comments for columns of table dbversion
+comment on column dbversion.version is 'SQL schema version';
+comment on column dbversion.release is 'Version deployment timestamp';
+comment on column dbversion.description is 'Version description';
+
+-- comments for columns of table task
+comment on column task.id is 'Task Identifier';
+comment on column task.type is 'References task_type table';
+comment on column task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')';
+
+-- comments for columns of table task_run
+comment on column task_run.id is 'Task run identifier';
+comment on column task_run.task is 'References task table';
+comment on column task_run.scheduled is 'Scheduled run time for task';
+comment on column task_run.started is 'Task starting time';
+comment on column task_run.ended is 'Task ending time';
diff --git a/sql/updates/14.sql b/sql/updates/14.sql
new file mode 100644
index 0000000..98f8184
--- /dev/null
+++ b/sql/updates/14.sql
@@ -0,0 +1,50 @@
+insert into dbversion (version, release, description)
+ values (14, now(), 'Work In Progress');
+
+drop index task_args;
+drop index task_kwargs;
+
+create index on task using btree(type, md5(arguments::text));
+
+create or replace function swh_scheduler_create_tasks_from_temp ()
+ returns setof task
+ language plpgsql
+as $$
+begin
+ -- update the default values in one go
+ -- this is separated from the insert/select to avoid too much
+ -- juggling
+ update tmp_task t
+ set current_interval = tt.default_interval,
+ retries_left = coalesce(retries_left, tt.num_retries, 0)
+ from task_type tt
+ where tt.type=t.type;
+
+ insert into task (type, arguments, next_run, status, current_interval, policy,
+ retries_left, priority)
+ select type, arguments, next_run, status, current_interval, policy,
+ retries_left, priority
+ from tmp_task t
+ where not exists(select 1
+ from task
+ where type = t.type and
+ md5(arguments::text) = md5(t.arguments::text) and
+ arguments = t.arguments and
+ policy = t.policy and
+ priority is not distinct from t.priority and
+ status = t.status);
+
+ return query
+ select distinct t.*
+ from tmp_task tt inner join task t on (
+ tt.type = t.type and
+ md5(tt.arguments::text) = md5(t.arguments::text) and
+ tt.arguments = t.arguments and
+ tt.policy = t.policy and
+ tt.priority is not distinct from t.priority and
+ tt.status = t.status
+ );
+end;
+$$;
+
+comment on function swh_scheduler_create_tasks_from_temp () is 'Create tasks in bulk from the temporary table';
diff --git a/sql/updates/15.sql b/sql/updates/15.sql
new file mode 100644
index 0000000..5dd088d
--- /dev/null
+++ b/sql/updates/15.sql
@@ -0,0 +1,24 @@
+insert into dbversion (version, release, description)
+ values (15, now(), 'Work In Progress');
+
+create or replace function swh_scheduler_task_to_archive(
+ ts_after timestamptz, ts_before timestamptz, last_id bigint default -1,
+ lim bigint default 10)
+ returns setof task_record
+ language sql stable
+as $$
+ select t.id as task_id, t.policy as task_policy,
+ t.status as task_status, tr.id as task_run_id,
+ t.arguments, t.type, tr.backend_id, tr.metadata,
+ tr.scheduled, tr.started, tr.ended, tr.status as task_run_status
+ from task_run tr inner join task t on tr.task=t.id
+ where ((t.policy = 'oneshot' and t.status in ('completed', 'disabled')) or
+ (t.policy = 'recurring' and t.status = 'disabled')) and
+ ((ts_after <= tr.started and tr.started < ts_before) or
+ (tr.started is null and (ts_after <= tr.scheduled and tr.scheduled < ts_before))) and
+ t.id >= last_id
+ order by tr.task, tr.started
+ limit lim;
+$$;
+
+comment on function swh_scheduler_task_to_archive (timestamptz, timestamptz, bigint, bigint) is 'Read archivable tasks function';
diff --git a/sql/updates/16.sql b/sql/updates/16.sql
new file mode 100644
index 0000000..7213eee
--- /dev/null
+++ b/sql/updates/16.sql
@@ -0,0 +1,53 @@
+insert into dbversion (version, release, description)
+ values (16, now(), 'Work In Progress');
+
+create table if not exists listers (
+ id uuid primary key default uuid_generate_v4(),
+ name text not null,
+ instance_name text not null,
+ created timestamptz not null default now(), -- auto_now_add in the model
+ current_state jsonb not null,
+ updated timestamptz not null
+);
+
+comment on table listers is 'Lister instances known to the origin visit scheduler';
+comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)';
+comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)';
+comment on column listers.created is 'Timestamp at which the lister was originally created';
+comment on column listers.current_state is 'Known current state of this lister';
+comment on column listers.updated is 'Timestamp at which the lister state was last updated';
+
+-- lister schema
+create unique index on listers (name, instance_name);
+
+create table if not exists listed_origins (
+ -- Basic information
+ lister_id uuid not null references listers(id),
+ url text not null,
+ visit_type text not null,
+ extra_loader_arguments jsonb not null,
+
+ -- Whether this origin still exists or not
+ enabled boolean not null,
+
+ -- time-based information
+ first_seen timestamptz not null default now(),
+ last_seen timestamptz not null,
+
+ -- potentially provided by the lister
+ last_update timestamptz,
+
+ primary key (lister_id, url, visit_type)
+);
+
+comment on table listed_origins is 'Origins known to the origin visit scheduler';
+comment on column listed_origins.lister_id is 'Lister instance which owns this origin';
+comment on column listed_origins.url is 'URL of the origin listed';
+comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url';
+comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin';
+
+comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.';
+comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister';
+comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister';
+
+comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote';
diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO
index dc35e9d..9025937 100644
--- a/swh.scheduler.egg-info/PKG-INFO
+++ b/swh.scheduler.egg-info/PKG-INFO
@@ -1,30 +1,30 @@
Metadata-Version: 2.1
Name: swh.scheduler
-Version: 0.1.1
+Version: 0.2.0
Summary: Software Heritage Scheduler
Home-page: https://forge.softwareheritage.org/diffusion/DSCH/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/
Description: swh-scheduler
=============
Job scheduler for the Software Heritage project.
Task manager for asynchronous/delayed tasks, used for both recurrent (e.g.,
listing a forge, loading new stuff from a Git repository) and one-off
activities (e.g., loading a specific version of a source package).
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Provides-Extra: testing
diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt
index fa7fef9..38ebe79 100644
--- a/swh.scheduler.egg-info/SOURCES.txt
+++ b/swh.scheduler.egg-info/SOURCES.txt
@@ -1,60 +1,104 @@
+.gitignore
+.pre-commit-config.yaml
+AUTHORS
+CODE_OF_CONDUCT.md
+CONTRIBUTORS
+LICENSE
+LICENSE.Celery
MANIFEST.in
Makefile
README.md
+conftest.py
+mypy.ini
pyproject.toml
+pytest.ini
requirements-swh.txt
requirements-test.txt
requirements.txt
setup.cfg
setup.py
+tox.ini
version.txt
-bin/swh-worker-control
+data/README.md
+data/elastic-template.json
+data/update-index-settings.json
+docs/.gitignore
+docs/Makefile
+docs/conf.py
+docs/index.rst
+docs/_static/.placeholder
+docs/_templates/.placeholder
+sql/.gitignore
+sql/Makefile
+sql/updates/02.sql
+sql/updates/03.sql
+sql/updates/04.sql
+sql/updates/05.sql
+sql/updates/06.sql
+sql/updates/07.sql
+sql/updates/08.sql
+sql/updates/09.sql
+sql/updates/10.sql
+sql/updates/11.sql
+sql/updates/12.sql
+sql/updates/13.sql
+sql/updates/14.sql
+sql/updates/15.sql
+sql/updates/16.sql
swh/__init__.py
swh.scheduler.egg-info/PKG-INFO
swh.scheduler.egg-info/SOURCES.txt
swh.scheduler.egg-info/dependency_links.txt
swh.scheduler.egg-info/entry_points.txt
swh.scheduler.egg-info/requires.txt
swh.scheduler.egg-info/top_level.txt
swh/scheduler/__init__.py
swh/scheduler/backend.py
swh/scheduler/backend_es.py
swh/scheduler/cli_utils.py
swh/scheduler/elasticsearch_memory.py
+swh/scheduler/exc.py
+swh/scheduler/interface.py
+swh/scheduler/model.py
swh/scheduler/py.typed
swh/scheduler/task.py
swh/scheduler/utils.py
swh/scheduler/api/__init__.py
swh/scheduler/api/client.py
+swh/scheduler/api/serializers.py
swh/scheduler/api/server.py
swh/scheduler/celery_backend/__init__.py
swh/scheduler/celery_backend/config.py
swh/scheduler/celery_backend/listener.py
swh/scheduler/celery_backend/pika_listener.py
swh/scheduler/celery_backend/runner.py
swh/scheduler/cli/__init__.py
swh/scheduler/cli/admin.py
+swh/scheduler/cli/celery_monitor.py
swh/scheduler/cli/task.py
swh/scheduler/cli/task_type.py
swh/scheduler/cli/utils.py
+swh/scheduler/sql/10-swh-init.sql
swh/scheduler/sql/30-swh-schema.sql
swh/scheduler/sql/40-swh-func.sql
swh/scheduler/sql/50-swh-data.sql
swh/scheduler/sql/60-swh-indexes.sql
swh/scheduler/tests/__init__.py
swh/scheduler/tests/common.py
swh/scheduler/tests/conftest.py
swh/scheduler/tests/tasks.py
swh/scheduler/tests/test_api_client.py
swh/scheduler/tests/test_celery_tasks.py
swh/scheduler/tests/test_cli.py
+swh/scheduler/tests/test_cli_celery_monitor.py
swh/scheduler/tests/test_cli_task_type.py
swh/scheduler/tests/test_common.py
+swh/scheduler/tests/test_model.py
swh/scheduler/tests/test_scheduler.py
swh/scheduler/tests/test_server.py
swh/scheduler/tests/test_utils.py
swh/scheduler/tests/es/__init__.py
swh/scheduler/tests/es/conftest.py
swh/scheduler/tests/es/test_backend_es.py
swh/scheduler/tests/es/test_cli_task.py
swh/scheduler/tests/es/test_elasticsearch_memory.py
\ No newline at end of file
diff --git a/swh.scheduler.egg-info/requires.txt b/swh.scheduler.egg-info/requires.txt
index fb74eb5..0c3f81b 100644
--- a/swh.scheduler.egg-info/requires.txt
+++ b/swh.scheduler.egg-info/requires.txt
@@ -1,21 +1,23 @@
arrow
+attrs
+attrs-strict
celery>=4
future>=0.18.0
Click
elasticsearch>5.4
flask
pika>=1.1.0
psycopg2
pyyaml
vcversioner
setuptools
swh.core[db,http]>=0.0.65
swh.storage>=0.0.182
[testing]
pytest
pytest-mock
pytest-postgresql>=2.1.0
celery>=4.3
hypothesis>=3.11.0
swh.lister
diff --git a/swh/scheduler/api/client.py b/swh/scheduler/api/client.py
index 4cfe0df..15d8e54 100644
--- a/swh/scheduler/api/client.py
+++ b/swh/scheduler/api/client.py
@@ -1,142 +1,24 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.core.api import RPCClient
+from .serializers import ENCODERS, DECODERS
+from .. import exc
+from ..interface import SchedulerInterface
+
class RemoteScheduler(RPCClient):
"""Proxy to a remote scheduler API
"""
- def close_connection(self):
- return self.post("close_connection", {})
-
- def set_status_tasks(self, task_ids, status="disabled", next_run=None):
- return self.post(
- "set_status_tasks",
- dict(task_ids=task_ids, status=status, next_run=next_run),
- )
-
- def create_task_type(self, task_type):
- return self.post("create_task_type", {"task_type": task_type})
-
- def get_task_type(self, task_type_name):
- return self.post("get_task_type", {"task_type_name": task_type_name})
-
- def get_task_types(self):
- return self.post("get_task_types", {})
-
- def create_tasks(self, tasks):
- return self.post("create_tasks", {"tasks": tasks})
-
- def disable_tasks(self, task_ids):
- return self.post("disable_tasks", {"task_ids": task_ids})
-
- def get_tasks(self, task_ids):
- return self.post("get_tasks", {"task_ids": task_ids})
-
- def get_task_runs(self, task_ids, limit=None):
- return self.post("get_task_runs", {"task_ids": task_ids, "limit": limit})
-
- def search_tasks(
- self,
- task_id=None,
- task_type=None,
- status=None,
- priority=None,
- policy=None,
- before=None,
- after=None,
- limit=None,
- ):
- return self.post(
- "search_tasks",
- dict(
- task_id=task_id,
- task_type=task_type,
- status=status,
- priority=priority,
- policy=policy,
- before=before,
- after=after,
- limit=limit,
- ),
- )
-
- def peek_ready_tasks(
- self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None
- ):
- return self.post(
- "peek_ready_tasks",
- {
- "task_type": task_type,
- "timestamp": timestamp,
- "num_tasks": num_tasks,
- "num_tasks_priority": num_tasks_priority,
- },
- )
-
- def grab_ready_tasks(
- self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None
- ):
- return self.post(
- "grab_ready_tasks",
- {
- "task_type": task_type,
- "timestamp": timestamp,
- "num_tasks": num_tasks,
- "num_tasks_priority": num_tasks_priority,
- },
- )
-
- def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None):
- return self.post(
- "schedule_task_run",
- {
- "task_id": task_id,
- "backend_id": backend_id,
- "metadata": metadata,
- "timestamp": timestamp,
- },
- )
-
- def mass_schedule_task_runs(self, task_runs):
- return self.post("mass_schedule_task_runs", {"task_runs": task_runs})
-
- def start_task_run(self, backend_id, metadata=None, timestamp=None):
- return self.post(
- "start_task_run",
- {"backend_id": backend_id, "metadata": metadata, "timestamp": timestamp,},
- )
-
- def end_task_run(self, backend_id, status, metadata=None, timestamp=None):
- return self.post(
- "end_task_run",
- {
- "backend_id": backend_id,
- "status": status,
- "metadata": metadata,
- "timestamp": timestamp,
- },
- )
-
- def filter_task_to_archive(self, after_ts, before_ts, limit=10, page_token=None):
- return self.post(
- "filter_task_to_archive",
- {
- "after_ts": after_ts,
- "before_ts": before_ts,
- "limit": limit,
- "page_token": page_token,
- },
- )
+ backend_class = SchedulerInterface
- def delete_archived_tasks(self, task_ids):
- return self.post("delete_archived_tasks", {"task_ids": task_ids})
+ reraise_exceptions = [getattr(exc, a) for a in exc.__all__]
- def get_priority_ratios(self):
- return self.get("get_priority_ratios")
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
diff --git a/swh/scheduler/api/serializers.py b/swh/scheduler/api/serializers.py
new file mode 100644
index 0000000..930c700
--- /dev/null
+++ b/swh/scheduler/api/serializers.py
@@ -0,0 +1,28 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Decoder and encoders for swh.scheduler.model objects."""
+
+from typing import Callable, Dict, List, Tuple
+
+import attr
+
+import swh.scheduler.model as model
+
+
+def _encode_model_object(obj):
+ d = attr.asdict(obj)
+ d["__type__"] = type(obj).__name__
+ return d
+
+
+ENCODERS: List[Tuple[type, str, Callable]] = [
+ (model.BaseSchedulerModel, "scheduler_model", _encode_model_object),
+]
+
+
+DECODERS: Dict[str, Callable] = {
+ "scheduler_model": lambda d: getattr(model, d.pop("__type__"))(**d)
+}
diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py
index 02ad19f..03b424f 100644
--- a/swh/scheduler/api/server.py
+++ b/swh/scheduler/api/server.py
@@ -1,266 +1,140 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import os
import logging
-
-from flask import request, Flask
+import os
from swh.core import config
-from swh.core.api import (
- decode_request,
- error_handler,
- encode_data_server as encode_data,
-)
+from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp
+from swh.core.api import encode_data_server as encode_data
+from swh.core.api import error_handler, negotiate
-from swh.core.api import negotiate, JSONFormatter, MsgpackFormatter
-from swh.scheduler import get_scheduler as get_scheduler_from
+from swh.scheduler import get_scheduler
+from swh.scheduler.exc import SchedulerException
+from swh.scheduler.interface import SchedulerInterface
+from .serializers import ENCODERS, DECODERS
-app = Flask(__name__)
scheduler = None
-@app.errorhandler(Exception)
-def my_error_handler(exception):
- return error_handler(exception, encode_data)
-
-
-def get_sched():
+def get_global_scheduler():
global scheduler
if not scheduler:
- scheduler = get_scheduler_from(**app.config["scheduler"])
+ scheduler = get_scheduler(**app.config["scheduler"])
return scheduler
-def has_no_empty_params(rule):
- return len(rule.defaults or ()) >= len(rule.arguments or ())
-
-
-@app.route("/")
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def index():
- return "SWH Scheduler API server"
-
-
-@app.route("/close_connection", methods=["GET", "POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def close_connection():
- return get_sched().close_connection()
-
-
-@app.route("/set_status_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def set_status_tasks():
- return get_sched().set_status_tasks(**decode_request(request))
+class SchedulerServerApp(RPCServerApp):
+ extra_type_decoders = DECODERS
+ extra_type_encoders = ENCODERS
-@app.route("/create_task_type", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def create_task_type():
- return get_sched().create_task_type(**decode_request(request))
-
-
-@app.route("/get_task_type", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_task_type():
- return get_sched().get_task_type(**decode_request(request))
-
-
-@app.route("/get_task_types", methods=["GET", "POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_task_types():
- return get_sched().get_task_types(**decode_request(request))
-
-
-@app.route("/create_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def create_tasks():
- return get_sched().create_tasks(**decode_request(request))
-
-
-@app.route("/disable_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def disable_tasks():
- return get_sched().disable_tasks(**decode_request(request))
-
-
-@app.route("/get_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_tasks():
- return get_sched().get_tasks(**decode_request(request))
-
-
-@app.route("/get_task_runs", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_task_runs():
- return get_sched().get_task_runs(**decode_request(request))
-
-
-@app.route("/search_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def search_tasks():
- return get_sched().search_tasks(**decode_request(request))
-
-
-@app.route("/peek_ready_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def peek_ready_tasks():
- return get_sched().peek_ready_tasks(**decode_request(request))
-
-
-@app.route("/grab_ready_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def grab_ready_tasks():
- return get_sched().grab_ready_tasks(**decode_request(request))
-
-
-@app.route("/schedule_task_run", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def schedule_task_run():
- return get_sched().schedule_task_run(**decode_request(request))
-
-
-@app.route("/mass_schedule_task_runs", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def mass_schedule_task_runs():
- return get_sched().mass_schedule_task_runs(**decode_request(request))
-
-
-@app.route("/start_task_run", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def start_task_run():
- return get_sched().start_task_run(**decode_request(request))
-
-
-@app.route("/end_task_run", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def end_task_run():
- return get_sched().end_task_run(**decode_request(request))
+app = SchedulerServerApp(
+ __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler
+)
-@app.route("/filter_task_to_archive", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def filter_task_to_archive():
- return get_sched().filter_task_to_archive(**decode_request(request))
+@app.errorhandler(SchedulerException)
+def argument_error_handler(exception):
+ return error_handler(exception, encode_data, status_code=400)
-@app.route("/delete_archived_tasks", methods=["POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def delete_archived_tasks():
- return get_sched().delete_archived_tasks(**decode_request(request))
+@app.errorhandler(Exception)
+def my_error_handler(exception):
+ return error_handler(exception, encode_data)
-@app.route("/get_priority_ratios", methods=["GET", "POST"])
-@negotiate(MsgpackFormatter)
-@negotiate(JSONFormatter)
-def get_priority_ratios():
- return get_sched().get_priority_ratios(**decode_request(request))
+def has_no_empty_params(rule):
+ return len(rule.defaults or ()) >= len(rule.arguments or ())
@app.route("/site-map")
@negotiate(MsgpackFormatter)
@negotiate(JSONFormatter)
def site_map():
links = []
- sched = get_sched()
for rule in app.url_map.iter_rules():
- if has_no_empty_params(rule) and hasattr(sched, rule.endpoint):
+ if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint):
links.append(
- dict(rule=rule.rule, description=getattr(sched, rule.endpoint).__doc__)
+ dict(
+ rule=rule.rule,
+ description=getattr(SchedulerInterface, rule.endpoint).__doc__,
+ )
)
# links is now a list of url, endpoint tuples
return links
def load_and_check_config(config_file, type="local"):
"""Check the minimal configuration is set to run the api or raise an
error explanation.
Args:
config_file (str): Path to the configuration file to load
type (str): configuration type. For 'local' type, more
checks are done.
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
if not config_file:
raise EnvironmentError("Configuration file must be defined")
if not os.path.exists(config_file):
raise FileNotFoundError("Configuration file %s does not exist" % (config_file,))
cfg = config.read(config_file)
vcfg = cfg.get("scheduler")
if not vcfg:
raise KeyError("Missing '%scheduler' configuration")
if type == "local":
cls = vcfg.get("cls")
if cls != "local":
raise ValueError(
"The scheduler backend can only be started with a 'local' "
"configuration"
)
args = vcfg.get("args")
if not args:
raise KeyError("Invalid configuration; missing 'args' config entry")
db = args.get("db")
if not db:
raise KeyError("Invalid configuration; missing 'db' config entry")
return cfg
api_cfg = None
def make_app_from_configfile():
"""Run the WSGI app from the webserver, loading the configuration from
a configuration file.
SWH_CONFIG_FILENAME environment variable defines the
configuration path to load.
"""
global api_cfg
if not api_cfg:
config_file = os.environ.get("SWH_CONFIG_FILENAME")
api_cfg = load_and_check_config(config_file)
app.config.update(api_cfg)
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app
if __name__ == "__main__":
print('Please use the "swh-scheduler api-server" command')
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
index 5ae47dc..cfbdff2 100644
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -1,571 +1,674 @@
-# Copyright (C) 2015-2019 The Software Heritage developers
+# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from arrow import Arrow, utcnow
+import attr
import psycopg2.pool
import psycopg2.extras
-from typing import Any, Dict, Optional
+from typing import Any, Dict, Iterable, List, Optional
from psycopg2.extensions import AsIs
from swh.core.db import BaseDb
from swh.core.db.common import db_transaction
+from .exc import StaleData
+from .model import Lister, ListedOrigin
logger = logging.getLogger(__name__)
def adapt_arrow(arrow):
return AsIs("'%s'::timestamptz" % arrow.isoformat())
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
psycopg2.extensions.register_adapter(Arrow, adapt_arrow)
+psycopg2.extras.register_uuid()
def format_query(query, keys):
"""Format a query with the given keys"""
query_keys = ", ".join(keys)
placeholders = ", ".join(["%s"] * len(keys))
return query.format(keys=query_keys, placeholders=placeholders)
class SchedulerBackend:
"""Backend for the Software Heritage scheduling database.
"""
def __init__(self, db, min_pool_conns=1, max_pool_conns=10):
"""
Args:
db_conn: either a libpq connection string, or a psycopg2 connection
"""
if isinstance(db, psycopg2.extensions.connection):
self._pool = None
self._db = BaseDb(db)
else:
self._pool = psycopg2.pool.ThreadedConnectionPool(
min_pool_conns,
max_pool_conns,
db,
cursor_factory=psycopg2.extras.RealDictCursor,
)
self._db = None
def get_db(self):
if self._db:
return self._db
return BaseDb.from_pool(self._pool)
def put_db(self, db):
if db is not self._db:
db.put_conn()
task_type_keys = [
"type",
"description",
"backend_name",
"default_interval",
"min_interval",
"max_interval",
"backoff_factor",
"max_queue_length",
"num_retries",
"retry_delay",
]
@db_transaction()
def create_task_type(self, task_type, db=None, cur=None):
"""Create a new task type ready for scheduling.
Args:
task_type (dict): a dictionary with the following keys:
- type (str): an identifier for the task type
- description (str): a human-readable description of what the
task does
- backend_name (str): the name of the task in the
job-scheduling backend
- default_interval (datetime.timedelta): the default interval
between two task runs
- min_interval (datetime.timedelta): the minimum interval
between two task runs
- max_interval (datetime.timedelta): the maximum interval
between two task runs
- backoff_factor (float): the factor by which the interval
changes at each run
- max_queue_length (int): the maximum length of the task queue
for this task type
"""
keys = [key for key in self.task_type_keys if key in task_type]
query = format_query(
"""insert into task_type ({keys}) values ({placeholders})
on conflict do nothing""",
keys,
)
cur.execute(query, [task_type[key] for key in keys])
@db_transaction()
def get_task_type(self, task_type_name, db=None, cur=None):
"""Retrieve the task type with id task_type_name"""
query = format_query(
"select {keys} from task_type where type=%s", self.task_type_keys,
)
cur.execute(query, (task_type_name,))
return cur.fetchone()
@db_transaction()
def get_task_types(self, db=None, cur=None):
"""Retrieve all registered task types"""
query = format_query("select {keys} from task_type", self.task_type_keys,)
cur.execute(query)
return cur.fetchall()
+ @db_transaction()
+ def get_or_create_lister(
+ self, name: str, instance_name: Optional[str] = None, db=None, cur=None
+ ) -> Lister:
+ """Retrieve information about the given instance of the lister from the
+ database, or create the entry if it did not exist.
+ """
+
+ if instance_name is None:
+ instance_name = ""
+
+ select_cols = ", ".join(Lister.select_columns())
+ insert_cols, insert_meta = (
+ ", ".join(tup) for tup in Lister.insert_columns_and_metavars()
+ )
+
+ query = f"""
+ with added as (
+ insert into listers ({insert_cols}) values ({insert_meta})
+ on conflict do nothing
+ returning {select_cols}
+ )
+ select {select_cols} from added
+ union all
+ select {select_cols} from listers
+ where (name, instance_name) = (%(name)s, %(instance_name)s);
+ """
+
+ cur.execute(query, attr.asdict(Lister(name=name, instance_name=instance_name)))
+
+ return Lister(**cur.fetchone())
+
+ @db_transaction()
+ def update_lister(self, lister: Lister, db=None, cur=None) -> Lister:
+ """Update the state for the given lister instance in the database.
+
+ Returns:
+ a new Lister object, with all fields updated from the database
+
+ Raises:
+ StaleData if the `updated` timestamp for the lister instance in
+ database doesn't match the one passed by the user.
+ """
+
+ select_cols = ", ".join(Lister.select_columns())
+ set_vars = ", ".join(
+ f"{col} = {meta}"
+ for col, meta in zip(*Lister.insert_columns_and_metavars())
+ )
+
+ query = f"""update listers
+ set {set_vars}
+ where id=%(id)s and updated=%(updated)s
+ returning {select_cols}"""
+
+ cur.execute(query, attr.asdict(lister))
+ updated = cur.fetchone()
+
+ if not updated:
+ raise StaleData("Stale data; Lister state not updated")
+
+ return Lister(**updated)
+
+ @db_transaction()
+ def record_listed_origins(
+ self, listed_origins: Iterable[ListedOrigin], db=None, cur=None
+ ) -> List[ListedOrigin]:
+ """Record a set of origins that a lister has listed.
+
+ This performs an "upsert": origins with the same (lister_id, url,
+ visit_type) values are updated with new values for
+ extra_loader_arguments, last_update and last_seen.
+ """
+
+ pk_cols = ListedOrigin.primary_key_columns()
+ select_cols = ListedOrigin.select_columns()
+ insert_cols, insert_meta = ListedOrigin.insert_columns_and_metavars()
+
+ upsert_cols = [col for col in insert_cols if col not in pk_cols]
+ upsert_set = ", ".join(f"{col} = EXCLUDED.{col}" for col in upsert_cols)
+
+ query = f"""INSERT into listed_origins ({", ".join(insert_cols)})
+ VALUES %s
+ ON CONFLICT ({", ".join(pk_cols)}) DO UPDATE
+ SET {upsert_set}
+ RETURNING {", ".join(select_cols)}
+ """
+
+ ret = psycopg2.extras.execute_values(
+ cur=cur,
+ sql=query,
+ argslist=(attr.asdict(origin) for origin in listed_origins),
+ template=f"({', '.join(insert_meta)})",
+ page_size=1000,
+ fetch=True,
+ )
+
+ return [ListedOrigin(**d) for d in ret]
+
task_create_keys = [
"type",
"arguments",
"next_run",
"policy",
"status",
"retries_left",
"priority",
]
task_keys = task_create_keys + ["id", "current_interval"]
@db_transaction()
def create_tasks(self, tasks, policy="recurring", db=None, cur=None):
"""Create new tasks.
Args:
tasks (list): each task is a dictionary with the following keys:
- type (str): the task type
- arguments (dict): the arguments for the task runner, keys:
- args (list of str): arguments
- kwargs (dict str -> str): keyword arguments
- next_run (datetime.datetime): the next scheduled run for the
task
Returns:
a list of created tasks.
"""
cur.execute("select swh_scheduler_mktemp_task()")
db.copy_to(
tasks,
"tmp_task",
self.task_create_keys,
default_values={"policy": policy, "status": "next_run_not_scheduled"},
cur=cur,
)
query = format_query(
"select {keys} from swh_scheduler_create_tasks_from_temp()", self.task_keys,
)
cur.execute(query)
return cur.fetchall()
@db_transaction()
def set_status_tasks(
self, task_ids, status="disabled", next_run=None, db=None, cur=None
):
"""Set the tasks' status whose ids are listed.
If given, also set the next_run date.
"""
if not task_ids:
return
query = ["UPDATE task SET status = %s"]
args = [status]
if next_run:
query.append(", next_run = %s")
args.append(next_run)
query.append(" WHERE id IN %s")
args.append(tuple(task_ids))
cur.execute("".join(query), args)
@db_transaction()
def disable_tasks(self, task_ids, db=None, cur=None):
"""Disable the tasks whose ids are listed."""
return self.set_status_tasks(task_ids, db=db, cur=cur)
@db_transaction()
def search_tasks(
self,
task_id=None,
task_type=None,
status=None,
priority=None,
policy=None,
before=None,
after=None,
limit=None,
db=None,
cur=None,
):
"""Search tasks from selected criterions"""
where = []
args = []
if task_id:
if isinstance(task_id, (str, int)):
where.append("id = %s")
else:
where.append("id in %s")
task_id = tuple(task_id)
args.append(task_id)
if task_type:
if isinstance(task_type, str):
where.append("type = %s")
else:
where.append("type in %s")
task_type = tuple(task_type)
args.append(task_type)
if status:
if isinstance(status, str):
where.append("status = %s")
else:
where.append("status in %s")
status = tuple(status)
args.append(status)
if priority:
if isinstance(priority, str):
where.append("priority = %s")
else:
priority = tuple(priority)
where.append("priority in %s")
args.append(priority)
if policy:
where.append("policy = %s")
args.append(policy)
if before:
where.append("next_run <= %s")
args.append(before)
if after:
where.append("next_run >= %s")
args.append(after)
query = "select * from task"
if where:
query += " where " + " and ".join(where)
if limit:
query += " limit %s :: bigint"
args.append(limit)
cur.execute(query, args)
return cur.fetchall()
@db_transaction()
def get_tasks(self, task_ids, db=None, cur=None):
"""Retrieve the info of tasks whose ids are listed."""
query = format_query("select {keys} from task where id in %s", self.task_keys)
cur.execute(query, (tuple(task_ids),))
return cur.fetchall()
@db_transaction()
def peek_ready_tasks(
self,
task_type,
timestamp=None,
num_tasks=None,
num_tasks_priority=None,
db=None,
cur=None,
):
"""Fetch the list of ready tasks
Args:
task_type (str): filtering task per their type
timestamp (datetime.datetime): peek tasks that need to be executed
before that timestamp
num_tasks (int): only peek at num_tasks tasks (with no priority)
num_tasks_priority (int): only peek at num_tasks_priority
tasks (with priority)
Returns:
a list of tasks
"""
if timestamp is None:
timestamp = utcnow()
cur.execute(
"""select * from swh_scheduler_peek_ready_tasks(
%s, %s, %s :: bigint, %s :: bigint)""",
(task_type, timestamp, num_tasks, num_tasks_priority),
)
logger.debug("PEEK %s => %s" % (task_type, cur.rowcount))
return cur.fetchall()
@db_transaction()
def grab_ready_tasks(
self,
task_type,
timestamp=None,
num_tasks=None,
num_tasks_priority=None,
db=None,
cur=None,
):
"""Fetch the list of ready tasks, and mark them as scheduled
Args:
task_type (str): filtering task per their type
timestamp (datetime.datetime): grab tasks that need to be executed
before that timestamp
num_tasks (int): only grab num_tasks tasks (with no priority)
num_tasks_priority (int): only grab oneshot num_tasks tasks (with
priorities)
Returns:
a list of tasks
"""
if timestamp is None:
timestamp = utcnow()
cur.execute(
"""select * from swh_scheduler_grab_ready_tasks(
%s, %s, %s :: bigint, %s :: bigint)""",
(task_type, timestamp, num_tasks, num_tasks_priority),
)
logger.debug("GRAB %s => %s" % (task_type, cur.rowcount))
return cur.fetchall()
task_run_create_keys = ["task", "backend_id", "scheduled", "metadata"]
@db_transaction()
def schedule_task_run(
self, task_id, backend_id, metadata=None, timestamp=None, db=None, cur=None
):
"""Mark a given task as scheduled, adding a task_run entry in the database.
Args:
task_id (int): the identifier for the task being scheduled
backend_id (str): the identifier of the job in the backend
metadata (dict): metadata to add to the task_run entry
timestamp (datetime.datetime): the instant the event occurred
Returns:
a fresh task_run entry
"""
if metadata is None:
metadata = {}
if timestamp is None:
timestamp = utcnow()
cur.execute(
"select * from swh_scheduler_schedule_task_run(%s, %s, %s, %s)",
(task_id, backend_id, metadata, timestamp),
)
return cur.fetchone()
@db_transaction()
def mass_schedule_task_runs(self, task_runs, db=None, cur=None):
"""Schedule a bunch of task runs.
Args:
task_runs (list): a list of dicts with keys:
- task (int): the identifier for the task being scheduled
- backend_id (str): the identifier of the job in the backend
- metadata (dict): metadata to add to the task_run entry
- scheduled (datetime.datetime): the instant the event occurred
Returns:
None
"""
cur.execute("select swh_scheduler_mktemp_task_run()")
db.copy_to(task_runs, "tmp_task_run", self.task_run_create_keys, cur=cur)
cur.execute("select swh_scheduler_schedule_task_run_from_temp()")
@db_transaction()
def start_task_run(
self, backend_id, metadata=None, timestamp=None, db=None, cur=None
):
"""Mark a given task as started, updating the corresponding task_run
entry in the database.
Args:
backend_id (str): the identifier of the job in the backend
metadata (dict): metadata to add to the task_run entry
timestamp (datetime.datetime): the instant the event occurred
Returns:
the updated task_run entry
"""
if metadata is None:
metadata = {}
if timestamp is None:
timestamp = utcnow()
cur.execute(
"select * from swh_scheduler_start_task_run(%s, %s, %s)",
(backend_id, metadata, timestamp),
)
return cur.fetchone()
@db_transaction()
def end_task_run(
self,
backend_id,
status,
metadata=None,
timestamp=None,
result=None,
db=None,
cur=None,
):
"""Mark a given task as ended, updating the corresponding task_run entry in the
database.
Args:
backend_id (str): the identifier of the job in the backend
status (str): how the task ended; one of: 'eventful', 'uneventful',
'failed'
metadata (dict): metadata to add to the task_run entry
timestamp (datetime.datetime): the instant the event occurred
Returns:
the updated task_run entry
"""
if metadata is None:
metadata = {}
if timestamp is None:
timestamp = utcnow()
cur.execute(
"select * from swh_scheduler_end_task_run(%s, %s, %s, %s)",
(backend_id, status, metadata, timestamp),
)
return cur.fetchone()
@db_transaction()
def filter_task_to_archive(
self,
after_ts: str,
before_ts: str,
limit: int = 10,
page_token: Optional[str] = None,
db=None,
cur=None,
) -> Dict[str, Any]:
"""Compute the tasks to archive within the datetime interval
[after_ts, before_ts[. The method returns a paginated result.
Returns:
dict with the following keys:
- **next_page_token**: opaque token to be used as
`page_token` to retrieve the next page of result. If absent,
there is no more pages to gather.
- **tasks**: list of task dictionaries with the following keys:
**id** (str): origin task id
**started** (Optional[datetime]): started date
**scheduled** (datetime): scheduled date
**arguments** (json dict): task's arguments
...
"""
assert not page_token or isinstance(page_token, str)
last_id = -1 if page_token is None else int(page_token)
tasks = []
cur.execute(
"select * from swh_scheduler_task_to_archive(%s, %s, %s, %s)",
(after_ts, before_ts, last_id, limit + 1),
)
for row in cur:
task = dict(row)
# nested type index does not accept bare values
# transform it as a dict to comply with this
task["arguments"]["args"] = {
i: v for i, v in enumerate(task["arguments"]["args"])
}
kwargs = task["arguments"]["kwargs"]
task["arguments"]["kwargs"] = json.dumps(kwargs)
tasks.append(task)
if len(tasks) >= limit + 1: # remains data, add pagination information
result = {
"tasks": tasks[:limit],
"next_page_token": str(tasks[-1]["task_id"]),
}
else:
result = {"tasks": tasks}
return result
@db_transaction()
def delete_archived_tasks(self, task_ids, db=None, cur=None):
"""Delete archived tasks as much as possible. Only the task_ids whose
complete associated task_run have been cleaned up will be.
"""
_task_ids = _task_run_ids = []
for task_id in task_ids:
_task_ids.append(task_id["task_id"])
_task_run_ids.append(task_id["task_run_id"])
cur.execute(
"select * from swh_scheduler_delete_archived_tasks(%s, %s)",
(_task_ids, _task_run_ids),
)
task_run_keys = [
"id",
"task",
"backend_id",
"scheduled",
"started",
"ended",
"metadata",
"status",
]
@db_transaction()
def get_task_runs(self, task_ids, limit=None, db=None, cur=None):
"""Search task run for a task id"""
where = []
args = []
if task_ids:
if isinstance(task_ids, (str, int)):
where.append("task = %s")
else:
where.append("task in %s")
task_ids = tuple(task_ids)
args.append(task_ids)
else:
return ()
query = "select * from task_run where " + " and ".join(where)
if limit:
query += " limit %s :: bigint"
args.append(limit)
cur.execute(query, args)
return cur.fetchall()
@db_transaction()
def get_priority_ratios(self, db=None, cur=None):
cur.execute("select id, ratio from priority_ratio")
return {row["id"]: row["ratio"] for row in cur.fetchall()}
diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py
index 34f33dd..57e26a4 100644
--- a/swh/scheduler/cli/__init__.py
+++ b/swh/scheduler/cli/__init__.py
@@ -1,93 +1,91 @@
# Copyright (C) 2016-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import click
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
@click.group(name="scheduler", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Configuration file.",
)
@click.option(
"--database",
"-d",
default=None,
help="Scheduling database DSN (imply cls is 'local')",
)
@click.option(
"--url", "-u", default=None, help="Scheduler's url access (imply cls is 'remote')"
)
@click.option(
"--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console"
)
@click.pass_context
def cli(ctx, config_file, database, url, no_stdout):
"""Software Heritage Scheduler tools.
Use a local scheduler instance by default (plugged to the
main scheduler db).
"""
+ try:
+ from psycopg2 import OperationalError
+ except ImportError:
+
+ class OperationalError(Exception):
+ pass
+
from swh.core import config
- from swh.scheduler.celery_backend.config import setup_log_handler
from swh.scheduler import get_scheduler, DEFAULT_CONFIG
ctx.ensure_object(dict)
- log_level = ctx.obj.get("log_level", logging.INFO)
-
- setup_log_handler(
- loglevel=log_level,
- colorize=False,
- format="[%(levelname)s] %(name)s -- %(message)s",
- log_console=not no_stdout,
- )
logger = logging.getLogger(__name__)
scheduler = None
conf = config.read(config_file, DEFAULT_CONFIG)
if "scheduler" not in conf:
raise ValueError("missing 'scheduler' configuration")
if database:
conf["scheduler"]["cls"] = "local"
conf["scheduler"]["args"]["db"] = database
elif url:
conf["scheduler"]["cls"] = "remote"
conf["scheduler"]["args"] = {"url": url}
sched_conf = conf["scheduler"]
try:
logger.debug("Instantiating scheduler with %s" % (sched_conf))
scheduler = get_scheduler(**sched_conf)
- except ValueError:
+ except (ValueError, OperationalError):
# it's the subcommand to decide whether not having a proper
# scheduler instance is a problem.
pass
ctx.obj["scheduler"] = scheduler
ctx.obj["config"] = conf
-from . import admin, task, task_type # noqa
+from . import admin, celery_monitor, task, task_type # noqa
def main():
import click.core
click.core.DEPRECATED_HELP_NOTICE = """
DEPRECATED! Please use the command 'swh scheduler'."""
cli.deprecated = True
return cli(auto_envvar_prefix="SWH_SCHEDULER")
if __name__ == "__main__":
main()
diff --git a/swh/scheduler/cli/celery_monitor.py b/swh/scheduler/cli/celery_monitor.py
new file mode 100644
index 0000000..7204ead
--- /dev/null
+++ b/swh/scheduler/cli/celery_monitor.py
@@ -0,0 +1,157 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from ast import literal_eval
+import csv
+import logging
+import sys
+import time
+from typing import Any, Dict, Optional
+
+import click
+
+from . import cli
+
+logger = logging.getLogger(__name__)
+
+
+def destination_from_pattern(ctx: click.Context, pattern: Optional[str]):
+ """Get the celery destination pattern from host and type values"""
+ if pattern is None:
+ logger.debug("Matching all workers")
+ elif "*" in pattern:
+ ctx.obj["inspect"].pattern = pattern
+ ctx.obj["inspect"].matcher = "glob"
+ logger.debug("Using glob pattern %s", pattern)
+ else:
+ destination = pattern.split(",")
+ ctx.obj["inspect"].destination = destination
+ logger.debug("Using destinations %s", ", ".join(destination))
+
+
+@cli.group("celery-monitor")
+@click.option(
+ "--timeout", type=float, default=3.0, help="Timeout for celery remote control"
+)
+@click.option("--pattern", help="Celery destination pattern", default=None)
+@click.pass_context
+def celery_monitor(ctx: click.Context, timeout: float, pattern: Optional[str]) -> None:
+ """Monitoring of Celery"""
+ from swh.scheduler.celery_backend.config import app
+
+ ctx.obj["timeout"] = timeout
+ ctx.obj["inspect"] = app.control.inspect(timeout=timeout)
+
+ destination_from_pattern(ctx, pattern)
+
+
+@celery_monitor.command("ping-workers")
+@click.pass_context
+def ping_workers(ctx: click.Context) -> None:
+ """Check which workers respond to the celery remote control"""
+
+ response_times = {}
+
+ def ping_callback(response):
+ rtt = time.monotonic() - ping_time
+ for destination in response:
+ logger.debug("Got ping response from %s: %r", destination, response)
+ response_times[destination] = rtt
+
+ ctx.obj["inspect"].callback = ping_callback
+
+ ping_time = time.monotonic()
+ ret = ctx.obj["inspect"].ping()
+
+ if not ret:
+ logger.info("No response in %f seconds", time.monotonic() - ping_time)
+ ctx.exit(1)
+
+ for destination in ret:
+ logger.info(
+ "Got response from %s in %f seconds",
+ destination,
+ response_times[destination],
+ )
+
+ ctx.exit(0)
+
+
+@celery_monitor.command("list-running")
+@click.option(
+ "--format",
+ help="Output format",
+ default="pretty",
+ type=click.Choice(["pretty", "csv"]),
+)
+@click.pass_context
+def list_running(ctx: click.Context, format: str):
+ """List running tasks on the lister workers"""
+ response_times = {}
+
+ def active_callback(response):
+ rtt = time.monotonic() - active_time
+ for destination in response:
+ response_times[destination] = rtt
+
+ ctx.obj["inspect"].callback = active_callback
+
+ active_time = time.monotonic()
+ ret = ctx.obj["inspect"].active()
+
+ if not ret:
+ logger.info("No response in %f seconds", time.monotonic() - active_time)
+ ctx.exit(1)
+
+ def pretty_task_arguments(task: Dict[str, Any]) -> str:
+ arg_list = []
+ for arg in task["args"]:
+ arg_list.append(repr(arg))
+ for k, v in task["kwargs"].items():
+ arg_list.append(f"{k}={v!r}")
+
+ return f'{task["name"]}({", ".join(arg_list)})'
+
+ def get_task_data(worker: str, task: Dict[str, Any]) -> Dict[str, Any]:
+ duration = time.time() - task["time_start"]
+ return {
+ "worker": worker,
+ "name": task["name"],
+ "args": literal_eval(task["args"]),
+ "kwargs": literal_eval(task["kwargs"]),
+ "duration": duration,
+ "worker_pid": task["worker_pid"],
+ }
+
+ if format == "csv":
+ writer = csv.DictWriter(
+ sys.stdout, ["worker", "name", "args", "kwargs", "duration", "worker_pid"]
+ )
+ writer.writeheader()
+
+ def output(data: Dict[str, Any]):
+ writer.writerow(data)
+
+ elif format == "pretty":
+
+ def output(data: Dict[str, Any]):
+ print(
+ f"{data['worker']}: {pretty_task_arguments(data)} "
+ f"[for {data['duration']:f}s, pid={data['worker_pid']}]"
+ )
+
+ else:
+ logger.error("Unknown format %s", format)
+ ctx.exit(127)
+
+ for worker, active in sorted(ret.items()):
+ if not active:
+ logger.info("%s: no active tasks", worker)
+ continue
+
+ for task in sorted(active, key=lambda t: t["time_start"]):
+ output(get_task_data(worker, task))
+
+ ctx.exit(0)
diff --git a/swh/scheduler/exc.py b/swh/scheduler/exc.py
new file mode 100644
index 0000000..0c92e43
--- /dev/null
+++ b/swh/scheduler/exc.py
@@ -0,0 +1,17 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+__all__ = [
+ "SchedulerException",
+ "StaleData",
+]
+
+
+class SchedulerException(Exception):
+ pass
+
+
+class StaleData(SchedulerException):
+ pass
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
new file mode 100644
index 0000000..0ff5311
--- /dev/null
+++ b/swh/scheduler/interface.py
@@ -0,0 +1,290 @@
+# Copyright (C) 2015-2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Dict, Iterable, List, Optional
+
+from swh.core.api import remote_api_endpoint
+
+from swh.scheduler.model import ListedOrigin, Lister
+
+
+class SchedulerInterface:
+ @remote_api_endpoint("task_type/create")
+ def create_task_type(self, task_type):
+ """Create a new task type ready for scheduling.
+
+ Args:
+ task_type (dict): a dictionary with the following keys:
+
+ - type (str): an identifier for the task type
+ - description (str): a human-readable description of what the
+ task does
+ - backend_name (str): the name of the task in the
+ job-scheduling backend
+ - default_interval (datetime.timedelta): the default interval
+ between two task runs
+ - min_interval (datetime.timedelta): the minimum interval
+ between two task runs
+ - max_interval (datetime.timedelta): the maximum interval
+ between two task runs
+ - backoff_factor (float): the factor by which the interval
+ changes at each run
+ - max_queue_length (int): the maximum length of the task queue
+ for this task type
+
+ """
+ ...
+
+ @remote_api_endpoint("task_type/get")
+ def get_task_type(self, task_type_name):
+ """Retrieve the task type with id task_type_name"""
+ ...
+
+ @remote_api_endpoint("task_type/get_all")
+ def get_task_types(self):
+ """Retrieve all registered task types"""
+ ...
+
+ @remote_api_endpoint("task/create")
+ def create_tasks(self, tasks, policy="recurring"):
+ """Create new tasks.
+
+ Args:
+ tasks (list): each task is a dictionary with the following keys:
+
+ - type (str): the task type
+ - arguments (dict): the arguments for the task runner, keys:
+
+ - args (list of str): arguments
+ - kwargs (dict str -> str): keyword arguments
+
+ - next_run (datetime.datetime): the next scheduled run for the
+ task
+
+ Returns:
+ a list of created tasks.
+
+ """
+ ...
+
+ @remote_api_endpoint("task/set_status")
+ def set_status_tasks(self, task_ids, status="disabled", next_run=None):
+ """Set the tasks' status whose ids are listed.
+
+ If given, also set the next_run date.
+ """
+ ...
+
+ @remote_api_endpoint("task/disable")
+ def disable_tasks(self, task_ids):
+ """Disable the tasks whose ids are listed."""
+ ...
+
+ @remote_api_endpoint("task/search")
+ def search_tasks(
+ self,
+ task_id=None,
+ task_type=None,
+ status=None,
+ priority=None,
+ policy=None,
+ before=None,
+ after=None,
+ limit=None,
+ ):
+ """Search tasks from selected criterions"""
+ ...
+
+ @remote_api_endpoint("task/get")
+ def get_tasks(self, task_ids):
+ """Retrieve the info of tasks whose ids are listed."""
+ ...
+
+ @remote_api_endpoint("task/peek_ready")
+ def peek_ready_tasks(
+ self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None,
+ ):
+ """Fetch the list of ready tasks
+
+ Args:
+ task_type (str): filtering task per their type
+ timestamp (datetime.datetime): peek tasks that need to be executed
+ before that timestamp
+ num_tasks (int): only peek at num_tasks tasks (with no priority)
+ num_tasks_priority (int): only peek at num_tasks_priority
+ tasks (with priority)
+
+ Returns:
+ a list of tasks
+
+ """
+ ...
+
+ @remote_api_endpoint("task/grab_ready")
+ def grab_ready_tasks(
+ self, task_type, timestamp=None, num_tasks=None, num_tasks_priority=None,
+ ):
+ """Fetch the list of ready tasks, and mark them as scheduled
+
+ Args:
+ task_type (str): filtering task per their type
+ timestamp (datetime.datetime): grab tasks that need to be executed
+ before that timestamp
+ num_tasks (int): only grab num_tasks tasks (with no priority)
+ num_tasks_priority (int): only grab oneshot num_tasks tasks (with
+ priorities)
+
+ Returns:
+ a list of tasks
+
+ """
+ ...
+
+ @remote_api_endpoint("task_run/schedule_one")
+ def schedule_task_run(self, task_id, backend_id, metadata=None, timestamp=None):
+ """Mark a given task as scheduled, adding a task_run entry in the database.
+
+ Args:
+ task_id (int): the identifier for the task being scheduled
+ backend_id (str): the identifier of the job in the backend
+ metadata (dict): metadata to add to the task_run entry
+ timestamp (datetime.datetime): the instant the event occurred
+
+ Returns:
+ a fresh task_run entry
+
+ """
+ ...
+
+ @remote_api_endpoint("task_run/schedule")
+ def mass_schedule_task_runs(self, task_runs):
+ """Schedule a bunch of task runs.
+
+ Args:
+ task_runs (list): a list of dicts with keys:
+
+ - task (int): the identifier for the task being scheduled
+ - backend_id (str): the identifier of the job in the backend
+ - metadata (dict): metadata to add to the task_run entry
+ - scheduled (datetime.datetime): the instant the event occurred
+
+ Returns:
+ None
+ """
+ ...
+
+ @remote_api_endpoint("task_run/start")
+ def start_task_run(self, backend_id, metadata=None, timestamp=None):
+ """Mark a given task as started, updating the corresponding task_run
+ entry in the database.
+
+ Args:
+ backend_id (str): the identifier of the job in the backend
+ metadata (dict): metadata to add to the task_run entry
+ timestamp (datetime.datetime): the instant the event occurred
+
+ Returns:
+ the updated task_run entry
+
+ """
+ ...
+
+ @remote_api_endpoint("task_run/end")
+ def end_task_run(
+ self, backend_id, status, metadata=None, timestamp=None, result=None,
+ ):
+ """Mark a given task as ended, updating the corresponding task_run entry in the
+ database.
+
+ Args:
+ backend_id (str): the identifier of the job in the backend
+ status (str): how the task ended; one of: 'eventful', 'uneventful',
+ 'failed'
+ metadata (dict): metadata to add to the task_run entry
+ timestamp (datetime.datetime): the instant the event occurred
+
+ Returns:
+ the updated task_run entry
+
+ """
+ ...
+
+ @remote_api_endpoint("task/filter_for_archive")
+ def filter_task_to_archive(
+ self,
+ after_ts: str,
+ before_ts: str,
+ limit: int = 10,
+ page_token: Optional[str] = None,
+ ) -> Dict[str, Any]:
+ """Compute the tasks to archive within the datetime interval
+ [after_ts, before_ts[. The method returns a paginated result.
+
+ Returns:
+ dict with the following keys:
+ - **next_page_token**: opaque token to be used as
+ `page_token` to retrieve the next page of result. If absent,
+ there is no more pages to gather.
+ - **tasks**: list of task dictionaries with the following keys:
+
+ **id** (str): origin task id
+ **started** (Optional[datetime]): started date
+ **scheduled** (datetime): scheduled date
+ **arguments** (json dict): task's arguments
+ ...
+
+ """
+ ...
+
+ @remote_api_endpoint("task/delete_archived")
+ def delete_archived_tasks(self, task_ids):
+ """Delete archived tasks as much as possible. Only the task_ids whose
+ complete associated task_run have been cleaned up will be.
+
+ """
+ ...
+
+ @remote_api_endpoint("task_run/get")
+ def get_task_runs(self, task_ids, limit=None):
+ """Search task run for a task id"""
+ ...
+
+ @remote_api_endpoint("lister/get_or_create")
+ def get_or_create_lister(
+ self, name: str, instance_name: Optional[str] = None
+ ) -> Lister:
+ """Retrieve information about the given instance of the lister from the
+ database, or create the entry if it did not exist.
+ """
+ ...
+
+ @remote_api_endpoint("lister/update")
+ def update_lister(self, lister: Lister) -> Lister:
+ """Update the state for the given lister instance in the database.
+
+ Returns:
+ a new Lister object, with all fields updated from the database
+
+ Raises:
+ StaleData if the `updated` timestamp for the lister instance in
+ database doesn't match the one passed by the user.
+ """
+ ...
+
+ @remote_api_endpoint("origins/record")
+ def record_listed_origins(
+ self, listed_origins: Iterable[ListedOrigin]
+ ) -> List[ListedOrigin]:
+ """Record a set of origins that a lister has listed.
+
+ This performs an "upsert": origins with the same (lister_id, url,
+ visit_type) values are updated with new values for
+ extra_loader_arguments, last_update and last_seen.
+ """
+ ...
+
+ @remote_api_endpoint("priority_ratios/get")
+ def get_priority_ratios(self):
+ ...
diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py
new file mode 100644
index 0000000..211e769
--- /dev/null
+++ b/swh/scheduler/model.py
@@ -0,0 +1,162 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+from uuid import UUID
+from typing import Any, Dict, List, Optional, Tuple
+
+import attr
+import attr.converters
+from attrs_strict import type_validator
+
+
+@attr.s
+class BaseSchedulerModel:
+ """Base class for database-backed objects.
+
+ These database-backed objects are defined through attrs-based attributes
+ that match the columns of the database 1:1. This is a (very) lightweight
+ ORM.
+
+ These attrs-based attributes have metadata specific to the functionality
+ expected from these fields in the database:
+
+ - `primary_key`: the column is a primary key; it should be filtered out
+ when doing an `update` of the object
+ - `auto_primary_key`: the column is a primary key, which is automatically handled
+ by the database. It will not be inserted to. This must be matched with a
+ database-side default value.
+ - `auto_now_add`: the column is a timestamp that is set to the current time when
+ the object is inserted, and never updated afterwards. This must be matched with
+ a database-side default value.
+ - `auto_now`: the column is a timestamp that is set to the current time when
+ the object is inserted or updated.
+
+ """
+
+ _pk_cols: Optional[Tuple[str, ...]] = None
+ _select_cols: Optional[Tuple[str, ...]] = None
+ _insert_cols_and_metavars: Optional[Tuple[Tuple[str, ...], Tuple[str, ...]]] = None
+
+ @classmethod
+ def primary_key_columns(cls) -> Tuple[str, ...]:
+ """Get the primary key columns for this object type"""
+ if cls._pk_cols is None:
+ columns: List[str] = []
+ for field in attr.fields(cls):
+ if any(
+ field.metadata.get(flag)
+ for flag in ("auto_primary_key", "primary_key")
+ ):
+ columns.append(field.name)
+ cls._pk_cols = tuple(sorted(columns))
+
+ return cls._pk_cols
+
+ @classmethod
+ def select_columns(cls) -> Tuple[str, ...]:
+ """Get all the database columns needed for a `select` on this object type"""
+ if cls._select_cols is None:
+ columns: List[str] = []
+ for field in attr.fields(cls):
+ columns.append(field.name)
+ cls._select_cols = tuple(sorted(columns))
+
+ return cls._select_cols
+
+ @classmethod
+ def insert_columns_and_metavars(cls) -> Tuple[Tuple[str, ...], Tuple[str, ...]]:
+ """Get the database columns and metavars needed for an `insert` or `update` on
+ this object type.
+
+ This implements support for the `auto_*` field metadata attributes.
+ """
+ if cls._insert_cols_and_metavars is None:
+ zipped_cols_and_metavars: List[Tuple[str, str]] = []
+
+ for field in attr.fields(cls):
+ if any(
+ field.metadata.get(flag)
+ for flag in ("auto_now_add", "auto_primary_key")
+ ):
+ continue
+ elif field.metadata.get("auto_now"):
+ zipped_cols_and_metavars.append((field.name, "now()"))
+ else:
+ zipped_cols_and_metavars.append((field.name, f"%({field.name})s"))
+
+ zipped_cols_and_metavars.sort()
+
+ cols, metavars = zip(*zipped_cols_and_metavars)
+ cls._insert_cols_and_metavars = cols, metavars
+
+ return cls._insert_cols_and_metavars
+
+
+@attr.s
+class Lister(BaseSchedulerModel):
+ name = attr.ib(type=str, validator=[type_validator()])
+ instance_name = attr.ib(type=str, validator=[type_validator()])
+
+ # Populated by database
+ id = attr.ib(
+ type=Optional[UUID],
+ validator=type_validator(),
+ default=None,
+ metadata={"auto_primary_key": True},
+ )
+
+ current_state = attr.ib(
+ type=Dict[str, Any], validator=[type_validator()], factory=dict
+ )
+ created = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now_add": True},
+ )
+ updated = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now": True},
+ )
+
+
+@attr.s
+class ListedOrigin(BaseSchedulerModel):
+ """Basic information about a listed origin, output by a lister"""
+
+ lister_id = attr.ib(
+ type=UUID, validator=[type_validator()], metadata={"primary_key": True}
+ )
+ url = attr.ib(
+ type=str, validator=[type_validator()], metadata={"primary_key": True}
+ )
+ visit_type = attr.ib(
+ type=str, validator=[type_validator()], metadata={"primary_key": True}
+ )
+ extra_loader_arguments = attr.ib(
+ type=Dict[str, str], validator=[type_validator()], factory=dict
+ )
+
+ last_update = attr.ib(
+ type=Optional[datetime.datetime], validator=[type_validator()], default=None,
+ )
+
+ enabled = attr.ib(type=bool, validator=[type_validator()], default=True)
+
+ first_seen = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now_add": True},
+ )
+ last_seen = attr.ib(
+ type=Optional[datetime.datetime],
+ validator=[type_validator()],
+ default=None,
+ metadata={"auto_now": True},
+ )
diff --git a/swh/scheduler/sql/10-swh-init.sql b/swh/scheduler/sql/10-swh-init.sql
new file mode 100644
index 0000000..d159cc5
--- /dev/null
+++ b/swh/scheduler/sql/10-swh-init.sql
@@ -0,0 +1 @@
+CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
diff --git a/swh/scheduler/sql/30-swh-schema.sql b/swh/scheduler/sql/30-swh-schema.sql
index a5f0b0e..0dfed9e 100644
--- a/swh/scheduler/sql/30-swh-schema.sql
+++ b/swh/scheduler/sql/30-swh-schema.sql
@@ -1,112 +1,161 @@
create table dbversion
(
version int primary key,
release timestamptz not null,
description text not null
);
comment on table dbversion is 'Schema update tracking';
comment on column dbversion.version is 'SQL schema version';
comment on column dbversion.release is 'Version deployment timestamp';
comment on column dbversion.description is 'Version description';
insert into dbversion (version, release, description)
- values (15, now(), 'Work In Progress');
+ values (16, now(), 'Work In Progress');
create table task_type (
type text primary key,
description text not null,
backend_name text not null,
default_interval interval,
min_interval interval,
max_interval interval,
backoff_factor float,
max_queue_length bigint,
num_retries bigint,
retry_delay interval
);
comment on table task_type is 'Types of schedulable tasks';
comment on column task_type.type is 'Short identifier for the task type';
comment on column task_type.description is 'Human-readable task description';
comment on column task_type.backend_name is 'Name of the task in the job-running backend';
comment on column task_type.default_interval is 'Default interval for newly scheduled tasks';
comment on column task_type.min_interval is 'Minimum interval between two runs of a task';
comment on column task_type.max_interval is 'Maximum interval between two runs of a task';
comment on column task_type.backoff_factor is 'Adjustment factor for the backoff between two task runs';
comment on column task_type.max_queue_length is 'Maximum length of the queue for this type of tasks';
comment on column task_type.num_retries is 'Default number of retries on transient failures';
comment on column task_type.retry_delay is 'Retry delay for the task';
create type task_status as enum ('next_run_not_scheduled', 'next_run_scheduled', 'completed', 'disabled');
comment on type task_status is 'Status of a given task';
create type task_policy as enum ('recurring', 'oneshot');
comment on type task_policy is 'Recurrence policy of the given task';
create type task_priority as enum('high', 'normal', 'low');
comment on type task_priority is 'Priority of the given task';
create table priority_ratio(
id task_priority primary key,
ratio float not null
);
comment on table priority_ratio is 'Oneshot task''s reading ratio per priority';
comment on column priority_ratio.id is 'Task priority id';
comment on column priority_ratio.ratio is 'Percentage of tasks to read per priority';
insert into priority_ratio (id, ratio) values ('high', 0.5);
insert into priority_ratio (id, ratio) values ('normal', 0.3);
insert into priority_ratio (id, ratio) values ('low', 0.2);
create table task (
id bigserial primary key,
type text not null references task_type(type),
arguments jsonb not null,
next_run timestamptz not null,
current_interval interval,
status task_status not null,
policy task_policy not null default 'recurring',
retries_left bigint not null default 0,
priority task_priority references priority_ratio(id),
check (policy <> 'recurring' or current_interval is not null)
);
comment on table task is 'Schedule of recurring tasks';
comment on column task.arguments is 'Arguments passed to the underlying job scheduler. '
'Contains two keys, ''args'' (list) and ''kwargs'' (object).';
comment on column task.next_run is 'The next run of this task should be run on or after that time';
comment on column task.current_interval is 'The interval between two runs of this task, '
'taking into account the backoff factor';
comment on column task.policy is 'Whether the task is one-shot or recurring';
comment on column task.retries_left is 'The number of "short delay" retries of the task in case of '
'transient failure';
comment on column task.priority is 'Policy of the given task';
comment on column task.id is 'Task Identifier';
comment on column task.type is 'References task_type table';
comment on column task.status is 'Task status (''next_run_not_scheduled'', ''next_run_scheduled'', ''completed'', ''disabled'')';
create type task_run_status as enum ('scheduled', 'started', 'eventful', 'uneventful', 'failed', 'permfailed', 'lost');
comment on type task_run_status is 'Status of a given task run';
create table task_run (
id bigserial primary key,
task bigint not null references task(id),
backend_id text,
scheduled timestamptz,
started timestamptz,
ended timestamptz,
metadata jsonb,
status task_run_status not null default 'scheduled'
);
comment on table task_run is 'History of task runs sent to the job-running backend';
comment on column task_run.backend_id is 'id of the task run in the job-running backend';
comment on column task_run.metadata is 'Useful metadata for the given task run. '
'For instance, the worker that took on the job, '
'or the logs for the run.';
comment on column task_run.id is 'Task run identifier';
comment on column task_run.task is 'References task table';
comment on column task_run.scheduled is 'Scheduled run time for task';
comment on column task_run.started is 'Task starting time';
comment on column task_run.ended is 'Task ending time';
+
+create table if not exists listers (
+ id uuid primary key default uuid_generate_v4(),
+ name text not null,
+ instance_name text not null,
+ created timestamptz not null default now(), -- auto_now_add in the model
+ current_state jsonb not null,
+ updated timestamptz not null
+);
+
+comment on table listers is 'Lister instances known to the origin visit scheduler';
+comment on column listers.name is 'Name of the lister (e.g. github, gitlab, debian, ...)';
+comment on column listers.instance_name is 'Name of the current instance of this lister (e.g. framagit, bitbucket, ...)';
+comment on column listers.created is 'Timestamp at which the lister was originally created';
+comment on column listers.current_state is 'Known current state of this lister';
+comment on column listers.updated is 'Timestamp at which the lister state was last updated';
+
+
+create table if not exists listed_origins (
+ -- Basic information
+ lister_id uuid not null references listers(id),
+ url text not null,
+ visit_type text not null,
+ extra_loader_arguments jsonb not null,
+
+ -- Whether this origin still exists or not
+ enabled boolean not null,
+
+ -- time-based information
+ first_seen timestamptz not null default now(),
+ last_seen timestamptz not null,
+
+ -- potentially provided by the lister
+ last_update timestamptz,
+
+ primary key (lister_id, url, visit_type)
+);
+
+comment on table listed_origins is 'Origins known to the origin visit scheduler';
+comment on column listed_origins.lister_id is 'Lister instance which owns this origin';
+comment on column listed_origins.url is 'URL of the origin listed';
+comment on column listed_origins.visit_type is 'Type of the visit which should be scheduled for the given url';
+comment on column listed_origins.extra_loader_arguments is 'Extra arguments that should be passed to the loader for this origin';
+
+comment on column listed_origins.enabled is 'Whether this origin has been seen during the last listing, and visits should be scheduled.';
+comment on column listed_origins.first_seen is 'Time at which the origin was first seen by a lister';
+comment on column listed_origins.last_seen is 'Time at which the origin was last seen by the lister';
+
+comment on column listed_origins.last_update is 'Time of the last update to the origin recorded by the remote';
diff --git a/swh/scheduler/sql/60-swh-indexes.sql b/swh/scheduler/sql/60-swh-indexes.sql
index 7da9519..690541c 100644
--- a/swh/scheduler/sql/60-swh-indexes.sql
+++ b/swh/scheduler/sql/60-swh-indexes.sql
@@ -1,13 +1,16 @@
create index on task(type);
create index on task(next_run);
-- used for quick equality checking
create index on task using btree(type, md5(arguments::text));
create index on task(priority);
create index on task_run(task);
create index on task_run(backend_id);
create index task_run_id_asc_idx on task_run(task asc, started asc);
+
+-- lister schema
+create unique index on listers (name, instance_name);
diff --git a/swh/scheduler/tests/common.py b/swh/scheduler/tests/common.py
index 258c80f..9c2c463 100644
--- a/swh/scheduler/tests/common.py
+++ b/swh/scheduler/tests/common.py
@@ -1,95 +1,104 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
TEMPLATES = {
"git": {
"type": "update-git",
"arguments": {"args": [], "kwargs": {},},
"next_run": None,
},
"hg": {
"type": "update-hg",
"arguments": {"args": [], "kwargs": {},},
"next_run": None,
"policy": "oneshot",
},
}
TASK_TYPES = {
"git": {
"type": "update-git",
"description": "Update a git repository",
"backend_name": "swh.loader.git.tasks.UpdateGitRepository",
"default_interval": datetime.timedelta(days=64),
"min_interval": datetime.timedelta(hours=12),
"max_interval": datetime.timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": datetime.timedelta(hours=2),
},
"hg": {
"type": "update-hg",
"description": "Update a mercurial repository",
"backend_name": "swh.loader.mercurial.tasks.UpdateHgRepository",
"default_interval": datetime.timedelta(days=64),
"min_interval": datetime.timedelta(hours=12),
"max_interval": datetime.timedelta(days=64),
"backoff_factor": 2,
"max_queue_length": None,
"num_retries": 7,
"retry_delay": datetime.timedelta(hours=2),
},
}
def tasks_from_template(template, max_timestamp, num, num_priority=0, priorities=None):
"""Build tasks from template
"""
def _task_from_template(template, next_run, priority, *args, **kwargs):
ret = copy.deepcopy(template)
ret["next_run"] = next_run
if priority:
ret["priority"] = priority
if args:
ret["arguments"]["args"] = list(args)
if kwargs:
ret["arguments"]["kwargs"] = kwargs
return ret
def _pop_priority(priorities):
if not priorities:
return None
for priority, remains in priorities.items():
if remains > 0:
priorities[priority] = remains - 1
return priority
return None
if num_priority and priorities:
priorities = {
priority: ratio * num_priority for priority, ratio in priorities.items()
}
tasks = []
for i in range(num + num_priority):
priority = _pop_priority(priorities)
tasks.append(
_task_from_template(
template,
max_timestamp - datetime.timedelta(microseconds=i),
priority,
"argument-%03d" % i,
- **{"kwarg%03d" % i: "bogus-kwarg"}
+ **{"kwarg%03d" % i: "bogus-kwarg"},
)
)
return tasks
+
+
+LISTERS = (
+ {"name": "github"},
+ {"name": "gitlab", "instance_name": "gitlab"},
+ {"name": "gitlab", "instance_name": "freedesktop"},
+ {"name": "npm"},
+ {"name": "pypi"},
+)
diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py
index b3be2e4..9aeb208 100644
--- a/swh/scheduler/tests/conftest.py
+++ b/swh/scheduler/tests/conftest.py
@@ -1,112 +1,134 @@
# Copyright (C) 2016-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-
import os
import pytest
import glob
-from datetime import timedelta
+from datetime import datetime, timedelta, timezone
import pkg_resources
+from typing import List
from swh.core.utils import numfile_sortkey as sortkey
from swh.scheduler import get_scheduler
from swh.scheduler.tests import SQL_DIR
+from swh.scheduler.model import ListedOrigin, Lister
+from swh.scheduler.tests.common import LISTERS
# make sure we are not fooled by CELERY_ config environment vars
for var in [x for x in os.environ.keys() if x.startswith("CELERY")]:
os.environ.pop(var)
# test_cli tests depends on a en/C locale, so ensure it
os.environ["LC_ALL"] = "C.UTF-8"
DUMP_FILES = os.path.join(SQL_DIR, "*.sql")
# celery tasks for testing purpose; tasks themselves should be
# in swh/scheduler/tests/tasks.py
TASK_NAMES = ["ping", "multiping", "add", "error", "echo"]
@pytest.fixture(scope="session")
def celery_enable_logging():
return True
@pytest.fixture(scope="session")
def celery_includes():
task_modules = [
"swh.scheduler.tests.tasks",
]
for entrypoint in pkg_resources.iter_entry_points("swh.workers"):
task_modules.extend(entrypoint.load()().get("task_modules", []))
return task_modules
@pytest.fixture(scope="session")
def celery_parameters():
return {
"task_cls": "swh.scheduler.task:SWHTask",
}
@pytest.fixture(scope="session")
def celery_config():
return {
"accept_content": ["application/x-msgpack", "application/json"],
"task_serializer": "msgpack",
"result_serializer": "json",
}
# use the celery_session_app fixture to monkeypatch the 'main'
# swh.scheduler.celery_backend.config.app Celery application
# with the test application
@pytest.fixture(scope="session")
def swh_app(celery_session_app):
from swh.scheduler.celery_backend import config
config.app = celery_session_app
yield celery_session_app
@pytest.fixture
def swh_scheduler_config(request, postgresql):
scheduler_config = {
"db": postgresql.dsn,
}
all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
cursor = postgresql.cursor()
for fname in all_dump_files:
with open(fname) as fobj:
cursor.execute(fobj.read())
postgresql.commit()
return scheduler_config
@pytest.fixture
def swh_scheduler(swh_scheduler_config):
scheduler = get_scheduler("local", swh_scheduler_config)
for taskname in TASK_NAMES:
scheduler.create_task_type(
{
"type": "swh-test-{}".format(taskname),
"description": "The {} testing task".format(taskname),
"backend_name": "swh.scheduler.tests.tasks.{}".format(taskname),
"default_interval": timedelta(days=1),
"min_interval": timedelta(hours=6),
"max_interval": timedelta(days=12),
}
)
return scheduler
# this alias is used to be able to easily instantiate a db-backed Scheduler
# eg. for the RPC client/server test suite.
swh_db_scheduler = swh_scheduler
+
+
+@pytest.fixture
+def stored_lister(swh_scheduler) -> Lister:
+ """Store a lister in the scheduler and return its information"""
+ return swh_scheduler.get_or_create_lister(**LISTERS[0])
+
+
+@pytest.fixture
+def listed_origins(stored_lister) -> List[ListedOrigin]:
+ """Return a (fixed) set of 1000 listed origins"""
+ return [
+ ListedOrigin(
+ lister_id=stored_lister.id,
+ url=f"https://example.com/{i:04d}.git",
+ visit_type="git",
+ last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc),
+ )
+ for i in range(1000)
+ ]
diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py
index 73b3373..fa0dd62 100644
--- a/swh/scheduler/tests/test_api_client.py
+++ b/swh/scheduler/tests/test_api_client.py
@@ -1,64 +1,68 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
from flask import url_for
import swh.scheduler.api.server as server
from swh.scheduler.api.client import RemoteScheduler
from swh.scheduler.tests.test_scheduler import TestScheduler # noqa
# tests are executed using imported class (TestScheduler) using overloaded
# swh_scheduler fixture below
# the Flask app used as server in these tests
@pytest.fixture
def app(swh_db_scheduler):
+ assert hasattr(server, "scheduler")
server.scheduler = swh_db_scheduler
yield server.app
# the RPCClient class used as client used in these tests
@pytest.fixture
def swh_rpc_client_class():
return RemoteScheduler
@pytest.fixture
def swh_scheduler(swh_rpc_client, app):
yield swh_rpc_client
def test_site_map(flask_app_client):
sitemap = flask_app_client.get(url_for("site_map"))
assert sitemap.headers["Content-Type"] == "application/json"
rules = set(x["rule"] for x in sitemap.json)
# we expect at least these rules
expected_rules = set(
"/" + rule
for rule in (
- "set_status_tasks",
- "create_task_type",
- "get_task_type",
- "get_task_types",
- "create_tasks",
- "disable_tasks",
- "get_tasks",
- "search_tasks",
- "get_task_runs",
- "peek_ready_tasks",
- "grab_ready_tasks",
- "schedule_task_run",
- "mass_schedule_task_runs",
- "start_task_run",
- "end_task_run",
- "filter_task_to_archive",
- "delete_archived_tasks",
- "get_priority_ratios",
+ "lister/get_or_create",
+ "lister/update",
+ "origins/record",
+ "priority_ratios/get",
+ "task/create",
+ "task/delete_archived",
+ "task/disable",
+ "task/filter_for_archive",
+ "task/get",
+ "task/grab_ready",
+ "task/peek_ready",
+ "task/search",
+ "task/set_status",
+ "task_run/end",
+ "task_run/get",
+ "task_run/schedule",
+ "task_run/schedule_one",
+ "task_run/start",
+ "task_type/create",
+ "task_type/get",
+ "task_type/get_all",
)
)
assert rules == expected_rules
diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py
index 8edabbd..ed9539d 100644
--- a/swh/scheduler/tests/test_celery_tasks.py
+++ b/swh/scheduler/tests/test_celery_tasks.py
@@ -1,167 +1,164 @@
from time import sleep
from itertools import count
from celery.result import GroupResult
from celery.result import AsyncResult
import pytest
from swh.scheduler.utils import create_task_dict
from swh.scheduler.celery_backend.runner import run_ready_tasks
def test_ping(swh_app, celery_session_worker):
res = swh_app.send_task("swh.scheduler.tests.tasks.ping")
assert res
res.wait()
assert res.successful()
assert res.result == "OK"
def test_ping_with_kw(swh_app, celery_session_worker):
res = swh_app.send_task("swh.scheduler.tests.tasks.ping", kwargs={"a": 1})
assert res
res.wait()
assert res.successful()
assert res.result == "OK (kw={'a': 1})"
def test_multiping(swh_app, celery_session_worker):
"Test that a task that spawns subtasks (group) works"
res = swh_app.send_task("swh.scheduler.tests.tasks.multiping", kwargs={"n": 5})
assert res
res.wait()
assert res.successful()
# retrieve the GroupResult for this task and wait for all the subtasks
# to complete
promise_id = res.result
assert promise_id
promise = GroupResult.restore(promise_id, app=swh_app)
for i in range(5):
if promise.ready():
break
sleep(1)
results = [x.get() for x in promise.results]
assert len(results) == 5
for i in range(5):
assert ("OK (kw={'i': %s})" % i) in results
-@pytest.mark.db
def test_scheduler_fixture(swh_app, celery_session_worker, swh_scheduler):
"Test that the scheduler fixture works properly"
task_type = swh_scheduler.get_task_type("swh-test-ping")
assert task_type
assert task_type["backend_name"] == "swh.scheduler.tests.tasks.ping"
swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")])
backend_tasks = run_ready_tasks(swh_scheduler, swh_app)
assert backend_tasks
for task in backend_tasks:
# Make sure the task completed
AsyncResult(id=task["backend_id"]).get()
-@pytest.mark.db
def test_task_return_value(swh_app, celery_session_worker, swh_scheduler):
task_type = swh_scheduler.get_task_type("swh-test-add")
assert task_type
assert task_type["backend_name"] == "swh.scheduler.tests.tasks.add"
swh_scheduler.create_tasks([create_task_dict("swh-test-add", "oneshot", 12, 30)])
backend_tasks = run_ready_tasks(swh_scheduler, swh_app)
assert len(backend_tasks) == 1
task = backend_tasks[0]
value = AsyncResult(id=task["backend_id"]).get()
assert value == 42
-@pytest.mark.db
def test_task_exception(swh_app, celery_session_worker, swh_scheduler):
task_type = swh_scheduler.get_task_type("swh-test-error")
assert task_type
assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error"
swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")])
backend_tasks = run_ready_tasks(swh_scheduler, swh_app)
assert len(backend_tasks) == 1
task = backend_tasks[0]
result = AsyncResult(id=task["backend_id"])
with pytest.raises(NotImplementedError):
result.get()
def test_statsd(swh_app, celery_session_worker, mocker):
m = mocker.patch("swh.scheduler.task.Statsd._send_to_server")
mocker.patch("swh.scheduler.task.ts", side_effect=count())
mocker.patch("swh.core.statsd.monotonic", side_effect=count())
res = swh_app.send_task("swh.scheduler.tests.tasks.echo")
assert res
res.wait()
assert res.successful()
assert res.result == {}
m.assert_any_call(
"swh_task_called_count:1|c|"
"#task:swh.scheduler.tests.tasks.echo,worker:unknown worker"
)
m.assert_any_call(
"swh_task_start_ts:0|g|"
"#task:swh.scheduler.tests.tasks.echo,worker:unknown worker"
)
m.assert_any_call(
"swh_task_end_ts:1|g|"
"#status:uneventful,task:swh.scheduler.tests.tasks.echo,"
"worker:unknown worker"
)
m.assert_any_call(
"swh_task_duration_seconds:1000|ms|"
"#task:swh.scheduler.tests.tasks.echo,worker:unknown worker"
)
m.assert_any_call(
"swh_task_success_count:1|c|"
"#task:swh.scheduler.tests.tasks.echo,worker:unknown worker"
)
def test_statsd_with_status(swh_app, celery_session_worker, mocker):
m = mocker.patch("swh.scheduler.task.Statsd._send_to_server")
mocker.patch("swh.scheduler.task.ts", side_effect=count())
mocker.patch("swh.core.statsd.monotonic", side_effect=count())
res = swh_app.send_task(
"swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"}
)
assert res
res.wait()
assert res.successful()
assert res.result == {"status": "eventful"}
m.assert_any_call(
"swh_task_called_count:1|c|"
"#task:swh.scheduler.tests.tasks.echo,worker:unknown worker"
)
m.assert_any_call(
"swh_task_start_ts:0|g|"
"#task:swh.scheduler.tests.tasks.echo,worker:unknown worker"
)
m.assert_any_call(
"swh_task_end_ts:1|g|"
"#status:eventful,task:swh.scheduler.tests.tasks.echo,"
"worker:unknown worker"
)
m.assert_any_call(
"swh_task_duration_seconds:1000|ms|"
"#task:swh.scheduler.tests.tasks.echo,worker:unknown worker"
)
m.assert_any_call(
"swh_task_success_count:1|c|"
"#task:swh.scheduler.tests.tasks.echo,worker:unknown worker"
)
diff --git a/swh/scheduler/tests/test_cli_celery_monitor.py b/swh/scheduler/tests/test_cli_celery_monitor.py
new file mode 100644
index 0000000..5a0ed24
--- /dev/null
+++ b/swh/scheduler/tests/test_cli_celery_monitor.py
@@ -0,0 +1,130 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import logging
+
+from click.testing import CliRunner
+import pytest
+
+from swh.scheduler.cli import cli
+
+
+def invoke(*args, catch_exceptions=False):
+ result = CliRunner(mix_stderr=False).invoke(
+ cli, ["celery-monitor", *args], catch_exceptions=catch_exceptions,
+ )
+
+ return result
+
+
+def test_celery_monitor():
+ """Check that celery-monitor returns its help text"""
+
+ result = invoke()
+
+ assert "Commands:" in result.stdout
+ assert "Options:" in result.stdout
+
+
+def test_celery_monitor_ping(caplog, swh_app, celery_session_worker):
+ caplog.set_level(logging.INFO, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("--pattern", celery_session_worker.hostname, "ping-workers")
+
+ assert result.exit_code == 0
+
+ assert len(caplog.records) == 1
+
+ (record,) = caplog.records
+
+ assert record.levelname == "INFO"
+ assert f"response from {celery_session_worker.hostname}" in record.message
+
+
+@pytest.mark.parametrize(
+ "filter_args,filter_message,exit_code",
+ [
+ ((), "Matching all workers", 0),
+ (
+ ("--pattern", "celery@*.test-host"),
+ "Using glob pattern celery@*.test-host",
+ 1,
+ ),
+ (
+ ("--pattern", "celery@test-type.test-host"),
+ "Using destinations celery@test-type.test-host",
+ 1,
+ ),
+ (
+ ("--pattern", "celery@test-type.test-host,celery@test-type2.test-host"),
+ (
+ "Using destinations "
+ "celery@test-type.test-host, celery@test-type2.test-host"
+ ),
+ 1,
+ ),
+ ],
+)
+def test_celery_monitor_ping_filter(
+ caplog, swh_app, celery_session_worker, filter_args, filter_message, exit_code
+):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("--timeout", "1.5", *filter_args, "ping-workers")
+
+ assert result.exit_code == exit_code, result.stdout
+
+ got_no_response_message = False
+ got_filter_message = False
+
+ for record in caplog.records:
+ # Check the proper filter has been generated
+ if record.levelname == "DEBUG":
+ if filter_message in record.message:
+ got_filter_message = True
+ # Check that no worker responded
+ if record.levelname == "INFO":
+ if "No response in" in record.message:
+ got_no_response_message = True
+
+ assert got_filter_message
+
+ if filter_args:
+ assert got_no_response_message
+
+
+def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke("--pattern", celery_session_worker.hostname, "list-running")
+
+ assert result.exit_code == 0, result.stdout
+
+ for record in caplog.records:
+ if record.levelname != "INFO":
+ continue
+ assert f"{celery_session_worker.hostname}: no active tasks" in record.message
+
+
+@pytest.mark.parametrize("format", ["csv", "pretty"])
+def test_celery_monitor_list_running_format(
+ caplog, swh_app, celery_session_worker, format
+):
+ caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
+
+ result = invoke(
+ "--pattern", celery_session_worker.hostname, "list-running", "--format", format
+ )
+
+ assert result.exit_code == 0, result.stdout
+
+ for record in caplog.records:
+ if record.levelname != "INFO":
+ continue
+ assert f"{celery_session_worker.hostname}: no active tasks" in record.message
+
+ if format == "csv":
+ lines = result.stdout.splitlines()
+ assert lines == ["worker,name,args,kwargs,duration,worker_pid"]
diff --git a/swh/scheduler/tests/test_model.py b/swh/scheduler/tests/test_model.py
new file mode 100644
index 0000000..47bb618
--- /dev/null
+++ b/swh/scheduler/tests/test_model.py
@@ -0,0 +1,94 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+
+import attr
+
+from swh.scheduler import model
+
+
+def test_select_columns():
+ @attr.s
+ class TestModel(model.BaseSchedulerModel):
+ id = attr.ib(type=str)
+ test1 = attr.ib(type=str)
+ a_first_attr = attr.ib(type=str)
+
+ @property
+ def test2(self):
+ """This property should not show up in the extracted columns"""
+ return self.test1
+
+ assert TestModel.select_columns() == ("a_first_attr", "id", "test1")
+
+
+def test_insert_columns():
+ @attr.s
+ class TestModel(model.BaseSchedulerModel):
+ id = attr.ib(type=str)
+ test1 = attr.ib(type=str)
+
+ @property
+ def test2(self):
+ """This property should not show up in the extracted columns"""
+ return self.test1
+
+ assert TestModel.insert_columns_and_metavars() == (
+ ("id", "test1"),
+ ("%(id)s", "%(test1)s"),
+ )
+
+
+def test_insert_columns_auto_now_add():
+ @attr.s
+ class TestModel(model.BaseSchedulerModel):
+ id = attr.ib(type=str)
+ test1 = attr.ib(type=str)
+ added = attr.ib(type=datetime.datetime, metadata={"auto_now_add": True})
+
+ assert TestModel.insert_columns_and_metavars() == (
+ ("id", "test1"),
+ ("%(id)s", "%(test1)s"),
+ )
+
+
+def test_insert_columns_auto_now():
+ @attr.s
+ class TestModel(model.BaseSchedulerModel):
+ id = attr.ib(type=str)
+ test1 = attr.ib(type=str)
+ updated = attr.ib(type=datetime.datetime, metadata={"auto_now": True})
+
+ assert TestModel.insert_columns_and_metavars() == (
+ ("id", "test1", "updated"),
+ ("%(id)s", "%(test1)s", "now()"),
+ )
+
+
+def test_insert_columns_primary_key():
+ @attr.s
+ class TestModel(model.BaseSchedulerModel):
+ id = attr.ib(type=str, metadata={"auto_primary_key": True})
+ test1 = attr.ib(type=str)
+
+ assert TestModel.insert_columns_and_metavars() == (("test1",), ("%(test1)s",))
+
+
+def test_insert_primary_key():
+ @attr.s
+ class TestModel(model.BaseSchedulerModel):
+ id = attr.ib(type=str, metadata={"auto_primary_key": True})
+ test1 = attr.ib(type=str)
+
+ assert TestModel.primary_key_columns() == ("id",)
+
+ @attr.s
+ class TestModel2(model.BaseSchedulerModel):
+ col1 = attr.ib(type=str, metadata={"primary_key": True})
+ col2 = attr.ib(type=str, metadata={"primary_key": True})
+ test1 = attr.ib(type=str)
+
+ assert TestModel2.primary_key_columns() == ("col1", "col2")
diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py
index c2cd1a4..6d58e8c 100644
--- a/swh/scheduler/tests/test_scheduler.py
+++ b/swh/scheduler/tests/test_scheduler.py
@@ -1,587 +1,678 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
import random
import uuid
from collections import defaultdict
+import inspect
from typing import Any, Dict
from arrow import utcnow
-
+import attr
import pytest
-from .common import tasks_from_template, TEMPLATES, TASK_TYPES
+from swh.scheduler.exc import StaleData
+from swh.scheduler.interface import SchedulerInterface
+
+from .common import tasks_from_template, TEMPLATES, TASK_TYPES, LISTERS
def subdict(d, keys=None, excl=()):
if keys is None:
keys = [k for k in d.keys()]
return {k: d[k] for k in keys if k not in excl}
-@pytest.mark.db
class TestScheduler:
+ def test_interface(self, swh_scheduler):
+ """Checks all methods of SchedulerInterface are implemented by this
+ backend, and that they have the same signature."""
+ # Create an instance of the protocol (which cannot be instantiated
+ # directly, so this creates a subclass, then instantiates it)
+ interface = type("_", (SchedulerInterface,), {})()
+
+ assert "create_task_type" in dir(interface)
+
+ missing_methods = []
+
+ for meth_name in dir(interface):
+ if meth_name.startswith("_"):
+ continue
+ interface_meth = getattr(interface, meth_name)
+ try:
+ concrete_meth = getattr(swh_scheduler, meth_name)
+ except AttributeError:
+ if not getattr(interface_meth, "deprecated_endpoint", False):
+ # The backend is missing a (non-deprecated) endpoint
+ missing_methods.append(meth_name)
+ continue
+
+ expected_signature = inspect.signature(interface_meth)
+ actual_signature = inspect.signature(concrete_meth)
+
+ assert expected_signature == actual_signature, meth_name
+
+ assert missing_methods == []
+
def test_get_priority_ratios(self, swh_scheduler):
assert swh_scheduler.get_priority_ratios() == {
"high": 0.5,
"normal": 0.3,
"low": 0.2,
}
def test_add_task_type(self, swh_scheduler):
tt = TASK_TYPES["git"]
swh_scheduler.create_task_type(tt)
assert tt == swh_scheduler.get_task_type(tt["type"])
tt2 = TASK_TYPES["hg"]
swh_scheduler.create_task_type(tt2)
assert tt == swh_scheduler.get_task_type(tt["type"])
assert tt2 == swh_scheduler.get_task_type(tt2["type"])
def test_create_task_type_idempotence(self, swh_scheduler):
tt = TASK_TYPES["git"]
swh_scheduler.create_task_type(tt)
swh_scheduler.create_task_type(tt)
assert tt == swh_scheduler.get_task_type(tt["type"])
def test_get_task_types(self, swh_scheduler):
tt, tt2 = TASK_TYPES["git"], TASK_TYPES["hg"]
swh_scheduler.create_task_type(tt)
swh_scheduler.create_task_type(tt2)
actual_task_types = swh_scheduler.get_task_types()
assert tt in actual_task_types
assert tt2 in actual_task_types
def test_create_tasks(self, swh_scheduler):
priority_ratio = self._priority_ratio(swh_scheduler)
self._create_task_types(swh_scheduler)
num_tasks_priority = 100
tasks_1 = tasks_from_template(TEMPLATES["git"], utcnow(), 100)
tasks_2 = tasks_from_template(
TEMPLATES["hg"],
utcnow(),
100,
num_tasks_priority,
priorities=priority_ratio,
)
tasks = tasks_1 + tasks_2
# tasks are returned only once with their ids
ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2)
set_ret1 = set([t["id"] for t in ret1])
# creating the same set result in the same ids
ret = swh_scheduler.create_tasks(tasks)
set_ret = set([t["id"] for t in ret])
# Idempotence results
assert set_ret == set_ret1
assert len(ret) == len(ret1)
ids = set()
actual_priorities = defaultdict(int)
for task, orig_task in zip(ret, tasks):
task = copy.deepcopy(task)
task_type = TASK_TYPES[orig_task["type"].split("-")[-1]]
assert task["id"] not in ids
assert task["status"] == "next_run_not_scheduled"
assert task["current_interval"] == task_type["default_interval"]
assert task["policy"] == orig_task.get("policy", "recurring")
priority = task.get("priority")
if priority:
actual_priorities[priority] += 1
assert task["retries_left"] == (task_type["num_retries"] or 0)
ids.add(task["id"])
del task["id"]
del task["status"]
del task["current_interval"]
del task["retries_left"]
if "policy" not in orig_task:
del task["policy"]
if "priority" not in orig_task:
del task["priority"]
assert task == orig_task
assert dict(actual_priorities) == {
priority: int(ratio * num_tasks_priority)
for priority, ratio in priority_ratio.items()
}
def test_peek_ready_tasks_no_priority(self, swh_scheduler):
self._create_task_types(swh_scheduler)
t = utcnow()
task_type = TEMPLATES["git"]["type"]
tasks = tasks_from_template(TEMPLATES["git"], t, 100)
random.shuffle(tasks)
swh_scheduler.create_tasks(tasks)
ready_tasks = swh_scheduler.peek_ready_tasks(task_type)
assert len(ready_tasks) == len(tasks)
for i in range(len(ready_tasks) - 1):
assert ready_tasks[i]["next_run"] <= ready_tasks[i + 1]["next_run"]
# Only get the first few ready tasks
limit = random.randrange(5, 5 + len(tasks) // 2)
ready_tasks_limited = swh_scheduler.peek_ready_tasks(task_type, num_tasks=limit)
assert len(ready_tasks_limited) == limit
assert ready_tasks_limited == ready_tasks[:limit]
# Limit by timestamp
max_ts = tasks[limit - 1]["next_run"]
ready_tasks_timestamped = swh_scheduler.peek_ready_tasks(
task_type, timestamp=max_ts
)
for ready_task in ready_tasks_timestamped:
assert ready_task["next_run"] <= max_ts
# Make sure we get proper behavior for the first ready tasks
assert ready_tasks[: len(ready_tasks_timestamped)] == ready_tasks_timestamped
# Limit by both
ready_tasks_both = swh_scheduler.peek_ready_tasks(
task_type, timestamp=max_ts, num_tasks=limit // 3
)
assert len(ready_tasks_both) <= limit // 3
for ready_task in ready_tasks_both:
assert ready_task["next_run"] <= max_ts
assert ready_task in ready_tasks[: limit // 3]
def _priority_ratio(self, swh_scheduler):
return swh_scheduler.get_priority_ratios()
def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler):
priority_ratio = self._priority_ratio(swh_scheduler)
self._create_task_types(swh_scheduler)
t = utcnow()
task_type = TEMPLATES["git"]["type"]
num_tasks_priority = 100
num_tasks_no_priority = 100
# Create tasks with and without priorities
tasks = tasks_from_template(
TEMPLATES["git"],
t,
num=num_tasks_no_priority,
num_priority=num_tasks_priority,
priorities=priority_ratio,
)
random.shuffle(tasks)
swh_scheduler.create_tasks(tasks)
# take all available tasks
ready_tasks = swh_scheduler.peek_ready_tasks(task_type)
assert len(ready_tasks) == len(tasks)
assert num_tasks_priority + num_tasks_no_priority == len(ready_tasks)
count_tasks_per_priority = defaultdict(int)
for task in ready_tasks:
priority = task.get("priority")
if priority:
count_tasks_per_priority[priority] += 1
assert dict(count_tasks_per_priority) == {
priority: int(ratio * num_tasks_priority)
for priority, ratio in priority_ratio.items()
}
# Only get some ready tasks
num_tasks = random.randrange(5, 5 + num_tasks_no_priority // 2)
num_tasks_priority = random.randrange(5, num_tasks_priority // 2)
ready_tasks_limited = swh_scheduler.peek_ready_tasks(
task_type, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority
)
count_tasks_per_priority = defaultdict(int)
for task in ready_tasks_limited:
priority = task.get("priority")
count_tasks_per_priority[priority] += 1
import math
for priority, ratio in priority_ratio.items():
expected_count = math.ceil(ratio * num_tasks_priority)
actual_prio = count_tasks_per_priority[priority]
assert actual_prio == expected_count or actual_prio == expected_count + 1
assert count_tasks_per_priority[None] == num_tasks
def test_grab_ready_tasks(self, swh_scheduler):
priority_ratio = self._priority_ratio(swh_scheduler)
self._create_task_types(swh_scheduler)
t = utcnow()
task_type = TEMPLATES["git"]["type"]
num_tasks_priority = 100
num_tasks_no_priority = 100
# Create tasks with and without priorities
tasks = tasks_from_template(
TEMPLATES["git"],
t,
num=num_tasks_no_priority,
num_priority=num_tasks_priority,
priorities=priority_ratio,
)
random.shuffle(tasks)
swh_scheduler.create_tasks(tasks)
first_ready_tasks = swh_scheduler.peek_ready_tasks(
task_type, num_tasks=10, num_tasks_priority=10
)
grabbed_tasks = swh_scheduler.grab_ready_tasks(
task_type, num_tasks=10, num_tasks_priority=10
)
for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks):
assert peeked["status"] == "next_run_not_scheduled"
del peeked["status"]
assert grabbed["status"] == "next_run_scheduled"
del grabbed["status"]
assert peeked == grabbed
assert peeked["priority"] == grabbed["priority"]
def test_get_tasks(self, swh_scheduler):
self._create_task_types(swh_scheduler)
t = utcnow()
tasks = tasks_from_template(TEMPLATES["git"], t, 100)
tasks = swh_scheduler.create_tasks(tasks)
random.shuffle(tasks)
while len(tasks) > 1:
length = random.randrange(1, len(tasks))
cur_tasks = sorted(tasks[:length], key=lambda x: x["id"])
tasks[:length] = []
ret = swh_scheduler.get_tasks(task["id"] for task in cur_tasks)
# result is not guaranteed to be sorted
ret.sort(key=lambda x: x["id"])
assert ret == cur_tasks
def test_search_tasks(self, swh_scheduler):
def make_real_dicts(lst):
"""RealDictRow is not a real dict."""
return [dict(d.items()) for d in lst]
self._create_task_types(swh_scheduler)
t = utcnow()
tasks = tasks_from_template(TEMPLATES["git"], t, 100)
tasks = swh_scheduler.create_tasks(tasks)
assert make_real_dicts(swh_scheduler.search_tasks()) == make_real_dicts(tasks)
def assert_filtered_task_ok(
self, task: Dict[str, Any], after: datetime.datetime, before: datetime.datetime
) -> None:
"""Ensure filtered tasks have the right expected properties
(within the range, recurring disabled, etc..)
"""
started = task["started"]
date = started if started is not None else task["scheduled"]
assert after <= date and date <= before
if task["task_policy"] == "oneshot":
assert task["task_status"] in ["completed", "disabled"]
if task["task_policy"] == "recurring":
assert task["task_status"] in ["disabled"]
def test_filter_task_to_archive(self, swh_scheduler):
"""Filtering only list disabled recurring or completed oneshot tasks
"""
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = tasks_from_template(TEMPLATES["git"], _time, 12)
oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12)
total_tasks = len(recurring) + len(oneshots)
# simulate scheduling tasks
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
backend_tasks = [
{
"task": task["id"],
"backend_id": str(uuid.uuid4()),
"scheduled": utcnow(),
}
for task in pending_tasks
]
swh_scheduler.mass_schedule_task_runs(backend_tasks)
# we simulate the task are being done
_tasks = []
for task in backend_tasks:
t = swh_scheduler.end_task_run(task["backend_id"], status="eventful")
_tasks.append(t)
# Randomly update task's status per policy
status_per_policy = {"recurring": 0, "oneshot": 0}
status_choice = {
# policy: [tuple (1-for-filtering, 'associated-status')]
"recurring": [
(1, "disabled"),
(0, "completed"),
(0, "next_run_not_scheduled"),
],
"oneshot": [
(0, "next_run_not_scheduled"),
(1, "disabled"),
(1, "completed"),
],
}
tasks_to_update = defaultdict(list)
_task_ids = defaultdict(list)
# randomize 'disabling' recurring task or 'complete' oneshot task
for task in pending_tasks:
policy = task["policy"]
_task_ids[policy].append(task["id"])
status = random.choice(status_choice[policy])
if status[0] != 1:
continue
# elected for filtering
status_per_policy[policy] += status[0]
tasks_to_update[policy].append(task["id"])
swh_scheduler.disable_tasks(tasks_to_update["recurring"])
# hack: change the status to something else than completed/disabled
swh_scheduler.set_status_tasks(
_task_ids["oneshot"], status="next_run_not_scheduled"
)
# complete the tasks to update
swh_scheduler.set_status_tasks(tasks_to_update["oneshot"], status="completed")
total_tasks_filtered = (
status_per_policy["recurring"] + status_per_policy["oneshot"]
)
# no pagination scenario
# retrieve tasks to archive
after = _time.shift(days=-1)
after_ts = after.format("YYYY-MM-DD")
before = utcnow().shift(days=1)
before_ts = before.format("YYYY-MM-DD")
tasks_result = swh_scheduler.filter_task_to_archive(
after_ts=after_ts, before_ts=before_ts, limit=total_tasks
)
tasks_to_archive = tasks_result["tasks"]
assert len(tasks_to_archive) == total_tasks_filtered
assert tasks_result.get("next_page_token") is None
actual_filtered_per_status = {"recurring": 0, "oneshot": 0}
for task in tasks_to_archive:
self.assert_filtered_task_ok(task, after, before)
actual_filtered_per_status[task["task_policy"]] += 1
assert actual_filtered_per_status == status_per_policy
# pagination scenario
nb_tasks = 3
tasks_result = swh_scheduler.filter_task_to_archive(
after_ts=after_ts, before_ts=before_ts, limit=nb_tasks
)
tasks_to_archive2 = tasks_result["tasks"]
assert len(tasks_to_archive2) == nb_tasks
next_page_token = tasks_result["next_page_token"]
assert next_page_token is not None
all_tasks = tasks_to_archive2
while next_page_token is not None: # Retrieve paginated results
tasks_result = swh_scheduler.filter_task_to_archive(
after_ts=after_ts,
before_ts=before_ts,
limit=nb_tasks,
page_token=next_page_token,
)
tasks_to_archive2 = tasks_result["tasks"]
assert len(tasks_to_archive2) <= nb_tasks
all_tasks.extend(tasks_to_archive2)
next_page_token = tasks_result.get("next_page_token")
actual_filtered_per_status = {"recurring": 0, "oneshot": 0}
for task in all_tasks:
self.assert_filtered_task_ok(task, after, before)
actual_filtered_per_status[task["task_policy"]] += 1
assert actual_filtered_per_status == status_per_policy
def test_delete_archived_tasks(self, swh_scheduler):
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = tasks_from_template(TEMPLATES["git"], _time, 12)
oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12)
total_tasks = len(recurring) + len(oneshots)
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
backend_tasks = [
{
"task": task["id"],
"backend_id": str(uuid.uuid4()),
"scheduled": utcnow(),
}
for task in pending_tasks
]
swh_scheduler.mass_schedule_task_runs(backend_tasks)
_tasks = []
percent = random.randint(0, 100) # random election removal boundary
for task in backend_tasks:
t = swh_scheduler.end_task_run(task["backend_id"], status="eventful")
c = random.randint(0, 100)
if c <= percent:
_tasks.append({"task_id": t["task"], "task_run_id": t["id"]})
swh_scheduler.delete_archived_tasks(_tasks)
all_tasks = [task["id"] for task in swh_scheduler.search_tasks()]
tasks_count = len(all_tasks)
tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks))
assert tasks_count == total_tasks - len(_tasks)
assert tasks_run_count == total_tasks - len(_tasks)
def test_get_task_runs_no_task(self, swh_scheduler):
"""No task exist in the scheduler's db, get_task_runs() should always return an
empty list.
"""
assert not swh_scheduler.get_task_runs(task_ids=())
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3))
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10)
def test_get_task_runs_no_task_executed(self, swh_scheduler):
"""No task has been executed yet, get_task_runs() should always return an empty
list.
"""
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = tasks_from_template(TEMPLATES["git"], _time, 12)
oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12)
swh_scheduler.create_tasks(recurring + oneshots)
assert not swh_scheduler.get_task_runs(task_ids=())
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3))
assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10)
def test_get_task_runs_with_scheduled(self, swh_scheduler):
"""Some tasks have been scheduled but not executed yet, get_task_runs() should
not return an empty list. limit should behave as expected.
"""
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = tasks_from_template(TEMPLATES["git"], _time, 12)
oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12)
total_tasks = len(recurring) + len(oneshots)
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
backend_tasks = [
{
"task": task["id"],
"backend_id": str(uuid.uuid4()),
"scheduled": utcnow(),
}
for task in pending_tasks
]
swh_scheduler.mass_schedule_task_runs(backend_tasks)
assert not swh_scheduler.get_task_runs(task_ids=[total_tasks + 1])
btask = backend_tasks[0]
runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]])
assert len(runs) == 1
run = runs[0]
assert subdict(run, excl=("id",)) == {
"task": btask["task"],
"backend_id": btask["backend_id"],
"scheduled": btask["scheduled"],
"started": None,
"ended": None,
"metadata": None,
"status": "scheduled",
}
runs = swh_scheduler.get_task_runs(
task_ids=[bt["task"] for bt in backend_tasks], limit=2
)
assert len(runs) == 2
runs = swh_scheduler.get_task_runs(
task_ids=[bt["task"] for bt in backend_tasks]
)
assert len(runs) == total_tasks
keys = ("task", "backend_id", "scheduled")
assert (
sorted([subdict(x, keys) for x in runs], key=lambda x: x["task"])
== backend_tasks
)
def test_get_task_runs_with_executed(self, swh_scheduler):
"""Some tasks have been executed, get_task_runs() should
not return an empty list. limit should behave as expected.
"""
self._create_task_types(swh_scheduler)
_time = utcnow()
recurring = tasks_from_template(TEMPLATES["git"], _time, 12)
oneshots = tasks_from_template(TEMPLATES["hg"], _time, 12)
pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
backend_tasks = [
{
"task": task["id"],
"backend_id": str(uuid.uuid4()),
"scheduled": utcnow(),
}
for task in pending_tasks
]
swh_scheduler.mass_schedule_task_runs(backend_tasks)
btask = backend_tasks[0]
ts = utcnow()
swh_scheduler.start_task_run(
btask["backend_id"], metadata={"something": "stupid"}, timestamp=ts
)
runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]])
assert len(runs) == 1
assert subdict(runs[0], excl=("id")) == {
"task": btask["task"],
"backend_id": btask["backend_id"],
"scheduled": btask["scheduled"],
"started": ts,
"ended": None,
"metadata": {"something": "stupid"},
"status": "started",
}
ts2 = utcnow()
swh_scheduler.end_task_run(
btask["backend_id"],
metadata={"other": "stuff"},
timestamp=ts2,
status="eventful",
)
runs = swh_scheduler.get_task_runs(task_ids=[btask["task"]])
assert len(runs) == 1
assert subdict(runs[0], excl=("id")) == {
"task": btask["task"],
"backend_id": btask["backend_id"],
"scheduled": btask["scheduled"],
"started": ts,
"ended": ts2,
"metadata": {"something": "stupid", "other": "stuff"},
"status": "eventful",
}
+ def test_get_or_create_lister(self, swh_scheduler):
+ db_listers = []
+ for lister_args in LISTERS:
+ db_listers.append(swh_scheduler.get_or_create_lister(**lister_args))
+
+ for lister, lister_args in zip(db_listers, LISTERS):
+ assert lister.name == lister_args["name"]
+ assert lister.instance_name == lister_args.get("instance_name", "")
+
+ lister_get_again = swh_scheduler.get_or_create_lister(
+ lister.name, lister.instance_name
+ )
+
+ assert lister == lister_get_again
+
+ def test_update_lister(self, swh_scheduler, stored_lister):
+ lister = attr.evolve(stored_lister, current_state={"updated": "now"})
+
+ updated_lister = swh_scheduler.update_lister(lister)
+
+ assert updated_lister.updated > lister.updated
+ assert updated_lister == attr.evolve(lister, updated=updated_lister.updated)
+
+ def test_update_lister_stale(self, swh_scheduler, stored_lister):
+ swh_scheduler.update_lister(stored_lister)
+
+ with pytest.raises(StaleData) as exc:
+ swh_scheduler.update_lister(stored_lister)
+ assert "state not updated" in exc.value.args[0]
+
+ def test_record_listed_origins(self, swh_scheduler, listed_origins):
+ ret = swh_scheduler.record_listed_origins(listed_origins)
+
+ assert set(returned.url for returned in ret) == set(
+ origin.url for origin in listed_origins
+ )
+
+ assert all(origin.first_seen == origin.last_seen for origin in ret)
+
+ def test_record_listed_origins_upsert(self, swh_scheduler, listed_origins):
+ # First, insert `cutoff` origins
+ cutoff = 100
+ assert cutoff < len(listed_origins)
+
+ ret = swh_scheduler.record_listed_origins(listed_origins[:cutoff])
+ assert len(ret) == cutoff
+
+ # Then, insert all origins, including the `cutoff` first.
+ ret = swh_scheduler.record_listed_origins(listed_origins)
+
+ assert len(ret) == len(listed_origins)
+
+ # Two different "first seen" values
+ assert len(set(origin.first_seen for origin in ret)) == 2
+
+ # But a single "last seen" value
+ assert len(set(origin.last_seen for origin in ret)) == 1
+
def _create_task_types(self, scheduler):
for tt in TASK_TYPES.values():
scheduler.create_task_type(tt)
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..b6252bd
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,42 @@
+[tox]
+envlist=black,flake8,mypy,py3
+
+[testenv]
+extras =
+ testing
+deps =
+ pytest-cov
+ dev: ipdb
+setenv =
+ LC_ALL=C.UTF-8
+ LC_CTYPE=C.UTF-8
+ LANG=C.UTF-8
+commands =
+ pytest --doctest-modules \
+ !slow: --hypothesis-profile=fast \
+ slow: --hypothesis-profile=slow \
+ --cov={envsitepackagesdir}/swh/scheduler \
+ {envsitepackagesdir}/swh/scheduler \
+ --cov-branch {posargs}
+
+[testenv:black]
+skip_install = true
+deps =
+ black
+commands =
+ {envpython} -m black --check swh
+
+[testenv:flake8]
+skip_install = true
+deps =
+ flake8
+commands =
+ {envpython} -m flake8
+
+[testenv:mypy]
+extras =
+ testing
+deps =
+ mypy
+commands =
+ mypy swh